summaryrefslogtreecommitdiffstats
path: root/modules/queue/workerqueue_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/queue/workerqueue_test.go')
-rw-r--r--modules/queue/workerqueue_test.go33
1 files changed, 23 insertions, 10 deletions
diff --git a/modules/queue/workerqueue_test.go b/modules/queue/workerqueue_test.go
index a08b02a123..d66253ff66 100644
--- a/modules/queue/workerqueue_test.go
+++ b/modules/queue/workerqueue_test.go
@@ -5,8 +5,10 @@ package queue
import (
"context"
+ "slices"
"strconv"
"sync"
+ "sync/atomic"
"testing"
"time"
@@ -250,23 +252,34 @@ func TestWorkerPoolQueueShutdown(t *testing.T) {
func TestWorkerPoolQueueWorkerIdleReset(t *testing.T) {
defer test.MockVariableValue(&workerIdleDuration, 10*time.Millisecond)()
- defer mockBackoffDuration(10 * time.Millisecond)()
+ defer mockBackoffDuration(5 * time.Millisecond)()
+ var q *WorkerPoolQueue[int]
+ var handledCount atomic.Int32
+ var hasOnlyOneWorkerRunning atomic.Bool
handler := func(items ...int) (unhandled []int) {
- time.Sleep(50 * time.Millisecond)
+ handledCount.Add(int32(len(items)))
+ // make each work have different duration, and check the active worker number periodically
+ var activeNums []int
+ for i := 0; i < 5-items[0]%2; i++ {
+ time.Sleep(workerIdleDuration * 2)
+ activeNums = append(activeNums, q.GetWorkerActiveNumber())
+ }
+ // When the queue never becomes empty, the existing workers should keep working
+ // It is not 100% true at the moment because the data-race in workergroup.go is not resolved, see that TODO */
+ // If the "active worker numbers" is like [2 2 ... 1 1], it means that an existing worker exited and the no new worker is started.
+ if slices.Equal([]int{1, 1}, activeNums[len(activeNums)-2:]) {
+ hasOnlyOneWorkerRunning.Store(true)
+ }
return nil
}
-
- q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 2, Length: 100}, handler, false)
+ q, _ = newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 2, Length: 100}, handler, false)
stop := runWorkerPoolQueue(q)
- for i := 0; i < 20; i++ {
+ for i := 0; i < 100; i++ {
assert.NoError(t, q.Push(i))
}
-
time.Sleep(500 * time.Millisecond)
- assert.EqualValues(t, 2, q.GetWorkerNumber())
- assert.EqualValues(t, 2, q.GetWorkerActiveNumber())
- // when the queue never becomes empty, the existing workers should keep working
- assert.EqualValues(t, 2, q.workerStartedCounter)
+ assert.Greater(t, int(handledCount.Load()), 4) // make sure there are enough items handled during the test
+ assert.False(t, hasOnlyOneWorkerRunning.Load(), "a slow handler should not block other workers from starting")
stop()
}