aboutsummaryrefslogtreecommitdiffstats
path: root/modules/queue
diff options
context:
space:
mode:
authorzeripath <art27@cantab.net>2022-03-31 18:01:43 +0100
committerGitHub <noreply@github.com>2022-03-31 19:01:43 +0200
commitc88547ce71a554091930e129c20776daf6da35ac (patch)
tree9232a7b0f07686698a9adbb51a3d3d72ebeaf12b /modules/queue
parent9c349a4277926bfd3ff0360301765ad7abd9f10b (diff)
downloadgitea-c88547ce71a554091930e129c20776daf6da35ac.tar.gz
gitea-c88547ce71a554091930e129c20776daf6da35ac.zip
Add Goroutine stack inspector to admin/monitor (#19207)
Continues on from #19202. Following the addition of pprof labels we can now more easily understand the relationship between a goroutine and the requests that spawn them. This PR takes advantage of the labels and adds a few others, then provides a mechanism for the monitoring page to query the pprof goroutine profile. The binary profile that results from this profile is immediately piped in to the google library for parsing this and then stack traces are formed for the goroutines. If the goroutine is within a context or has been created from a goroutine within a process context it will acquire the process description labels for that process. The goroutines are mapped with there associate pids and any that do not have an associated pid are placed in a group at the bottom as unbound. In this way we should be able to more easily examine goroutines that have been stuck. A manager command `gitea manager processes` is also provided that can export the processes (with or without stacktraces) to the command line. Signed-off-by: Andrew Thornton <art27@cantab.net>
Diffstat (limited to 'modules/queue')
-rw-r--r--modules/queue/queue_bytefifo.go4
-rw-r--r--modules/queue/queue_channel.go4
-rw-r--r--modules/queue/queue_channel_test.go2
-rw-r--r--modules/queue/queue_disk_channel.go6
-rw-r--r--modules/queue/unique_queue_channel.go3
-rw-r--r--modules/queue/unique_queue_channel_test.go2
-rw-r--r--modules/queue/unique_queue_disk_channel.go7
-rw-r--r--modules/queue/workerpool.go10
8 files changed, 29 insertions, 9 deletions
diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go
index ead3828f33..99c6428abc 100644
--- a/modules/queue/queue_bytefifo.go
+++ b/modules/queue/queue_bytefifo.go
@@ -7,6 +7,7 @@ package queue
import (
"context"
"fmt"
+ "runtime/pprof"
"sync"
"sync/atomic"
"time"
@@ -20,7 +21,6 @@ import (
type ByteFIFOQueueConfiguration struct {
WorkerPoolConfiguration
Workers int
- Name string
WaitOnEmpty bool
}
@@ -153,6 +153,7 @@ func (q *ByteFIFOQueue) Flush(timeout time.Duration) error {
// Run runs the bytefifo queue
func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(func())) {
+ pprof.SetGoroutineLabels(q.baseCtx)
atShutdown(q.Shutdown)
atTerminate(q.Terminate)
log.Debug("%s: %s Starting", q.typ, q.name)
@@ -355,6 +356,7 @@ func (q *ByteFIFOQueue) Terminate() {
if err := q.byteFIFO.Close(); err != nil {
log.Error("Error whilst closing internal byte fifo in %s: %s: %v", q.typ, q.name, err)
}
+ q.baseCtxFinished()
log.Debug("%s: %s Terminated", q.typ, q.name)
}
diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go
index 5469c03100..028023d500 100644
--- a/modules/queue/queue_channel.go
+++ b/modules/queue/queue_channel.go
@@ -7,6 +7,7 @@ package queue
import (
"context"
"fmt"
+ "runtime/pprof"
"sync/atomic"
"time"
@@ -20,7 +21,6 @@ const ChannelQueueType Type = "channel"
type ChannelQueueConfiguration struct {
WorkerPoolConfiguration
Workers int
- Name string
}
// ChannelQueue implements Queue
@@ -84,6 +84,7 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro
// Run starts to run the queue
func (q *ChannelQueue) Run(atShutdown, atTerminate func(func())) {
+ pprof.SetGoroutineLabels(q.baseCtx)
atShutdown(q.Shutdown)
atTerminate(q.Terminate)
log.Debug("ChannelQueue: %s Starting", q.name)
@@ -169,6 +170,7 @@ func (q *ChannelQueue) Terminate() {
default:
}
q.terminateCtxCancel()
+ q.baseCtxFinished()
log.Debug("ChannelQueue: %s Terminated", q.name)
}
diff --git a/modules/queue/queue_channel_test.go b/modules/queue/queue_channel_test.go
index 26a635b918..d30b908861 100644
--- a/modules/queue/queue_channel_test.go
+++ b/modules/queue/queue_channel_test.go
@@ -34,9 +34,9 @@ func TestChannelQueue(t *testing.T) {
BlockTimeout: 1 * time.Second,
BoostTimeout: 5 * time.Minute,
BoostWorkers: 5,
+ Name: "TestChannelQueue",
},
Workers: 0,
- Name: "TestChannelQueue",
}, &testData{})
assert.NoError(t, err)
diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go
index 0494698e0e..014d93f5b5 100644
--- a/modules/queue/queue_disk_channel.go
+++ b/modules/queue/queue_disk_channel.go
@@ -7,6 +7,7 @@ package queue
import (
"context"
"fmt"
+ "runtime/pprof"
"sync"
"sync/atomic"
"time"
@@ -72,9 +73,9 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
BoostTimeout: config.BoostTimeout,
BoostWorkers: config.BoostWorkers,
MaxWorkers: config.MaxWorkers,
+ Name: config.Name + "-channel",
},
Workers: config.Workers,
- Name: config.Name + "-channel",
}, exemplar)
if err != nil {
return nil, err
@@ -90,9 +91,9 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
BoostTimeout: 5 * time.Minute,
BoostWorkers: 1,
MaxWorkers: 5,
+ Name: config.Name + "-level",
},
Workers: 0,
- Name: config.Name + "-level",
},
DataDir: config.DataDir,
}
@@ -154,6 +155,7 @@ func (q *PersistableChannelQueue) PushBack(data Data) error {
// Run starts to run the queue
func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) {
+ pprof.SetGoroutineLabels(q.channelQueue.baseCtx)
log.Debug("PersistableChannelQueue: %s Starting", q.delayedStarter.name)
_ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0)
diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go
index b7282e6c6c..6e8d37a20c 100644
--- a/modules/queue/unique_queue_channel.go
+++ b/modules/queue/unique_queue_channel.go
@@ -7,6 +7,7 @@ package queue
import (
"context"
"fmt"
+ "runtime/pprof"
"sync"
"sync/atomic"
"time"
@@ -97,6 +98,7 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
// Run starts to run the queue
func (q *ChannelUniqueQueue) Run(atShutdown, atTerminate func(func())) {
+ pprof.SetGoroutineLabels(q.baseCtx)
atShutdown(q.Shutdown)
atTerminate(q.Terminate)
log.Debug("ChannelUniqueQueue: %s Starting", q.name)
@@ -226,6 +228,7 @@ func (q *ChannelUniqueQueue) Terminate() {
default:
}
q.terminateCtxCancel()
+ q.baseCtxFinished()
log.Debug("ChannelUniqueQueue: %s Terminated", q.name)
}
diff --git a/modules/queue/unique_queue_channel_test.go b/modules/queue/unique_queue_channel_test.go
index ef6752079e..6daf3fc96e 100644
--- a/modules/queue/unique_queue_channel_test.go
+++ b/modules/queue/unique_queue_channel_test.go
@@ -32,9 +32,9 @@ func TestChannelUniqueQueue(t *testing.T) {
BlockTimeout: 1 * time.Second,
BoostTimeout: 5 * time.Minute,
BoostWorkers: 5,
+ Name: "TestChannelQueue",
},
Workers: 0,
- Name: "TestChannelQueue",
}, &testData{})
assert.NoError(t, err)
diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go
index 5ee1c396fc..6ab03094ba 100644
--- a/modules/queue/unique_queue_disk_channel.go
+++ b/modules/queue/unique_queue_disk_channel.go
@@ -6,6 +6,7 @@ package queue
import (
"context"
+ "runtime/pprof"
"sync"
"time"
@@ -72,9 +73,9 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac
BoostTimeout: config.BoostTimeout,
BoostWorkers: config.BoostWorkers,
MaxWorkers: config.MaxWorkers,
+ Name: config.Name + "-channel",
},
Workers: config.Workers,
- Name: config.Name + "-channel",
}, exemplar)
if err != nil {
return nil, err
@@ -90,9 +91,9 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac
BoostTimeout: 5 * time.Minute,
BoostWorkers: 1,
MaxWorkers: 5,
+ Name: config.Name + "-level",
},
Workers: 0,
- Name: config.Name + "-level",
},
DataDir: config.DataDir,
}
@@ -183,6 +184,7 @@ func (q *PersistableChannelUniqueQueue) Has(data Data) (bool, error) {
// Run starts to run the queue
func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func())) {
+ pprof.SetGoroutineLabels(q.channelQueue.baseCtx)
log.Debug("PersistableChannelUniqueQueue: %s Starting", q.delayedStarter.name)
q.lock.Lock()
@@ -301,6 +303,7 @@ func (q *PersistableChannelUniqueQueue) Terminate() {
if q.internal != nil {
q.internal.(*LevelUniqueQueue).Terminate()
}
+ q.channelQueue.baseCtxFinished()
log.Debug("PersistableChannelUniqueQueue: %s Terminated", q.delayedStarter.name)
}
diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go
index 5f6ec18710..2d8504598a 100644
--- a/modules/queue/workerpool.go
+++ b/modules/queue/workerpool.go
@@ -6,11 +6,14 @@ package queue
import (
"context"
+ "fmt"
+ "runtime/pprof"
"sync"
"sync/atomic"
"time"
"code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/util"
)
@@ -22,6 +25,7 @@ type WorkerPool struct {
lock sync.Mutex
baseCtx context.Context
baseCtxCancel context.CancelFunc
+ baseCtxFinished process.FinishedFunc
paused chan struct{}
resumed chan struct{}
cond *sync.Cond
@@ -44,6 +48,7 @@ var (
// WorkerPoolConfiguration is the basic configuration for a WorkerPool
type WorkerPoolConfiguration struct {
+ Name string
QueueLength int
BatchLength int
BlockTimeout time.Duration
@@ -54,12 +59,13 @@ type WorkerPoolConfiguration struct {
// NewWorkerPool creates a new worker pool
func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPool {
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel, finished := process.GetManager().AddTypedContext(context.Background(), fmt.Sprintf("Queue: %s", config.Name), process.SystemProcessType, false)
dataChan := make(chan Data, config.QueueLength)
pool := &WorkerPool{
baseCtx: ctx,
baseCtxCancel: cancel,
+ baseCtxFinished: finished,
batchLength: config.BatchLength,
dataChan: dataChan,
resumed: closedChan,
@@ -299,6 +305,7 @@ func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc,
p.numberOfWorkers++
p.lock.Unlock()
go func() {
+ pprof.SetGoroutineLabels(ctx)
p.doWork(ctx)
p.lock.Lock()
@@ -476,6 +483,7 @@ func (p *WorkerPool) FlushWithContext(ctx context.Context) error {
}
func (p *WorkerPool) doWork(ctx context.Context) {
+ pprof.SetGoroutineLabels(ctx)
delay := time.Millisecond * 300
// Create a common timer - we will use this elsewhere