aboutsummaryrefslogtreecommitdiffstats
path: root/modules/queue
diff options
context:
space:
mode:
Diffstat (limited to 'modules/queue')
-rw-r--r--modules/queue/workerpool.go14
1 files changed, 14 insertions, 0 deletions
diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go
index 20108d3588..39ea59b7b1 100644
--- a/modules/queue/workerpool.go
+++ b/modules/queue/workerpool.go
@@ -115,6 +115,9 @@ func (p *WorkerPool) hasNoWorkerScaling() bool {
return p.numberOfWorkers == 0 && (p.boostTimeout == 0 || p.boostWorkers == 0 || p.maxNumberOfWorkers == 0)
}
+// zeroBoost will add a temporary boost worker for a no worker queue
+// p.lock must be locked at the start of this function BUT it will be unlocked by the end of this function
+// (This is because addWorkers has to be called whilst unlocked)
func (p *WorkerPool) zeroBoost() {
ctx, cancel := context.WithTimeout(p.baseCtx, p.boostTimeout)
mq := GetManager().GetManagedQueue(p.qid)
@@ -316,6 +319,17 @@ func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc,
}
p.pause()
}
+ select {
+ case <-p.baseCtx.Done():
+ // this worker queue is shut-down don't reboost
+ default:
+ if p.numberOfWorkers == 0 && atomic.LoadInt64(&p.numInQueue) > 0 {
+ // OK there are no workers but... there's still work to be done -> Reboost
+ p.zeroBoost()
+ // p.lock will be unlocked by zeroBoost
+ return
+ }
+ }
p.lock.Unlock()
}()
}