* Only attempt to flush queue if the underlying worker pool is not finished
There is a possible race whereby a worker pool could be cancelled but yet the
underlying queue is not empty. This will lead to flush-all cycling because it
cannot empty the pool.
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Apply suggestions from code review
Co-authored-by: Gusted <williamzijl7@hotmail.com>
Co-authored-by: Gusted <williamzijl7@hotmail.com>
BoostWorkers() int
// SetPoolSettings sets the user updatable settings for the pool
SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
+ // Done returns a channel that will be closed when the Pool's baseCtx is closed
+ Done() <-chan struct{}
}
// ManagedQueueList implements the sort.Interface
continue
}
}
+ if pool, ok := mq.Managed.(ManagedPool); ok {
+ // No point into flushing pools when their base's ctx is already done.
+ select {
+ case <-pool.Done():
+ wg.Done()
+ continue
+ default:
+ }
+ }
allEmpty = false
if flushable, ok := mq.Managed.(Flushable); ok {
return pool
}
+// Done returns when this worker pool's base context has been cancelled
+func (p *WorkerPool) Done() <-chan struct{} {
+ return p.baseCtx.Done()
+}
+
// Push pushes the data to the internal channel
func (p *WorkerPool) Push(data Data) {
atomic.AddInt64(&p.numInQueue, 1)