]> source.dussan.org Git - gitea.git/commitdiff
Only attempt to flush queue if the underlying worker pool is not finished (#18593)
authorzeripath <art27@cantab.net>
Sat, 5 Feb 2022 20:51:25 +0000 (20:51 +0000)
committerGitHub <noreply@github.com>
Sat, 5 Feb 2022 20:51:25 +0000 (20:51 +0000)
* Only attempt to flush queue if the underlying worker pool is not finished

There is a possible race whereby a worker pool could be cancelled but yet the
underlying queue is not empty. This will lead to flush-all cycling because it
cannot empty the pool.

Signed-off-by: Andrew Thornton <art27@cantab.net>
* Apply suggestions from code review

Co-authored-by: Gusted <williamzijl7@hotmail.com>
Co-authored-by: Gusted <williamzijl7@hotmail.com>
modules/queue/manager.go
modules/queue/workerpool.go

index 56298a3e00b5e92155ff3a198cb922816c925700..73c57540be85d40d1019fccffaca50b734f7dd2a 100644 (file)
@@ -84,6 +84,8 @@ type ManagedPool interface {
        BoostWorkers() int
        // SetPoolSettings sets the user updatable settings for the pool
        SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
+       // Done returns a channel that will be closed when the Pool's baseCtx is closed
+       Done() <-chan struct{}
 }
 
 // ManagedQueueList implements the sort.Interface
@@ -211,6 +213,15 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error
                                        continue
                                }
                        }
+                       if pool, ok := mq.Managed.(ManagedPool); ok {
+                               // No point into flushing pools when their base's ctx is already done.
+                               select {
+                               case <-pool.Done():
+                                       wg.Done()
+                                       continue
+                               default:
+                               }
+                       }
 
                        allEmpty = false
                        if flushable, ok := mq.Managed.(Flushable); ok {
index fd56f782d4f9d875d03064c4a9c17f0de7a66386..20108d35886ca00a65d5a73e7b7c659c574baea4 100644 (file)
@@ -74,6 +74,11 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo
        return pool
 }
 
+// Done returns when this worker pool's base context has been cancelled
+func (p *WorkerPool) Done() <-chan struct{} {
+       return p.baseCtx.Done()
+}
+
 // Push pushes the data to the internal channel
 func (p *WorkerPool) Push(data Data) {
        atomic.AddInt64(&p.numInQueue, 1)