diff options
Diffstat (limited to 'modules/queue/workerqueue_test.go')
-rw-r--r-- | modules/queue/workerqueue_test.go | 30 |
1 files changed, 11 insertions, 19 deletions
diff --git a/modules/queue/workerqueue_test.go b/modules/queue/workerqueue_test.go index da9451cd77..e60120162a 100644 --- a/modules/queue/workerqueue_test.go +++ b/modules/queue/workerqueue_test.go @@ -16,17 +16,9 @@ import ( ) func runWorkerPoolQueue[T any](q *WorkerPoolQueue[T]) func() { - var stop func() - started := make(chan struct{}) - stopped := make(chan struct{}) - go func() { - q.Run(func(f func()) { stop = f; close(started) }, nil) - close(stopped) - }() - <-started + go q.Run() return func() { - stop() - <-stopped + q.ShutdownWait(1 * time.Second) } } @@ -57,7 +49,7 @@ func TestWorkerPoolQueueUnhandled(t *testing.T) { return unhandled } - q, _ := NewWorkerPoolQueueBySetting("test-workpoolqueue", queueSetting, handler, false) + q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", queueSetting, handler, false) stop := runWorkerPoolQueue(q) for i := 0; i < queueSetting.Length; i++ { testRecorder.Record("push:%v", i) @@ -145,7 +137,7 @@ func testWorkerPoolQueuePersistence(t *testing.T, queueSetting setting.QueueSett return nil } - q, _ := NewWorkerPoolQueueBySetting("pr_patch_checker_test", queueSetting, testHandler, true) + q, _ := newWorkerPoolQueueForTest("pr_patch_checker_test", queueSetting, testHandler, true) stop := runWorkerPoolQueue(q) for i := 0; i < testCount; i++ { _ = q.Push("task-" + strconv.Itoa(i)) @@ -169,7 +161,7 @@ func testWorkerPoolQueuePersistence(t *testing.T, queueSetting setting.QueueSett return nil } - q, _ := NewWorkerPoolQueueBySetting("pr_patch_checker_test", queueSetting, testHandler, true) + q, _ := newWorkerPoolQueueForTest("pr_patch_checker_test", queueSetting, testHandler, true) stop := runWorkerPoolQueue(q) assert.NoError(t, q.FlushWithContext(context.Background(), 0)) stop() @@ -194,7 +186,7 @@ func TestWorkerPoolQueueActiveWorkers(t *testing.T) { return nil } - q, _ := NewWorkerPoolQueueBySetting("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 1, Length: 100}, handler, false) + q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 1, Length: 100}, handler, false) stop := runWorkerPoolQueue(q) for i := 0; i < 5; i++ { assert.NoError(t, q.Push(i)) @@ -210,7 +202,7 @@ func TestWorkerPoolQueueActiveWorkers(t *testing.T) { assert.EqualValues(t, 1, q.GetWorkerNumber()) // there is at least one worker after the queue begins working stop() - q, _ = NewWorkerPoolQueueBySetting("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 3, Length: 100}, handler, false) + q, _ = newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 3, Length: 100}, handler, false) stop = runWorkerPoolQueue(q) for i := 0; i < 15; i++ { assert.NoError(t, q.Push(i)) @@ -238,23 +230,23 @@ func TestWorkerPoolQueueShutdown(t *testing.T) { if items[0] == 0 { close(handlerCalled) } - time.Sleep(100 * time.Millisecond) + time.Sleep(400 * time.Millisecond) return items } qs := setting.QueueSettings{Type: "level", Datadir: t.TempDir() + "/queue", BatchLength: 3, MaxWorkers: 4, Length: 20} - q, _ := NewWorkerPoolQueueBySetting("test-workpoolqueue", qs, handler, false) + q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", qs, handler, false) stop := runWorkerPoolQueue(q) for i := 0; i < qs.Length; i++ { assert.NoError(t, q.Push(i)) } <-handlerCalled - time.Sleep(50 * time.Millisecond) // wait for a while to make sure all workers are active + time.Sleep(200 * time.Millisecond) // wait for a while to make sure all workers are active assert.EqualValues(t, 4, q.GetWorkerActiveNumber()) stop() // stop triggers shutdown assert.EqualValues(t, 0, q.GetWorkerActiveNumber()) // no item was ever handled, so we still get all of them again - q, _ = NewWorkerPoolQueueBySetting("test-workpoolqueue", qs, handler, false) + q, _ = newWorkerPoolQueueForTest("test-workpoolqueue", qs, handler, false) assert.EqualValues(t, 20, q.GetQueueItemNumber()) } |