diff options
Diffstat (limited to 'modules/queue/workerpool.go')
-rw-r--r-- | modules/queue/workerpool.go | 94 |
1 files changed, 87 insertions, 7 deletions
diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index 25fc7dd644..63ec897481 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -7,12 +7,16 @@ package queue import ( "context" "sync" + "sync/atomic" "time" "code.gitea.io/gitea/modules/log" ) -// WorkerPool takes +// WorkerPool represent a dynamically growable worker pool for a +// provided handler function. They have an internal channel which +// they use to detect if there is a block and will grow and shrink in +// response to demand as per configuration. type WorkerPool struct { lock sync.Mutex baseCtx context.Context @@ -27,10 +31,42 @@ type WorkerPool struct { blockTimeout time.Duration boostTimeout time.Duration boostWorkers int + numInQueue int64 +} + +// WorkerPoolConfiguration is the basic configuration for a WorkerPool +type WorkerPoolConfiguration struct { + QueueLength int + BatchLength int + BlockTimeout time.Duration + BoostTimeout time.Duration + BoostWorkers int + MaxWorkers int +} + +// NewWorkerPool creates a new worker pool +func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPool { + ctx, cancel := context.WithCancel(context.Background()) + + dataChan := make(chan Data, config.QueueLength) + pool := &WorkerPool{ + baseCtx: ctx, + cancel: cancel, + batchLength: config.BatchLength, + dataChan: dataChan, + handle: handle, + blockTimeout: config.BlockTimeout, + boostTimeout: config.BoostTimeout, + boostWorkers: config.BoostWorkers, + maxNumberOfWorkers: config.MaxWorkers, + } + + return pool } // Push pushes the data to the internal channel func (p *WorkerPool) Push(data Data) { + atomic.AddInt64(&p.numInQueue, 1) p.lock.Lock() if p.blockTimeout > 0 && p.boostTimeout > 0 && (p.numberOfWorkers <= p.maxNumberOfWorkers || p.maxNumberOfWorkers < 0) { p.lock.Unlock() @@ -80,7 +116,7 @@ func (p *WorkerPool) pushBoost(data Data) { log.Warn("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, mq.Name, ourTimeout, boost, p.boostTimeout, p.blockTimeout) start := time.Now() - pid := mq.RegisterWorkers(boost, start, false, start, cancel) + pid := mq.RegisterWorkers(boost, start, false, start, cancel, false) go func() { <-ctx.Done() mq.RemoveWorkers(pid) @@ -138,8 +174,8 @@ func (p *WorkerPool) BlockTimeout() time.Duration { return p.blockTimeout } -// SetSettings sets the setable boost values -func (p *WorkerPool) SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) { +// SetPoolSettings sets the setable boost values +func (p *WorkerPool) SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) { p.lock.Lock() defer p.lock.Unlock() p.maxNumberOfWorkers = maxNumberOfWorkers @@ -156,8 +192,7 @@ func (p *WorkerPool) SetMaxNumberOfWorkers(newMax int) { p.maxNumberOfWorkers = newMax } -// AddWorkers adds workers to the pool - this allows the number of workers to go above the limit -func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc { +func (p *WorkerPool) commonRegisterWorkers(number int, timeout time.Duration, isFlusher bool) (context.Context, context.CancelFunc) { var ctx context.Context var cancel context.CancelFunc start := time.Now() @@ -173,7 +208,7 @@ func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.Cance mq := GetManager().GetManagedQueue(p.qid) if mq != nil { - pid := mq.RegisterWorkers(number, start, hasTimeout, end, cancel) + pid := mq.RegisterWorkers(number, start, hasTimeout, end, cancel, isFlusher) go func() { <-ctx.Done() mq.RemoveWorkers(pid) @@ -184,6 +219,12 @@ func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.Cance log.Trace("WorkerPool: %d adding %d workers (no group id)", p.qid, number) } + return ctx, cancel +} + +// AddWorkers adds workers to the pool - this allows the number of workers to go above the limit +func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc { + ctx, cancel := p.commonRegisterWorkers(number, timeout, false) p.addWorkers(ctx, number) return cancel } @@ -235,6 +276,7 @@ func (p *WorkerPool) CleanUp(ctx context.Context) { close(p.dataChan) for data := range p.dataChan { p.handle(data) + atomic.AddInt64(&p.numInQueue, -1) select { case <-ctx.Done(): log.Warn("WorkerPool: %d Cleanup context closed before finishing clean-up", p.qid) @@ -245,6 +287,37 @@ func (p *WorkerPool) CleanUp(ctx context.Context) { log.Trace("WorkerPool: %d CleanUp Done", p.qid) } +// Flush flushes the channel with a timeout - the Flush worker will be registered as a flush worker with the manager +func (p *WorkerPool) Flush(timeout time.Duration) error { + ctx, cancel := p.commonRegisterWorkers(1, timeout, true) + defer cancel() + return p.FlushWithContext(ctx) +} + +// IsEmpty returns if true if the worker queue is empty +func (p *WorkerPool) IsEmpty() bool { + return atomic.LoadInt64(&p.numInQueue) == 0 +} + +// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty +// NB: The worker will not be registered with the manager. +func (p *WorkerPool) FlushWithContext(ctx context.Context) error { + log.Trace("WorkerPool: %d Flush", p.qid) + for { + select { + case data := <-p.dataChan: + p.handle(data) + atomic.AddInt64(&p.numInQueue, -1) + case <-p.baseCtx.Done(): + return p.baseCtx.Err() + case <-ctx.Done(): + return ctx.Err() + default: + return nil + } + } +} + func (p *WorkerPool) doWork(ctx context.Context) { delay := time.Millisecond * 300 var data = make([]Data, 0, p.batchLength) @@ -254,6 +327,7 @@ func (p *WorkerPool) doWork(ctx context.Context) { if len(data) > 0 { log.Trace("Handling: %d data, %v", len(data), data) p.handle(data...) + atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) } log.Trace("Worker shutting down") return @@ -263,6 +337,7 @@ func (p *WorkerPool) doWork(ctx context.Context) { if len(data) > 0 { log.Trace("Handling: %d data, %v", len(data), data) p.handle(data...) + atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) } log.Trace("Worker shutting down") return @@ -271,6 +346,7 @@ func (p *WorkerPool) doWork(ctx context.Context) { if len(data) >= p.batchLength { log.Trace("Handling: %d data, %v", len(data), data) p.handle(data...) + atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) data = make([]Data, 0, p.batchLength) } default: @@ -286,6 +362,7 @@ func (p *WorkerPool) doWork(ctx context.Context) { if len(data) > 0 { log.Trace("Handling: %d data, %v", len(data), data) p.handle(data...) + atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) } log.Trace("Worker shutting down") return @@ -301,6 +378,7 @@ func (p *WorkerPool) doWork(ctx context.Context) { if len(data) > 0 { log.Trace("Handling: %d data, %v", len(data), data) p.handle(data...) + atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) } log.Trace("Worker shutting down") return @@ -309,6 +387,7 @@ func (p *WorkerPool) doWork(ctx context.Context) { if len(data) >= p.batchLength { log.Trace("Handling: %d data, %v", len(data), data) p.handle(data...) + atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) data = make([]Data, 0, p.batchLength) } case <-timer.C: @@ -316,6 +395,7 @@ func (p *WorkerPool) doWork(ctx context.Context) { if len(data) > 0 { log.Trace("Handling: %d data, %v", len(data), data) p.handle(data...) + atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) data = make([]Data, 0, p.batchLength) } |