summaryrefslogtreecommitdiffstats
path: root/modules/queue/workergroup.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/queue/workergroup.go')
-rw-r--r--modules/queue/workergroup.go24
1 files changed, 15 insertions, 9 deletions
diff --git a/modules/queue/workergroup.go b/modules/queue/workergroup.go
index 7127ea1117..147a4f335e 100644
--- a/modules/queue/workergroup.go
+++ b/modules/queue/workergroup.go
@@ -5,6 +5,7 @@ package queue
import (
"context"
+ "runtime/pprof"
"sync"
"sync/atomic"
"time"
@@ -13,9 +14,10 @@ import (
)
var (
- infiniteTimerC = make(chan time.Time)
- batchDebounceDuration = 100 * time.Millisecond
- workerIdleDuration = 1 * time.Second
+ infiniteTimerC = make(chan time.Time)
+ batchDebounceDuration = 100 * time.Millisecond
+ workerIdleDuration = 1 * time.Second
+ shutdownDefaultTimeout = 2 * time.Second
unhandledItemRequeueDuration atomic.Int64 // to avoid data race during test
)
@@ -116,13 +118,15 @@ func (q *WorkerPoolQueue[T]) doWorkerHandle(batch []T) {
// If the queue is shutting down, it returns true and try to push the items
// Otherwise it does nothing and returns false
func (q *WorkerPoolQueue[T]) basePushForShutdown(items ...T) bool {
- ctxShutdown := q.ctxShutdown.Load()
- if ctxShutdown == nil {
+ shutdownTimeout := time.Duration(q.shutdownTimeout.Load())
+ if shutdownTimeout == 0 {
return false
}
+ ctxShutdown, ctxShutdownCancel := context.WithTimeout(context.Background(), shutdownTimeout)
+ defer ctxShutdownCancel()
for _, item := range items {
// if there is still any error, the queue can do nothing instead of losing the items
- if err := q.baseQueue.PushItem(*ctxShutdown, q.marshal(item)); err != nil {
+ if err := q.baseQueue.PushItem(ctxShutdown, q.marshal(item)); err != nil {
log.Error("Failed to requeue item for queue %q when shutting down: %v", q.GetName(), err)
}
}
@@ -246,6 +250,8 @@ var skipFlushChan = make(chan flushType) // an empty flush chan, used to skip re
// doRun is the main loop of the queue. All related "doXxx" functions are executed in its context.
func (q *WorkerPoolQueue[T]) doRun() {
+ pprof.SetGoroutineLabels(q.ctxRun)
+
log.Debug("Queue %q starts running", q.GetName())
defer log.Debug("Queue %q stops running", q.GetName())
@@ -271,8 +277,8 @@ func (q *WorkerPoolQueue[T]) doRun() {
}
}
- ctxShutdownPtr := q.ctxShutdown.Load()
- if ctxShutdownPtr != nil {
+ shutdownTimeout := time.Duration(q.shutdownTimeout.Load())
+ if shutdownTimeout != 0 {
// if there is a shutdown context, try to push the items back to the base queue
q.basePushForShutdown(unhandled...)
workerDone := make(chan struct{})
@@ -280,7 +286,7 @@ func (q *WorkerPoolQueue[T]) doRun() {
go func() { wg.wg.Wait(); close(workerDone) }()
select {
case <-workerDone:
- case <-(*ctxShutdownPtr).Done():
+ case <-time.After(shutdownTimeout):
log.Error("Queue %q is shutting down, but workers are still running after timeout", q.GetName())
}
} else {