aboutsummaryrefslogtreecommitdiffstats
path: root/modules/queue
diff options
context:
space:
mode:
Diffstat (limited to 'modules/queue')
-rw-r--r--modules/queue/workerpool.go53
1 files changed, 36 insertions, 17 deletions
diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go
index 39ea59b7b1..100197c5e1 100644
--- a/modules/queue/workerpool.go
+++ b/modules/queue/workerpool.go
@@ -308,22 +308,18 @@ func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc,
p.cond.Broadcast()
cancel()
}
- if p.hasNoWorkerScaling() {
- select {
- case <-p.baseCtx.Done():
- // Don't warn if the baseCtx is shutdown
- default:
- log.Warn(
- "Queue: %d is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n"+
- "The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required.", p.qid)
- }
- p.pause()
- }
select {
case <-p.baseCtx.Done():
- // this worker queue is shut-down don't reboost
+ // Don't warn or check for ongoing work if the baseCtx is shutdown
+ case <-p.paused:
+ // Don't warn or check for ongoing work if the pool is paused
default:
- if p.numberOfWorkers == 0 && atomic.LoadInt64(&p.numInQueue) > 0 {
+ if p.hasNoWorkerScaling() {
+ log.Warn(
+ "Queue: %d is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n"+
+ "The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required.", p.qid)
+ p.pause()
+ } else if p.numberOfWorkers == 0 && atomic.LoadInt64(&p.numInQueue) > 0 {
// OK there are no workers but... there's still work to be done -> Reboost
p.zeroBoost()
// p.lock will be unlocked by zeroBoost
@@ -385,14 +381,37 @@ func (p *WorkerPool) pause() {
// Resume resumes the WorkerPool
func (p *WorkerPool) Resume() {
- p.lock.Lock()
- defer p.lock.Unlock()
+ p.lock.Lock() // can't defer unlock because of the zeroBoost at the end
select {
case <-p.resumed:
+ // already resumed - there's nothing to do
+ p.lock.Unlock()
+ return
default:
- p.paused = make(chan struct{})
- close(p.resumed)
}
+
+ p.paused = make(chan struct{})
+ close(p.resumed)
+
+ // OK now we need to check if we need to add some workers...
+ if p.numberOfWorkers > 0 || p.hasNoWorkerScaling() || atomic.LoadInt64(&p.numInQueue) == 0 {
+ // We either have workers, can't scale or there's no work to be done -> so just resume
+ p.lock.Unlock()
+ return
+ }
+
+ // OK we got some work but no workers we need to think about boosting
+ select {
+ case <-p.baseCtx.Done():
+ // don't bother boosting if the baseCtx is done
+ p.lock.Unlock()
+ return
+ default:
+ }
+
+ // OK we'd better add some boost workers!
+ p.zeroBoost()
+ // p.zeroBoost will unlock the lock
}
// CleanUp will drain the remaining contents of the channel