diff options
author | zeripath <art27@cantab.net> | 2021-05-15 15:22:26 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-05-15 16:22:26 +0200 |
commit | ba526ceffe33a54b6015cdfbdc9bba920484dc23 (patch) | |
tree | ddd9ff13b0da7b272b5a60445a997319cb0de882 /modules/queue/workerpool.go | |
parent | 9f19c2b8cca9edf2ad7b8803e6ed72b1aea322a5 (diff) | |
download | gitea-ba526ceffe33a54b6015cdfbdc9bba920484dc23.tar.gz gitea-ba526ceffe33a54b6015cdfbdc9bba920484dc23.zip |
Multiple Queue improvements: LevelDB Wait on empty, shutdown empty shadow level queue, reduce goroutines etc (#15693)
* move shutdownfns, terminatefns and hammerfns out of separate goroutines
Coalesce the shutdownfns etc into a list of functions that get run at shutdown
rather then have them run at goroutines blocked on selects.
This may help reduce the background select/poll load in certain
configurations.
* The LevelDB queues can actually wait on empty instead of polling
Slight refactor to cause leveldb queues to wait on empty instead of polling.
* Shutdown the shadow level queue once it is empty
* Remove bytefifo additional goroutine for readToChan as it can just be run in run
* Remove additional removeWorkers goroutine for workers
* Simplify the AtShutdown and AtTerminate functions and add Channel Flusher
* Add shutdown flusher to CUQ
* move persistable channel shutdown stuff to Shutdown Fn
* Ensure that UPCQ has the correct config
* handle shutdown during the flushing
* reduce risk of race between zeroBoost and addWorkers
* prevent double shutdown
Signed-off-by: Andrew Thornton <art27@cantab.net>
Diffstat (limited to 'modules/queue/workerpool.go')
-rw-r--r-- | modules/queue/workerpool.go | 55 |
1 files changed, 21 insertions, 34 deletions
diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index 0f15ccac9e..0176e2e0b2 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -21,7 +21,7 @@ import ( type WorkerPool struct { lock sync.Mutex baseCtx context.Context - cancel context.CancelFunc + baseCtxCancel context.CancelFunc cond *sync.Cond qid int64 maxNumberOfWorkers int @@ -52,7 +52,7 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo dataChan := make(chan Data, config.QueueLength) pool := &WorkerPool{ baseCtx: ctx, - cancel: cancel, + baseCtxCancel: cancel, batchLength: config.BatchLength, dataChan: dataChan, handle: handle, @@ -83,7 +83,7 @@ func (p *WorkerPool) Push(data Data) { } func (p *WorkerPool) zeroBoost() { - ctx, cancel := context.WithCancel(p.baseCtx) + ctx, cancel := context.WithTimeout(p.baseCtx, p.boostTimeout) mq := GetManager().GetManagedQueue(p.qid) boost := p.boostWorkers if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 { @@ -94,26 +94,14 @@ func (p *WorkerPool) zeroBoost() { start := time.Now() pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), cancel, false) - go func() { - select { - case <-ctx.Done(): - case <-time.After(p.boostTimeout): - } + cancel = func() { mq.RemoveWorkers(pid) - cancel() - }() + } } else { log.Warn("WorkerPool: %d has zero workers - adding %d temporary workers for %s", p.qid, p.boostWorkers, p.boostTimeout) - go func() { - select { - case <-ctx.Done(): - case <-time.After(p.boostTimeout): - } - cancel() - }() } p.lock.Unlock() - p.addWorkers(ctx, boost) + p.addWorkers(ctx, cancel, boost) } func (p *WorkerPool) pushBoost(data Data) { @@ -140,7 +128,7 @@ func (p *WorkerPool) pushBoost(data Data) { return } p.blockTimeout *= 2 - ctx, cancel := context.WithCancel(p.baseCtx) + boostCtx, boostCtxCancel := context.WithCancel(p.baseCtx) mq := GetManager().GetManagedQueue(p.qid) boost := p.boostWorkers if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 { @@ -150,24 +138,24 @@ func (p *WorkerPool) pushBoost(data Data) { log.Warn("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, mq.Name, ourTimeout, boost, p.boostTimeout, p.blockTimeout) start := time.Now() - pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), cancel, false) + pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), boostCtxCancel, false) go func() { - <-ctx.Done() + <-boostCtx.Done() mq.RemoveWorkers(pid) - cancel() + boostCtxCancel() }() } else { log.Warn("WorkerPool: %d Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, ourTimeout, p.boostWorkers, p.boostTimeout, p.blockTimeout) } go func() { <-time.After(p.boostTimeout) - cancel() + boostCtxCancel() p.lock.Lock() p.blockTimeout /= 2 p.lock.Unlock() }() p.lock.Unlock() - p.addWorkers(ctx, boost) + p.addWorkers(boostCtx, boostCtxCancel, boost) p.dataChan <- data } } @@ -243,28 +231,25 @@ func (p *WorkerPool) commonRegisterWorkers(number int, timeout time.Duration, is mq := GetManager().GetManagedQueue(p.qid) if mq != nil { pid := mq.RegisterWorkers(number, start, hasTimeout, end, cancel, isFlusher) - go func() { - <-ctx.Done() - mq.RemoveWorkers(pid) - cancel() - }() log.Trace("WorkerPool: %d (for %s) adding %d workers with group id: %d", p.qid, mq.Name, number, pid) - } else { - log.Trace("WorkerPool: %d adding %d workers (no group id)", p.qid, number) - + return ctx, func() { + mq.RemoveWorkers(pid) + } } + log.Trace("WorkerPool: %d adding %d workers (no group id)", p.qid, number) + return ctx, cancel } // AddWorkers adds workers to the pool - this allows the number of workers to go above the limit func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc { ctx, cancel := p.commonRegisterWorkers(number, timeout, false) - p.addWorkers(ctx, number) + p.addWorkers(ctx, cancel, number) return cancel } // addWorkers adds workers to the pool -func (p *WorkerPool) addWorkers(ctx context.Context, number int) { +func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc, number int) { for i := 0; i < number; i++ { p.lock.Lock() if p.cond == nil { @@ -279,11 +264,13 @@ func (p *WorkerPool) addWorkers(ctx context.Context, number int) { p.numberOfWorkers-- if p.numberOfWorkers == 0 { p.cond.Broadcast() + cancel() } else if p.numberOfWorkers < 0 { // numberOfWorkers can't go negative but... log.Warn("Number of Workers < 0 for QID %d - this shouldn't happen", p.qid) p.numberOfWorkers = 0 p.cond.Broadcast() + cancel() } p.lock.Unlock() }() |