summaryrefslogtreecommitdiffstats
path: root/modules/queue/queue_disk_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/queue/queue_disk_test.go')
-rw-r--r--modules/queue/queue_disk_test.go18
1 files changed, 18 insertions, 0 deletions
diff --git a/modules/queue/queue_disk_test.go b/modules/queue/queue_disk_test.go
index 8600b8d868..038d7d8223 100644
--- a/modules/queue/queue_disk_test.go
+++ b/modules/queue/queue_disk_test.go
@@ -8,6 +8,7 @@ import (
"context"
"io/ioutil"
"os"
+ "sync"
"testing"
"time"
@@ -24,6 +25,7 @@ func TestLevelQueue(t *testing.T) {
}
}
+ var lock sync.Mutex
queueShutdown := []func(){}
queueTerminate := []func(){}
@@ -46,9 +48,13 @@ func TestLevelQueue(t *testing.T) {
assert.NoError(t, err)
go queue.Run(func(_ context.Context, shutdown func()) {
+ lock.Lock()
queueShutdown = append(queueShutdown, shutdown)
+ lock.Unlock()
}, func(_ context.Context, terminate func()) {
+ lock.Lock()
queueTerminate = append(queueTerminate, terminate)
+ lock.Unlock()
})
test1 := testData{"A", 1}
@@ -72,9 +78,12 @@ func TestLevelQueue(t *testing.T) {
err = queue.Push(test1)
assert.Error(t, err)
+ lock.Lock()
for _, callback := range queueShutdown {
callback()
}
+ lock.Unlock()
+
time.Sleep(200 * time.Millisecond)
err = queue.Push(&test1)
assert.NoError(t, err)
@@ -85,9 +94,11 @@ func TestLevelQueue(t *testing.T) {
assert.Fail(t, "Handler processing should have stopped")
default:
}
+ lock.Lock()
for _, callback := range queueTerminate {
callback()
}
+ lock.Unlock()
// Reopen queue
queue, err = NewWrappedQueue(handle,
@@ -109,9 +120,13 @@ func TestLevelQueue(t *testing.T) {
assert.NoError(t, err)
go queue.Run(func(_ context.Context, shutdown func()) {
+ lock.Lock()
queueShutdown = append(queueShutdown, shutdown)
+ lock.Unlock()
}, func(_ context.Context, terminate func()) {
+ lock.Lock()
queueTerminate = append(queueTerminate, terminate)
+ lock.Unlock()
})
result3 := <-handleChan
@@ -121,10 +136,13 @@ func TestLevelQueue(t *testing.T) {
result4 := <-handleChan
assert.Equal(t, test2.TestString, result4.TestString)
assert.Equal(t, test2.TestInt, result4.TestInt)
+
+ lock.Lock()
for _, callback := range queueShutdown {
callback()
}
for _, callback := range queueTerminate {
callback()
}
+ lock.Unlock()
}