diff options
author | wxiaoguang <wxiaoguang@gmail.com> | 2024-04-23 16:30:32 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-04-23 08:30:32 +0000 |
commit | 32f895f2d7b9d74b84f99f7ea130b62adc6769be (patch) | |
tree | e083ace0132e58dc5a211d1bb8313177a9127b50 /modules | |
parent | d95408bd5d3db8552b8044ffcc8c9e3b31b31f2e (diff) | |
download | gitea-32f895f2d7b9d74b84f99f7ea130b62adc6769be.tar.gz gitea-32f895f2d7b9d74b84f99f7ea130b62adc6769be.zip |
Fix queue test (#30646) (#30650)
Backport #30553 and #30646
Diffstat (limited to 'modules')
-rw-r--r-- | modules/queue/backoff.go | 10 | ||||
-rw-r--r-- | modules/queue/workergroup.go | 18 | ||||
-rw-r--r-- | modules/queue/workerqueue.go | 2 | ||||
-rw-r--r-- | modules/queue/workerqueue_test.go | 32 |
4 files changed, 43 insertions, 19 deletions
diff --git a/modules/queue/backoff.go b/modules/queue/backoff.go index cda7233567..02fc88574a 100644 --- a/modules/queue/backoff.go +++ b/modules/queue/backoff.go @@ -8,7 +8,7 @@ import ( "time" ) -const ( +var ( backoffBegin = 50 * time.Millisecond backoffUpper = 2 * time.Second ) @@ -18,6 +18,14 @@ type ( backoffFuncErr func() (retry bool, err error) ) +func mockBackoffDuration(d time.Duration) func() { + oldBegin, oldUpper := backoffBegin, backoffUpper + backoffBegin, backoffUpper = d, d + return func() { + backoffBegin, backoffUpper = oldBegin, oldUpper + } +} + func backoffRetErr[T any](ctx context.Context, begin, upper time.Duration, end <-chan time.Time, fn backoffFuncRetErr[T]) (ret T, err error) { d := begin for { diff --git a/modules/queue/workergroup.go b/modules/queue/workergroup.go index e3801ef2b2..153123f883 100644 --- a/modules/queue/workergroup.go +++ b/modules/queue/workergroup.go @@ -63,6 +63,8 @@ func (q *WorkerPoolQueue[T]) doDispatchBatchToWorker(wg *workerGroup[T], flushCh // TODO: the logic could be improved in the future, to avoid a data-race between "doStartNewWorker" and "workerNum" // The root problem is that if we skip "doStartNewWorker" here, the "workerNum" might be decreased by other workers later // So ideally, it should check whether there are enough workers by some approaches, and start new workers if necessary. + // This data-race is not serious, as long as a new worker will be started soon to make sure there are enough workers, + // so no need to hugely refactor at the moment. q.workerNumMu.Lock() noWorker := q.workerNum == 0 if full || noWorker { @@ -136,6 +138,14 @@ func (q *WorkerPoolQueue[T]) basePushForShutdown(items ...T) bool { return true } +func resetIdleTicker(t *time.Ticker, dur time.Duration) { + t.Reset(dur) + select { + case <-t.C: + default: + } +} + // doStartNewWorker starts a new worker for the queue, the worker reads from worker's channel and handles the items. func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) { wp.wg.Add(1) @@ -146,8 +156,6 @@ func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) { log.Debug("Queue %q starts new worker", q.GetName()) defer log.Debug("Queue %q stops idle worker", q.GetName()) - atomic.AddInt32(&q.workerStartedCounter, 1) // Only increase counter, used for debugging - t := time.NewTicker(workerIdleDuration) defer t.Stop() @@ -169,11 +177,7 @@ func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) { } q.doWorkerHandle(batch) // reset the idle ticker, and drain the tick after reset in case a tick is already triggered - t.Reset(workerIdleDuration) - select { - case <-t.C: - default: - } + resetIdleTicker(t, workerIdleDuration) // key code for TestWorkerPoolQueueWorkerIdleReset case <-t.C: q.workerNumMu.Lock() keepWorking = q.workerNum <= 1 // keep the last worker running diff --git a/modules/queue/workerqueue.go b/modules/queue/workerqueue.go index 4160622d81..b28fd88027 100644 --- a/modules/queue/workerqueue.go +++ b/modules/queue/workerqueue.go @@ -40,8 +40,6 @@ type WorkerPoolQueue[T any] struct { workerMaxNum int workerActiveNum int workerNumMu sync.Mutex - - workerStartedCounter int32 } type flushType chan struct{} diff --git a/modules/queue/workerqueue_test.go b/modules/queue/workerqueue_test.go index e09669c542..d66253ff66 100644 --- a/modules/queue/workerqueue_test.go +++ b/modules/queue/workerqueue_test.go @@ -5,8 +5,10 @@ package queue import ( "context" + "slices" "strconv" "sync" + "sync/atomic" "testing" "time" @@ -250,22 +252,34 @@ func TestWorkerPoolQueueShutdown(t *testing.T) { func TestWorkerPoolQueueWorkerIdleReset(t *testing.T) { defer test.MockVariableValue(&workerIdleDuration, 10*time.Millisecond)() + defer mockBackoffDuration(5 * time.Millisecond)() + var q *WorkerPoolQueue[int] + var handledCount atomic.Int32 + var hasOnlyOneWorkerRunning atomic.Bool handler := func(items ...int) (unhandled []int) { - time.Sleep(50 * time.Millisecond) + handledCount.Add(int32(len(items))) + // make each work have different duration, and check the active worker number periodically + var activeNums []int + for i := 0; i < 5-items[0]%2; i++ { + time.Sleep(workerIdleDuration * 2) + activeNums = append(activeNums, q.GetWorkerActiveNumber()) + } + // When the queue never becomes empty, the existing workers should keep working + // It is not 100% true at the moment because the data-race in workergroup.go is not resolved, see that TODO */ + // If the "active worker numbers" is like [2 2 ... 1 1], it means that an existing worker exited and the no new worker is started. + if slices.Equal([]int{1, 1}, activeNums[len(activeNums)-2:]) { + hasOnlyOneWorkerRunning.Store(true) + } return nil } - - q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 2, Length: 100}, handler, false) + q, _ = newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 2, Length: 100}, handler, false) stop := runWorkerPoolQueue(q) - for i := 0; i < 20; i++ { + for i := 0; i < 100; i++ { assert.NoError(t, q.Push(i)) } - time.Sleep(500 * time.Millisecond) - assert.EqualValues(t, 2, q.GetWorkerNumber()) - assert.EqualValues(t, 2, q.GetWorkerActiveNumber()) - // when the queue never becomes empty, the existing workers should keep working - assert.EqualValues(t, 2, q.workerStartedCounter) + assert.Greater(t, int(handledCount.Load()), 4) // make sure there are enough items handled during the test + assert.False(t, hasOnlyOneWorkerRunning.Load(), "a slow handler should not block other workers from starting") stop() } |