diff options
Diffstat (limited to 'modules')
-rw-r--r-- | modules/queue/workerpool.go | 53 |
1 files changed, 36 insertions, 17 deletions
diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index 39ea59b7b1..100197c5e1 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -308,22 +308,18 @@ func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc, p.cond.Broadcast() cancel() } - if p.hasNoWorkerScaling() { - select { - case <-p.baseCtx.Done(): - // Don't warn if the baseCtx is shutdown - default: - log.Warn( - "Queue: %d is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n"+ - "The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required.", p.qid) - } - p.pause() - } select { case <-p.baseCtx.Done(): - // this worker queue is shut-down don't reboost + // Don't warn or check for ongoing work if the baseCtx is shutdown + case <-p.paused: + // Don't warn or check for ongoing work if the pool is paused default: - if p.numberOfWorkers == 0 && atomic.LoadInt64(&p.numInQueue) > 0 { + if p.hasNoWorkerScaling() { + log.Warn( + "Queue: %d is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n"+ + "The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required.", p.qid) + p.pause() + } else if p.numberOfWorkers == 0 && atomic.LoadInt64(&p.numInQueue) > 0 { // OK there are no workers but... there's still work to be done -> Reboost p.zeroBoost() // p.lock will be unlocked by zeroBoost @@ -385,14 +381,37 @@ func (p *WorkerPool) pause() { // Resume resumes the WorkerPool func (p *WorkerPool) Resume() { - p.lock.Lock() - defer p.lock.Unlock() + p.lock.Lock() // can't defer unlock because of the zeroBoost at the end select { case <-p.resumed: + // already resumed - there's nothing to do + p.lock.Unlock() + return default: - p.paused = make(chan struct{}) - close(p.resumed) } + + p.paused = make(chan struct{}) + close(p.resumed) + + // OK now we need to check if we need to add some workers... + if p.numberOfWorkers > 0 || p.hasNoWorkerScaling() || atomic.LoadInt64(&p.numInQueue) == 0 { + // We either have workers, can't scale or there's no work to be done -> so just resume + p.lock.Unlock() + return + } + + // OK we got some work but no workers we need to think about boosting + select { + case <-p.baseCtx.Done(): + // don't bother boosting if the baseCtx is done + p.lock.Unlock() + return + default: + } + + // OK we'd better add some boost workers! + p.zeroBoost() + // p.zeroBoost will unlock the lock } // CleanUp will drain the remaining contents of the channel |