diff options
Diffstat (limited to 'modules/queue/queue_disk_channel_test.go')
-rw-r--r-- | modules/queue/queue_disk_channel_test.go | 76 |
1 files changed, 57 insertions, 19 deletions
diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go index f092bb1f56..22b4f0f452 100644 --- a/modules/queue/queue_disk_channel_test.go +++ b/modules/queue/queue_disk_channel_test.go @@ -221,6 +221,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) { queueShutdown := []func(){} queueTerminate := []func(){} + terminated := make(chan struct{}) tmpDir, err := os.MkdirTemp("", "persistable-channel-queue-pause-test-data") assert.NoError(t, err) @@ -237,15 +238,18 @@ func TestPersistableChannelQueue_Pause(t *testing.T) { }, &testData{}) assert.NoError(t, err) - go queue.Run(func(shutdown func()) { - lock.Lock() - defer lock.Unlock() - queueShutdown = append(queueShutdown, shutdown) - }, func(terminate func()) { - lock.Lock() - defer lock.Unlock() - queueTerminate = append(queueTerminate, terminate) - }) + go func() { + queue.Run(func(shutdown func()) { + lock.Lock() + defer lock.Unlock() + queueShutdown = append(queueShutdown, shutdown) + }, func(terminate func()) { + lock.Lock() + defer lock.Unlock() + queueTerminate = append(queueTerminate, terminate) + }) + close(terminated) + }() // Shutdown and Terminate in defer defer func() { @@ -417,7 +421,10 @@ func TestPersistableChannelQueue_Pause(t *testing.T) { case <-handleChan: assert.Fail(t, "Handler processing should have stopped") return - default: + case <-terminated: + case <-time.After(10 * time.Second): + assert.Fail(t, "Queue should have terminated") + return } lock.Lock() @@ -425,6 +432,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) { lock.Unlock() // Reopen queue + terminated = make(chan struct{}) queue, err = NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{ DataDir: tmpDir, BatchLength: 1, @@ -442,15 +450,18 @@ func TestPersistableChannelQueue_Pause(t *testing.T) { paused, _ = pausable.IsPausedIsResumed() - go queue.Run(func(shutdown func()) { - lock.Lock() - defer lock.Unlock() - queueShutdown = append(queueShutdown, shutdown) - }, func(terminate func()) { - lock.Lock() - defer lock.Unlock() - queueTerminate = append(queueTerminate, terminate) - }) + go func() { + queue.Run(func(shutdown func()) { + lock.Lock() + defer lock.Unlock() + queueShutdown = append(queueShutdown, shutdown) + }, func(terminate func()) { + lock.Lock() + defer lock.Unlock() + queueTerminate = append(queueTerminate, terminate) + }) + close(terminated) + }() select { case <-handleChan: @@ -510,4 +521,31 @@ func TestPersistableChannelQueue_Pause(t *testing.T) { assert.Equal(t, test2.TestString, result4.TestString) assert.Equal(t, test2.TestInt, result4.TestInt) + + lock.Lock() + callbacks = make([]func(), len(queueShutdown)) + copy(callbacks, queueShutdown) + queueShutdown = queueShutdown[:0] + lock.Unlock() + // Now shutdown the queue + for _, callback := range callbacks { + callback() + } + + // terminate the queue + lock.Lock() + callbacks = make([]func(), len(queueTerminate)) + copy(callbacks, queueTerminate) + queueShutdown = queueTerminate[:0] + lock.Unlock() + for _, callback := range callbacks { + callback() + } + + select { + case <-time.After(10 * time.Second): + assert.Fail(t, "Queue should have terminated") + return + case <-terminated: + } } |