aboutsummaryrefslogtreecommitdiffstats
path: root/modules/queue/manager.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/queue/manager.go')
-rw-r--r--modules/queue/manager.go11
1 files changed, 11 insertions, 0 deletions
diff --git a/modules/queue/manager.go b/modules/queue/manager.go
index 56298a3e00..73c57540be 100644
--- a/modules/queue/manager.go
+++ b/modules/queue/manager.go
@@ -84,6 +84,8 @@ type ManagedPool interface {
BoostWorkers() int
// SetPoolSettings sets the user updatable settings for the pool
SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
+ // Done returns a channel that will be closed when the Pool's baseCtx is closed
+ Done() <-chan struct{}
}
// ManagedQueueList implements the sort.Interface
@@ -211,6 +213,15 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error
continue
}
}
+ if pool, ok := mq.Managed.(ManagedPool); ok {
+ // No point into flushing pools when their base's ctx is already done.
+ select {
+ case <-pool.Done():
+ wg.Done()
+ continue
+ default:
+ }
+ }
allEmpty = false
if flushable, ok := mq.Managed.(Flushable); ok {