summaryrefslogtreecommitdiffstats
path: root/modules/queue/workerpool.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/queue/workerpool.go')
-rw-r--r--modules/queue/workerpool.go10
1 files changed, 9 insertions, 1 deletions
diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go
index 5f6ec18710..2d8504598a 100644
--- a/modules/queue/workerpool.go
+++ b/modules/queue/workerpool.go
@@ -6,11 +6,14 @@ package queue
import (
"context"
+ "fmt"
+ "runtime/pprof"
"sync"
"sync/atomic"
"time"
"code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/util"
)
@@ -22,6 +25,7 @@ type WorkerPool struct {
lock sync.Mutex
baseCtx context.Context
baseCtxCancel context.CancelFunc
+ baseCtxFinished process.FinishedFunc
paused chan struct{}
resumed chan struct{}
cond *sync.Cond
@@ -44,6 +48,7 @@ var (
// WorkerPoolConfiguration is the basic configuration for a WorkerPool
type WorkerPoolConfiguration struct {
+ Name string
QueueLength int
BatchLength int
BlockTimeout time.Duration
@@ -54,12 +59,13 @@ type WorkerPoolConfiguration struct {
// NewWorkerPool creates a new worker pool
func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPool {
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel, finished := process.GetManager().AddTypedContext(context.Background(), fmt.Sprintf("Queue: %s", config.Name), process.SystemProcessType, false)
dataChan := make(chan Data, config.QueueLength)
pool := &WorkerPool{
baseCtx: ctx,
baseCtxCancel: cancel,
+ baseCtxFinished: finished,
batchLength: config.BatchLength,
dataChan: dataChan,
resumed: closedChan,
@@ -299,6 +305,7 @@ func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc,
p.numberOfWorkers++
p.lock.Unlock()
go func() {
+ pprof.SetGoroutineLabels(ctx)
p.doWork(ctx)
p.lock.Lock()
@@ -476,6 +483,7 @@ func (p *WorkerPool) FlushWithContext(ctx context.Context) error {
}
func (p *WorkerPool) doWork(ctx context.Context) {
+ pprof.SetGoroutineLabels(ctx)
delay := time.Millisecond * 300
// Create a common timer - we will use this elsewhere