aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--modules/graceful/context.go43
-rw-r--r--modules/graceful/manager.go62
-rw-r--r--modules/graceful/manager_unix.go1
-rw-r--r--modules/graceful/manager_windows.go1
-rw-r--r--modules/indexer/code/indexer.go6
-rw-r--r--modules/indexer/issues/indexer.go71
-rw-r--r--modules/indexer/stats/queue.go8
-rw-r--r--modules/log/event_writer_base.go11
-rw-r--r--modules/log/event_writer_conn_test.go2
-rw-r--r--modules/log/init.go19
-rw-r--r--modules/log/logger_impl.go4
-rw-r--r--modules/log/logger_test.go8
-rw-r--r--modules/log/manager.go4
-rw-r--r--modules/mirror/mirror.go8
-rw-r--r--modules/notification/ui/ui.go7
-rw-r--r--modules/process/manager.go11
-rw-r--r--modules/queue/manager.go12
-rw-r--r--modules/queue/manager_test.go6
-rw-r--r--modules/queue/workergroup.go24
-rw-r--r--modules/queue/workerqueue.go33
-rw-r--r--modules/queue/workerqueue_test.go30
-rw-r--r--routers/web/admin/queue_tester.go15
-rw-r--r--services/actions/init.go8
-rw-r--r--services/automerge/automerge.go6
-rw-r--r--services/mailer/mailer.go12
-rw-r--r--services/pull/check.go6
-rw-r--r--services/pull/check_test.go18
-rw-r--r--services/repository/archiver/archiver.go9
-rw-r--r--services/repository/push.go7
-rw-r--r--services/task/task.go9
-rw-r--r--services/webhook/deliver.go6
31 files changed, 204 insertions, 263 deletions
diff --git a/modules/graceful/context.go b/modules/graceful/context.go
index 6b5b207ff3..4fcbcb04b6 100644
--- a/modules/graceful/context.go
+++ b/modules/graceful/context.go
@@ -5,51 +5,8 @@ package graceful
import (
"context"
- "time"
)
-// ChannelContext is a context that wraps a channel and error as a context
-type ChannelContext struct {
- done <-chan struct{}
- err error
-}
-
-// NewChannelContext creates a ChannelContext from a channel and error
-func NewChannelContext(done <-chan struct{}, err error) *ChannelContext {
- return &ChannelContext{
- done: done,
- err: err,
- }
-}
-
-// Deadline returns the time when work done on behalf of this context
-// should be canceled. There is no Deadline for a ChannelContext
-func (ctx *ChannelContext) Deadline() (deadline time.Time, ok bool) {
- return deadline, ok
-}
-
-// Done returns the channel provided at the creation of this context.
-// When closed, work done on behalf of this context should be canceled.
-func (ctx *ChannelContext) Done() <-chan struct{} {
- return ctx.done
-}
-
-// Err returns nil, if Done is not closed. If Done is closed,
-// Err returns the error provided at the creation of this context
-func (ctx *ChannelContext) Err() error {
- select {
- case <-ctx.done:
- return ctx.err
- default:
- return nil
- }
-}
-
-// Value returns nil for all calls as no values are or can be associated with this context
-func (ctx *ChannelContext) Value(key interface{}) interface{} {
- return nil
-}
-
// ShutdownContext returns a context.Context that is Done at shutdown
// Callers using this context should ensure that they are registered as a running server
// in order that they are waited for.
diff --git a/modules/graceful/manager.go b/modules/graceful/manager.go
index d32788092d..3604c0a3f5 100644
--- a/modules/graceful/manager.go
+++ b/modules/graceful/manager.go
@@ -23,6 +23,11 @@ const (
stateTerminate
)
+type RunCanceler interface {
+ Run()
+ Cancel()
+}
+
// There are some places that could inherit sockets:
//
// * HTTP or HTTPS main listener
@@ -55,46 +60,19 @@ func InitManager(ctx context.Context) {
})
}
-// WithCallback is a runnable to call when the caller has finished
-type WithCallback func(callback func())
-
-// RunnableWithShutdownFns is a runnable with functions to run at shutdown and terminate
-// After the callback to atShutdown is called and is complete, the main function must return.
-// Similarly the callback function provided to atTerminate must return once termination is complete.
-// Please note that use of the atShutdown and atTerminate callbacks will create go-routines that will wait till their respective signals
-// - users must therefore be careful to only call these as necessary.
-type RunnableWithShutdownFns func(atShutdown, atTerminate func(func()))
-
-// RunWithShutdownFns takes a function that has both atShutdown and atTerminate callbacks
-// After the callback to atShutdown is called and is complete, the main function must return.
-// Similarly the callback function provided to atTerminate must return once termination is complete.
-// Please note that use of the atShutdown and atTerminate callbacks will create go-routines that will wait till their respective signals
-// - users must therefore be careful to only call these as necessary.
-func (g *Manager) RunWithShutdownFns(run RunnableWithShutdownFns) {
+// RunWithCancel helps to run a function with a custom context, the Cancel function will be called at shutdown
+// The Cancel function should stop the Run function in predictable time.
+func (g *Manager) RunWithCancel(rc RunCanceler) {
+ g.RunAtShutdown(context.Background(), rc.Cancel)
g.runningServerWaitGroup.Add(1)
defer g.runningServerWaitGroup.Done()
defer func() {
if err := recover(); err != nil {
- log.Critical("PANIC during RunWithShutdownFns: %v\nStacktrace: %s", err, log.Stack(2))
+ log.Critical("PANIC during RunWithCancel: %v\nStacktrace: %s", err, log.Stack(2))
g.doShutdown()
}
}()
- run(func(atShutdown func()) {
- g.lock.Lock()
- defer g.lock.Unlock()
- g.toRunAtShutdown = append(g.toRunAtShutdown,
- func() {
- defer func() {
- if err := recover(); err != nil {
- log.Critical("PANIC during RunWithShutdownFns: %v\nStacktrace: %s", err, log.Stack(2))
- g.doShutdown()
- }
- }()
- atShutdown()
- })
- }, func(atTerminate func()) {
- g.RunAtTerminate(atTerminate)
- })
+ rc.Run()
}
// RunWithShutdownContext takes a function that has a context to watch for shutdown.
@@ -151,21 +129,6 @@ func (g *Manager) RunAtShutdown(ctx context.Context, shutdown func()) {
})
}
-// RunAtHammer creates a go-routine to run the provided function at shutdown
-func (g *Manager) RunAtHammer(hammer func()) {
- g.lock.Lock()
- defer g.lock.Unlock()
- g.toRunAtHammer = append(g.toRunAtHammer,
- func() {
- defer func() {
- if err := recover(); err != nil {
- log.Critical("PANIC during RunAtHammer: %v\nStacktrace: %s", err, log.Stack(2))
- }
- }()
- hammer()
- })
-}
-
func (g *Manager) doShutdown() {
if !g.setStateTransition(stateRunning, stateShuttingDown) {
g.DoImmediateHammer()
@@ -206,9 +169,6 @@ func (g *Manager) doHammerTime(d time.Duration) {
g.hammerCtxCancel()
atHammerCtx := pprof.WithLabels(g.terminateCtx, pprof.Labels("graceful-lifecycle", "post-hammer"))
pprof.SetGoroutineLabels(atHammerCtx)
- for _, fn := range g.toRunAtHammer {
- go fn()
- }
}
g.lock.Unlock()
}
diff --git a/modules/graceful/manager_unix.go b/modules/graceful/manager_unix.go
index f949abfd21..b1fd6da76d 100644
--- a/modules/graceful/manager_unix.go
+++ b/modules/graceful/manager_unix.go
@@ -41,7 +41,6 @@ type Manager struct {
terminateWaitGroup sync.WaitGroup
toRunAtShutdown []func()
- toRunAtHammer []func()
toRunAtTerminate []func()
}
diff --git a/modules/graceful/manager_windows.go b/modules/graceful/manager_windows.go
index f3606084aa..f676f86d04 100644
--- a/modules/graceful/manager_windows.go
+++ b/modules/graceful/manager_windows.go
@@ -50,7 +50,6 @@ type Manager struct {
shutdownRequested chan struct{}
toRunAtShutdown []func()
- toRunAtHammer []func()
toRunAtTerminate []func()
}
diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go
index e9b8e76500..f38fd6000c 100644
--- a/modules/indexer/code/indexer.go
+++ b/modules/indexer/code/indexer.go
@@ -166,7 +166,7 @@ func Init() {
handler := func(items ...*IndexerData) (unhandled []*IndexerData) {
idx, err := indexer.get()
if idx == nil || err != nil {
- log.Error("Codes indexer handler: unable to get indexer!")
+ log.Warn("Codes indexer handler: indexer is not ready, retry later.")
return items
}
@@ -201,7 +201,7 @@ func Init() {
return unhandled
}
- indexerQueue = queue.CreateUniqueQueue("code_indexer", handler)
+ indexerQueue = queue.CreateUniqueQueue(ctx, "code_indexer", handler)
if indexerQueue == nil {
log.Fatal("Unable to create codes indexer queue")
}
@@ -259,7 +259,7 @@ func Init() {
indexer.set(rIndexer)
// Start processing the queue
- go graceful.GetManager().RunWithShutdownFns(indexerQueue.Run)
+ go graceful.GetManager().RunWithCancel(indexerQueue)
if populate {
go graceful.GetManager().RunWithShutdownContext(populateRepoIndexer)
diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go
index 76ff80ffca..f36ea10935 100644
--- a/modules/indexer/issues/indexer.go
+++ b/modules/indexer/issues/indexer.go
@@ -102,7 +102,7 @@ var (
func InitIssueIndexer(syncReindex bool) {
ctx, _, finished := process.GetManager().AddTypedContext(context.Background(), "Service: IssueIndexer", process.SystemProcessType, false)
- waitChannel := make(chan time.Duration, 1)
+ indexerInitWaitChannel := make(chan time.Duration, 1)
// Create the Queue
switch setting.Indexer.IssueType {
@@ -110,7 +110,7 @@ func InitIssueIndexer(syncReindex bool) {
handler := func(items ...*IndexerData) (unhandled []*IndexerData) {
indexer := holder.get()
if indexer == nil {
- log.Error("Issue indexer handler: unable to get indexer.")
+ log.Warn("Issue indexer handler: indexer is not ready, retry later.")
return items
}
toIndex := make([]*IndexerData, 0, len(items))
@@ -138,15 +138,17 @@ func InitIssueIndexer(syncReindex bool) {
return unhandled
}
- issueIndexerQueue = queue.CreateSimpleQueue("issue_indexer", handler)
+ issueIndexerQueue = queue.CreateSimpleQueue(ctx, "issue_indexer", handler)
if issueIndexerQueue == nil {
log.Fatal("Unable to create issue indexer queue")
}
default:
- issueIndexerQueue = queue.CreateSimpleQueue[*IndexerData]("issue_indexer", nil)
+ issueIndexerQueue = queue.CreateSimpleQueue[*IndexerData](ctx, "issue_indexer", nil)
}
+ graceful.GetManager().RunAtTerminate(finished)
+
// Create the Indexer
go func() {
pprof.SetGoroutineLabels(ctx)
@@ -178,51 +180,41 @@ func InitIssueIndexer(syncReindex bool) {
if issueIndexer != nil {
issueIndexer.Close()
}
- finished()
log.Info("PID: %d Issue Indexer closed", os.Getpid())
})
log.Debug("Created Bleve Indexer")
case "elasticsearch":
- graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(func())) {
- pprof.SetGoroutineLabels(ctx)
- issueIndexer, err := NewElasticSearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueIndexerName)
- if err != nil {
- log.Fatal("Unable to initialize Elastic Search Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err)
- }
- exist, err := issueIndexer.Init()
- if err != nil {
- log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err)
- }
- populate = !exist
- holder.set(issueIndexer)
- atTerminate(finished)
- })
+ issueIndexer, err := NewElasticSearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueIndexerName)
+ if err != nil {
+ log.Fatal("Unable to initialize Elastic Search Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err)
+ }
+ exist, err := issueIndexer.Init()
+ if err != nil {
+ log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err)
+ }
+ populate = !exist
+ holder.set(issueIndexer)
case "db":
issueIndexer := &DBIndexer{}
holder.set(issueIndexer)
- graceful.GetManager().RunAtTerminate(finished)
case "meilisearch":
- graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(func())) {
- pprof.SetGoroutineLabels(ctx)
- issueIndexer, err := NewMeilisearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueConnAuth, setting.Indexer.IssueIndexerName)
- if err != nil {
- log.Fatal("Unable to initialize Meilisearch Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err)
- }
- exist, err := issueIndexer.Init()
- if err != nil {
- log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err)
- }
- populate = !exist
- holder.set(issueIndexer)
- atTerminate(finished)
- })
+ issueIndexer, err := NewMeilisearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueConnAuth, setting.Indexer.IssueIndexerName)
+ if err != nil {
+ log.Fatal("Unable to initialize Meilisearch Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err)
+ }
+ exist, err := issueIndexer.Init()
+ if err != nil {
+ log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err)
+ }
+ populate = !exist
+ holder.set(issueIndexer)
default:
holder.cancel()
log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType)
}
// Start processing the queue
- go graceful.GetManager().RunWithShutdownFns(issueIndexerQueue.Run)
+ go graceful.GetManager().RunWithCancel(issueIndexerQueue)
// Populate the index
if populate {
@@ -232,13 +224,14 @@ func InitIssueIndexer(syncReindex bool) {
go graceful.GetManager().RunWithShutdownContext(populateIssueIndexer)
}
}
- waitChannel <- time.Since(start)
- close(waitChannel)
+
+ indexerInitWaitChannel <- time.Since(start)
+ close(indexerInitWaitChannel)
}()
if syncReindex {
select {
- case <-waitChannel:
+ case <-indexerInitWaitChannel:
case <-graceful.GetManager().IsShutdown():
}
} else if setting.Indexer.StartupTimeout > 0 {
@@ -249,7 +242,7 @@ func InitIssueIndexer(syncReindex bool) {
timeout += setting.GracefulHammerTime
}
select {
- case duration := <-waitChannel:
+ case duration := <-indexerInitWaitChannel:
log.Info("Issue Indexer Initialization took %v", duration)
case <-graceful.GetManager().IsShutdown():
log.Warn("Shutdown occurred before issue index initialisation was complete")
diff --git a/modules/indexer/stats/queue.go b/modules/indexer/stats/queue.go
index 46438925e4..d002bd57cf 100644
--- a/modules/indexer/stats/queue.go
+++ b/modules/indexer/stats/queue.go
@@ -29,13 +29,11 @@ func handler(items ...int64) []int64 {
}
func initStatsQueue() error {
- statsQueue = queue.CreateUniqueQueue("repo_stats_update", handler)
+ statsQueue = queue.CreateUniqueQueue(graceful.GetManager().ShutdownContext(), "repo_stats_update", handler)
if statsQueue == nil {
- return fmt.Errorf("Unable to create repo_stats_update Queue")
+ return fmt.Errorf("unable to create repo_stats_update queue")
}
-
- go graceful.GetManager().RunWithShutdownFns(statsQueue.Run)
-
+ go graceful.GetManager().RunWithCancel(statsQueue)
return nil
}
diff --git a/modules/log/event_writer_base.go b/modules/log/event_writer_base.go
index f61d9a7b9d..1d45d579c0 100644
--- a/modules/log/event_writer_base.go
+++ b/modules/log/event_writer_base.go
@@ -8,6 +8,7 @@ import (
"fmt"
"io"
"regexp"
+ "runtime/pprof"
"time"
)
@@ -143,9 +144,17 @@ func eventWriterStartGo(ctx context.Context, w EventWriter, shared bool) {
}
w.Base().shared = shared
w.Base().stopped = make(chan struct{})
+
+ ctxDesc := "Logger: EventWriter: " + w.GetWriterName()
+ if shared {
+ ctxDesc = "Logger: EventWriter (shared): " + w.GetWriterName()
+ }
+ writerCtx, writerCancel := newContext(ctx, ctxDesc)
go func() {
+ defer writerCancel()
defer close(w.Base().stopped)
- w.Run(ctx)
+ pprof.SetGoroutineLabels(writerCtx)
+ w.Run(writerCtx)
}()
}
diff --git a/modules/log/event_writer_conn_test.go b/modules/log/event_writer_conn_test.go
index e08ec025a3..69e87aa8c4 100644
--- a/modules/log/event_writer_conn_test.go
+++ b/modules/log/event_writer_conn_test.go
@@ -40,7 +40,7 @@ func TestConnLogger(t *testing.T) {
level := INFO
flags := LstdFlags | LUTC | Lfuncname
- logger := NewLoggerWithWriters(context.Background(), NewEventWriterConn("test-conn", WriterMode{
+ logger := NewLoggerWithWriters(context.Background(), "test", NewEventWriterConn("test-conn", WriterMode{
Level: level,
Prefix: prefix,
Flags: FlagsFromBits(flags),
diff --git a/modules/log/init.go b/modules/log/init.go
index 798ba86410..38a3ad60a5 100644
--- a/modules/log/init.go
+++ b/modules/log/init.go
@@ -4,14 +4,19 @@
package log
import (
+ "context"
"runtime"
"strings"
+ "sync/atomic"
"code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/util/rotatingfilewriter"
)
-var projectPackagePrefix string
+var (
+ projectPackagePrefix string
+ processTraceDisabled atomic.Int64
+)
func init() {
_, filename, _, _ := runtime.Caller(0)
@@ -24,6 +29,10 @@ func init() {
rotatingfilewriter.ErrorPrintf = FallbackErrorf
process.Trace = func(start bool, pid process.IDType, description string, parentPID process.IDType, typ string) {
+ // the logger manager has its own mutex lock, so it's safe to use "Load" here
+ if processTraceDisabled.Load() != 0 {
+ return
+ }
if start && parentPID != "" {
Log(1, TRACE, "Start %s: %s (from %s) (%s)", NewColoredValue(pid, FgHiYellow), description, NewColoredValue(parentPID, FgYellow), NewColoredValue(typ, Reset))
} else if start {
@@ -33,3 +42,11 @@ func init() {
}
}
}
+
+func newContext(parent context.Context, desc string) (ctx context.Context, cancel context.CancelFunc) {
+ // the "process manager" also calls "log.Trace()" to output logs, so if we want to create new contexts by the manager, we need to disable the trace temporarily
+ processTraceDisabled.Add(1)
+ defer processTraceDisabled.Add(-1)
+ ctx, _, cancel = process.GetManager().AddTypedContext(parent, desc, process.SystemProcessType, false)
+ return ctx, cancel
+}
diff --git a/modules/log/logger_impl.go b/modules/log/logger_impl.go
index c7e8fde3c0..abd72d326f 100644
--- a/modules/log/logger_impl.go
+++ b/modules/log/logger_impl.go
@@ -228,9 +228,9 @@ func (l *LoggerImpl) GetLevel() Level {
return Level(l.level.Load())
}
-func NewLoggerWithWriters(ctx context.Context, writer ...EventWriter) *LoggerImpl {
+func NewLoggerWithWriters(ctx context.Context, name string, writer ...EventWriter) *LoggerImpl {
l := &LoggerImpl{}
- l.ctx, l.ctxCancel = context.WithCancel(ctx)
+ l.ctx, l.ctxCancel = newContext(ctx, "Logger: "+name)
l.LevelLogger = BaseLoggerToGeneralLogger(l)
l.eventWriters = map[string]EventWriter{}
l.syncLevelInternal()
diff --git a/modules/log/logger_test.go b/modules/log/logger_test.go
index a91b4d23af..70222f64f5 100644
--- a/modules/log/logger_test.go
+++ b/modules/log/logger_test.go
@@ -53,7 +53,7 @@ func newDummyWriter(name string, level Level, delay time.Duration) *dummyWriter
}
func TestLogger(t *testing.T) {
- logger := NewLoggerWithWriters(context.Background())
+ logger := NewLoggerWithWriters(context.Background(), "test")
dump := logger.DumpWriters()
assert.EqualValues(t, 0, len(dump))
@@ -88,7 +88,7 @@ func TestLogger(t *testing.T) {
}
func TestLoggerPause(t *testing.T) {
- logger := NewLoggerWithWriters(context.Background())
+ logger := NewLoggerWithWriters(context.Background(), "test")
w1 := newDummyWriter("dummy-1", DEBUG, 0)
logger.AddWriters(w1)
@@ -117,7 +117,7 @@ func (t testLogString) LogString() string {
}
func TestLoggerLogString(t *testing.T) {
- logger := NewLoggerWithWriters(context.Background())
+ logger := NewLoggerWithWriters(context.Background(), "test")
w1 := newDummyWriter("dummy-1", DEBUG, 0)
w1.Mode.Colorize = true
@@ -130,7 +130,7 @@ func TestLoggerLogString(t *testing.T) {
}
func TestLoggerExpressionFilter(t *testing.T) {
- logger := NewLoggerWithWriters(context.Background())
+ logger := NewLoggerWithWriters(context.Background(), "test")
w1 := newDummyWriter("dummy-1", DEBUG, 0)
w1.Mode.Expression = "foo.*"
diff --git a/modules/log/manager.go b/modules/log/manager.go
index bbaef7eb20..b5d6cbf8e1 100644
--- a/modules/log/manager.go
+++ b/modules/log/manager.go
@@ -39,7 +39,7 @@ func (m *LoggerManager) GetLogger(name string) *LoggerImpl {
logger := m.loggers[name]
if logger == nil {
- logger = NewLoggerWithWriters(m.ctx)
+ logger = NewLoggerWithWriters(m.ctx, name)
m.loggers[name] = logger
if name == DEFAULT {
m.defaultLogger.Store(logger)
@@ -137,6 +137,6 @@ func GetManager() *LoggerManager {
func NewManager() *LoggerManager {
m := &LoggerManager{writers: map[string]EventWriter{}, loggers: map[string]*LoggerImpl{}}
- m.ctx, m.ctxCancel = context.WithCancel(context.Background())
+ m.ctx, m.ctxCancel = newContext(context.Background(), "LoggerManager")
return m
}
diff --git a/modules/mirror/mirror.go b/modules/mirror/mirror.go
index 73e591adba..0d9a624730 100644
--- a/modules/mirror/mirror.go
+++ b/modules/mirror/mirror.go
@@ -33,9 +33,11 @@ func StartSyncMirrors(queueHandle func(data ...*SyncRequest) []*SyncRequest) {
if !setting.Mirror.Enabled {
return
}
- mirrorQueue = queue.CreateUniqueQueue("mirror", queueHandle)
-
- go graceful.GetManager().RunWithShutdownFns(mirrorQueue.Run)
+ mirrorQueue = queue.CreateUniqueQueue(graceful.GetManager().ShutdownContext(), "mirror", queueHandle)
+ if mirrorQueue == nil {
+ log.Fatal("Unable to create mirror queue")
+ }
+ go graceful.GetManager().RunWithCancel(mirrorQueue)
}
// AddPullMirrorToQueue adds repoID to mirror queue
diff --git a/modules/notification/ui/ui.go b/modules/notification/ui/ui.go
index a4576b6791..6c5f22c122 100644
--- a/modules/notification/ui/ui.go
+++ b/modules/notification/ui/ui.go
@@ -37,7 +37,10 @@ var _ base.Notifier = &notificationService{}
// NewNotifier create a new notificationService notifier
func NewNotifier() base.Notifier {
ns := &notificationService{}
- ns.issueQueue = queue.CreateSimpleQueue("notification-service", handler)
+ ns.issueQueue = queue.CreateSimpleQueue(graceful.GetManager().ShutdownContext(), "notification-service", handler)
+ if ns.issueQueue == nil {
+ log.Fatal("Unable to create notification-service queue")
+ }
return ns
}
@@ -51,7 +54,7 @@ func handler(items ...issueNotificationOpts) []issueNotificationOpts {
}
func (ns *notificationService) Run() {
- go graceful.GetManager().RunWithShutdownFns(ns.issueQueue.Run)
+ go graceful.GetManager().RunWithCancel(ns.issueQueue) // TODO: using "go" here doesn't seem right, just leave it as old code
}
func (ns *notificationService) NotifyCreateIssueComment(ctx context.Context, doer *user_model.User, repo *repo_model.Repository,
diff --git a/modules/process/manager.go b/modules/process/manager.go
index fdfca3db7a..25d503c594 100644
--- a/modules/process/manager.go
+++ b/modules/process/manager.go
@@ -167,6 +167,7 @@ func (pm *Manager) Add(ctx context.Context, description string, cancel context.C
pm.processMap[pid] = process
pm.mutex.Unlock()
+
Trace(true, pid, description, parentPID, processType)
pprofCtx := pprof.WithLabels(ctx, pprof.Labels(DescriptionPProfLabel, description, PPIDPProfLabel, string(parentPID), PIDPProfLabel, string(pid), ProcessTypePProfLabel, processType))
@@ -200,10 +201,16 @@ func (pm *Manager) nextPID() (start time.Time, pid IDType) {
}
func (pm *Manager) remove(process *process) {
+ deleted := false
+
pm.mutex.Lock()
- defer pm.mutex.Unlock()
- if p := pm.processMap[process.PID]; p == process {
+ if pm.processMap[process.PID] == process {
delete(pm.processMap, process.PID)
+ deleted = true
+ }
+ pm.mutex.Unlock()
+
+ if deleted {
Trace(false, process.PID, process.Description, process.ParentPID, process.Type)
}
}
diff --git a/modules/queue/manager.go b/modules/queue/manager.go
index 95b3bad57b..8b964c0c28 100644
--- a/modules/queue/manager.go
+++ b/modules/queue/manager.go
@@ -88,22 +88,22 @@ func (m *Manager) FlushAll(ctx context.Context, timeout time.Duration) error {
}
// CreateSimpleQueue creates a simple queue from global setting config provider by name
-func CreateSimpleQueue[T any](name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T] {
- return createWorkerPoolQueue(name, setting.CfgProvider, handler, false)
+func CreateSimpleQueue[T any](ctx context.Context, name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T] {
+ return createWorkerPoolQueue(ctx, name, setting.CfgProvider, handler, false)
}
// CreateUniqueQueue creates a unique queue from global setting config provider by name
-func CreateUniqueQueue[T any](name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T] {
- return createWorkerPoolQueue(name, setting.CfgProvider, handler, true)
+func CreateUniqueQueue[T any](ctx context.Context, name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T] {
+ return createWorkerPoolQueue(ctx, name, setting.CfgProvider, handler, true)
}
-func createWorkerPoolQueue[T any](name string, cfgProvider setting.ConfigProvider, handler HandlerFuncT[T], unique bool) *WorkerPoolQueue[T] {
+func createWorkerPoolQueue[T any](ctx context.Context, name string, cfgProvider setting.ConfigProvider, handler HandlerFuncT[T], unique bool) *WorkerPoolQueue[T] {
queueSetting, err := setting.GetQueueSettings(cfgProvider, name)
if err != nil {
log.Error("Failed to get queue settings for %q: %v", name, err)
return nil
}
- w, err := NewWorkerPoolQueueBySetting(name, queueSetting, handler, unique)
+ w, err := NewWorkerPoolQueueWithContext(ctx, name, queueSetting, handler, unique)
if err != nil {
log.Error("Failed to create queue %q: %v", name, err)
return nil
diff --git a/modules/queue/manager_test.go b/modules/queue/manager_test.go
index 50265e27b6..1fd29f813f 100644
--- a/modules/queue/manager_test.go
+++ b/modules/queue/manager_test.go
@@ -29,7 +29,7 @@ func TestManager(t *testing.T) {
if err != nil {
return nil, err
}
- return NewWorkerPoolQueueBySetting(name, qs, func(s ...int) (unhandled []int) { return nil }, false)
+ return newWorkerPoolQueueForTest(name, qs, func(s ...int) (unhandled []int) { return nil }, false)
}
// test invalid CONN_STR
@@ -80,7 +80,7 @@ MAX_WORKERS = 2
assert.NoError(t, err)
- q1 := createWorkerPoolQueue[string]("no-such", cfgProvider, nil, false)
+ q1 := createWorkerPoolQueue[string](context.Background(), "no-such", cfgProvider, nil, false)
assert.Equal(t, "no-such", q1.GetName())
assert.Equal(t, "dummy", q1.GetType()) // no handler, so it becomes dummy
assert.Equal(t, filepath.Join(setting.AppDataPath, "queues/dir1"), q1.baseConfig.DataFullDir)
@@ -96,7 +96,7 @@ MAX_WORKERS = 2
assert.Equal(t, "string", q1.GetItemTypeName())
qid1 := GetManager().qidCounter
- q2 := createWorkerPoolQueue("sub", cfgProvider, func(s ...int) (unhandled []int) { return nil }, false)
+ q2 := createWorkerPoolQueue(context.Background(), "sub", cfgProvider, func(s ...int) (unhandled []int) { return nil }, false)
assert.Equal(t, "sub", q2.GetName())
assert.Equal(t, "level", q2.GetType())
assert.Equal(t, filepath.Join(setting.AppDataPath, "queues/dir2"), q2.baseConfig.DataFullDir)
diff --git a/modules/queue/workergroup.go b/modules/queue/workergroup.go
index 7127ea1117..147a4f335e 100644
--- a/modules/queue/workergroup.go
+++ b/modules/queue/workergroup.go
@@ -5,6 +5,7 @@ package queue
import (
"context"
+ "runtime/pprof"
"sync"
"sync/atomic"
"time"
@@ -13,9 +14,10 @@ import (
)
var (
- infiniteTimerC = make(chan time.Time)
- batchDebounceDuration = 100 * time.Millisecond
- workerIdleDuration = 1 * time.Second
+ infiniteTimerC = make(chan time.Time)
+ batchDebounceDuration = 100 * time.Millisecond
+ workerIdleDuration = 1 * time.Second
+ shutdownDefaultTimeout = 2 * time.Second
unhandledItemRequeueDuration atomic.Int64 // to avoid data race during test
)
@@ -116,13 +118,15 @@ func (q *WorkerPoolQueue[T]) doWorkerHandle(batch []T) {
// If the queue is shutting down, it returns true and try to push the items
// Otherwise it does nothing and returns false
func (q *WorkerPoolQueue[T]) basePushForShutdown(items ...T) bool {
- ctxShutdown := q.ctxShutdown.Load()
- if ctxShutdown == nil {
+ shutdownTimeout := time.Duration(q.shutdownTimeout.Load())
+ if shutdownTimeout == 0 {
return false
}
+ ctxShutdown, ctxShutdownCancel := context.WithTimeout(context.Background(), shutdownTimeout)
+ defer ctxShutdownCancel()
for _, item := range items {
// if there is still any error, the queue can do nothing instead of losing the items
- if err := q.baseQueue.PushItem(*ctxShutdown, q.marshal(item)); err != nil {
+ if err := q.baseQueue.PushItem(ctxShutdown, q.marshal(item)); err != nil {
log.Error("Failed to requeue item for queue %q when shutting down: %v", q.GetName(), err)
}
}
@@ -246,6 +250,8 @@ var skipFlushChan = make(chan flushType) // an empty flush chan, used to skip re
// doRun is the main loop of the queue. All related "doXxx" functions are executed in its context.
func (q *WorkerPoolQueue[T]) doRun() {
+ pprof.SetGoroutineLabels(q.ctxRun)
+
log.Debug("Queue %q starts running", q.GetName())
defer log.Debug("Queue %q stops running", q.GetName())
@@ -271,8 +277,8 @@ func (q *WorkerPoolQueue[T]) doRun() {
}
}
- ctxShutdownPtr := q.ctxShutdown.Load()
- if ctxShutdownPtr != nil {
+ shutdownTimeout := time.Duration(q.shutdownTimeout.Load())
+ if shutdownTimeout != 0 {
// if there is a shutdown context, try to push the items back to the base queue
q.basePushForShutdown(unhandled...)
workerDone := make(chan struct{})
@@ -280,7 +286,7 @@ func (q *WorkerPoolQueue[T]) doRun() {
go func() { wg.wg.Wait(); close(workerDone) }()
select {
case <-workerDone:
- case <-(*ctxShutdownPtr).Done():
+ case <-time.After(shutdownTimeout):
log.Error("Queue %q is shutting down, but workers are still running after timeout", q.GetName())
}
} else {
diff --git a/modules/queue/workerqueue.go b/modules/queue/workerqueue.go
index 5695c6cc23..e0d5183bd9 100644
--- a/modules/queue/workerqueue.go
+++ b/modules/queue/workerqueue.go
@@ -10,9 +10,9 @@ import (
"sync/atomic"
"time"
- "code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/setting"
)
@@ -21,8 +21,9 @@ import (
type WorkerPoolQueue[T any] struct {
ctxRun context.Context
ctxRunCancel context.CancelFunc
- ctxShutdown atomic.Pointer[context.Context]
- shutdownDone chan struct{}
+
+ shutdownDone chan struct{}
+ shutdownTimeout atomic.Int64 // in case some buggy handlers (workers) would hang forever, "shutdown" should finish in predictable time
origHandler HandlerFuncT[T]
safeHandler HandlerFuncT[T]
@@ -175,22 +176,19 @@ func (q *WorkerPoolQueue[T]) Has(data T) (bool, error) {
return q.baseQueue.HasItem(q.ctxRun, q.marshal(data))
}
-func (q *WorkerPoolQueue[T]) Run(atShutdown, atTerminate func(func())) {
- atShutdown(func() {
- // in case some queue handlers are slow or have hanging bugs, at most wait for a short time
- q.ShutdownWait(1 * time.Second)
- })
+func (q *WorkerPoolQueue[T]) Run() {
q.doRun()
}
+func (q *WorkerPoolQueue[T]) Cancel() {
+ q.ctxRunCancel()
+}
+
// ShutdownWait shuts down the queue, waits for all workers to finish their jobs, and pushes the unhandled items back to the base queue
// It waits for all workers (handlers) to finish their jobs, in case some buggy handlers would hang forever, a reasonable timeout is needed
func (q *WorkerPoolQueue[T]) ShutdownWait(timeout time.Duration) {
- shutdownCtx, shutdownCtxCancel := context.WithTimeout(context.Background(), timeout)
- defer shutdownCtxCancel()
- if q.ctxShutdown.CompareAndSwap(nil, &shutdownCtx) {
- q.ctxRunCancel()
- }
+ q.shutdownTimeout.Store(int64(timeout))
+ q.ctxRunCancel()
<-q.shutdownDone
}
@@ -207,7 +205,11 @@ func getNewQueueFn(t string) (string, func(cfg *BaseConfig, unique bool) (baseQu
}
}
-func NewWorkerPoolQueueBySetting[T any](name string, queueSetting setting.QueueSettings, handler HandlerFuncT[T], unique bool) (*WorkerPoolQueue[T], error) {
+func newWorkerPoolQueueForTest[T any](name string, queueSetting setting.QueueSettings, handler HandlerFuncT[T], unique bool) (*WorkerPoolQueue[T], error) {
+ return NewWorkerPoolQueueWithContext(context.Background(), name, queueSetting, handler, unique)
+}
+
+func NewWorkerPoolQueueWithContext[T any](ctx context.Context, name string, queueSetting setting.QueueSettings, handler HandlerFuncT[T], unique bool) (*WorkerPoolQueue[T], error) {
if handler == nil {
log.Debug("Use dummy queue for %q because handler is nil and caller doesn't want to process the queue items", name)
queueSetting.Type = "dummy"
@@ -224,10 +226,11 @@ func NewWorkerPoolQueueBySetting[T any](name string, queueSetting setting.QueueS
}
log.Trace("Created queue %q of type %q", name, queueType)
- w.ctxRun, w.ctxRunCancel = context.WithCancel(graceful.GetManager().ShutdownContext())
+ w.ctxRun, _, w.ctxRunCancel = process.GetManager().AddTypedContext(ctx, "Queue: "+w.GetName(), process.SystemProcessType, false)
w.batchChan = make(chan []T)
w.flushChan = make(chan flushType)
w.shutdownDone = make(chan struct{})
+ w.shutdownTimeout.Store(int64(shutdownDefaultTimeout))
w.workerMaxNum = queueSetting.MaxWorkers
w.batchLength = queueSetting.BatchLength
diff --git a/modules/queue/workerqueue_test.go b/modules/queue/workerqueue_test.go
index da9451cd77..e60120162a 100644
--- a/modules/queue/workerqueue_test.go
+++ b/modules/queue/workerqueue_test.go
@@ -16,17 +16,9 @@ import (
)
func runWorkerPoolQueue[T any](q *WorkerPoolQueue[T]) func() {
- var stop func()
- started := make(chan struct{})
- stopped := make(chan struct{})
- go func() {
- q.Run(func(f func()) { stop = f; close(started) }, nil)
- close(stopped)
- }()
- <-started
+ go q.Run()
return func() {
- stop()
- <-stopped
+ q.ShutdownWait(1 * time.Second)
}
}
@@ -57,7 +49,7 @@ func TestWorkerPoolQueueUnhandled(t *testing.T) {
return unhandled
}
- q, _ := NewWorkerPoolQueueBySetting("test-workpoolqueue", queueSetting, handler, false)
+ q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", queueSetting, handler, false)
stop := runWorkerPoolQueue(q)
for i := 0; i < queueSetting.Length; i++ {
testRecorder.Record("push:%v", i)
@@ -145,7 +137,7 @@ func testWorkerPoolQueuePersistence(t *testing.T, queueSetting setting.QueueSett
return nil
}
- q, _ := NewWorkerPoolQueueBySetting("pr_patch_checker_test", queueSetting, testHandler, true)
+ q, _ := newWorkerPoolQueueForTest("pr_patch_checker_test", queueSetting, testHandler, true)
stop := runWorkerPoolQueue(q)
for i := 0; i < testCount; i++ {
_ = q.Push("task-" + strconv.Itoa(i))
@@ -169,7 +161,7 @@ func testWorkerPoolQueuePersistence(t *testing.T, queueSetting setting.QueueSett
return nil
}
- q, _ := NewWorkerPoolQueueBySetting("pr_patch_checker_test", queueSetting, testHandler, true)
+ q, _ := newWorkerPoolQueueForTest("pr_patch_checker_test", queueSetting, testHandler, true)
stop := runWorkerPoolQueue(q)
assert.NoError(t, q.FlushWithContext(context.Background(), 0))
stop()
@@ -194,7 +186,7 @@ func TestWorkerPoolQueueActiveWorkers(t *testing.T) {
return nil
}
- q, _ := NewWorkerPoolQueueBySetting("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 1, Length: 100}, handler, false)
+ q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 1, Length: 100}, handler, false)
stop := runWorkerPoolQueue(q)
for i := 0; i < 5; i++ {
assert.NoError(t, q.Push(i))
@@ -210,7 +202,7 @@ func TestWorkerPoolQueueActiveWorkers(t *testing.T) {
assert.EqualValues(t, 1, q.GetWorkerNumber()) // there is at least one worker after the queue begins working
stop()
- q, _ = NewWorkerPoolQueueBySetting("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 3, Length: 100}, handler, false)
+ q, _ = newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 3, Length: 100}, handler, false)
stop = runWorkerPoolQueue(q)
for i := 0; i < 15; i++ {
assert.NoError(t, q.Push(i))
@@ -238,23 +230,23 @@ func TestWorkerPoolQueueShutdown(t *testing.T) {
if items[0] == 0 {
close(handlerCalled)
}
- time.Sleep(100 * time.Millisecond)
+ time.Sleep(400 * time.Millisecond)
return items
}
qs := setting.QueueSettings{Type: "level", Datadir: t.TempDir() + "/queue", BatchLength: 3, MaxWorkers: 4, Length: 20}
- q, _ := NewWorkerPoolQueueBySetting("test-workpoolqueue", qs, handler, false)
+ q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", qs, handler, false)
stop := runWorkerPoolQueue(q)
for i := 0; i < qs.Length; i++ {
assert.NoError(t, q.Push(i))
}
<-handlerCalled
- time.Sleep(50 * time.Millisecond) // wait for a while to make sure all workers are active
+ time.Sleep(200 * time.Millisecond) // wait for a while to make sure all workers are active
assert.EqualValues(t, 4, q.GetWorkerActiveNumber())
stop() // stop triggers shutdown
assert.EqualValues(t, 0, q.GetWorkerActiveNumber())
// no item was ever handled, so we still get all of them again
- q, _ = NewWorkerPoolQueueBySetting("test-workpoolqueue", qs, handler, false)
+ q, _ = newWorkerPoolQueueForTest("test-workpoolqueue", qs, handler, false)
assert.EqualValues(t, 20, q.GetQueueItemNumber())
}
diff --git a/routers/web/admin/queue_tester.go b/routers/web/admin/queue_tester.go
index 96373c4d5f..8f713b3bb1 100644
--- a/routers/web/admin/queue_tester.go
+++ b/routers/web/admin/queue_tester.go
@@ -4,12 +4,13 @@
package admin
import (
- gocontext "context"
+ "runtime/pprof"
"sync"
"time"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting"
)
@@ -21,6 +22,7 @@ var testQueueOnce sync.Once
// developers could see the queue length / worker number / items number on the admin page and try to remove the items
func initTestQueueOnce() {
testQueueOnce.Do(func() {
+ ctx, _, finished := process.GetManager().AddTypedContext(graceful.GetManager().ShutdownContext(), "TestQueue", process.SystemProcessType, false)
qs := setting.QueueSettings{
Name: "test-queue",
Type: "channel",
@@ -28,7 +30,7 @@ func initTestQueueOnce() {
BatchLength: 2,
MaxWorkers: 3,
}
- testQueue, err := queue.NewWorkerPoolQueueBySetting("test-queue", qs, func(t ...int64) (unhandled []int64) {
+ testQueue, err := queue.NewWorkerPoolQueueWithContext(ctx, "test-queue", qs, func(t ...int64) (unhandled []int64) {
for range t {
select {
case <-graceful.GetManager().ShutdownContext().Done():
@@ -44,8 +46,11 @@ func initTestQueueOnce() {
queue.GetManager().AddManagedQueue(testQueue)
testQueue.SetWorkerMaxNumber(5)
- go graceful.GetManager().RunWithShutdownFns(testQueue.Run)
- go graceful.GetManager().RunWithShutdownContext(func(ctx gocontext.Context) {
+ go graceful.GetManager().RunWithCancel(testQueue)
+ go func() {
+ pprof.SetGoroutineLabels(ctx)
+ defer finished()
+
cnt := int64(0)
adding := true
for {
@@ -67,6 +72,6 @@ func initTestQueueOnce() {
}
}
}
- })
+ }()
})
}
diff --git a/services/actions/init.go b/services/actions/init.go
index 8a9a30084a..26573c1681 100644
--- a/services/actions/init.go
+++ b/services/actions/init.go
@@ -5,6 +5,7 @@ package actions
import (
"code.gitea.io/gitea/modules/graceful"
+ "code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/notification"
"code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting"
@@ -15,8 +16,11 @@ func Init() {
return
}
- jobEmitterQueue = queue.CreateUniqueQueue("actions_ready_job", jobEmitterQueueHandler)
- go graceful.GetManager().RunWithShutdownFns(jobEmitterQueue.Run)
+ jobEmitterQueue = queue.CreateUniqueQueue(graceful.GetManager().ShutdownContext(), "actions_ready_job", jobEmitterQueueHandler)
+ if jobEmitterQueue == nil {
+ log.Fatal("Unable to create actions_ready_job queue")
+ }
+ go graceful.GetManager().RunWithCancel(jobEmitterQueue)
notification.RegisterNotifier(NewNotifier())
}
diff --git a/services/automerge/automerge.go b/services/automerge/automerge.go
index f001a6ccc5..bf713c4431 100644
--- a/services/automerge/automerge.go
+++ b/services/automerge/automerge.go
@@ -29,11 +29,11 @@ var prAutoMergeQueue *queue.WorkerPoolQueue[string]
// Init runs the task queue to that handles auto merges
func Init() error {
- prAutoMergeQueue = queue.CreateUniqueQueue("pr_auto_merge", handler)
+ prAutoMergeQueue = queue.CreateUniqueQueue(graceful.GetManager().ShutdownContext(), "pr_auto_merge", handler)
if prAutoMergeQueue == nil {
- return fmt.Errorf("Unable to create pr_auto_merge Queue")
+ return fmt.Errorf("unable to create pr_auto_merge queue")
}
- go graceful.GetManager().RunWithShutdownFns(prAutoMergeQueue.Run)
+ go graceful.GetManager().RunWithCancel(prAutoMergeQueue)
return nil
}
diff --git a/services/mailer/mailer.go b/services/mailer/mailer.go
index 5aeda9ed79..ee4721d438 100644
--- a/services/mailer/mailer.go
+++ b/services/mailer/mailer.go
@@ -401,7 +401,9 @@ func NewContext(ctx context.Context) {
Sender = &smtpSender{}
}
- mailQueue = queue.CreateSimpleQueue("mail", func(items ...*Message) []*Message {
+ subjectTemplates, bodyTemplates = templates.Mailer(ctx)
+
+ mailQueue = queue.CreateSimpleQueue(graceful.GetManager().ShutdownContext(), "mail", func(items ...*Message) []*Message {
for _, msg := range items {
gomailMsg := msg.ToMessage()
log.Trace("New e-mail sending request %s: %s", gomailMsg.GetHeader("To"), msg.Info)
@@ -413,10 +415,10 @@ func NewContext(ctx context.Context) {
}
return nil
})
-
- go graceful.GetManager().RunWithShutdownFns(mailQueue.Run)
-
- subjectTemplates, bodyTemplates = templates.Mailer(ctx)
+ if mailQueue == nil {
+ log.Fatal("Unable to create mail queue")
+ }
+ go graceful.GetManager().RunWithCancel(mailQueue)
}
// SendAsync send mail asynchronously
diff --git a/services/pull/check.go b/services/pull/check.go
index 8bc2bdff1d..b5150ad9a8 100644
--- a/services/pull/check.go
+++ b/services/pull/check.go
@@ -384,13 +384,13 @@ func CheckPRsForBaseBranch(baseRepo *repo_model.Repository, baseBranchName strin
// Init runs the task queue to test all the checking status pull requests
func Init() error {
- prPatchCheckerQueue = queue.CreateUniqueQueue("pr_patch_checker", handler)
+ prPatchCheckerQueue = queue.CreateUniqueQueue(graceful.GetManager().ShutdownContext(), "pr_patch_checker", handler)
if prPatchCheckerQueue == nil {
- return fmt.Errorf("Unable to create pr_patch_checker Queue")
+ return fmt.Errorf("unable to create pr_patch_checker queue")
}
- go graceful.GetManager().RunWithShutdownFns(prPatchCheckerQueue.Run)
+ go graceful.GetManager().RunWithCancel(prPatchCheckerQueue)
go graceful.GetManager().RunWithShutdownContext(InitializePullRequests)
return nil
}
diff --git a/services/pull/check_test.go b/services/pull/check_test.go
index 52209b4d35..57ccb20033 100644
--- a/services/pull/check_test.go
+++ b/services/pull/check_test.go
@@ -5,6 +5,7 @@
package pull
import (
+ "context"
"strconv"
"testing"
"time"
@@ -31,7 +32,7 @@ func TestPullRequest_AddToTaskQueue(t *testing.T) {
cfg, err := setting.GetQueueSettings(setting.CfgProvider, "pr_patch_checker")
assert.NoError(t, err)
- prPatchCheckerQueue, err = queue.NewWorkerPoolQueueBySetting("pr_patch_checker", cfg, testHandler, true)
+ prPatchCheckerQueue, err = queue.NewWorkerPoolQueueWithContext(context.Background(), "pr_patch_checker", cfg, testHandler, true)
assert.NoError(t, err)
pr := unittest.AssertExistsAndLoadBean(t, &issues_model.PullRequest{ID: 2})
@@ -46,12 +47,7 @@ func TestPullRequest_AddToTaskQueue(t *testing.T) {
assert.True(t, has)
assert.NoError(t, err)
- var queueShutdown, queueTerminate []func()
- go prPatchCheckerQueue.Run(func(shutdown func()) {
- queueShutdown = append(queueShutdown, shutdown)
- }, func(terminate func()) {
- queueTerminate = append(queueTerminate, terminate)
- })
+ go prPatchCheckerQueue.Run()
select {
case id := <-idChan:
@@ -67,12 +63,6 @@ func TestPullRequest_AddToTaskQueue(t *testing.T) {
pr = unittest.AssertExistsAndLoadBean(t, &issues_model.PullRequest{ID: 2})
assert.Equal(t, issues_model.PullRequestStatusChecking, pr.Status)
- for _, callback := range queueShutdown {
- callback()
- }
- for _, callback := range queueTerminate {
- callback()
- }
-
+ prPatchCheckerQueue.ShutdownWait(5 * time.Second)
prPatchCheckerQueue = nil
}
diff --git a/services/repository/archiver/archiver.go b/services/repository/archiver/archiver.go
index 1c514a4112..2e3defee8d 100644
--- a/services/repository/archiver/archiver.go
+++ b/services/repository/archiver/archiver.go
@@ -297,7 +297,7 @@ func ArchiveRepository(request *ArchiveRequest) (*repo_model.RepoArchiver, error
var archiverQueue *queue.WorkerPoolQueue[*ArchiveRequest]
-// Init initlize archive
+// Init initializes archiver
func Init() error {
handler := func(items ...*ArchiveRequest) []*ArchiveRequest {
for _, archiveReq := range items {
@@ -309,12 +309,11 @@ func Init() error {
return nil
}
- archiverQueue = queue.CreateUniqueQueue("repo-archive", handler)
+ archiverQueue = queue.CreateUniqueQueue(graceful.GetManager().ShutdownContext(), "repo-archive", handler)
if archiverQueue == nil {
- return errors.New("unable to create codes indexer queue")
+ return errors.New("unable to create repo-archive queue")
}
-
- go graceful.GetManager().RunWithShutdownFns(archiverQueue.Run)
+ go graceful.GetManager().RunWithCancel(archiverQueue)
return nil
}
diff --git a/services/repository/push.go b/services/repository/push.go
index f213948916..571eedccb3 100644
--- a/services/repository/push.go
+++ b/services/repository/push.go
@@ -42,12 +42,11 @@ func handler(items ...[]*repo_module.PushUpdateOptions) [][]*repo_module.PushUpd
}
func initPushQueue() error {
- pushQueue = queue.CreateSimpleQueue("push_update", handler)
+ pushQueue = queue.CreateSimpleQueue(graceful.GetManager().ShutdownContext(), "push_update", handler)
if pushQueue == nil {
- return errors.New("unable to create push_update Queue")
+ return errors.New("unable to create push_update queue")
}
-
- go graceful.GetManager().RunWithShutdownFns(pushQueue.Run)
+ go graceful.GetManager().RunWithCancel(pushQueue)
return nil
}
diff --git a/services/task/task.go b/services/task/task.go
index 493586a645..11a47a68bb 100644
--- a/services/task/task.go
+++ b/services/task/task.go
@@ -37,14 +37,11 @@ func Run(t *admin_model.Task) error {
// Init will start the service to get all unfinished tasks and run them
func Init() error {
- taskQueue = queue.CreateSimpleQueue("task", handler)
-
+ taskQueue = queue.CreateSimpleQueue(graceful.GetManager().ShutdownContext(), "task", handler)
if taskQueue == nil {
- return fmt.Errorf("Unable to create Task Queue")
+ return fmt.Errorf("unable to create task queue")
}
-
- go graceful.GetManager().RunWithShutdownFns(taskQueue.Run)
-
+ go graceful.GetManager().RunWithCancel(taskQueue)
return nil
}
diff --git a/services/webhook/deliver.go b/services/webhook/deliver.go
index e817783e55..fd7a3d7fba 100644
--- a/services/webhook/deliver.go
+++ b/services/webhook/deliver.go
@@ -283,11 +283,11 @@ func Init() error {
},
}
- hookQueue = queue.CreateUniqueQueue("webhook_sender", handler)
+ hookQueue = queue.CreateUniqueQueue(graceful.GetManager().ShutdownContext(), "webhook_sender", handler)
if hookQueue == nil {
- return fmt.Errorf("Unable to create webhook_sender Queue")
+ return fmt.Errorf("unable to create webhook_sender queue")
}
- go graceful.GetManager().RunWithShutdownFns(hookQueue.Run)
+ go graceful.GetManager().RunWithCancel(hookQueue)
go graceful.GetManager().RunWithShutdownContext(populateWebhookSendingQueue)