aboutsummaryrefslogtreecommitdiffstats
path: root/modules/queue/queue_disk_channel_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/queue/queue_disk_channel_test.go')
-rw-r--r--modules/queue/queue_disk_channel_test.go102
1 files changed, 46 insertions, 56 deletions
diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go
index 026982fd92..f092bb1f56 100644
--- a/modules/queue/queue_disk_channel_test.go
+++ b/modules/queue/queue_disk_channel_test.go
@@ -207,7 +207,6 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
log.Info("pausing")
pausable.Pause()
}
- pushBack = false
lock.Unlock()
return data
}
@@ -248,6 +247,25 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
queueTerminate = append(queueTerminate, terminate)
})
+ // Shutdown and Terminate in defer
+ defer func() {
+ lock.Lock()
+ callbacks := make([]func(), len(queueShutdown))
+ copy(callbacks, queueShutdown)
+ lock.Unlock()
+ for _, callback := range callbacks {
+ callback()
+ }
+ lock.Lock()
+ log.Info("Finally terminating")
+ callbacks = make([]func(), len(queueTerminate))
+ copy(callbacks, queueTerminate)
+ lock.Unlock()
+ for _, callback := range callbacks {
+ callback()
+ }
+ }()
+
test1 := testData{"A", 1}
test2 := testData{"B", 2}
@@ -263,14 +281,11 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
assert.Equal(t, test1.TestInt, result1.TestInt)
pausable.Pause()
- paused, resumed := pausable.IsPausedIsResumed()
+ paused, _ := pausable.IsPausedIsResumed()
select {
case <-paused:
- case <-resumed:
- assert.Fail(t, "Queue should not be resumed")
- return
- default:
+ case <-time.After(100 * time.Millisecond):
assert.Fail(t, "Queue is not paused")
return
}
@@ -287,14 +302,11 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
assert.Nil(t, result2)
pausable.Resume()
- paused, resumed = pausable.IsPausedIsResumed()
+ _, resumed := pausable.IsPausedIsResumed()
select {
- case <-paused:
- assert.Fail(t, "Queue should be resumed")
- return
case <-resumed:
- default:
+ case <-time.After(100 * time.Millisecond):
assert.Fail(t, "Queue should be resumed")
return
}
@@ -308,24 +320,27 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
assert.Equal(t, test2.TestString, result2.TestString)
assert.Equal(t, test2.TestInt, result2.TestInt)
+ // Set pushBack to so that the next handle will result in a Pause
lock.Lock()
pushBack = true
lock.Unlock()
- paused, resumed = pausable.IsPausedIsResumed()
+ // Ensure that we're still resumed
+ _, resumed = pausable.IsPausedIsResumed()
select {
- case <-paused:
- assert.Fail(t, "Queue should not be paused")
- return
case <-resumed:
- default:
+ case <-time.After(100 * time.Millisecond):
assert.Fail(t, "Queue is not resumed")
return
}
+ // push test1
queue.Push(&test1)
+ // Now as this is handled it should pause
+ paused, _ = pausable.IsPausedIsResumed()
+
select {
case <-paused:
case <-handleChan:
@@ -336,27 +351,16 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
return
}
- paused, resumed = pausable.IsPausedIsResumed()
-
- select {
- case <-paused:
- case <-resumed:
- assert.Fail(t, "Queue should not be resumed")
- return
- default:
- assert.Fail(t, "Queue is not paused")
- return
- }
+ lock.Lock()
+ pushBack = false
+ lock.Unlock()
pausable.Resume()
- paused, resumed = pausable.IsPausedIsResumed()
+ _, resumed = pausable.IsPausedIsResumed()
select {
- case <-paused:
- assert.Fail(t, "Queue should not be paused")
- return
case <-resumed:
- default:
+ case <-time.After(500 * time.Millisecond):
assert.Fail(t, "Queue should be resumed")
return
}
@@ -373,6 +377,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
lock.Lock()
callbacks := make([]func(), len(queueShutdown))
copy(callbacks, queueShutdown)
+ queueShutdown = queueShutdown[:0]
lock.Unlock()
// Now shutdown the queue
for _, callback := range callbacks {
@@ -402,6 +407,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
lock.Lock()
callbacks = make([]func(), len(queueTerminate))
copy(callbacks, queueTerminate)
+ queueShutdown = queueTerminate[:0]
lock.Unlock()
for _, callback := range callbacks {
callback()
@@ -453,14 +459,11 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
case <-paused:
}
- paused, resumed = pausable.IsPausedIsResumed()
+ paused, _ = pausable.IsPausedIsResumed()
select {
case <-paused:
- case <-resumed:
- assert.Fail(t, "Queue should not be resumed")
- return
- default:
+ case <-time.After(500 * time.Millisecond):
assert.Fail(t, "Queue is not paused")
return
}
@@ -472,14 +475,15 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
default:
}
+ lock.Lock()
+ pushBack = false
+ lock.Unlock()
+
pausable.Resume()
- paused, resumed = pausable.IsPausedIsResumed()
+ _, resumed = pausable.IsPausedIsResumed()
select {
- case <-paused:
- assert.Fail(t, "Queue should not be paused")
- return
case <-resumed:
- default:
+ case <-time.After(500 * time.Millisecond):
assert.Fail(t, "Queue should be resumed")
return
}
@@ -506,18 +510,4 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
assert.Equal(t, test2.TestString, result4.TestString)
assert.Equal(t, test2.TestInt, result4.TestInt)
- lock.Lock()
- callbacks = make([]func(), len(queueShutdown))
- copy(callbacks, queueShutdown)
- lock.Unlock()
- for _, callback := range callbacks {
- callback()
- }
- lock.Lock()
- callbacks = make([]func(), len(queueTerminate))
- copy(callbacks, queueTerminate)
- lock.Unlock()
- for _, callback := range callbacks {
- callback()
- }
}