diff options
Diffstat (limited to 'modules/queue/workergroup.go')
-rw-r--r-- | modules/queue/workergroup.go | 24 |
1 files changed, 15 insertions, 9 deletions
diff --git a/modules/queue/workergroup.go b/modules/queue/workergroup.go index 7127ea1117..147a4f335e 100644 --- a/modules/queue/workergroup.go +++ b/modules/queue/workergroup.go @@ -5,6 +5,7 @@ package queue import ( "context" + "runtime/pprof" "sync" "sync/atomic" "time" @@ -13,9 +14,10 @@ import ( ) var ( - infiniteTimerC = make(chan time.Time) - batchDebounceDuration = 100 * time.Millisecond - workerIdleDuration = 1 * time.Second + infiniteTimerC = make(chan time.Time) + batchDebounceDuration = 100 * time.Millisecond + workerIdleDuration = 1 * time.Second + shutdownDefaultTimeout = 2 * time.Second unhandledItemRequeueDuration atomic.Int64 // to avoid data race during test ) @@ -116,13 +118,15 @@ func (q *WorkerPoolQueue[T]) doWorkerHandle(batch []T) { // If the queue is shutting down, it returns true and try to push the items // Otherwise it does nothing and returns false func (q *WorkerPoolQueue[T]) basePushForShutdown(items ...T) bool { - ctxShutdown := q.ctxShutdown.Load() - if ctxShutdown == nil { + shutdownTimeout := time.Duration(q.shutdownTimeout.Load()) + if shutdownTimeout == 0 { return false } + ctxShutdown, ctxShutdownCancel := context.WithTimeout(context.Background(), shutdownTimeout) + defer ctxShutdownCancel() for _, item := range items { // if there is still any error, the queue can do nothing instead of losing the items - if err := q.baseQueue.PushItem(*ctxShutdown, q.marshal(item)); err != nil { + if err := q.baseQueue.PushItem(ctxShutdown, q.marshal(item)); err != nil { log.Error("Failed to requeue item for queue %q when shutting down: %v", q.GetName(), err) } } @@ -246,6 +250,8 @@ var skipFlushChan = make(chan flushType) // an empty flush chan, used to skip re // doRun is the main loop of the queue. All related "doXxx" functions are executed in its context. func (q *WorkerPoolQueue[T]) doRun() { + pprof.SetGoroutineLabels(q.ctxRun) + log.Debug("Queue %q starts running", q.GetName()) defer log.Debug("Queue %q stops running", q.GetName()) @@ -271,8 +277,8 @@ func (q *WorkerPoolQueue[T]) doRun() { } } - ctxShutdownPtr := q.ctxShutdown.Load() - if ctxShutdownPtr != nil { + shutdownTimeout := time.Duration(q.shutdownTimeout.Load()) + if shutdownTimeout != 0 { // if there is a shutdown context, try to push the items back to the base queue q.basePushForShutdown(unhandled...) workerDone := make(chan struct{}) @@ -280,7 +286,7 @@ func (q *WorkerPoolQueue[T]) doRun() { go func() { wg.wg.Wait(); close(workerDone) }() select { case <-workerDone: - case <-(*ctxShutdownPtr).Done(): + case <-time.After(shutdownTimeout): log.Error("Queue %q is shutting down, but workers are still running after timeout", q.GetName()) } } else { |