summaryrefslogtreecommitdiffstats
path: root/modules/queue/queue_disk_channel_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/queue/queue_disk_channel_test.go')
-rw-r--r--modules/queue/queue_disk_channel_test.go76
1 files changed, 57 insertions, 19 deletions
diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go
index f092bb1f56..22b4f0f452 100644
--- a/modules/queue/queue_disk_channel_test.go
+++ b/modules/queue/queue_disk_channel_test.go
@@ -221,6 +221,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
queueShutdown := []func(){}
queueTerminate := []func(){}
+ terminated := make(chan struct{})
tmpDir, err := os.MkdirTemp("", "persistable-channel-queue-pause-test-data")
assert.NoError(t, err)
@@ -237,15 +238,18 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
}, &testData{})
assert.NoError(t, err)
- go queue.Run(func(shutdown func()) {
- lock.Lock()
- defer lock.Unlock()
- queueShutdown = append(queueShutdown, shutdown)
- }, func(terminate func()) {
- lock.Lock()
- defer lock.Unlock()
- queueTerminate = append(queueTerminate, terminate)
- })
+ go func() {
+ queue.Run(func(shutdown func()) {
+ lock.Lock()
+ defer lock.Unlock()
+ queueShutdown = append(queueShutdown, shutdown)
+ }, func(terminate func()) {
+ lock.Lock()
+ defer lock.Unlock()
+ queueTerminate = append(queueTerminate, terminate)
+ })
+ close(terminated)
+ }()
// Shutdown and Terminate in defer
defer func() {
@@ -417,7 +421,10 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
case <-handleChan:
assert.Fail(t, "Handler processing should have stopped")
return
- default:
+ case <-terminated:
+ case <-time.After(10 * time.Second):
+ assert.Fail(t, "Queue should have terminated")
+ return
}
lock.Lock()
@@ -425,6 +432,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
lock.Unlock()
// Reopen queue
+ terminated = make(chan struct{})
queue, err = NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{
DataDir: tmpDir,
BatchLength: 1,
@@ -442,15 +450,18 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
paused, _ = pausable.IsPausedIsResumed()
- go queue.Run(func(shutdown func()) {
- lock.Lock()
- defer lock.Unlock()
- queueShutdown = append(queueShutdown, shutdown)
- }, func(terminate func()) {
- lock.Lock()
- defer lock.Unlock()
- queueTerminate = append(queueTerminate, terminate)
- })
+ go func() {
+ queue.Run(func(shutdown func()) {
+ lock.Lock()
+ defer lock.Unlock()
+ queueShutdown = append(queueShutdown, shutdown)
+ }, func(terminate func()) {
+ lock.Lock()
+ defer lock.Unlock()
+ queueTerminate = append(queueTerminate, terminate)
+ })
+ close(terminated)
+ }()
select {
case <-handleChan:
@@ -510,4 +521,31 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
assert.Equal(t, test2.TestString, result4.TestString)
assert.Equal(t, test2.TestInt, result4.TestInt)
+
+ lock.Lock()
+ callbacks = make([]func(), len(queueShutdown))
+ copy(callbacks, queueShutdown)
+ queueShutdown = queueShutdown[:0]
+ lock.Unlock()
+ // Now shutdown the queue
+ for _, callback := range callbacks {
+ callback()
+ }
+
+ // terminate the queue
+ lock.Lock()
+ callbacks = make([]func(), len(queueTerminate))
+ copy(callbacks, queueTerminate)
+ queueShutdown = queueTerminate[:0]
+ lock.Unlock()
+ for _, callback := range callbacks {
+ callback()
+ }
+
+ select {
+ case <-time.After(10 * time.Second):
+ assert.Fail(t, "Queue should have terminated")
+ return
+ case <-terminated:
+ }
}