summaryrefslogtreecommitdiffstats
path: root/modules/queue
diff options
context:
space:
mode:
Diffstat (limited to 'modules/queue')
-rw-r--r--modules/queue/queue_channel.go26
-rw-r--r--modules/queue/unique_queue_channel.go30
-rw-r--r--modules/queue/workerpool.go44
3 files changed, 43 insertions, 57 deletions
diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go
index 431f48390c..6f75b8357e 100644
--- a/modules/queue/queue_channel.go
+++ b/modules/queue/queue_channel.go
@@ -109,32 +109,6 @@ func (q *ChannelQueue) Flush(timeout time.Duration) error {
return q.FlushWithContext(ctx)
}
-// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
-func (q *ChannelQueue) FlushWithContext(ctx context.Context) error {
- log.Trace("ChannelQueue: %d Flush", q.qid)
- paused, _ := q.IsPausedIsResumed()
- for {
- select {
- case <-paused:
- return nil
- case data, ok := <-q.dataChan:
- if !ok {
- return nil
- }
- if unhandled := q.handle(data); unhandled != nil {
- log.Error("Unhandled Data whilst flushing queue %d", q.qid)
- }
- atomic.AddInt64(&q.numInQueue, -1)
- case <-q.baseCtx.Done():
- return q.baseCtx.Err()
- case <-ctx.Done():
- return ctx.Err()
- default:
- return nil
- }
- }
-}
-
// Shutdown processing from this queue
func (q *ChannelQueue) Shutdown() {
q.lock.Lock()
diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go
index f2d3dbdc97..c43bd1db3f 100644
--- a/modules/queue/unique_queue_channel.go
+++ b/modules/queue/unique_queue_channel.go
@@ -8,7 +8,6 @@ import (
"fmt"
"runtime/pprof"
"sync"
- "sync/atomic"
"time"
"code.gitea.io/gitea/modules/container"
@@ -167,35 +166,6 @@ func (q *ChannelUniqueQueue) Flush(timeout time.Duration) error {
return q.FlushWithContext(ctx)
}
-// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
-func (q *ChannelUniqueQueue) FlushWithContext(ctx context.Context) error {
- log.Trace("ChannelUniqueQueue: %d Flush", q.qid)
- paused, _ := q.IsPausedIsResumed()
- for {
- select {
- case <-paused:
- return nil
- default:
- }
- select {
- case data, ok := <-q.dataChan:
- if !ok {
- return nil
- }
- if unhandled := q.handle(data); unhandled != nil {
- log.Error("Unhandled Data whilst flushing queue %d", q.qid)
- }
- atomic.AddInt64(&q.numInQueue, -1)
- case <-q.baseCtx.Done():
- return q.baseCtx.Err()
- case <-ctx.Done():
- return ctx.Err()
- default:
- return nil
- }
- }
-}
-
// Shutdown processing from this queue
func (q *ChannelUniqueQueue) Shutdown() {
log.Trace("ChannelUniqueQueue: %s Shutting down", q.name)
diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go
index 244927880e..b32128cb82 100644
--- a/modules/queue/workerpool.go
+++ b/modules/queue/workerpool.go
@@ -463,13 +463,43 @@ func (p *WorkerPool) IsEmpty() bool {
return atomic.LoadInt64(&p.numInQueue) == 0
}
+// contextError returns either ctx.Done(), the base context's error or nil
+func (p *WorkerPool) contextError(ctx context.Context) error {
+ select {
+ case <-p.baseCtx.Done():
+ return p.baseCtx.Err()
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ return nil
+ }
+}
+
// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
// NB: The worker will not be registered with the manager.
func (p *WorkerPool) FlushWithContext(ctx context.Context) error {
log.Trace("WorkerPool: %d Flush", p.qid)
+ paused, _ := p.IsPausedIsResumed()
for {
+ // Because select will return any case that is satisified at random we precheck here before looking at dataChan.
+ select {
+ case <-paused:
+ // Ensure that even if paused that the cancelled error is still sent
+ return p.contextError(ctx)
+ case <-p.baseCtx.Done():
+ return p.baseCtx.Err()
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
+
select {
- case data := <-p.dataChan:
+ case <-paused:
+ return p.contextError(ctx)
+ case data, ok := <-p.dataChan:
+ if !ok {
+ return nil
+ }
if unhandled := p.handle(data); unhandled != nil {
log.Error("Unhandled Data whilst flushing queue %d", p.qid)
}
@@ -495,6 +525,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
paused, _ := p.IsPausedIsResumed()
data := make([]Data, 0, p.batchLength)
for {
+ // Because select will return any case that is satisified at random we precheck here before looking at dataChan.
select {
case <-paused:
log.Trace("Worker for Queue %d Pausing", p.qid)
@@ -515,8 +546,19 @@ func (p *WorkerPool) doWork(ctx context.Context) {
log.Trace("Worker shutting down")
return
}
+ case <-ctx.Done():
+ if len(data) > 0 {
+ log.Trace("Handling: %d data, %v", len(data), data)
+ if unhandled := p.handle(data...); unhandled != nil {
+ log.Error("Unhandled Data in queue %d", p.qid)
+ }
+ atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
+ }
+ log.Trace("Worker shutting down")
+ return
default:
}
+
select {
case <-paused:
// go back around