aboutsummaryrefslogtreecommitdiffstats
path: root/modules/queue/workerpool.go
diff options
context:
space:
mode:
authorzeripath <art27@cantab.net>2021-05-15 15:22:26 +0100
committerGitHub <noreply@github.com>2021-05-15 16:22:26 +0200
commitba526ceffe33a54b6015cdfbdc9bba920484dc23 (patch)
treeddd9ff13b0da7b272b5a60445a997319cb0de882 /modules/queue/workerpool.go
parent9f19c2b8cca9edf2ad7b8803e6ed72b1aea322a5 (diff)
downloadgitea-ba526ceffe33a54b6015cdfbdc9bba920484dc23.tar.gz
gitea-ba526ceffe33a54b6015cdfbdc9bba920484dc23.zip
Multiple Queue improvements: LevelDB Wait on empty, shutdown empty shadow level queue, reduce goroutines etc (#15693)
* move shutdownfns, terminatefns and hammerfns out of separate goroutines Coalesce the shutdownfns etc into a list of functions that get run at shutdown rather then have them run at goroutines blocked on selects. This may help reduce the background select/poll load in certain configurations. * The LevelDB queues can actually wait on empty instead of polling Slight refactor to cause leveldb queues to wait on empty instead of polling. * Shutdown the shadow level queue once it is empty * Remove bytefifo additional goroutine for readToChan as it can just be run in run * Remove additional removeWorkers goroutine for workers * Simplify the AtShutdown and AtTerminate functions and add Channel Flusher * Add shutdown flusher to CUQ * move persistable channel shutdown stuff to Shutdown Fn * Ensure that UPCQ has the correct config * handle shutdown during the flushing * reduce risk of race between zeroBoost and addWorkers * prevent double shutdown Signed-off-by: Andrew Thornton <art27@cantab.net>
Diffstat (limited to 'modules/queue/workerpool.go')
-rw-r--r--modules/queue/workerpool.go55
1 files changed, 21 insertions, 34 deletions
diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go
index 0f15ccac9e..0176e2e0b2 100644
--- a/modules/queue/workerpool.go
+++ b/modules/queue/workerpool.go
@@ -21,7 +21,7 @@ import (
type WorkerPool struct {
lock sync.Mutex
baseCtx context.Context
- cancel context.CancelFunc
+ baseCtxCancel context.CancelFunc
cond *sync.Cond
qid int64
maxNumberOfWorkers int
@@ -52,7 +52,7 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo
dataChan := make(chan Data, config.QueueLength)
pool := &WorkerPool{
baseCtx: ctx,
- cancel: cancel,
+ baseCtxCancel: cancel,
batchLength: config.BatchLength,
dataChan: dataChan,
handle: handle,
@@ -83,7 +83,7 @@ func (p *WorkerPool) Push(data Data) {
}
func (p *WorkerPool) zeroBoost() {
- ctx, cancel := context.WithCancel(p.baseCtx)
+ ctx, cancel := context.WithTimeout(p.baseCtx, p.boostTimeout)
mq := GetManager().GetManagedQueue(p.qid)
boost := p.boostWorkers
if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 {
@@ -94,26 +94,14 @@ func (p *WorkerPool) zeroBoost() {
start := time.Now()
pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), cancel, false)
- go func() {
- select {
- case <-ctx.Done():
- case <-time.After(p.boostTimeout):
- }
+ cancel = func() {
mq.RemoveWorkers(pid)
- cancel()
- }()
+ }
} else {
log.Warn("WorkerPool: %d has zero workers - adding %d temporary workers for %s", p.qid, p.boostWorkers, p.boostTimeout)
- go func() {
- select {
- case <-ctx.Done():
- case <-time.After(p.boostTimeout):
- }
- cancel()
- }()
}
p.lock.Unlock()
- p.addWorkers(ctx, boost)
+ p.addWorkers(ctx, cancel, boost)
}
func (p *WorkerPool) pushBoost(data Data) {
@@ -140,7 +128,7 @@ func (p *WorkerPool) pushBoost(data Data) {
return
}
p.blockTimeout *= 2
- ctx, cancel := context.WithCancel(p.baseCtx)
+ boostCtx, boostCtxCancel := context.WithCancel(p.baseCtx)
mq := GetManager().GetManagedQueue(p.qid)
boost := p.boostWorkers
if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 {
@@ -150,24 +138,24 @@ func (p *WorkerPool) pushBoost(data Data) {
log.Warn("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, mq.Name, ourTimeout, boost, p.boostTimeout, p.blockTimeout)
start := time.Now()
- pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), cancel, false)
+ pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), boostCtxCancel, false)
go func() {
- <-ctx.Done()
+ <-boostCtx.Done()
mq.RemoveWorkers(pid)
- cancel()
+ boostCtxCancel()
}()
} else {
log.Warn("WorkerPool: %d Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, ourTimeout, p.boostWorkers, p.boostTimeout, p.blockTimeout)
}
go func() {
<-time.After(p.boostTimeout)
- cancel()
+ boostCtxCancel()
p.lock.Lock()
p.blockTimeout /= 2
p.lock.Unlock()
}()
p.lock.Unlock()
- p.addWorkers(ctx, boost)
+ p.addWorkers(boostCtx, boostCtxCancel, boost)
p.dataChan <- data
}
}
@@ -243,28 +231,25 @@ func (p *WorkerPool) commonRegisterWorkers(number int, timeout time.Duration, is
mq := GetManager().GetManagedQueue(p.qid)
if mq != nil {
pid := mq.RegisterWorkers(number, start, hasTimeout, end, cancel, isFlusher)
- go func() {
- <-ctx.Done()
- mq.RemoveWorkers(pid)
- cancel()
- }()
log.Trace("WorkerPool: %d (for %s) adding %d workers with group id: %d", p.qid, mq.Name, number, pid)
- } else {
- log.Trace("WorkerPool: %d adding %d workers (no group id)", p.qid, number)
-
+ return ctx, func() {
+ mq.RemoveWorkers(pid)
+ }
}
+ log.Trace("WorkerPool: %d adding %d workers (no group id)", p.qid, number)
+
return ctx, cancel
}
// AddWorkers adds workers to the pool - this allows the number of workers to go above the limit
func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc {
ctx, cancel := p.commonRegisterWorkers(number, timeout, false)
- p.addWorkers(ctx, number)
+ p.addWorkers(ctx, cancel, number)
return cancel
}
// addWorkers adds workers to the pool
-func (p *WorkerPool) addWorkers(ctx context.Context, number int) {
+func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc, number int) {
for i := 0; i < number; i++ {
p.lock.Lock()
if p.cond == nil {
@@ -279,11 +264,13 @@ func (p *WorkerPool) addWorkers(ctx context.Context, number int) {
p.numberOfWorkers--
if p.numberOfWorkers == 0 {
p.cond.Broadcast()
+ cancel()
} else if p.numberOfWorkers < 0 {
// numberOfWorkers can't go negative but...
log.Warn("Number of Workers < 0 for QID %d - this shouldn't happen", p.qid)
p.numberOfWorkers = 0
p.cond.Broadcast()
+ cancel()
}
p.lock.Unlock()
}()