aboutsummaryrefslogtreecommitdiffstats
path: root/modules/queue
diff options
context:
space:
mode:
authorzeripath <art27@cantab.net>2022-02-08 18:53:34 +0000
committerGitHub <noreply@github.com>2022-02-08 13:53:34 -0500
commitf8b21ac04a02a784fe14941e5c89940448694b8c (patch)
tree15764eb02242278a7ef348946bddeef04b8d5963 /modules/queue
parentc14ca34d576ab6be82ff93c05ddd1a9537269830 (diff)
downloadgitea-f8b21ac04a02a784fe14941e5c89940448694b8c.tar.gz
gitea-f8b21ac04a02a784fe14941e5c89940448694b8c.zip
Simplify Boost/Pause logic (#18673)
* Simplify Boost/Pause logic #18658 has added a check to see if we need to boost because there is still work to do however the check is slightly complex and not ideal. There's no point boosting if the queue is paused or can't scale. Therefore merge the two selects into one and add a check to p.paused. Signed-off-by: Andrew Thornton <art27@cantab.net> * And on resume add a zeroboost if necessary Signed-off-by: Andrew Thornton <art27@cantab.net> * simplify Signed-off-by: Andrew Thornton <art27@cantab.net> Co-authored-by: Lauris BH <lauris@nix.lv>
Diffstat (limited to 'modules/queue')
-rw-r--r--modules/queue/workerpool.go53
1 files changed, 36 insertions, 17 deletions
diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go
index 39ea59b7b1..100197c5e1 100644
--- a/modules/queue/workerpool.go
+++ b/modules/queue/workerpool.go
@@ -308,22 +308,18 @@ func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc,
p.cond.Broadcast()
cancel()
}
- if p.hasNoWorkerScaling() {
- select {
- case <-p.baseCtx.Done():
- // Don't warn if the baseCtx is shutdown
- default:
- log.Warn(
- "Queue: %d is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n"+
- "The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required.", p.qid)
- }
- p.pause()
- }
select {
case <-p.baseCtx.Done():
- // this worker queue is shut-down don't reboost
+ // Don't warn or check for ongoing work if the baseCtx is shutdown
+ case <-p.paused:
+ // Don't warn or check for ongoing work if the pool is paused
default:
- if p.numberOfWorkers == 0 && atomic.LoadInt64(&p.numInQueue) > 0 {
+ if p.hasNoWorkerScaling() {
+ log.Warn(
+ "Queue: %d is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n"+
+ "The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required.", p.qid)
+ p.pause()
+ } else if p.numberOfWorkers == 0 && atomic.LoadInt64(&p.numInQueue) > 0 {
// OK there are no workers but... there's still work to be done -> Reboost
p.zeroBoost()
// p.lock will be unlocked by zeroBoost
@@ -385,14 +381,37 @@ func (p *WorkerPool) pause() {
// Resume resumes the WorkerPool
func (p *WorkerPool) Resume() {
- p.lock.Lock()
- defer p.lock.Unlock()
+ p.lock.Lock() // can't defer unlock because of the zeroBoost at the end
select {
case <-p.resumed:
+ // already resumed - there's nothing to do
+ p.lock.Unlock()
+ return
default:
- p.paused = make(chan struct{})
- close(p.resumed)
}
+
+ p.paused = make(chan struct{})
+ close(p.resumed)
+
+ // OK now we need to check if we need to add some workers...
+ if p.numberOfWorkers > 0 || p.hasNoWorkerScaling() || atomic.LoadInt64(&p.numInQueue) == 0 {
+ // We either have workers, can't scale or there's no work to be done -> so just resume
+ p.lock.Unlock()
+ return
+ }
+
+ // OK we got some work but no workers we need to think about boosting
+ select {
+ case <-p.baseCtx.Done():
+ // don't bother boosting if the baseCtx is done
+ p.lock.Unlock()
+ return
+ default:
+ }
+
+ // OK we'd better add some boost workers!
+ p.zeroBoost()
+ // p.zeroBoost will unlock the lock
}
// CleanUp will drain the remaining contents of the channel