summaryrefslogtreecommitdiffstats
path: root/modules/queue/workerpool.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/queue/workerpool.go')
-rw-r--r--modules/queue/workerpool.go94
1 files changed, 87 insertions, 7 deletions
diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go
index 25fc7dd644..63ec897481 100644
--- a/modules/queue/workerpool.go
+++ b/modules/queue/workerpool.go
@@ -7,12 +7,16 @@ package queue
import (
"context"
"sync"
+ "sync/atomic"
"time"
"code.gitea.io/gitea/modules/log"
)
-// WorkerPool takes
+// WorkerPool represent a dynamically growable worker pool for a
+// provided handler function. They have an internal channel which
+// they use to detect if there is a block and will grow and shrink in
+// response to demand as per configuration.
type WorkerPool struct {
lock sync.Mutex
baseCtx context.Context
@@ -27,10 +31,42 @@ type WorkerPool struct {
blockTimeout time.Duration
boostTimeout time.Duration
boostWorkers int
+ numInQueue int64
+}
+
+// WorkerPoolConfiguration is the basic configuration for a WorkerPool
+type WorkerPoolConfiguration struct {
+ QueueLength int
+ BatchLength int
+ BlockTimeout time.Duration
+ BoostTimeout time.Duration
+ BoostWorkers int
+ MaxWorkers int
+}
+
+// NewWorkerPool creates a new worker pool
+func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPool {
+ ctx, cancel := context.WithCancel(context.Background())
+
+ dataChan := make(chan Data, config.QueueLength)
+ pool := &WorkerPool{
+ baseCtx: ctx,
+ cancel: cancel,
+ batchLength: config.BatchLength,
+ dataChan: dataChan,
+ handle: handle,
+ blockTimeout: config.BlockTimeout,
+ boostTimeout: config.BoostTimeout,
+ boostWorkers: config.BoostWorkers,
+ maxNumberOfWorkers: config.MaxWorkers,
+ }
+
+ return pool
}
// Push pushes the data to the internal channel
func (p *WorkerPool) Push(data Data) {
+ atomic.AddInt64(&p.numInQueue, 1)
p.lock.Lock()
if p.blockTimeout > 0 && p.boostTimeout > 0 && (p.numberOfWorkers <= p.maxNumberOfWorkers || p.maxNumberOfWorkers < 0) {
p.lock.Unlock()
@@ -80,7 +116,7 @@ func (p *WorkerPool) pushBoost(data Data) {
log.Warn("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, mq.Name, ourTimeout, boost, p.boostTimeout, p.blockTimeout)
start := time.Now()
- pid := mq.RegisterWorkers(boost, start, false, start, cancel)
+ pid := mq.RegisterWorkers(boost, start, false, start, cancel, false)
go func() {
<-ctx.Done()
mq.RemoveWorkers(pid)
@@ -138,8 +174,8 @@ func (p *WorkerPool) BlockTimeout() time.Duration {
return p.blockTimeout
}
-// SetSettings sets the setable boost values
-func (p *WorkerPool) SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) {
+// SetPoolSettings sets the setable boost values
+func (p *WorkerPool) SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) {
p.lock.Lock()
defer p.lock.Unlock()
p.maxNumberOfWorkers = maxNumberOfWorkers
@@ -156,8 +192,7 @@ func (p *WorkerPool) SetMaxNumberOfWorkers(newMax int) {
p.maxNumberOfWorkers = newMax
}
-// AddWorkers adds workers to the pool - this allows the number of workers to go above the limit
-func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc {
+func (p *WorkerPool) commonRegisterWorkers(number int, timeout time.Duration, isFlusher bool) (context.Context, context.CancelFunc) {
var ctx context.Context
var cancel context.CancelFunc
start := time.Now()
@@ -173,7 +208,7 @@ func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.Cance
mq := GetManager().GetManagedQueue(p.qid)
if mq != nil {
- pid := mq.RegisterWorkers(number, start, hasTimeout, end, cancel)
+ pid := mq.RegisterWorkers(number, start, hasTimeout, end, cancel, isFlusher)
go func() {
<-ctx.Done()
mq.RemoveWorkers(pid)
@@ -184,6 +219,12 @@ func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.Cance
log.Trace("WorkerPool: %d adding %d workers (no group id)", p.qid, number)
}
+ return ctx, cancel
+}
+
+// AddWorkers adds workers to the pool - this allows the number of workers to go above the limit
+func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc {
+ ctx, cancel := p.commonRegisterWorkers(number, timeout, false)
p.addWorkers(ctx, number)
return cancel
}
@@ -235,6 +276,7 @@ func (p *WorkerPool) CleanUp(ctx context.Context) {
close(p.dataChan)
for data := range p.dataChan {
p.handle(data)
+ atomic.AddInt64(&p.numInQueue, -1)
select {
case <-ctx.Done():
log.Warn("WorkerPool: %d Cleanup context closed before finishing clean-up", p.qid)
@@ -245,6 +287,37 @@ func (p *WorkerPool) CleanUp(ctx context.Context) {
log.Trace("WorkerPool: %d CleanUp Done", p.qid)
}
+// Flush flushes the channel with a timeout - the Flush worker will be registered as a flush worker with the manager
+func (p *WorkerPool) Flush(timeout time.Duration) error {
+ ctx, cancel := p.commonRegisterWorkers(1, timeout, true)
+ defer cancel()
+ return p.FlushWithContext(ctx)
+}
+
+// IsEmpty returns if true if the worker queue is empty
+func (p *WorkerPool) IsEmpty() bool {
+ return atomic.LoadInt64(&p.numInQueue) == 0
+}
+
+// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
+// NB: The worker will not be registered with the manager.
+func (p *WorkerPool) FlushWithContext(ctx context.Context) error {
+ log.Trace("WorkerPool: %d Flush", p.qid)
+ for {
+ select {
+ case data := <-p.dataChan:
+ p.handle(data)
+ atomic.AddInt64(&p.numInQueue, -1)
+ case <-p.baseCtx.Done():
+ return p.baseCtx.Err()
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ return nil
+ }
+ }
+}
+
func (p *WorkerPool) doWork(ctx context.Context) {
delay := time.Millisecond * 300
var data = make([]Data, 0, p.batchLength)
@@ -254,6 +327,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
if len(data) > 0 {
log.Trace("Handling: %d data, %v", len(data), data)
p.handle(data...)
+ atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
}
log.Trace("Worker shutting down")
return
@@ -263,6 +337,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
if len(data) > 0 {
log.Trace("Handling: %d data, %v", len(data), data)
p.handle(data...)
+ atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
}
log.Trace("Worker shutting down")
return
@@ -271,6 +346,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
if len(data) >= p.batchLength {
log.Trace("Handling: %d data, %v", len(data), data)
p.handle(data...)
+ atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
data = make([]Data, 0, p.batchLength)
}
default:
@@ -286,6 +362,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
if len(data) > 0 {
log.Trace("Handling: %d data, %v", len(data), data)
p.handle(data...)
+ atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
}
log.Trace("Worker shutting down")
return
@@ -301,6 +378,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
if len(data) > 0 {
log.Trace("Handling: %d data, %v", len(data), data)
p.handle(data...)
+ atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
}
log.Trace("Worker shutting down")
return
@@ -309,6 +387,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
if len(data) >= p.batchLength {
log.Trace("Handling: %d data, %v", len(data), data)
p.handle(data...)
+ atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
data = make([]Data, 0, p.batchLength)
}
case <-timer.C:
@@ -316,6 +395,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
if len(data) > 0 {
log.Trace("Handling: %d data, %v", len(data), data)
p.handle(data...)
+ atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
data = make([]Data, 0, p.batchLength)
}