diff options
Diffstat (limited to 'modules/queue/workerqueue_test.go')
-rw-r--r-- | modules/queue/workerqueue_test.go | 33 |
1 files changed, 23 insertions, 10 deletions
diff --git a/modules/queue/workerqueue_test.go b/modules/queue/workerqueue_test.go index a08b02a123..d66253ff66 100644 --- a/modules/queue/workerqueue_test.go +++ b/modules/queue/workerqueue_test.go @@ -5,8 +5,10 @@ package queue import ( "context" + "slices" "strconv" "sync" + "sync/atomic" "testing" "time" @@ -250,23 +252,34 @@ func TestWorkerPoolQueueShutdown(t *testing.T) { func TestWorkerPoolQueueWorkerIdleReset(t *testing.T) { defer test.MockVariableValue(&workerIdleDuration, 10*time.Millisecond)() - defer mockBackoffDuration(10 * time.Millisecond)() + defer mockBackoffDuration(5 * time.Millisecond)() + var q *WorkerPoolQueue[int] + var handledCount atomic.Int32 + var hasOnlyOneWorkerRunning atomic.Bool handler := func(items ...int) (unhandled []int) { - time.Sleep(50 * time.Millisecond) + handledCount.Add(int32(len(items))) + // make each work have different duration, and check the active worker number periodically + var activeNums []int + for i := 0; i < 5-items[0]%2; i++ { + time.Sleep(workerIdleDuration * 2) + activeNums = append(activeNums, q.GetWorkerActiveNumber()) + } + // When the queue never becomes empty, the existing workers should keep working + // It is not 100% true at the moment because the data-race in workergroup.go is not resolved, see that TODO */ + // If the "active worker numbers" is like [2 2 ... 1 1], it means that an existing worker exited and the no new worker is started. + if slices.Equal([]int{1, 1}, activeNums[len(activeNums)-2:]) { + hasOnlyOneWorkerRunning.Store(true) + } return nil } - - q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 2, Length: 100}, handler, false) + q, _ = newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 2, Length: 100}, handler, false) stop := runWorkerPoolQueue(q) - for i := 0; i < 20; i++ { + for i := 0; i < 100; i++ { assert.NoError(t, q.Push(i)) } - time.Sleep(500 * time.Millisecond) - assert.EqualValues(t, 2, q.GetWorkerNumber()) - assert.EqualValues(t, 2, q.GetWorkerActiveNumber()) - // when the queue never becomes empty, the existing workers should keep working - assert.EqualValues(t, 2, q.workerStartedCounter) + assert.Greater(t, int(handledCount.Load()), 4) // make sure there are enough items handled during the test + assert.False(t, hasOnlyOneWorkerRunning.Load(), "a slow handler should not block other workers from starting") stop() } |