diff options
Diffstat (limited to 'modules/queue/queue_disk_channel_test.go')
-rw-r--r-- | modules/queue/queue_disk_channel_test.go | 102 |
1 files changed, 46 insertions, 56 deletions
diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go index 026982fd92..f092bb1f56 100644 --- a/modules/queue/queue_disk_channel_test.go +++ b/modules/queue/queue_disk_channel_test.go @@ -207,7 +207,6 @@ func TestPersistableChannelQueue_Pause(t *testing.T) { log.Info("pausing") pausable.Pause() } - pushBack = false lock.Unlock() return data } @@ -248,6 +247,25 @@ func TestPersistableChannelQueue_Pause(t *testing.T) { queueTerminate = append(queueTerminate, terminate) }) + // Shutdown and Terminate in defer + defer func() { + lock.Lock() + callbacks := make([]func(), len(queueShutdown)) + copy(callbacks, queueShutdown) + lock.Unlock() + for _, callback := range callbacks { + callback() + } + lock.Lock() + log.Info("Finally terminating") + callbacks = make([]func(), len(queueTerminate)) + copy(callbacks, queueTerminate) + lock.Unlock() + for _, callback := range callbacks { + callback() + } + }() + test1 := testData{"A", 1} test2 := testData{"B", 2} @@ -263,14 +281,11 @@ func TestPersistableChannelQueue_Pause(t *testing.T) { assert.Equal(t, test1.TestInt, result1.TestInt) pausable.Pause() - paused, resumed := pausable.IsPausedIsResumed() + paused, _ := pausable.IsPausedIsResumed() select { case <-paused: - case <-resumed: - assert.Fail(t, "Queue should not be resumed") - return - default: + case <-time.After(100 * time.Millisecond): assert.Fail(t, "Queue is not paused") return } @@ -287,14 +302,11 @@ func TestPersistableChannelQueue_Pause(t *testing.T) { assert.Nil(t, result2) pausable.Resume() - paused, resumed = pausable.IsPausedIsResumed() + _, resumed := pausable.IsPausedIsResumed() select { - case <-paused: - assert.Fail(t, "Queue should be resumed") - return case <-resumed: - default: + case <-time.After(100 * time.Millisecond): assert.Fail(t, "Queue should be resumed") return } @@ -308,24 +320,27 @@ func TestPersistableChannelQueue_Pause(t *testing.T) { assert.Equal(t, test2.TestString, result2.TestString) assert.Equal(t, test2.TestInt, result2.TestInt) + // Set pushBack to so that the next handle will result in a Pause lock.Lock() pushBack = true lock.Unlock() - paused, resumed = pausable.IsPausedIsResumed() + // Ensure that we're still resumed + _, resumed = pausable.IsPausedIsResumed() select { - case <-paused: - assert.Fail(t, "Queue should not be paused") - return case <-resumed: - default: + case <-time.After(100 * time.Millisecond): assert.Fail(t, "Queue is not resumed") return } + // push test1 queue.Push(&test1) + // Now as this is handled it should pause + paused, _ = pausable.IsPausedIsResumed() + select { case <-paused: case <-handleChan: @@ -336,27 +351,16 @@ func TestPersistableChannelQueue_Pause(t *testing.T) { return } - paused, resumed = pausable.IsPausedIsResumed() - - select { - case <-paused: - case <-resumed: - assert.Fail(t, "Queue should not be resumed") - return - default: - assert.Fail(t, "Queue is not paused") - return - } + lock.Lock() + pushBack = false + lock.Unlock() pausable.Resume() - paused, resumed = pausable.IsPausedIsResumed() + _, resumed = pausable.IsPausedIsResumed() select { - case <-paused: - assert.Fail(t, "Queue should not be paused") - return case <-resumed: - default: + case <-time.After(500 * time.Millisecond): assert.Fail(t, "Queue should be resumed") return } @@ -373,6 +377,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) { lock.Lock() callbacks := make([]func(), len(queueShutdown)) copy(callbacks, queueShutdown) + queueShutdown = queueShutdown[:0] lock.Unlock() // Now shutdown the queue for _, callback := range callbacks { @@ -402,6 +407,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) { lock.Lock() callbacks = make([]func(), len(queueTerminate)) copy(callbacks, queueTerminate) + queueShutdown = queueTerminate[:0] lock.Unlock() for _, callback := range callbacks { callback() @@ -453,14 +459,11 @@ func TestPersistableChannelQueue_Pause(t *testing.T) { case <-paused: } - paused, resumed = pausable.IsPausedIsResumed() + paused, _ = pausable.IsPausedIsResumed() select { case <-paused: - case <-resumed: - assert.Fail(t, "Queue should not be resumed") - return - default: + case <-time.After(500 * time.Millisecond): assert.Fail(t, "Queue is not paused") return } @@ -472,14 +475,15 @@ func TestPersistableChannelQueue_Pause(t *testing.T) { default: } + lock.Lock() + pushBack = false + lock.Unlock() + pausable.Resume() - paused, resumed = pausable.IsPausedIsResumed() + _, resumed = pausable.IsPausedIsResumed() select { - case <-paused: - assert.Fail(t, "Queue should not be paused") - return case <-resumed: - default: + case <-time.After(500 * time.Millisecond): assert.Fail(t, "Queue should be resumed") return } @@ -506,18 +510,4 @@ func TestPersistableChannelQueue_Pause(t *testing.T) { assert.Equal(t, test2.TestString, result4.TestString) assert.Equal(t, test2.TestInt, result4.TestInt) - lock.Lock() - callbacks = make([]func(), len(queueShutdown)) - copy(callbacks, queueShutdown) - lock.Unlock() - for _, callback := range callbacks { - callback() - } - lock.Lock() - callbacks = make([]func(), len(queueTerminate)) - copy(callbacks, queueTerminate) - lock.Unlock() - for _, callback := range callbacks { - callback() - } } |