diff options
author | zeripath <art27@cantab.net> | 2022-03-31 18:01:43 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-03-31 19:01:43 +0200 |
commit | c88547ce71a554091930e129c20776daf6da35ac (patch) | |
tree | 9232a7b0f07686698a9adbb51a3d3d72ebeaf12b /modules/queue | |
parent | 9c349a4277926bfd3ff0360301765ad7abd9f10b (diff) | |
download | gitea-c88547ce71a554091930e129c20776daf6da35ac.tar.gz gitea-c88547ce71a554091930e129c20776daf6da35ac.zip |
Add Goroutine stack inspector to admin/monitor (#19207)
Continues on from #19202.
Following the addition of pprof labels we can now more easily understand the relationship between a goroutine and the requests that spawn them.
This PR takes advantage of the labels and adds a few others, then provides a mechanism for the monitoring page to query the pprof goroutine profile.
The binary profile that results from this profile is immediately piped in to the google library for parsing this and then stack traces are formed for the goroutines.
If the goroutine is within a context or has been created from a goroutine within a process context it will acquire the process description labels for that process.
The goroutines are mapped with there associate pids and any that do not have an associated pid are placed in a group at the bottom as unbound.
In this way we should be able to more easily examine goroutines that have been stuck.
A manager command `gitea manager processes` is also provided that can export the processes (with or without stacktraces) to the command line.
Signed-off-by: Andrew Thornton <art27@cantab.net>
Diffstat (limited to 'modules/queue')
-rw-r--r-- | modules/queue/queue_bytefifo.go | 4 | ||||
-rw-r--r-- | modules/queue/queue_channel.go | 4 | ||||
-rw-r--r-- | modules/queue/queue_channel_test.go | 2 | ||||
-rw-r--r-- | modules/queue/queue_disk_channel.go | 6 | ||||
-rw-r--r-- | modules/queue/unique_queue_channel.go | 3 | ||||
-rw-r--r-- | modules/queue/unique_queue_channel_test.go | 2 | ||||
-rw-r--r-- | modules/queue/unique_queue_disk_channel.go | 7 | ||||
-rw-r--r-- | modules/queue/workerpool.go | 10 |
8 files changed, 29 insertions, 9 deletions
diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go index ead3828f33..99c6428abc 100644 --- a/modules/queue/queue_bytefifo.go +++ b/modules/queue/queue_bytefifo.go @@ -7,6 +7,7 @@ package queue import ( "context" "fmt" + "runtime/pprof" "sync" "sync/atomic" "time" @@ -20,7 +21,6 @@ import ( type ByteFIFOQueueConfiguration struct { WorkerPoolConfiguration Workers int - Name string WaitOnEmpty bool } @@ -153,6 +153,7 @@ func (q *ByteFIFOQueue) Flush(timeout time.Duration) error { // Run runs the bytefifo queue func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(func())) { + pprof.SetGoroutineLabels(q.baseCtx) atShutdown(q.Shutdown) atTerminate(q.Terminate) log.Debug("%s: %s Starting", q.typ, q.name) @@ -355,6 +356,7 @@ func (q *ByteFIFOQueue) Terminate() { if err := q.byteFIFO.Close(); err != nil { log.Error("Error whilst closing internal byte fifo in %s: %s: %v", q.typ, q.name, err) } + q.baseCtxFinished() log.Debug("%s: %s Terminated", q.typ, q.name) } diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go index 5469c03100..028023d500 100644 --- a/modules/queue/queue_channel.go +++ b/modules/queue/queue_channel.go @@ -7,6 +7,7 @@ package queue import ( "context" "fmt" + "runtime/pprof" "sync/atomic" "time" @@ -20,7 +21,6 @@ const ChannelQueueType Type = "channel" type ChannelQueueConfiguration struct { WorkerPoolConfiguration Workers int - Name string } // ChannelQueue implements Queue @@ -84,6 +84,7 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro // Run starts to run the queue func (q *ChannelQueue) Run(atShutdown, atTerminate func(func())) { + pprof.SetGoroutineLabels(q.baseCtx) atShutdown(q.Shutdown) atTerminate(q.Terminate) log.Debug("ChannelQueue: %s Starting", q.name) @@ -169,6 +170,7 @@ func (q *ChannelQueue) Terminate() { default: } q.terminateCtxCancel() + q.baseCtxFinished() log.Debug("ChannelQueue: %s Terminated", q.name) } diff --git a/modules/queue/queue_channel_test.go b/modules/queue/queue_channel_test.go index 26a635b918..d30b908861 100644 --- a/modules/queue/queue_channel_test.go +++ b/modules/queue/queue_channel_test.go @@ -34,9 +34,9 @@ func TestChannelQueue(t *testing.T) { BlockTimeout: 1 * time.Second, BoostTimeout: 5 * time.Minute, BoostWorkers: 5, + Name: "TestChannelQueue", }, Workers: 0, - Name: "TestChannelQueue", }, &testData{}) assert.NoError(t, err) diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index 0494698e0e..014d93f5b5 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -7,6 +7,7 @@ package queue import ( "context" "fmt" + "runtime/pprof" "sync" "sync/atomic" "time" @@ -72,9 +73,9 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( BoostTimeout: config.BoostTimeout, BoostWorkers: config.BoostWorkers, MaxWorkers: config.MaxWorkers, + Name: config.Name + "-channel", }, Workers: config.Workers, - Name: config.Name + "-channel", }, exemplar) if err != nil { return nil, err @@ -90,9 +91,9 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( BoostTimeout: 5 * time.Minute, BoostWorkers: 1, MaxWorkers: 5, + Name: config.Name + "-level", }, Workers: 0, - Name: config.Name + "-level", }, DataDir: config.DataDir, } @@ -154,6 +155,7 @@ func (q *PersistableChannelQueue) PushBack(data Data) error { // Run starts to run the queue func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) { + pprof.SetGoroutineLabels(q.channelQueue.baseCtx) log.Debug("PersistableChannelQueue: %s Starting", q.delayedStarter.name) _ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0) diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go index b7282e6c6c..6e8d37a20c 100644 --- a/modules/queue/unique_queue_channel.go +++ b/modules/queue/unique_queue_channel.go @@ -7,6 +7,7 @@ package queue import ( "context" "fmt" + "runtime/pprof" "sync" "sync/atomic" "time" @@ -97,6 +98,7 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue // Run starts to run the queue func (q *ChannelUniqueQueue) Run(atShutdown, atTerminate func(func())) { + pprof.SetGoroutineLabels(q.baseCtx) atShutdown(q.Shutdown) atTerminate(q.Terminate) log.Debug("ChannelUniqueQueue: %s Starting", q.name) @@ -226,6 +228,7 @@ func (q *ChannelUniqueQueue) Terminate() { default: } q.terminateCtxCancel() + q.baseCtxFinished() log.Debug("ChannelUniqueQueue: %s Terminated", q.name) } diff --git a/modules/queue/unique_queue_channel_test.go b/modules/queue/unique_queue_channel_test.go index ef6752079e..6daf3fc96e 100644 --- a/modules/queue/unique_queue_channel_test.go +++ b/modules/queue/unique_queue_channel_test.go @@ -32,9 +32,9 @@ func TestChannelUniqueQueue(t *testing.T) { BlockTimeout: 1 * time.Second, BoostTimeout: 5 * time.Minute, BoostWorkers: 5, + Name: "TestChannelQueue", }, Workers: 0, - Name: "TestChannelQueue", }, &testData{}) assert.NoError(t, err) diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go index 5ee1c396fc..6ab03094ba 100644 --- a/modules/queue/unique_queue_disk_channel.go +++ b/modules/queue/unique_queue_disk_channel.go @@ -6,6 +6,7 @@ package queue import ( "context" + "runtime/pprof" "sync" "time" @@ -72,9 +73,9 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac BoostTimeout: config.BoostTimeout, BoostWorkers: config.BoostWorkers, MaxWorkers: config.MaxWorkers, + Name: config.Name + "-channel", }, Workers: config.Workers, - Name: config.Name + "-channel", }, exemplar) if err != nil { return nil, err @@ -90,9 +91,9 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac BoostTimeout: 5 * time.Minute, BoostWorkers: 1, MaxWorkers: 5, + Name: config.Name + "-level", }, Workers: 0, - Name: config.Name + "-level", }, DataDir: config.DataDir, } @@ -183,6 +184,7 @@ func (q *PersistableChannelUniqueQueue) Has(data Data) (bool, error) { // Run starts to run the queue func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func())) { + pprof.SetGoroutineLabels(q.channelQueue.baseCtx) log.Debug("PersistableChannelUniqueQueue: %s Starting", q.delayedStarter.name) q.lock.Lock() @@ -301,6 +303,7 @@ func (q *PersistableChannelUniqueQueue) Terminate() { if q.internal != nil { q.internal.(*LevelUniqueQueue).Terminate() } + q.channelQueue.baseCtxFinished() log.Debug("PersistableChannelUniqueQueue: %s Terminated", q.delayedStarter.name) } diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index 5f6ec18710..2d8504598a 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -6,11 +6,14 @@ package queue import ( "context" + "fmt" + "runtime/pprof" "sync" "sync/atomic" "time" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/process" "code.gitea.io/gitea/modules/util" ) @@ -22,6 +25,7 @@ type WorkerPool struct { lock sync.Mutex baseCtx context.Context baseCtxCancel context.CancelFunc + baseCtxFinished process.FinishedFunc paused chan struct{} resumed chan struct{} cond *sync.Cond @@ -44,6 +48,7 @@ var ( // WorkerPoolConfiguration is the basic configuration for a WorkerPool type WorkerPoolConfiguration struct { + Name string QueueLength int BatchLength int BlockTimeout time.Duration @@ -54,12 +59,13 @@ type WorkerPoolConfiguration struct { // NewWorkerPool creates a new worker pool func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPool { - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel, finished := process.GetManager().AddTypedContext(context.Background(), fmt.Sprintf("Queue: %s", config.Name), process.SystemProcessType, false) dataChan := make(chan Data, config.QueueLength) pool := &WorkerPool{ baseCtx: ctx, baseCtxCancel: cancel, + baseCtxFinished: finished, batchLength: config.BatchLength, dataChan: dataChan, resumed: closedChan, @@ -299,6 +305,7 @@ func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc, p.numberOfWorkers++ p.lock.Unlock() go func() { + pprof.SetGoroutineLabels(ctx) p.doWork(ctx) p.lock.Lock() @@ -476,6 +483,7 @@ func (p *WorkerPool) FlushWithContext(ctx context.Context) error { } func (p *WorkerPool) doWork(ctx context.Context) { + pprof.SetGoroutineLabels(ctx) delay := time.Millisecond * 300 // Create a common timer - we will use this elsewhere |