diff options
author | zeripath <art27@cantab.net> | 2022-02-05 20:51:25 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-02-05 20:51:25 +0000 |
commit | 7ba1b7112f47a2025e8538509a24d8c6d5b1f488 (patch) | |
tree | 4003c0c3040a11baf4dcd96f9861c0e70e0300b6 /modules | |
parent | a51d2114c7c0472cf20459dd8916bf48d529ae83 (diff) | |
download | gitea-7ba1b7112f47a2025e8538509a24d8c6d5b1f488.tar.gz gitea-7ba1b7112f47a2025e8538509a24d8c6d5b1f488.zip |
Only attempt to flush queue if the underlying worker pool is not finished (#18593)
* Only attempt to flush queue if the underlying worker pool is not finished
There is a possible race whereby a worker pool could be cancelled but yet the
underlying queue is not empty. This will lead to flush-all cycling because it
cannot empty the pool.
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Apply suggestions from code review
Co-authored-by: Gusted <williamzijl7@hotmail.com>
Co-authored-by: Gusted <williamzijl7@hotmail.com>
Diffstat (limited to 'modules')
-rw-r--r-- | modules/queue/manager.go | 11 | ||||
-rw-r--r-- | modules/queue/workerpool.go | 5 |
2 files changed, 16 insertions, 0 deletions
diff --git a/modules/queue/manager.go b/modules/queue/manager.go index 56298a3e00..73c57540be 100644 --- a/modules/queue/manager.go +++ b/modules/queue/manager.go @@ -84,6 +84,8 @@ type ManagedPool interface { BoostWorkers() int // SetPoolSettings sets the user updatable settings for the pool SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) + // Done returns a channel that will be closed when the Pool's baseCtx is closed + Done() <-chan struct{} } // ManagedQueueList implements the sort.Interface @@ -211,6 +213,15 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error continue } } + if pool, ok := mq.Managed.(ManagedPool); ok { + // No point into flushing pools when their base's ctx is already done. + select { + case <-pool.Done(): + wg.Done() + continue + default: + } + } allEmpty = false if flushable, ok := mq.Managed.(Flushable); ok { diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index fd56f782d4..20108d3588 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -74,6 +74,11 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo return pool } +// Done returns when this worker pool's base context has been cancelled +func (p *WorkerPool) Done() <-chan struct{} { + return p.baseCtx.Done() +} + // Push pushes the data to the internal channel func (p *WorkerPool) Push(data Data) { atomic.AddInt64(&p.numInQueue, 1) |