summaryrefslogtreecommitdiffstats
path: root/modules/queue
diff options
context:
space:
mode:
Diffstat (limited to 'modules/queue')
-rw-r--r--modules/queue/queue_bytefifo.go4
-rw-r--r--modules/queue/queue_channel.go4
-rw-r--r--modules/queue/queue_channel_test.go2
-rw-r--r--modules/queue/queue_disk_channel.go6
-rw-r--r--modules/queue/unique_queue_channel.go3
-rw-r--r--modules/queue/unique_queue_channel_test.go2
-rw-r--r--modules/queue/unique_queue_disk_channel.go7
-rw-r--r--modules/queue/workerpool.go10
8 files changed, 29 insertions, 9 deletions
diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go
index ead3828f33..99c6428abc 100644
--- a/modules/queue/queue_bytefifo.go
+++ b/modules/queue/queue_bytefifo.go
@@ -7,6 +7,7 @@ package queue
import (
"context"
"fmt"
+ "runtime/pprof"
"sync"
"sync/atomic"
"time"
@@ -20,7 +21,6 @@ import (
type ByteFIFOQueueConfiguration struct {
WorkerPoolConfiguration
Workers int
- Name string
WaitOnEmpty bool
}
@@ -153,6 +153,7 @@ func (q *ByteFIFOQueue) Flush(timeout time.Duration) error {
// Run runs the bytefifo queue
func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(func())) {
+ pprof.SetGoroutineLabels(q.baseCtx)
atShutdown(q.Shutdown)
atTerminate(q.Terminate)
log.Debug("%s: %s Starting", q.typ, q.name)
@@ -355,6 +356,7 @@ func (q *ByteFIFOQueue) Terminate() {
if err := q.byteFIFO.Close(); err != nil {
log.Error("Error whilst closing internal byte fifo in %s: %s: %v", q.typ, q.name, err)
}
+ q.baseCtxFinished()
log.Debug("%s: %s Terminated", q.typ, q.name)
}
diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go
index 5469c03100..028023d500 100644
--- a/modules/queue/queue_channel.go
+++ b/modules/queue/queue_channel.go
@@ -7,6 +7,7 @@ package queue
import (
"context"
"fmt"
+ "runtime/pprof"
"sync/atomic"
"time"
@@ -20,7 +21,6 @@ const ChannelQueueType Type = "channel"
type ChannelQueueConfiguration struct {
WorkerPoolConfiguration
Workers int
- Name string
}
// ChannelQueue implements Queue
@@ -84,6 +84,7 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro
// Run starts to run the queue
func (q *ChannelQueue) Run(atShutdown, atTerminate func(func())) {
+ pprof.SetGoroutineLabels(q.baseCtx)
atShutdown(q.Shutdown)
atTerminate(q.Terminate)
log.Debug("ChannelQueue: %s Starting", q.name)
@@ -169,6 +170,7 @@ func (q *ChannelQueue) Terminate() {
default:
}
q.terminateCtxCancel()
+ q.baseCtxFinished()
log.Debug("ChannelQueue: %s Terminated", q.name)
}
diff --git a/modules/queue/queue_channel_test.go b/modules/queue/queue_channel_test.go
index 26a635b918..d30b908861 100644
--- a/modules/queue/queue_channel_test.go
+++ b/modules/queue/queue_channel_test.go
@@ -34,9 +34,9 @@ func TestChannelQueue(t *testing.T) {
BlockTimeout: 1 * time.Second,
BoostTimeout: 5 * time.Minute,
BoostWorkers: 5,
+ Name: "TestChannelQueue",
},
Workers: 0,
- Name: "TestChannelQueue",
}, &testData{})
assert.NoError(t, err)
diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go
index 0494698e0e..014d93f5b5 100644
--- a/modules/queue/queue_disk_channel.go
+++ b/modules/queue/queue_disk_channel.go
@@ -7,6 +7,7 @@ package queue
import (
"context"
"fmt"
+ "runtime/pprof"
"sync"
"sync/atomic"
"time"
@@ -72,9 +73,9 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
BoostTimeout: config.BoostTimeout,
BoostWorkers: config.BoostWorkers,
MaxWorkers: config.MaxWorkers,
+ Name: config.Name + "-channel",
},
Workers: config.Workers,
- Name: config.Name + "-channel",
}, exemplar)
if err != nil {
return nil, err
@@ -90,9 +91,9 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
BoostTimeout: 5 * time.Minute,
BoostWorkers: 1,
MaxWorkers: 5,
+ Name: config.Name + "-level",
},
Workers: 0,
- Name: config.Name + "-level",
},
DataDir: config.DataDir,
}
@@ -154,6 +155,7 @@ func (q *PersistableChannelQueue) PushBack(data Data) error {
// Run starts to run the queue
func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) {
+ pprof.SetGoroutineLabels(q.channelQueue.baseCtx)
log.Debug("PersistableChannelQueue: %s Starting", q.delayedStarter.name)
_ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0)
diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go
index b7282e6c6c..6e8d37a20c 100644
--- a/modules/queue/unique_queue_channel.go
+++ b/modules/queue/unique_queue_channel.go
@@ -7,6 +7,7 @@ package queue
import (
"context"
"fmt"
+ "runtime/pprof"
"sync"
"sync/atomic"
"time"
@@ -97,6 +98,7 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
// Run starts to run the queue
func (q *ChannelUniqueQueue) Run(atShutdown, atTerminate func(func())) {
+ pprof.SetGoroutineLabels(q.baseCtx)
atShutdown(q.Shutdown)
atTerminate(q.Terminate)
log.Debug("ChannelUniqueQueue: %s Starting", q.name)
@@ -226,6 +228,7 @@ func (q *ChannelUniqueQueue) Terminate() {
default:
}
q.terminateCtxCancel()
+ q.baseCtxFinished()
log.Debug("ChannelUniqueQueue: %s Terminated", q.name)
}
diff --git a/modules/queue/unique_queue_channel_test.go b/modules/queue/unique_queue_channel_test.go
index ef6752079e..6daf3fc96e 100644
--- a/modules/queue/unique_queue_channel_test.go
+++ b/modules/queue/unique_queue_channel_test.go
@@ -32,9 +32,9 @@ func TestChannelUniqueQueue(t *testing.T) {
BlockTimeout: 1 * time.Second,
BoostTimeout: 5 * time.Minute,
BoostWorkers: 5,
+ Name: "TestChannelQueue",
},
Workers: 0,
- Name: "TestChannelQueue",
}, &testData{})
assert.NoError(t, err)
diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go
index 5ee1c396fc..6ab03094ba 100644
--- a/modules/queue/unique_queue_disk_channel.go
+++ b/modules/queue/unique_queue_disk_channel.go
@@ -6,6 +6,7 @@ package queue
import (
"context"
+ "runtime/pprof"
"sync"
"time"
@@ -72,9 +73,9 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac
BoostTimeout: config.BoostTimeout,
BoostWorkers: config.BoostWorkers,
MaxWorkers: config.MaxWorkers,
+ Name: config.Name + "-channel",
},
Workers: config.Workers,
- Name: config.Name + "-channel",
}, exemplar)
if err != nil {
return nil, err
@@ -90,9 +91,9 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac
BoostTimeout: 5 * time.Minute,
BoostWorkers: 1,
MaxWorkers: 5,
+ Name: config.Name + "-level",
},
Workers: 0,
- Name: config.Name + "-level",
},
DataDir: config.DataDir,
}
@@ -183,6 +184,7 @@ func (q *PersistableChannelUniqueQueue) Has(data Data) (bool, error) {
// Run starts to run the queue
func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func())) {
+ pprof.SetGoroutineLabels(q.channelQueue.baseCtx)
log.Debug("PersistableChannelUniqueQueue: %s Starting", q.delayedStarter.name)
q.lock.Lock()
@@ -301,6 +303,7 @@ func (q *PersistableChannelUniqueQueue) Terminate() {
if q.internal != nil {
q.internal.(*LevelUniqueQueue).Terminate()
}
+ q.channelQueue.baseCtxFinished()
log.Debug("PersistableChannelUniqueQueue: %s Terminated", q.delayedStarter.name)
}
diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go
index 5f6ec18710..2d8504598a 100644
--- a/modules/queue/workerpool.go
+++ b/modules/queue/workerpool.go
@@ -6,11 +6,14 @@ package queue
import (
"context"
+ "fmt"
+ "runtime/pprof"
"sync"
"sync/atomic"
"time"
"code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/util"
)
@@ -22,6 +25,7 @@ type WorkerPool struct {
lock sync.Mutex
baseCtx context.Context
baseCtxCancel context.CancelFunc
+ baseCtxFinished process.FinishedFunc
paused chan struct{}
resumed chan struct{}
cond *sync.Cond
@@ -44,6 +48,7 @@ var (
// WorkerPoolConfiguration is the basic configuration for a WorkerPool
type WorkerPoolConfiguration struct {
+ Name string
QueueLength int
BatchLength int
BlockTimeout time.Duration
@@ -54,12 +59,13 @@ type WorkerPoolConfiguration struct {
// NewWorkerPool creates a new worker pool
func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPool {
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel, finished := process.GetManager().AddTypedContext(context.Background(), fmt.Sprintf("Queue: %s", config.Name), process.SystemProcessType, false)
dataChan := make(chan Data, config.QueueLength)
pool := &WorkerPool{
baseCtx: ctx,
baseCtxCancel: cancel,
+ baseCtxFinished: finished,
batchLength: config.BatchLength,
dataChan: dataChan,
resumed: closedChan,
@@ -299,6 +305,7 @@ func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc,
p.numberOfWorkers++
p.lock.Unlock()
go func() {
+ pprof.SetGoroutineLabels(ctx)
p.doWork(ctx)
p.lock.Lock()
@@ -476,6 +483,7 @@ func (p *WorkerPool) FlushWithContext(ctx context.Context) error {
}
func (p *WorkerPool) doWork(ctx context.Context) {
+ pprof.SetGoroutineLabels(ctx)
delay := time.Millisecond * 300
// Create a common timer - we will use this elsewhere