diff options
author | wxiaoguang <wxiaoguang@gmail.com> | 2024-04-23 07:55:43 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-04-22 23:55:43 +0000 |
commit | e6103955ccc48e19f42dd7b70ad27d0e17205d77 (patch) | |
tree | e05702b7b0d3bd9460abc0200d6557034215b42e /modules/queue/workergroup.go | |
parent | 99c5683da5e5c50154dcf9c07229a455a5095058 (diff) | |
download | gitea-e6103955ccc48e19f42dd7b70ad27d0e17205d77.tar.gz gitea-e6103955ccc48e19f42dd7b70ad27d0e17205d77.zip |
Fix queue test (#30646)
Fix #30643
The old test code is not stable due to the data-race described in the
TODO added at that time.
Make it stable, and remove a debug-only field from old test code.
Diffstat (limited to 'modules/queue/workergroup.go')
-rw-r--r-- | modules/queue/workergroup.go | 18 |
1 files changed, 11 insertions, 7 deletions
diff --git a/modules/queue/workergroup.go b/modules/queue/workergroup.go index e3801ef2b2..153123f883 100644 --- a/modules/queue/workergroup.go +++ b/modules/queue/workergroup.go @@ -63,6 +63,8 @@ func (q *WorkerPoolQueue[T]) doDispatchBatchToWorker(wg *workerGroup[T], flushCh // TODO: the logic could be improved in the future, to avoid a data-race between "doStartNewWorker" and "workerNum" // The root problem is that if we skip "doStartNewWorker" here, the "workerNum" might be decreased by other workers later // So ideally, it should check whether there are enough workers by some approaches, and start new workers if necessary. + // This data-race is not serious, as long as a new worker will be started soon to make sure there are enough workers, + // so no need to hugely refactor at the moment. q.workerNumMu.Lock() noWorker := q.workerNum == 0 if full || noWorker { @@ -136,6 +138,14 @@ func (q *WorkerPoolQueue[T]) basePushForShutdown(items ...T) bool { return true } +func resetIdleTicker(t *time.Ticker, dur time.Duration) { + t.Reset(dur) + select { + case <-t.C: + default: + } +} + // doStartNewWorker starts a new worker for the queue, the worker reads from worker's channel and handles the items. func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) { wp.wg.Add(1) @@ -146,8 +156,6 @@ func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) { log.Debug("Queue %q starts new worker", q.GetName()) defer log.Debug("Queue %q stops idle worker", q.GetName()) - atomic.AddInt32(&q.workerStartedCounter, 1) // Only increase counter, used for debugging - t := time.NewTicker(workerIdleDuration) defer t.Stop() @@ -169,11 +177,7 @@ func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) { } q.doWorkerHandle(batch) // reset the idle ticker, and drain the tick after reset in case a tick is already triggered - t.Reset(workerIdleDuration) - select { - case <-t.C: - default: - } + resetIdleTicker(t, workerIdleDuration) // key code for TestWorkerPoolQueueWorkerIdleReset case <-t.C: q.workerNumMu.Lock() keepWorking = q.workerNum <= 1 // keep the last worker running |