return q.FlushWithContext(ctx)
}
-// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
-func (q *ChannelQueue) FlushWithContext(ctx context.Context) error {
- log.Trace("ChannelQueue: %d Flush", q.qid)
- paused, _ := q.IsPausedIsResumed()
- for {
- select {
- case <-paused:
- return nil
- case data, ok := <-q.dataChan:
- if !ok {
- return nil
- }
- if unhandled := q.handle(data); unhandled != nil {
- log.Error("Unhandled Data whilst flushing queue %d", q.qid)
- }
- atomic.AddInt64(&q.numInQueue, -1)
- case <-q.baseCtx.Done():
- return q.baseCtx.Err()
- case <-ctx.Done():
- return ctx.Err()
- default:
- return nil
- }
- }
-}
-
// Shutdown processing from this queue
func (q *ChannelQueue) Shutdown() {
q.lock.Lock()
"fmt"
"runtime/pprof"
"sync"
- "sync/atomic"
"time"
"code.gitea.io/gitea/modules/container"
return q.FlushWithContext(ctx)
}
-// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
-func (q *ChannelUniqueQueue) FlushWithContext(ctx context.Context) error {
- log.Trace("ChannelUniqueQueue: %d Flush", q.qid)
- paused, _ := q.IsPausedIsResumed()
- for {
- select {
- case <-paused:
- return nil
- default:
- }
- select {
- case data, ok := <-q.dataChan:
- if !ok {
- return nil
- }
- if unhandled := q.handle(data); unhandled != nil {
- log.Error("Unhandled Data whilst flushing queue %d", q.qid)
- }
- atomic.AddInt64(&q.numInQueue, -1)
- case <-q.baseCtx.Done():
- return q.baseCtx.Err()
- case <-ctx.Done():
- return ctx.Err()
- default:
- return nil
- }
- }
-}
-
// Shutdown processing from this queue
func (q *ChannelUniqueQueue) Shutdown() {
log.Trace("ChannelUniqueQueue: %s Shutting down", q.name)
return atomic.LoadInt64(&p.numInQueue) == 0
}
+// contextError returns either ctx.Done(), the base context's error or nil
+func (p *WorkerPool) contextError(ctx context.Context) error {
+ select {
+ case <-p.baseCtx.Done():
+ return p.baseCtx.Err()
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ return nil
+ }
+}
+
// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
// NB: The worker will not be registered with the manager.
func (p *WorkerPool) FlushWithContext(ctx context.Context) error {
log.Trace("WorkerPool: %d Flush", p.qid)
+ paused, _ := p.IsPausedIsResumed()
for {
+ // Because select will return any case that is satisified at random we precheck here before looking at dataChan.
+ select {
+ case <-paused:
+ // Ensure that even if paused that the cancelled error is still sent
+ return p.contextError(ctx)
+ case <-p.baseCtx.Done():
+ return p.baseCtx.Err()
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
+
select {
- case data := <-p.dataChan:
+ case <-paused:
+ return p.contextError(ctx)
+ case data, ok := <-p.dataChan:
+ if !ok {
+ return nil
+ }
if unhandled := p.handle(data); unhandled != nil {
log.Error("Unhandled Data whilst flushing queue %d", p.qid)
}
paused, _ := p.IsPausedIsResumed()
data := make([]Data, 0, p.batchLength)
for {
+ // Because select will return any case that is satisified at random we precheck here before looking at dataChan.
select {
case <-paused:
log.Trace("Worker for Queue %d Pausing", p.qid)
log.Trace("Worker shutting down")
return
}
+ case <-ctx.Done():
+ if len(data) > 0 {
+ log.Trace("Handling: %d data, %v", len(data), data)
+ if unhandled := p.handle(data...); unhandled != nil {
+ log.Error("Unhandled Data in queue %d", p.qid)
+ }
+ atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
+ }
+ log.Trace("Worker shutting down")
+ return
default:
}
+
select {
case <-paused:
// go back around