summaryrefslogtreecommitdiffstats
path: root/modules/queue/workerpool.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/queue/workerpool.go')
-rw-r--r--modules/queue/workerpool.go44
1 files changed, 43 insertions, 1 deletions
diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go
index 244927880e..b32128cb82 100644
--- a/modules/queue/workerpool.go
+++ b/modules/queue/workerpool.go
@@ -463,13 +463,43 @@ func (p *WorkerPool) IsEmpty() bool {
return atomic.LoadInt64(&p.numInQueue) == 0
}
+// contextError returns either ctx.Done(), the base context's error or nil
+func (p *WorkerPool) contextError(ctx context.Context) error {
+ select {
+ case <-p.baseCtx.Done():
+ return p.baseCtx.Err()
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ return nil
+ }
+}
+
// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
// NB: The worker will not be registered with the manager.
func (p *WorkerPool) FlushWithContext(ctx context.Context) error {
log.Trace("WorkerPool: %d Flush", p.qid)
+ paused, _ := p.IsPausedIsResumed()
for {
+ // Because select will return any case that is satisified at random we precheck here before looking at dataChan.
+ select {
+ case <-paused:
+ // Ensure that even if paused that the cancelled error is still sent
+ return p.contextError(ctx)
+ case <-p.baseCtx.Done():
+ return p.baseCtx.Err()
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
+
select {
- case data := <-p.dataChan:
+ case <-paused:
+ return p.contextError(ctx)
+ case data, ok := <-p.dataChan:
+ if !ok {
+ return nil
+ }
if unhandled := p.handle(data); unhandled != nil {
log.Error("Unhandled Data whilst flushing queue %d", p.qid)
}
@@ -495,6 +525,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
paused, _ := p.IsPausedIsResumed()
data := make([]Data, 0, p.batchLength)
for {
+ // Because select will return any case that is satisified at random we precheck here before looking at dataChan.
select {
case <-paused:
log.Trace("Worker for Queue %d Pausing", p.qid)
@@ -515,8 +546,19 @@ func (p *WorkerPool) doWork(ctx context.Context) {
log.Trace("Worker shutting down")
return
}
+ case <-ctx.Done():
+ if len(data) > 0 {
+ log.Trace("Handling: %d data, %v", len(data), data)
+ if unhandled := p.handle(data...); unhandled != nil {
+ log.Error("Unhandled Data in queue %d", p.qid)
+ }
+ atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
+ }
+ log.Trace("Worker shutting down")
+ return
default:
}
+
select {
case <-paused:
// go back around