summaryrefslogtreecommitdiffstats
path: root/modules
diff options
context:
space:
mode:
authorzeripath <art27@cantab.net>2022-03-31 18:01:43 +0100
committerGitHub <noreply@github.com>2022-03-31 19:01:43 +0200
commitc88547ce71a554091930e129c20776daf6da35ac (patch)
tree9232a7b0f07686698a9adbb51a3d3d72ebeaf12b /modules
parent9c349a4277926bfd3ff0360301765ad7abd9f10b (diff)
downloadgitea-c88547ce71a554091930e129c20776daf6da35ac.tar.gz
gitea-c88547ce71a554091930e129c20776daf6da35ac.zip
Add Goroutine stack inspector to admin/monitor (#19207)
Continues on from #19202. Following the addition of pprof labels we can now more easily understand the relationship between a goroutine and the requests that spawn them. This PR takes advantage of the labels and adds a few others, then provides a mechanism for the monitoring page to query the pprof goroutine profile. The binary profile that results from this profile is immediately piped in to the google library for parsing this and then stack traces are formed for the goroutines. If the goroutine is within a context or has been created from a goroutine within a process context it will acquire the process description labels for that process. The goroutines are mapped with there associate pids and any that do not have an associated pid are placed in a group at the bottom as unbound. In this way we should be able to more easily examine goroutines that have been stuck. A manager command `gitea manager processes` is also provided that can export the processes (with or without stacktraces) to the command line. Signed-off-by: Andrew Thornton <art27@cantab.net>
Diffstat (limited to 'modules')
-rw-r--r--modules/context/private.go2
-rw-r--r--modules/eventsource/manager_run.go4
-rw-r--r--modules/graceful/manager_unix.go6
-rw-r--r--modules/indexer/code/indexer.go7
-rw-r--r--modules/indexer/issues/indexer.go12
-rw-r--r--modules/log/event.go21
-rw-r--r--modules/log/multichannel.go2
-rw-r--r--modules/nosql/manager.go9
-rw-r--r--modules/nosql/manager_leveldb.go40
-rw-r--r--modules/nosql/manager_redis.go27
-rw-r--r--modules/private/manager.go23
-rw-r--r--modules/process/error.go26
-rw-r--r--modules/process/manager.go225
-rw-r--r--modules/process/manager_exec.go79
-rw-r--r--modules/process/manager_stacktraces.go355
-rw-r--r--modules/process/manager_test.go8
-rw-r--r--modules/process/process.go65
-rw-r--r--modules/queue/queue_bytefifo.go4
-rw-r--r--modules/queue/queue_channel.go4
-rw-r--r--modules/queue/queue_channel_test.go2
-rw-r--r--modules/queue/queue_disk_channel.go6
-rw-r--r--modules/queue/unique_queue_channel.go3
-rw-r--r--modules/queue/unique_queue_channel_test.go2
-rw-r--r--modules/queue/unique_queue_disk_channel.go7
-rw-r--r--modules/queue/workerpool.go10
-rw-r--r--modules/ssh/ssh.go8
-rw-r--r--modules/web/routing/logger_manager.go7
27 files changed, 732 insertions, 232 deletions
diff --git a/modules/context/private.go b/modules/context/private.go
index 6e5ef1bd12..b57ba102e6 100644
--- a/modules/context/private.go
+++ b/modules/context/private.go
@@ -79,6 +79,6 @@ func PrivateContexter() func(http.Handler) http.Handler {
// the underlying request has timed out from the ssh/http push
func OverrideContext(ctx *PrivateContext) (cancel context.CancelFunc) {
// We now need to override the request context as the base for our work because even if the request is cancelled we have to continue this work
- ctx.Override, _, cancel = process.GetManager().AddContext(graceful.GetManager().HammerContext(), fmt.Sprintf("PrivateContext: %s", ctx.Req.RequestURI))
+ ctx.Override, _, cancel = process.GetManager().AddTypedContext(graceful.GetManager().HammerContext(), fmt.Sprintf("PrivateContext: %s", ctx.Req.RequestURI), process.RequestProcessType, true)
return
}
diff --git a/modules/eventsource/manager_run.go b/modules/eventsource/manager_run.go
index 60598ecb49..9af5c9e78a 100644
--- a/modules/eventsource/manager_run.go
+++ b/modules/eventsource/manager_run.go
@@ -11,6 +11,7 @@ import (
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/timeutil"
)
@@ -25,6 +26,9 @@ func (m *Manager) Init() {
// Run runs the manager within a provided context
func (m *Manager) Run(ctx context.Context) {
+ ctx, _, finished := process.GetManager().AddTypedContext(ctx, "Service: EventSource", process.SystemProcessType, true)
+ defer finished()
+
then := timeutil.TimeStampNow().Add(-2)
timer := time.NewTicker(setting.UI.Notification.EventSourceUpdateTime)
loop:
diff --git a/modules/graceful/manager_unix.go b/modules/graceful/manager_unix.go
index 6fbb2bda29..b22b7b5860 100644
--- a/modules/graceful/manager_unix.go
+++ b/modules/graceful/manager_unix.go
@@ -18,6 +18,7 @@ import (
"time"
"code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/setting"
)
@@ -73,7 +74,7 @@ func (g *Manager) start(ctx context.Context) {
// Set the running state & handle signals
g.setState(stateRunning)
- go g.handleSignals(ctx)
+ go g.handleSignals(g.managerCtx)
// Handle clean up of unused provided listeners and delayed start-up
startupDone := make(chan struct{})
@@ -112,6 +113,9 @@ func (g *Manager) start(ctx context.Context) {
}
func (g *Manager) handleSignals(ctx context.Context) {
+ ctx, _, finished := process.GetManager().AddTypedContext(ctx, "Graceful: HandleSignals", process.SystemProcessType, true)
+ defer finished()
+
signalChannel := make(chan os.Signal, 1)
signal.Notify(
diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go
index d897fcccd5..3ead3261e9 100644
--- a/modules/indexer/code/indexer.go
+++ b/modules/indexer/code/indexer.go
@@ -7,6 +7,7 @@ package code
import (
"context"
"os"
+ "runtime/pprof"
"strconv"
"strings"
"time"
@@ -15,6 +16,7 @@ import (
repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/timeutil"
@@ -116,7 +118,7 @@ func Init() {
return
}
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel, finished := process.GetManager().AddTypedContext(context.Background(), "Service: CodeIndexer", process.SystemProcessType, false)
graceful.GetManager().RunAtTerminate(func() {
select {
@@ -128,6 +130,7 @@ func Init() {
log.Debug("Closing repository indexer")
indexer.Close()
log.Info("PID: %d Repository Indexer closed", os.Getpid())
+ finished()
})
waitChannel := make(chan time.Duration)
@@ -172,6 +175,7 @@ func Init() {
}
go func() {
+ pprof.SetGoroutineLabels(ctx)
start := time.Now()
var (
rIndexer Indexer
@@ -247,6 +251,7 @@ func Init() {
if setting.Indexer.StartupTimeout > 0 {
go func() {
+ pprof.SetGoroutineLabels(ctx)
timeout := setting.Indexer.StartupTimeout
if graceful.GetManager().IsChild() && setting.GracefulHammerTime > 0 {
timeout += setting.GracefulHammerTime
diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go
index 3aaa27eed2..1343b0bddd 100644
--- a/modules/indexer/issues/indexer.go
+++ b/modules/indexer/issues/indexer.go
@@ -8,6 +8,7 @@ import (
"context"
"fmt"
"os"
+ "runtime/pprof"
"sync"
"time"
@@ -16,6 +17,7 @@ import (
repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/util"
@@ -100,6 +102,8 @@ var (
// InitIssueIndexer initialize issue indexer, syncReindex is true then reindex until
// all issue index done.
func InitIssueIndexer(syncReindex bool) {
+ ctx, _, finished := process.GetManager().AddTypedContext(context.Background(), "Service: IssueIndexer", process.SystemProcessType, false)
+
waitChannel := make(chan time.Duration)
// Create the Queue
@@ -165,6 +169,7 @@ func InitIssueIndexer(syncReindex bool) {
// Create the Indexer
go func() {
+ pprof.SetGoroutineLabels(ctx)
start := time.Now()
log.Info("PID %d: Initializing Issue Indexer: %s", os.Getpid(), setting.Indexer.IssueType)
var populate bool
@@ -193,11 +198,13 @@ func InitIssueIndexer(syncReindex bool) {
if issueIndexer != nil {
issueIndexer.Close()
}
+ finished()
log.Info("PID: %d Issue Indexer closed", os.Getpid())
})
log.Debug("Created Bleve Indexer")
case "elasticsearch":
graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(func())) {
+ pprof.SetGoroutineLabels(ctx)
issueIndexer, err := NewElasticSearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueIndexerName)
if err != nil {
log.Fatal("Unable to initialize Elastic Search Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err)
@@ -208,10 +215,12 @@ func InitIssueIndexer(syncReindex bool) {
}
populate = !exist
holder.set(issueIndexer)
+ atTerminate(finished)
})
case "db":
issueIndexer := &DBIndexer{}
holder.set(issueIndexer)
+ graceful.GetManager().RunAtTerminate(finished)
default:
holder.cancel()
log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType)
@@ -251,6 +260,7 @@ func InitIssueIndexer(syncReindex bool) {
}
} else if setting.Indexer.StartupTimeout > 0 {
go func() {
+ pprof.SetGoroutineLabels(ctx)
timeout := setting.Indexer.StartupTimeout
if graceful.GetManager().IsChild() && setting.GracefulHammerTime > 0 {
timeout += setting.GracefulHammerTime
@@ -272,6 +282,8 @@ func InitIssueIndexer(syncReindex bool) {
// populateIssueIndexer populate the issue indexer with issue data
func populateIssueIndexer(ctx context.Context) {
+ ctx, _, finished := process.GetManager().AddTypedContext(ctx, "Service: PopulateIssueIndexer", process.SystemProcessType, true)
+ defer finished()
for page := 1; ; page++ {
select {
case <-ctx.Done():
diff --git a/modules/log/event.go b/modules/log/event.go
index b20dac17c7..f66ecd179b 100644
--- a/modules/log/event.go
+++ b/modules/log/event.go
@@ -5,9 +5,13 @@
package log
import (
+ "context"
"fmt"
+ "runtime/pprof"
"sync"
"time"
+
+ "code.gitea.io/gitea/modules/process"
)
// Event represents a logging event
@@ -34,6 +38,8 @@ type EventLogger interface {
// ChannelledLog represents a cached channel to a LoggerProvider
type ChannelledLog struct {
+ ctx context.Context
+ finished context.CancelFunc
name string
provider string
queue chan *Event
@@ -44,8 +50,9 @@ type ChannelledLog struct {
}
// NewChannelledLog a new logger instance with given logger provider and config.
-func NewChannelledLog(name, provider, config string, bufferLength int64) (*ChannelledLog, error) {
+func NewChannelledLog(parent context.Context, name, provider, config string, bufferLength int64) (*ChannelledLog, error) {
if log, ok := providers[provider]; ok {
+
l := &ChannelledLog{
queue: make(chan *Event, bufferLength),
flush: make(chan bool),
@@ -58,6 +65,7 @@ func NewChannelledLog(name, provider, config string, bufferLength int64) (*Chann
}
l.name = name
l.provider = provider
+ l.ctx, _, l.finished = process.GetManager().AddTypedContext(parent, fmt.Sprintf("Logger: %s(%s)", l.name, l.provider), process.SystemProcessType, false)
go l.Start()
return l, nil
}
@@ -66,6 +74,8 @@ func NewChannelledLog(name, provider, config string, bufferLength int64) (*Chann
// Start processing the ChannelledLog
func (l *ChannelledLog) Start() {
+ pprof.SetGoroutineLabels(l.ctx)
+ defer l.finished()
for {
select {
case event, ok := <-l.queue:
@@ -140,6 +150,8 @@ func (l *ChannelledLog) GetName() string {
// MultiChannelledLog represents a cached channel to a LoggerProvider
type MultiChannelledLog struct {
+ ctx context.Context
+ finished context.CancelFunc
name string
bufferLength int64
queue chan *Event
@@ -156,7 +168,11 @@ type MultiChannelledLog struct {
// NewMultiChannelledLog a new logger instance with given logger provider and config.
func NewMultiChannelledLog(name string, bufferLength int64) *MultiChannelledLog {
+ ctx, _, finished := process.GetManager().AddTypedContext(context.Background(), fmt.Sprintf("Logger: %s", name), process.SystemProcessType, false)
+
m := &MultiChannelledLog{
+ ctx: ctx,
+ finished: finished,
name: name,
queue: make(chan *Event, bufferLength),
flush: make(chan bool),
@@ -277,6 +293,9 @@ func (m *MultiChannelledLog) Start() {
m.rwmutex.Unlock()
return
}
+ pprof.SetGoroutineLabels(m.ctx)
+ defer m.finished()
+
m.started = true
m.rwmutex.Unlock()
paused := false
diff --git a/modules/log/multichannel.go b/modules/log/multichannel.go
index 8d94eb2b22..273df81df1 100644
--- a/modules/log/multichannel.go
+++ b/modules/log/multichannel.go
@@ -31,7 +31,7 @@ func newLogger(name string, buffer int64) *MultiChannelledLogger {
// SetLogger sets new logger instance with given logger provider and config.
func (l *MultiChannelledLogger) SetLogger(name, provider, config string) error {
- eventLogger, err := NewChannelledLog(name, provider, config, l.bufferLength)
+ eventLogger, err := NewChannelledLog(l.ctx, name, provider, config, l.bufferLength)
if err != nil {
return fmt.Errorf("Failed to create sublogger (%s): %v", name, err)
}
diff --git a/modules/nosql/manager.go b/modules/nosql/manager.go
index a89b5bb633..dab30812ce 100644
--- a/modules/nosql/manager.go
+++ b/modules/nosql/manager.go
@@ -5,10 +5,12 @@
package nosql
import (
+ "context"
"strconv"
"sync"
"time"
+ "code.gitea.io/gitea/modules/process"
"github.com/go-redis/redis/v8"
"github.com/syndtr/goleveldb/leveldb"
)
@@ -17,7 +19,9 @@ var manager *Manager
// Manager is the nosql connection manager
type Manager struct {
- mutex sync.Mutex
+ ctx context.Context
+ finished context.CancelFunc
+ mutex sync.Mutex
RedisConnections map[string]*redisClientHolder
LevelDBConnections map[string]*levelDBHolder
@@ -46,7 +50,10 @@ func init() {
// GetManager returns a Manager and initializes one as singleton is there's none yet
func GetManager() *Manager {
if manager == nil {
+ ctx, _, finished := process.GetManager().AddTypedContext(context.Background(), "Service: NoSQL", process.SystemProcessType, false)
manager = &Manager{
+ ctx: ctx,
+ finished: finished,
RedisConnections: make(map[string]*redisClientHolder),
LevelDBConnections: make(map[string]*levelDBHolder),
}
diff --git a/modules/nosql/manager_leveldb.go b/modules/nosql/manager_leveldb.go
index de4ef14d7d..d69ae88800 100644
--- a/modules/nosql/manager_leveldb.go
+++ b/modules/nosql/manager_leveldb.go
@@ -7,6 +7,7 @@ package nosql
import (
"fmt"
"path"
+ "runtime/pprof"
"strconv"
"strings"
@@ -50,7 +51,31 @@ func (m *Manager) CloseLevelDB(connection string) error {
}
// GetLevelDB gets a levelDB for a particular connection
-func (m *Manager) GetLevelDB(connection string) (*leveldb.DB, error) {
+func (m *Manager) GetLevelDB(connection string) (db *leveldb.DB, err error) {
+ // Because we want associate any goroutines created by this call to the main nosqldb context we need to
+ // wrap this in a goroutine labelled with the nosqldb context
+ done := make(chan struct{})
+ var recovered interface{}
+ go func() {
+ defer func() {
+ recovered = recover()
+ if recovered != nil {
+ log.Critical("PANIC during GetLevelDB: %v\nStacktrace: %s", recovered, log.Stack(2))
+ }
+ close(done)
+ }()
+ pprof.SetGoroutineLabels(m.ctx)
+
+ db, err = m.getLevelDB(connection)
+ }()
+ <-done
+ if recovered != nil {
+ panic(recovered)
+ }
+ return
+}
+
+func (m *Manager) getLevelDB(connection string) (*leveldb.DB, error) {
// Convert the provided connection description to the common format
uri := ToLevelDBURI(connection)
@@ -168,15 +193,18 @@ func (m *Manager) GetLevelDB(connection string) (*leveldb.DB, error) {
if err != nil {
if !errors.IsCorrupted(err) {
if strings.Contains(err.Error(), "resource temporarily unavailable") {
- return nil, fmt.Errorf("unable to lock level db at %s: %w", dataDir, err)
+ err = fmt.Errorf("unable to lock level db at %s: %w", dataDir, err)
+ return nil, err
}
- return nil, fmt.Errorf("unable to open level db at %s: %w", dataDir, err)
- }
- db.db, err = leveldb.RecoverFile(dataDir, opts)
- if err != nil {
+ err = fmt.Errorf("unable to open level db at %s: %w", dataDir, err)
return nil, err
}
+ db.db, err = leveldb.RecoverFile(dataDir, opts)
+ }
+
+ if err != nil {
+ return nil, err
}
for _, name := range db.name {
diff --git a/modules/nosql/manager_redis.go b/modules/nosql/manager_redis.go
index 0ff01dcac2..b82f899db0 100644
--- a/modules/nosql/manager_redis.go
+++ b/modules/nosql/manager_redis.go
@@ -8,6 +8,7 @@ import (
"crypto/tls"
"net/url"
"path"
+ "runtime/pprof"
"strconv"
"strings"
@@ -43,7 +44,31 @@ func (m *Manager) CloseRedisClient(connection string) error {
}
// GetRedisClient gets a redis client for a particular connection
-func (m *Manager) GetRedisClient(connection string) redis.UniversalClient {
+func (m *Manager) GetRedisClient(connection string) (client redis.UniversalClient) {
+ // Because we want associate any goroutines created by this call to the main nosqldb context we need to
+ // wrap this in a goroutine labelled with the nosqldb context
+ done := make(chan struct{})
+ var recovered interface{}
+ go func() {
+ defer func() {
+ recovered = recover()
+ if recovered != nil {
+ log.Critical("PANIC during GetRedisClient: %v\nStacktrace: %s", recovered, log.Stack(2))
+ }
+ close(done)
+ }()
+ pprof.SetGoroutineLabels(m.ctx)
+
+ client = m.getRedisClient(connection)
+ }()
+ <-done
+ if recovered != nil {
+ panic(recovered)
+ }
+ return
+}
+
+func (m *Manager) getRedisClient(connection string) redis.UniversalClient {
m.mutex.Lock()
defer m.mutex.Unlock()
client, ok := m.RedisConnections[connection]
diff --git a/modules/private/manager.go b/modules/private/manager.go
index 2543e141ea..8405bf2c83 100644
--- a/modules/private/manager.go
+++ b/modules/private/manager.go
@@ -7,6 +7,7 @@ package private
import (
"context"
"fmt"
+ "io"
"net/http"
"net/url"
"time"
@@ -189,3 +190,25 @@ func RemoveLogger(ctx context.Context, group, name string) (int, string) {
return http.StatusOK, "Removed"
}
+
+// Processes return the current processes from this gitea instance
+func Processes(ctx context.Context, out io.Writer, flat, noSystem, stacktraces, json bool, cancel string) (int, string) {
+ reqURL := setting.LocalURL + fmt.Sprintf("api/internal/manager/processes?flat=%t&no-system=%t&stacktraces=%t&json=%t&cancel-pid=%s", flat, noSystem, stacktraces, json, url.QueryEscape(cancel))
+
+ req := newInternalRequest(ctx, reqURL, "GET")
+ resp, err := req.Response()
+ if err != nil {
+ return http.StatusInternalServerError, fmt.Sprintf("Unable to contact gitea: %v", err.Error())
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ return resp.StatusCode, decodeJSONError(resp).Err
+ }
+
+ _, err = io.Copy(out, resp.Body)
+ if err != nil {
+ return http.StatusInternalServerError, err.Error()
+ }
+ return http.StatusOK, ""
+}
diff --git a/modules/process/error.go b/modules/process/error.go
new file mode 100644
index 0000000000..7a72bda40e
--- /dev/null
+++ b/modules/process/error.go
@@ -0,0 +1,26 @@
+// Copyright 2022 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package process
+
+import "fmt"
+
+// Error is a wrapped error describing the error results of Process Execution
+type Error struct {
+ PID IDType
+ Description string
+ Err error
+ CtxErr error
+ Stdout string
+ Stderr string
+}
+
+func (err *Error) Error() string {
+ return fmt.Sprintf("exec(%s:%s) failed: %v(%v) stdout: %s stderr: %s", err.PID, err.Description, err.Err, err.CtxErr, err.Stdout, err.Stderr)
+}
+
+// Unwrap implements the unwrappable implicit interface for go1.13 Unwrap()
+func (err *Error) Unwrap() error {
+ return err.Err
+}
diff --git a/modules/process/manager.go b/modules/process/manager.go
index 26dd6d535f..5d7aee760f 100644
--- a/modules/process/manager.go
+++ b/modules/process/manager.go
@@ -6,13 +6,8 @@
package process
import (
- "bytes"
"context"
- "fmt"
- "io"
- "os/exec"
"runtime/pprof"
- "sort"
"strconv"
"sync"
"time"
@@ -30,6 +25,18 @@ var (
DefaultContext = context.Background()
)
+// DescriptionPProfLabel is a label set on goroutines that have a process attached
+const DescriptionPProfLabel = "process-description"
+
+// PIDPProfLabel is a label set on goroutines that have a process attached
+const PIDPProfLabel = "pid"
+
+// PPIDPProfLabel is a label set on goroutines that have a process attached
+const PPIDPProfLabel = "ppid"
+
+// ProcessTypePProfLabel is a label set on goroutines that have a process attached
+const ProcessTypePProfLabel = "process-type"
+
// IDType is a pid type
type IDType string
@@ -44,15 +51,15 @@ type Manager struct {
next int64
lastTime int64
- processes map[IDType]*Process
+ processMap map[IDType]*process
}
// GetManager returns a Manager and initializes one as singleton if there's none yet
func GetManager() *Manager {
managerInit.Do(func() {
manager = &Manager{
- processes: make(map[IDType]*Process),
- next: 1,
+ processMap: make(map[IDType]*process),
+ next: 1,
}
})
return manager
@@ -69,12 +76,25 @@ func GetManager() *Manager {
func (pm *Manager) AddContext(parent context.Context, description string) (ctx context.Context, cancel context.CancelFunc, finished FinishedFunc) {
ctx, cancel = context.WithCancel(parent)
- ctx, pid, finished := pm.Add(ctx, description, cancel)
+ ctx, _, finished = pm.Add(ctx, description, cancel, NormalProcessType, true)
- return &Context{
- Context: ctx,
- pid: pid,
- }, cancel, finished
+ return ctx, cancel, finished
+}
+
+// AddTypedContext creates a new context and adds it as a process. Once the process is finished, finished must be called
+// to remove the process from the process table. It should not be called until the process is finished but must always be called.
+//
+// cancel should be used to cancel the returned context, however it will not remove the process from the process table.
+// finished will cancel the returned context and remove it from the process table.
+//
+// Most processes will not need to use the cancel function but there will be cases whereby you want to cancel the process but not immediately remove it from the
+// process table.
+func (pm *Manager) AddTypedContext(parent context.Context, description, processType string, currentlyRunning bool) (ctx context.Context, cancel context.CancelFunc, finished FinishedFunc) {
+ ctx, cancel = context.WithCancel(parent)
+
+ ctx, _, finished = pm.Add(ctx, description, cancel, processType, currentlyRunning)
+
+ return ctx, cancel, finished
}
// AddContextTimeout creates a new context and add it as a process. Once the process is finished, finished must be called
@@ -90,52 +110,61 @@ func (pm *Manager) AddContextTimeout(parent context.Context, timeout time.Durati
// it's meaningless to use timeout <= 0, and it must be a bug! so we must panic here to tell developers to make the timeout correct
panic("the timeout must be greater than zero, otherwise the context will be cancelled immediately")
}
+
ctx, cancel = context.WithTimeout(parent, timeout)
- ctx, pid, finshed := pm.Add(ctx, description, cancel)
+ ctx, _, finshed = pm.Add(ctx, description, cancel, NormalProcessType, true)
- return &Context{
- Context: ctx,
- pid: pid,
- }, cancel, finshed
+ return ctx, cancel, finshed
}
// Add create a new process
-func (pm *Manager) Add(ctx context.Context, description string, cancel context.CancelFunc) (context.Context, IDType, FinishedFunc) {
+func (pm *Manager) Add(ctx context.Context, description string, cancel context.CancelFunc, processType string, currentlyRunning bool) (context.Context, IDType, FinishedFunc) {
parentPID := GetParentPID(ctx)
pm.mutex.Lock()
start, pid := pm.nextPID()
- parent := pm.processes[parentPID]
+ parent := pm.processMap[parentPID]
if parent == nil {
parentPID = ""
}
- process := &Process{
+ process := &process{
PID: pid,
ParentPID: parentPID,
Description: description,
Start: start,
Cancel: cancel,
+ Type: processType,
}
- finished := func() {
- cancel()
- pm.remove(process)
- pprof.SetGoroutineLabels(ctx)
+ var finished FinishedFunc
+ if currentlyRunning {
+ finished = func() {
+ cancel()
+ pm.remove(process)
+ pprof.SetGoroutineLabels(ctx)
+ }
+ } else {
+ finished = func() {
+ cancel()
+ pm.remove(process)
+ }
}
- if parent != nil {
- parent.AddChild(process)
- }
- pm.processes[pid] = process
+ pm.processMap[pid] = process
pm.mutex.Unlock()
- pprofCtx := pprof.WithLabels(ctx, pprof.Labels("process-description", description, "ppid", string(parentPID), "pid", string(pid)))
- pprof.SetGoroutineLabels(pprofCtx)
+ pprofCtx := pprof.WithLabels(ctx, pprof.Labels(DescriptionPProfLabel, description, PPIDPProfLabel, string(parentPID), PIDPProfLabel, string(pid), ProcessTypePProfLabel, processType))
+ if currentlyRunning {
+ pprof.SetGoroutineLabels(pprofCtx)
+ }
- return pprofCtx, pid, finished
+ return &Context{
+ Context: pprofCtx,
+ pid: pid,
+ }, pid, finished
}
// nextPID will return the next available PID. pm.mutex should already be locked.
@@ -160,142 +189,24 @@ func (pm *Manager) nextPID() (start time.Time, pid IDType) {
// Remove a process from the ProcessManager.
func (pm *Manager) Remove(pid IDType) {
pm.mutex.Lock()
- delete(pm.processes, pid)
+ delete(pm.processMap, pid)
pm.mutex.Unlock()
}
-func (pm *Manager) remove(process *Process) {
+func (pm *Manager) remove(process *process) {
pm.mutex.Lock()
- if p := pm.processes[process.PID]; p == process {
- delete(pm.processes, process.PID)
+ defer pm.mutex.Unlock()
+ if p := pm.processMap[process.PID]; p == process {
+ delete(pm.processMap, process.PID)
}
- parent := pm.processes[process.ParentPID]
- pm.mutex.Unlock()
-
- if parent == nil {
- return
- }
-
- parent.RemoveChild(process)
}
// Cancel a process in the ProcessManager.
func (pm *Manager) Cancel(pid IDType) {
pm.mutex.Lock()
- process, ok := pm.processes[pid]
+ process, ok := pm.processMap[pid]
pm.mutex.Unlock()
- if ok {
+ if ok && process.Type != SystemProcessType {
process.Cancel()
}
}
-
-// Processes gets the processes in a thread safe manner
-func (pm *Manager) Processes(onlyRoots bool) []*Process {
- pm.mutex.Lock()
- processes := make([]*Process, 0, len(pm.processes))
- if onlyRoots {
- for _, process := range pm.processes {
- if _, has := pm.processes[process.ParentPID]; !has {
- processes = append(processes, process)
- }
- }
- } else {
- for _, process := range pm.processes {
- processes = append(processes, process)
- }
- }
- pm.mutex.Unlock()
-
- sort.Slice(processes, func(i, j int) bool {
- left, right := processes[i], processes[j]
-
- return left.Start.Before(right.Start)
- })
-
- return processes
-}
-
-// Exec a command and use the default timeout.
-func (pm *Manager) Exec(desc, cmdName string, args ...string) (string, string, error) {
- return pm.ExecDir(DefaultContext, -1, "", desc, cmdName, args...)
-}
-
-// ExecTimeout a command and use a specific timeout duration.
-func (pm *Manager) ExecTimeout(timeout time.Duration, desc, cmdName string, args ...string) (string, string, error) {
- return pm.ExecDir(DefaultContext, timeout, "", desc, cmdName, args...)
-}
-
-// ExecDir a command and use the default timeout.
-func (pm *Manager) ExecDir(ctx context.Context, timeout time.Duration, dir, desc, cmdName string, args ...string) (string, string, error) {
- return pm.ExecDirEnv(ctx, timeout, dir, desc, nil, cmdName, args...)
-}
-
-// ExecDirEnv runs a command in given path and environment variables, and waits for its completion
-// up to the given timeout (or DefaultTimeout if -1 is given).
-// Returns its complete stdout and stderr
-// outputs and an error, if any (including timeout)
-func (pm *Manager) ExecDirEnv(ctx context.Context, timeout time.Duration, dir, desc string, env []string, cmdName string, args ...string) (string, string, error) {
- return pm.ExecDirEnvStdIn(ctx, timeout, dir, desc, env, nil, cmdName, args...)
-}
-
-// ExecDirEnvStdIn runs a command in given path and environment variables with provided stdIN, and waits for its completion
-// up to the given timeout (or DefaultTimeout if -1 is given).
-// Returns its complete stdout and stderr
-// outputs and an error, if any (including timeout)
-func (pm *Manager) ExecDirEnvStdIn(ctx context.Context, timeout time.Duration, dir, desc string, env []string, stdIn io.Reader, cmdName string, args ...string) (string, string, error) {
- if timeout <= 0 {
- timeout = 60 * time.Second
- }
-
- stdOut := new(bytes.Buffer)
- stdErr := new(bytes.Buffer)
-
- ctx, _, finished := pm.AddContextTimeout(ctx, timeout, desc)
- defer finished()
-
- cmd := exec.CommandContext(ctx, cmdName, args...)
- cmd.Dir = dir
- cmd.Env = env
- cmd.Stdout = stdOut
- cmd.Stderr = stdErr
- if stdIn != nil {
- cmd.Stdin = stdIn
- }
-
- if err := cmd.Start(); err != nil {
- return "", "", err
- }
-
- err := cmd.Wait()
- if err != nil {
- err = &Error{
- PID: GetPID(ctx),
- Description: desc,
- Err: err,
- CtxErr: ctx.Err(),
- Stdout: stdOut.String(),
- Stderr: stdErr.String(),
- }
- }
-
- return stdOut.String(), stdErr.String(), err
-}
-
-// Error is a wrapped error describing the error results of Process Execution
-type Error struct {
- PID IDType
- Description string
- Err error
- CtxErr error
- Stdout string
- Stderr string
-}
-
-func (err *Error) Error() string {
- return fmt.Sprintf("exec(%s:%s) failed: %v(%v) stdout: %s stderr: %s", err.PID, err.Description, err.Err, err.CtxErr, err.Stdout, err.Stderr)
-}
-
-// Unwrap implements the unwrappable implicit interface for go1.13 Unwrap()
-func (err *Error) Unwrap() error {
- return err.Err
-}
diff --git a/modules/process/manager_exec.go b/modules/process/manager_exec.go
new file mode 100644
index 0000000000..61ddae646f
--- /dev/null
+++ b/modules/process/manager_exec.go
@@ -0,0 +1,79 @@
+// Copyright 2022 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package process
+
+import (
+ "bytes"
+ "context"
+ "io"
+ "os/exec"
+ "time"
+)
+
+// Exec a command and use the default timeout.
+func (pm *Manager) Exec(desc, cmdName string, args ...string) (string, string, error) {
+ return pm.ExecDir(DefaultContext, -1, "", desc, cmdName, args...)
+}
+
+// ExecTimeout a command and use a specific timeout duration.
+func (pm *Manager) ExecTimeout(timeout time.Duration, desc, cmdName string, args ...string) (string, string, error) {
+ return pm.ExecDir(DefaultContext, timeout, "", desc, cmdName, args...)
+}
+
+// ExecDir a command and use the default timeout.
+func (pm *Manager) ExecDir(ctx context.Context, timeout time.Duration, dir, desc, cmdName string, args ...string) (string, string, error) {
+ return pm.ExecDirEnv(ctx, timeout, dir, desc, nil, cmdName, args...)
+}
+
+// ExecDirEnv runs a command in given path and environment variables, and waits for its completion
+// up to the given timeout (or DefaultTimeout if -1 is given).
+// Returns its complete stdout and stderr
+// outputs and an error, if any (including timeout)
+func (pm *Manager) ExecDirEnv(ctx context.Context, timeout time.Duration, dir, desc string, env []string, cmdName string, args ...string) (string, string, error) {
+ return pm.ExecDirEnvStdIn(ctx, timeout, dir, desc, env, nil, cmdName, args...)
+}
+
+// ExecDirEnvStdIn runs a command in given path and environment variables with provided stdIN, and waits for its completion
+// up to the given timeout (or DefaultTimeout if timeout <= 0 is given).
+// Returns its complete stdout and stderr
+// outputs and an error, if any (including timeout)
+func (pm *Manager) ExecDirEnvStdIn(ctx context.Context, timeout time.Duration, dir, desc string, env []string, stdIn io.Reader, cmdName string, args ...string) (string, string, error) {
+ if timeout <= 0 {
+ timeout = 60 * time.Second
+ }
+
+ stdOut := new(bytes.Buffer)
+ stdErr := new(bytes.Buffer)
+
+ ctx, _, finished := pm.AddContextTimeout(ctx, timeout, desc)
+ defer finished()
+
+ cmd := exec.CommandContext(ctx, cmdName, args...)
+ cmd.Dir = dir
+ cmd.Env = env
+ cmd.Stdout = stdOut
+ cmd.Stderr = stdErr
+ if stdIn != nil {
+ cmd.Stdin = stdIn
+ }
+
+ if err := cmd.Start(); err != nil {
+ return "", "", err
+ }
+
+ err := cmd.Wait()
+ if err != nil {
+ err = &Error{
+ PID: GetPID(ctx),
+ Description: desc,
+ Err: err,
+ CtxErr: ctx.Err(),
+ Stdout: stdOut.String(),
+ Stderr: stdErr.String(),
+ }
+ }
+
+ return stdOut.String(), stdErr.String(), err
+}
diff --git a/modules/process/manager_stacktraces.go b/modules/process/manager_stacktraces.go
new file mode 100644
index 0000000000..fbe3374b87
--- /dev/null
+++ b/modules/process/manager_stacktraces.go
@@ -0,0 +1,355 @@
+// Copyright 2022 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package process
+
+import (
+ "fmt"
+ "io"
+ "runtime/pprof"
+ "sort"
+ "time"
+
+ "github.com/google/pprof/profile"
+)
+
+// StackEntry is an entry on a stacktrace
+type StackEntry struct {
+ Function string
+ File string
+ Line int
+}
+
+// Label represents a pprof label assigned to goroutine stack
+type Label struct {
+ Name string
+ Value string
+}
+
+// Stack is a stacktrace relating to a goroutine. (Multiple goroutines may have the same stacktrace)
+type Stack struct {
+ Count int64 // Number of goroutines with this stack trace
+ Description string
+ Labels []*Label `json:",omitempty"`
+ Entry []*StackEntry `json:",omitempty"`
+}
+
+// A Process is a combined representation of a Process and a Stacktrace for the goroutines associated with it
+type Process struct {
+ PID IDType
+ ParentPID IDType
+ Description string
+ Start time.Time
+ Type string
+
+ Children []*Process `json:",omitempty"`
+ Stacks []*Stack `json:",omitempty"`
+}
+
+// Processes gets the processes in a thread safe manner
+func (pm *Manager) Processes(flat, noSystem bool) ([]*Process, int) {
+ pm.mutex.Lock()
+ processCount := len(pm.processMap)
+ processes := make([]*Process, 0, len(pm.processMap))
+ if flat {
+ for _, process := range pm.processMap {
+ if noSystem && process.Type == SystemProcessType {
+ continue
+ }
+ processes = append(processes, process.toProcess())
+ }
+ } else {
+ // We need our own processMap
+ processMap := map[IDType]*Process{}
+ for _, internalProcess := range pm.processMap {
+ process, ok := processMap[internalProcess.PID]
+ if !ok {
+ process = internalProcess.toProcess()
+ processMap[process.PID] = process
+ }
+
+ // Check its parent
+ if process.ParentPID == "" {
+ processes = append(processes, process)
+ continue
+ }
+
+ internalParentProcess, ok := pm.processMap[internalProcess.ParentPID]
+ if ok {
+ parentProcess, ok := processMap[process.ParentPID]
+ if !ok {
+ parentProcess = internalParentProcess.toProcess()
+ processMap[parentProcess.PID] = parentProcess
+ }
+ parentProcess.Children = append(parentProcess.Children, process)
+ continue
+ }
+
+ processes = append(processes, process)
+ }
+ }
+ pm.mutex.Unlock()
+
+ if !flat && noSystem {
+ for i := 0; i < len(processes); i++ {
+ process := processes[i]
+ if process.Type != SystemProcessType {
+ continue
+ }
+ processes[len(processes)-1], processes[i] = processes[i], processes[len(processes)-1]
+ processes = append(processes[:len(processes)-1], process.Children...)
+ i--
+ }
+ }
+
+ // Sort by process' start time. Oldest process appears first.
+ sort.Slice(processes, func(i, j int) bool {
+ left, right := processes[i], processes[j]
+
+ return left.Start.Before(right.Start)
+ })
+
+ return processes, processCount
+}
+
+// ProcessStacktraces gets the processes and stacktraces in a thread safe manner
+func (pm *Manager) ProcessStacktraces(flat, noSystem bool) ([]*Process, int, int64, error) {
+ var stacks *profile.Profile
+ var err error
+
+ // We cannot use the pm.ProcessMap here because we will release the mutex ...
+ processMap := map[IDType]*Process{}
+ processCount := 0
+
+ // Lock the manager
+ pm.mutex.Lock()
+ processCount = len(pm.processMap)
+
+ // Add a defer to unlock in case there is a panic
+ unlocked := false
+ defer func() {
+ if !unlocked {
+ pm.mutex.Unlock()
+ }
+ }()
+
+ processes := make([]*Process, 0, len(pm.processMap))
+ if flat {
+ for _, internalProcess := range pm.processMap {
+ process := internalProcess.toProcess()
+ processMap[process.PID] = process
+ if noSystem && internalProcess.Type == SystemProcessType {
+ continue
+ }
+ processes = append(processes, process)
+ }
+ } else {
+ for _, internalProcess := range pm.processMap {
+ process, ok := processMap[internalProcess.PID]
+ if !ok {
+ process = internalProcess.toProcess()
+ processMap[process.PID] = process
+ }
+
+ // Check its parent
+ if process.ParentPID == "" {
+ processes = append(processes, process)
+ continue
+ }
+
+ internalParentProcess, ok := pm.processMap[internalProcess.ParentPID]
+ if ok {
+ parentProcess, ok := processMap[process.ParentPID]
+ if !ok {
+ parentProcess = internalParentProcess.toProcess()
+ processMap[parentProcess.PID] = parentProcess
+ }
+ parentProcess.Children = append(parentProcess.Children, process)
+ continue
+ }
+
+ processes = append(processes, process)
+ }
+ }
+
+ // Now from within the lock we need to get the goroutines.
+ // Why? If we release the lock then between between filling the above map and getting
+ // the stacktraces another process could be created which would then look like a dead process below
+ reader, writer := io.Pipe()
+ defer reader.Close()
+ go func() {
+ err := pprof.Lookup("goroutine").WriteTo(writer, 0)
+ _ = writer.CloseWithError(err)
+ }()
+ stacks, err = profile.Parse(reader)
+ if err != nil {
+ return nil, 0, 0, err
+ }
+
+ // Unlock the mutex
+ pm.mutex.Unlock()
+ unlocked = true
+
+ goroutineCount := int64(0)
+
+ // Now walk through the "Sample" slice in the goroutines stack
+ for _, sample := range stacks.Sample {
+ // In the "goroutine" pprof profile each sample represents one or more goroutines
+ // with the same labels and stacktraces.
+
+ // We will represent each goroutine by a `Stack`
+ stack := &Stack{}
+
+ // Add the non-process associated labels from the goroutine sample to the Stack
+ for name, value := range sample.Label {
+ if name == DescriptionPProfLabel || name == PIDPProfLabel || (!flat && name == PPIDPProfLabel) || name == ProcessTypePProfLabel {
+ continue
+ }
+
+ // Labels from the "goroutine" pprof profile only have one value.
+ // This is because the underlying representation is a map[string]string
+ if len(value) != 1 {
+ // Unexpected...
+ return nil, 0, 0, fmt.Errorf("label: %s in goroutine stack with unexpected number of values: %v", name, value)
+ }
+
+ stack.Labels = append(stack.Labels, &Label{Name: name, Value: value[0]})
+ }
+
+ // The number of goroutines that this sample represents is the `stack.Value[0]`
+ stack.Count = sample.Value[0]
+ goroutineCount += stack.Count
+
+ // Now we want to associate this Stack with a Process.
+ var process *Process
+
+ // Try to get the PID from the goroutine labels
+ if pidvalue, ok := sample.Label[PIDPProfLabel]; ok && len(pidvalue) == 1 {
+ pid := IDType(pidvalue[0])
+
+ // Now try to get the process from our map
+ process, ok = processMap[pid]
+ if !ok && pid != "" {
+ // This means that no process has been found in the process map - but there was a process PID
+ // Therefore this goroutine belongs to a dead process and it has escaped control of the process as it
+ // should have died with the process context cancellation.
+
+ // We need to create a dead process holder for this process and label it appropriately
+
+ // get the parent PID
+ ppid := IDType("")
+ if value, ok := sample.Label[PPIDPProfLabel]; ok && len(value) == 1 {
+ ppid = IDType(value[0])
+ }
+
+ // format the description
+ description := "(dead process)"
+ if value, ok := sample.Label[DescriptionPProfLabel]; ok && len(value) == 1 {
+ description = value[0] + " " + description
+ }
+
+ // override the type of the process to "code" but add the old type as a label on the first stack
+ ptype := NoneProcessType
+ if value, ok := sample.Label[ProcessTypePProfLabel]; ok && len(value) == 1 {
+ stack.Labels = append(stack.Labels, &Label{Name: ProcessTypePProfLabel, Value: value[0]})
+ }
+ process = &Process{
+ PID: pid,
+ ParentPID: ppid,
+ Description: description,
+ Type: ptype,
+ }
+
+ // Now add the dead process back to the map and tree so we don't go back through this again.
+ processMap[process.PID] = process
+ added := false
+ if process.ParentPID != "" && !flat {
+ if parent, ok := processMap[process.ParentPID]; ok {
+ parent.Children = append(parent.Children, process)
+ added = true
+ }
+ }
+ if !added {
+ processes = append(processes, process)
+ }
+ }
+ }
+
+ if process == nil {
+ // This means that the sample we're looking has no PID label
+ var ok bool
+ process, ok = processMap[""]
+ if !ok {
+ // this is the first time we've come acrross an unassociated goroutine so create a "process" to hold them
+ process = &Process{
+ Description: "(unassociated)",
+ Type: NoneProcessType,
+ }
+ processMap[process.PID] = process
+ processes = append(processes, process)
+ }
+ }
+
+ // The sample.Location represents a stack trace for this goroutine,
+ // however each Location can represent multiple lines (mostly due to inlining)
+ // so we need to walk the lines too
+ for _, location := range sample.Location {
+ for _, line := range location.Line {
+ entry := &StackEntry{
+ Function: line.Function.Name,
+ File: line.Function.Filename,
+ Line: int(line.Line),
+ }
+ stack.Entry = append(stack.Entry, entry)
+ }
+ }
+
+ // Now we need a short-descriptive name to call the stack trace if when it is folded and
+ // assuming the stack trace has some lines we'll choose the bottom of the stack (i.e. the
+ // initial function that started the stack trace.) The top of the stack is unlikely to
+ // be very helpful as a lot of the time it will be runtime.select or some other call into
+ // a std library.
+ stack.Description = "(unknown)"
+ if len(stack.Entry) > 0 {
+ stack.Description = stack.Entry[len(stack.Entry)-1].Function
+ }
+
+ process.Stacks = append(process.Stacks, stack)
+ }
+
+ // restrict to not show system processes
+ if noSystem {
+ for i := 0; i < len(processes); i++ {
+ process := processes[i]
+ if process.Type != SystemProcessType && process.Type != NoneProcessType {
+ continue
+ }
+ processes[len(processes)-1], processes[i] = processes[i], processes[len(processes)-1]
+ processes = append(processes[:len(processes)-1], process.Children...)
+ i--
+ }
+ }
+
+ // Now finally re-sort the processes. Newest process appears first
+ after := func(processes []*Process) func(i, j int) bool {
+ return func(i, j int) bool {
+ left, right := processes[i], processes[j]
+ return left.Start.After(right.Start)
+ }
+ }
+ sort.Slice(processes, after(processes))
+ if !flat {
+
+ var sortChildren func(process *Process)
+
+ sortChildren = func(process *Process) {
+ sort.Slice(process.Children, after(process.Children))
+ for _, child := range process.Children {
+ sortChildren(child)
+ }
+ }
+ }
+
+ return processes, processCount, goroutineCount, err
+}
diff --git a/modules/process/manager_test.go b/modules/process/manager_test.go
index 152c7a9235..30eabeb37a 100644
--- a/modules/process/manager_test.go
+++ b/modules/process/manager_test.go
@@ -22,7 +22,7 @@ func TestGetManager(t *testing.T) {
}
func TestManager_AddContext(t *testing.T) {
- pm := Manager{processes: make(map[IDType]*Process), next: 1}
+ pm := Manager{processMap: make(map[IDType]*process), next: 1}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -41,7 +41,7 @@ func TestManager_AddContext(t *testing.T) {
}
func TestManager_Cancel(t *testing.T) {
- pm := Manager{processes: make(map[IDType]*Process), next: 1}
+ pm := Manager{processMap: make(map[IDType]*process), next: 1}
ctx, _, finished := pm.AddContext(context.Background(), "foo")
defer finished()
@@ -69,7 +69,7 @@ func TestManager_Cancel(t *testing.T) {
}
func TestManager_Remove(t *testing.T) {
- pm := Manager{processes: make(map[IDType]*Process), next: 1}
+ pm := Manager{processMap: make(map[IDType]*process), next: 1}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -85,7 +85,7 @@ func TestManager_Remove(t *testing.T) {
pm.Remove(GetPID(p2Ctx))
- _, exists := pm.processes[GetPID(p2Ctx)]
+ _, exists := pm.processMap[GetPID(p2Ctx)]
assert.False(t, exists, "PID %d is in the list but shouldn't", GetPID(p2Ctx))
}
diff --git a/modules/process/process.go b/modules/process/process.go
index 662f878d7f..2f7ea18373 100644
--- a/modules/process/process.go
+++ b/modules/process/process.go
@@ -6,61 +6,34 @@ package process
import (
"context"
- "sync"
"time"
)
-// Process represents a working process inheriting from Gitea.
-type Process struct {
+var (
+ SystemProcessType = "system"
+ RequestProcessType = "request"
+ NormalProcessType = "normal"
+ NoneProcessType = "none"
+)
+
+// process represents a working process inheriting from Gitea.
+type process struct {
PID IDType // Process ID, not system one.
ParentPID IDType
Description string
Start time.Time
Cancel context.CancelFunc
-
- lock sync.Mutex
- children []*Process
-}
-
-// Children gets the children of the process
-// Note: this function will behave nicely even if p is nil
-func (p *Process) Children() (children []*Process) {
- if p == nil {
- return
- }
-
- p.lock.Lock()
- defer p.lock.Unlock()
- children = make([]*Process, len(p.children))
- copy(children, p.children)
- return children
+ Type string
}
-// AddChild adds a child process
-// Note: this function will behave nicely even if p is nil
-func (p *Process) AddChild(child *Process) {
- if p == nil {
- return
- }
-
- p.lock.Lock()
- defer p.lock.Unlock()
- p.children = append(p.children, child)
-}
-
-// RemoveChild removes a child process
-// Note: this function will behave nicely even if p is nil
-func (p *Process) RemoveChild(process *Process) {
- if p == nil {
- return
- }
-
- p.lock.Lock()
- defer p.lock.Unlock()
- for i, child := range p.children {
- if child == process {
- p.children = append(p.children[:i], p.children[i+1:]...)
- return
- }
+// ToProcess converts a process to a externally usable Process
+func (p *process) toProcess() *Process {
+ process := &Process{
+ PID: p.PID,
+ ParentPID: p.ParentPID,
+ Description: p.Description,
+ Start: p.Start,
+ Type: p.Type,
}
+ return process
}
diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go
index ead3828f33..99c6428abc 100644
--- a/modules/queue/queue_bytefifo.go
+++ b/modules/queue/queue_bytefifo.go
@@ -7,6 +7,7 @@ package queue
import (
"context"
"fmt"
+ "runtime/pprof"
"sync"
"sync/atomic"
"time"
@@ -20,7 +21,6 @@ import (
type ByteFIFOQueueConfiguration struct {
WorkerPoolConfiguration
Workers int
- Name string
WaitOnEmpty bool
}
@@ -153,6 +153,7 @@ func (q *ByteFIFOQueue) Flush(timeout time.Duration) error {
// Run runs the bytefifo queue
func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(func())) {
+ pprof.SetGoroutineLabels(q.baseCtx)
atShutdown(q.Shutdown)
atTerminate(q.Terminate)
log.Debug("%s: %s Starting", q.typ, q.name)
@@ -355,6 +356,7 @@ func (q *ByteFIFOQueue) Terminate() {
if err := q.byteFIFO.Close(); err != nil {
log.Error("Error whilst closing internal byte fifo in %s: %s: %v", q.typ, q.name, err)
}
+ q.baseCtxFinished()
log.Debug("%s: %s Terminated", q.typ, q.name)
}
diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go
index 5469c03100..028023d500 100644
--- a/modules/queue/queue_channel.go
+++ b/modules/queue/queue_channel.go
@@ -7,6 +7,7 @@ package queue
import (
"context"
"fmt"
+ "runtime/pprof"
"sync/atomic"
"time"
@@ -20,7 +21,6 @@ const ChannelQueueType Type = "channel"
type ChannelQueueConfiguration struct {
WorkerPoolConfiguration
Workers int
- Name string
}
// ChannelQueue implements Queue
@@ -84,6 +84,7 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro
// Run starts to run the queue
func (q *ChannelQueue) Run(atShutdown, atTerminate func(func())) {
+ pprof.SetGoroutineLabels(q.baseCtx)
atShutdown(q.Shutdown)
atTerminate(q.Terminate)
log.Debug("ChannelQueue: %s Starting", q.name)
@@ -169,6 +170,7 @@ func (q *ChannelQueue) Terminate() {
default:
}
q.terminateCtxCancel()
+ q.baseCtxFinished()
log.Debug("ChannelQueue: %s Terminated", q.name)
}
diff --git a/modules/queue/queue_channel_test.go b/modules/queue/queue_channel_test.go
index 26a635b918..d30b908861 100644
--- a/modules/queue/queue_channel_test.go
+++ b/modules/queue/queue_channel_test.go
@@ -34,9 +34,9 @@ func TestChannelQueue(t *testing.T) {
BlockTimeout: 1 * time.Second,
BoostTimeout: 5 * time.Minute,
BoostWorkers: 5,
+ Name: "TestChannelQueue",
},
Workers: 0,
- Name: "TestChannelQueue",
}, &testData{})
assert.NoError(t, err)
diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go
index 0494698e0e..014d93f5b5 100644
--- a/modules/queue/queue_disk_channel.go
+++ b/modules/queue/queue_disk_channel.go
@@ -7,6 +7,7 @@ package queue
import (
"context"
"fmt"
+ "runtime/pprof"
"sync"
"sync/atomic"
"time"
@@ -72,9 +73,9 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
BoostTimeout: config.BoostTimeout,
BoostWorkers: config.BoostWorkers,
MaxWorkers: config.MaxWorkers,
+ Name: config.Name + "-channel",
},
Workers: config.Workers,
- Name: config.Name + "-channel",
}, exemplar)
if err != nil {
return nil, err
@@ -90,9 +91,9 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
BoostTimeout: 5 * time.Minute,
BoostWorkers: 1,
MaxWorkers: 5,
+ Name: config.Name + "-level",
},
Workers: 0,
- Name: config.Name + "-level",
},
DataDir: config.DataDir,
}
@@ -154,6 +155,7 @@ func (q *PersistableChannelQueue) PushBack(data Data) error {
// Run starts to run the queue
func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) {
+ pprof.SetGoroutineLabels(q.channelQueue.baseCtx)
log.Debug("PersistableChannelQueue: %s Starting", q.delayedStarter.name)
_ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0)
diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go
index b7282e6c6c..6e8d37a20c 100644
--- a/modules/queue/unique_queue_channel.go
+++ b/modules/queue/unique_queue_channel.go
@@ -7,6 +7,7 @@ package queue
import (
"context"
"fmt"
+ "runtime/pprof"
"sync"
"sync/atomic"
"time"
@@ -97,6 +98,7 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
// Run starts to run the queue
func (q *ChannelUniqueQueue) Run(atShutdown, atTerminate func(func())) {
+ pprof.SetGoroutineLabels(q.baseCtx)
atShutdown(q.Shutdown)
atTerminate(q.Terminate)
log.Debug("ChannelUniqueQueue: %s Starting", q.name)
@@ -226,6 +228,7 @@ func (q *ChannelUniqueQueue) Terminate() {
default:
}
q.terminateCtxCancel()
+ q.baseCtxFinished()
log.Debug("ChannelUniqueQueue: %s Terminated", q.name)
}
diff --git a/modules/queue/unique_queue_channel_test.go b/modules/queue/unique_queue_channel_test.go
index ef6752079e..6daf3fc96e 100644
--- a/modules/queue/unique_queue_channel_test.go
+++ b/modules/queue/unique_queue_channel_test.go
@@ -32,9 +32,9 @@ func TestChannelUniqueQueue(t *testing.T) {
BlockTimeout: 1 * time.Second,
BoostTimeout: 5 * time.Minute,
BoostWorkers: 5,
+ Name: "TestChannelQueue",
},
Workers: 0,
- Name: "TestChannelQueue",
}, &testData{})
assert.NoError(t, err)
diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go
index 5ee1c396fc..6ab03094ba 100644
--- a/modules/queue/unique_queue_disk_channel.go
+++ b/modules/queue/unique_queue_disk_channel.go
@@ -6,6 +6,7 @@ package queue
import (
"context"
+ "runtime/pprof"
"sync"
"time"
@@ -72,9 +73,9 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac
BoostTimeout: config.BoostTimeout,
BoostWorkers: config.BoostWorkers,
MaxWorkers: config.MaxWorkers,
+ Name: config.Name + "-channel",
},
Workers: config.Workers,
- Name: config.Name + "-channel",
}, exemplar)
if err != nil {
return nil, err
@@ -90,9 +91,9 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac
BoostTimeout: 5 * time.Minute,
BoostWorkers: 1,
MaxWorkers: 5,
+ Name: config.Name + "-level",
},
Workers: 0,
- Name: config.Name + "-level",
},
DataDir: config.DataDir,
}
@@ -183,6 +184,7 @@ func (q *PersistableChannelUniqueQueue) Has(data Data) (bool, error) {
// Run starts to run the queue
func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func())) {
+ pprof.SetGoroutineLabels(q.channelQueue.baseCtx)
log.Debug("PersistableChannelUniqueQueue: %s Starting", q.delayedStarter.name)
q.lock.Lock()
@@ -301,6 +303,7 @@ func (q *PersistableChannelUniqueQueue) Terminate() {
if q.internal != nil {
q.internal.(*LevelUniqueQueue).Terminate()
}
+ q.channelQueue.baseCtxFinished()
log.Debug("PersistableChannelUniqueQueue: %s Terminated", q.delayedStarter.name)
}
diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go
index 5f6ec18710..2d8504598a 100644
--- a/modules/queue/workerpool.go
+++ b/modules/queue/workerpool.go
@@ -6,11 +6,14 @@ package queue
import (
"context"
+ "fmt"
+ "runtime/pprof"
"sync"
"sync/atomic"
"time"
"code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/util"
)
@@ -22,6 +25,7 @@ type WorkerPool struct {
lock sync.Mutex
baseCtx context.Context
baseCtxCancel context.CancelFunc
+ baseCtxFinished process.FinishedFunc
paused chan struct{}
resumed chan struct{}
cond *sync.Cond
@@ -44,6 +48,7 @@ var (
// WorkerPoolConfiguration is the basic configuration for a WorkerPool
type WorkerPoolConfiguration struct {
+ Name string
QueueLength int
BatchLength int
BlockTimeout time.Duration
@@ -54,12 +59,13 @@ type WorkerPoolConfiguration struct {
// NewWorkerPool creates a new worker pool
func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPool {
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel, finished := process.GetManager().AddTypedContext(context.Background(), fmt.Sprintf("Queue: %s", config.Name), process.SystemProcessType, false)
dataChan := make(chan Data, config.QueueLength)
pool := &WorkerPool{
baseCtx: ctx,
baseCtxCancel: cancel,
+ baseCtxFinished: finished,
batchLength: config.BatchLength,
dataChan: dataChan,
resumed: closedChan,
@@ -299,6 +305,7 @@ func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc,
p.numberOfWorkers++
p.lock.Unlock()
go func() {
+ pprof.SetGoroutineLabels(ctx)
p.doWork(ctx)
p.lock.Lock()
@@ -476,6 +483,7 @@ func (p *WorkerPool) FlushWithContext(ctx context.Context) error {
}
func (p *WorkerPool) doWork(ctx context.Context) {
+ pprof.SetGoroutineLabels(ctx)
delay := time.Millisecond * 300
// Create a common timer - we will use this elsewhere
diff --git a/modules/ssh/ssh.go b/modules/ssh/ssh.go
index 1a92edb795..44ed431c93 100644
--- a/modules/ssh/ssh.go
+++ b/modules/ssh/ssh.go
@@ -23,7 +23,9 @@ import (
"syscall"
asymkey_model "code.gitea.io/gitea/models/asymkey"
+ "code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/util"
@@ -317,7 +319,11 @@ func Listen(host string, port int, ciphers, keyExchanges, macs []string) {
}
}
- go listen(&srv)
+ go func() {
+ _, _, finished := process.GetManager().AddTypedContext(graceful.GetManager().HammerContext(), "Service: Built-in SSH server", process.SystemProcessType, true)
+ defer finished()
+ listen(&srv)
+ }()
}
// GenKeyPair make a pair of public and private keys for SSH access.
diff --git a/modules/web/routing/logger_manager.go b/modules/web/routing/logger_manager.go
index cc434c338d..7715b0b5d3 100644
--- a/modules/web/routing/logger_manager.go
+++ b/modules/web/routing/logger_manager.go
@@ -11,6 +11,7 @@ import (
"time"
"code.gitea.io/gitea/modules/graceful"
+ "code.gitea.io/gitea/modules/process"
)
// Event indicates when the printer is triggered
@@ -40,7 +41,9 @@ type requestRecordsManager struct {
}
func (manager *requestRecordsManager) startSlowQueryDetector(threshold time.Duration) {
- go graceful.GetManager().RunWithShutdownContext(func(baseCtx context.Context) {
+ go graceful.GetManager().RunWithShutdownContext(func(ctx context.Context) {
+ ctx, _, finished := process.GetManager().AddTypedContext(ctx, "Service: SlowQueryDetector", process.SystemProcessType, true)
+ defer finished()
// This go-routine checks all active requests every second.
// If a request has been running for a long time (eg: /user/events), we also print a log with "still-executing" message
// After the "still-executing" log is printed, the record will be removed from the map to prevent from duplicated logs in future
@@ -49,7 +52,7 @@ func (manager *requestRecordsManager) startSlowQueryDetector(threshold time.Dura
t := time.NewTicker(time.Second)
for {
select {
- case <-baseCtx.Done():
+ case <-ctx.Done():
return
case <-t.C:
now := time.Now()