summaryrefslogtreecommitdiffstats
path: root/modules
diff options
context:
space:
mode:
authorzeripath <art27@cantab.net>2022-02-05 20:51:25 +0000
committerGitHub <noreply@github.com>2022-02-05 20:51:25 +0000
commit7ba1b7112f47a2025e8538509a24d8c6d5b1f488 (patch)
tree4003c0c3040a11baf4dcd96f9861c0e70e0300b6 /modules
parenta51d2114c7c0472cf20459dd8916bf48d529ae83 (diff)
downloadgitea-7ba1b7112f47a2025e8538509a24d8c6d5b1f488.tar.gz
gitea-7ba1b7112f47a2025e8538509a24d8c6d5b1f488.zip
Only attempt to flush queue if the underlying worker pool is not finished (#18593)
* Only attempt to flush queue if the underlying worker pool is not finished There is a possible race whereby a worker pool could be cancelled but yet the underlying queue is not empty. This will lead to flush-all cycling because it cannot empty the pool. Signed-off-by: Andrew Thornton <art27@cantab.net> * Apply suggestions from code review Co-authored-by: Gusted <williamzijl7@hotmail.com> Co-authored-by: Gusted <williamzijl7@hotmail.com>
Diffstat (limited to 'modules')
-rw-r--r--modules/queue/manager.go11
-rw-r--r--modules/queue/workerpool.go5
2 files changed, 16 insertions, 0 deletions
diff --git a/modules/queue/manager.go b/modules/queue/manager.go
index 56298a3e00..73c57540be 100644
--- a/modules/queue/manager.go
+++ b/modules/queue/manager.go
@@ -84,6 +84,8 @@ type ManagedPool interface {
BoostWorkers() int
// SetPoolSettings sets the user updatable settings for the pool
SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
+ // Done returns a channel that will be closed when the Pool's baseCtx is closed
+ Done() <-chan struct{}
}
// ManagedQueueList implements the sort.Interface
@@ -211,6 +213,15 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error
continue
}
}
+ if pool, ok := mq.Managed.(ManagedPool); ok {
+ // No point into flushing pools when their base's ctx is already done.
+ select {
+ case <-pool.Done():
+ wg.Done()
+ continue
+ default:
+ }
+ }
allEmpty = false
if flushable, ok := mq.Managed.(Flushable); ok {
diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go
index fd56f782d4..20108d3588 100644
--- a/modules/queue/workerpool.go
+++ b/modules/queue/workerpool.go
@@ -74,6 +74,11 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo
return pool
}
+// Done returns when this worker pool's base context has been cancelled
+func (p *WorkerPool) Done() <-chan struct{} {
+ return p.baseCtx.Done()
+}
+
// Push pushes the data to the internal channel
func (p *WorkerPool) Push(data Data) {
atomic.AddInt64(&p.numInQueue, 1)