summaryrefslogtreecommitdiffstats
path: root/modules/queue/unique_queue_disk_channel.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/queue/unique_queue_disk_channel.go')
-rw-r--r--modules/queue/unique_queue_disk_channel.go7
1 files changed, 5 insertions, 2 deletions
diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go
index 5ee1c396fc..6ab03094ba 100644
--- a/modules/queue/unique_queue_disk_channel.go
+++ b/modules/queue/unique_queue_disk_channel.go
@@ -6,6 +6,7 @@ package queue
import (
"context"
+ "runtime/pprof"
"sync"
"time"
@@ -72,9 +73,9 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac
BoostTimeout: config.BoostTimeout,
BoostWorkers: config.BoostWorkers,
MaxWorkers: config.MaxWorkers,
+ Name: config.Name + "-channel",
},
Workers: config.Workers,
- Name: config.Name + "-channel",
}, exemplar)
if err != nil {
return nil, err
@@ -90,9 +91,9 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac
BoostTimeout: 5 * time.Minute,
BoostWorkers: 1,
MaxWorkers: 5,
+ Name: config.Name + "-level",
},
Workers: 0,
- Name: config.Name + "-level",
},
DataDir: config.DataDir,
}
@@ -183,6 +184,7 @@ func (q *PersistableChannelUniqueQueue) Has(data Data) (bool, error) {
// Run starts to run the queue
func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func())) {
+ pprof.SetGoroutineLabels(q.channelQueue.baseCtx)
log.Debug("PersistableChannelUniqueQueue: %s Starting", q.delayedStarter.name)
q.lock.Lock()
@@ -301,6 +303,7 @@ func (q *PersistableChannelUniqueQueue) Terminate() {
if q.internal != nil {
q.internal.(*LevelUniqueQueue).Terminate()
}
+ q.channelQueue.baseCtxFinished()
log.Debug("PersistableChannelUniqueQueue: %s Terminated", q.delayedStarter.name)
}