summaryrefslogtreecommitdiffstats
path: root/modules/queue/workerqueue_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/queue/workerqueue_test.go')
-rw-r--r--modules/queue/workerqueue_test.go30
1 files changed, 11 insertions, 19 deletions
diff --git a/modules/queue/workerqueue_test.go b/modules/queue/workerqueue_test.go
index da9451cd77..e60120162a 100644
--- a/modules/queue/workerqueue_test.go
+++ b/modules/queue/workerqueue_test.go
@@ -16,17 +16,9 @@ import (
)
func runWorkerPoolQueue[T any](q *WorkerPoolQueue[T]) func() {
- var stop func()
- started := make(chan struct{})
- stopped := make(chan struct{})
- go func() {
- q.Run(func(f func()) { stop = f; close(started) }, nil)
- close(stopped)
- }()
- <-started
+ go q.Run()
return func() {
- stop()
- <-stopped
+ q.ShutdownWait(1 * time.Second)
}
}
@@ -57,7 +49,7 @@ func TestWorkerPoolQueueUnhandled(t *testing.T) {
return unhandled
}
- q, _ := NewWorkerPoolQueueBySetting("test-workpoolqueue", queueSetting, handler, false)
+ q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", queueSetting, handler, false)
stop := runWorkerPoolQueue(q)
for i := 0; i < queueSetting.Length; i++ {
testRecorder.Record("push:%v", i)
@@ -145,7 +137,7 @@ func testWorkerPoolQueuePersistence(t *testing.T, queueSetting setting.QueueSett
return nil
}
- q, _ := NewWorkerPoolQueueBySetting("pr_patch_checker_test", queueSetting, testHandler, true)
+ q, _ := newWorkerPoolQueueForTest("pr_patch_checker_test", queueSetting, testHandler, true)
stop := runWorkerPoolQueue(q)
for i := 0; i < testCount; i++ {
_ = q.Push("task-" + strconv.Itoa(i))
@@ -169,7 +161,7 @@ func testWorkerPoolQueuePersistence(t *testing.T, queueSetting setting.QueueSett
return nil
}
- q, _ := NewWorkerPoolQueueBySetting("pr_patch_checker_test", queueSetting, testHandler, true)
+ q, _ := newWorkerPoolQueueForTest("pr_patch_checker_test", queueSetting, testHandler, true)
stop := runWorkerPoolQueue(q)
assert.NoError(t, q.FlushWithContext(context.Background(), 0))
stop()
@@ -194,7 +186,7 @@ func TestWorkerPoolQueueActiveWorkers(t *testing.T) {
return nil
}
- q, _ := NewWorkerPoolQueueBySetting("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 1, Length: 100}, handler, false)
+ q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 1, Length: 100}, handler, false)
stop := runWorkerPoolQueue(q)
for i := 0; i < 5; i++ {
assert.NoError(t, q.Push(i))
@@ -210,7 +202,7 @@ func TestWorkerPoolQueueActiveWorkers(t *testing.T) {
assert.EqualValues(t, 1, q.GetWorkerNumber()) // there is at least one worker after the queue begins working
stop()
- q, _ = NewWorkerPoolQueueBySetting("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 3, Length: 100}, handler, false)
+ q, _ = newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 3, Length: 100}, handler, false)
stop = runWorkerPoolQueue(q)
for i := 0; i < 15; i++ {
assert.NoError(t, q.Push(i))
@@ -238,23 +230,23 @@ func TestWorkerPoolQueueShutdown(t *testing.T) {
if items[0] == 0 {
close(handlerCalled)
}
- time.Sleep(100 * time.Millisecond)
+ time.Sleep(400 * time.Millisecond)
return items
}
qs := setting.QueueSettings{Type: "level", Datadir: t.TempDir() + "/queue", BatchLength: 3, MaxWorkers: 4, Length: 20}
- q, _ := NewWorkerPoolQueueBySetting("test-workpoolqueue", qs, handler, false)
+ q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", qs, handler, false)
stop := runWorkerPoolQueue(q)
for i := 0; i < qs.Length; i++ {
assert.NoError(t, q.Push(i))
}
<-handlerCalled
- time.Sleep(50 * time.Millisecond) // wait for a while to make sure all workers are active
+ time.Sleep(200 * time.Millisecond) // wait for a while to make sure all workers are active
assert.EqualValues(t, 4, q.GetWorkerActiveNumber())
stop() // stop triggers shutdown
assert.EqualValues(t, 0, q.GetWorkerActiveNumber())
// no item was ever handled, so we still get all of them again
- q, _ = NewWorkerPoolQueueBySetting("test-workpoolqueue", qs, handler, false)
+ q, _ = newWorkerPoolQueueForTest("test-workpoolqueue", qs, handler, false)
assert.EqualValues(t, 20, q.GetQueueItemNumber())
}