diff options
31 files changed, 204 insertions, 263 deletions
diff --git a/modules/graceful/context.go b/modules/graceful/context.go index 6b5b207ff3..4fcbcb04b6 100644 --- a/modules/graceful/context.go +++ b/modules/graceful/context.go @@ -5,51 +5,8 @@ package graceful import ( "context" - "time" ) -// ChannelContext is a context that wraps a channel and error as a context -type ChannelContext struct { - done <-chan struct{} - err error -} - -// NewChannelContext creates a ChannelContext from a channel and error -func NewChannelContext(done <-chan struct{}, err error) *ChannelContext { - return &ChannelContext{ - done: done, - err: err, - } -} - -// Deadline returns the time when work done on behalf of this context -// should be canceled. There is no Deadline for a ChannelContext -func (ctx *ChannelContext) Deadline() (deadline time.Time, ok bool) { - return deadline, ok -} - -// Done returns the channel provided at the creation of this context. -// When closed, work done on behalf of this context should be canceled. -func (ctx *ChannelContext) Done() <-chan struct{} { - return ctx.done -} - -// Err returns nil, if Done is not closed. If Done is closed, -// Err returns the error provided at the creation of this context -func (ctx *ChannelContext) Err() error { - select { - case <-ctx.done: - return ctx.err - default: - return nil - } -} - -// Value returns nil for all calls as no values are or can be associated with this context -func (ctx *ChannelContext) Value(key interface{}) interface{} { - return nil -} - // ShutdownContext returns a context.Context that is Done at shutdown // Callers using this context should ensure that they are registered as a running server // in order that they are waited for. diff --git a/modules/graceful/manager.go b/modules/graceful/manager.go index d32788092d..3604c0a3f5 100644 --- a/modules/graceful/manager.go +++ b/modules/graceful/manager.go @@ -23,6 +23,11 @@ const ( stateTerminate ) +type RunCanceler interface { + Run() + Cancel() +} + // There are some places that could inherit sockets: // // * HTTP or HTTPS main listener @@ -55,46 +60,19 @@ func InitManager(ctx context.Context) { }) } -// WithCallback is a runnable to call when the caller has finished -type WithCallback func(callback func()) - -// RunnableWithShutdownFns is a runnable with functions to run at shutdown and terminate -// After the callback to atShutdown is called and is complete, the main function must return. -// Similarly the callback function provided to atTerminate must return once termination is complete. -// Please note that use of the atShutdown and atTerminate callbacks will create go-routines that will wait till their respective signals -// - users must therefore be careful to only call these as necessary. -type RunnableWithShutdownFns func(atShutdown, atTerminate func(func())) - -// RunWithShutdownFns takes a function that has both atShutdown and atTerminate callbacks -// After the callback to atShutdown is called and is complete, the main function must return. -// Similarly the callback function provided to atTerminate must return once termination is complete. -// Please note that use of the atShutdown and atTerminate callbacks will create go-routines that will wait till their respective signals -// - users must therefore be careful to only call these as necessary. -func (g *Manager) RunWithShutdownFns(run RunnableWithShutdownFns) { +// RunWithCancel helps to run a function with a custom context, the Cancel function will be called at shutdown +// The Cancel function should stop the Run function in predictable time. +func (g *Manager) RunWithCancel(rc RunCanceler) { + g.RunAtShutdown(context.Background(), rc.Cancel) g.runningServerWaitGroup.Add(1) defer g.runningServerWaitGroup.Done() defer func() { if err := recover(); err != nil { - log.Critical("PANIC during RunWithShutdownFns: %v\nStacktrace: %s", err, log.Stack(2)) + log.Critical("PANIC during RunWithCancel: %v\nStacktrace: %s", err, log.Stack(2)) g.doShutdown() } }() - run(func(atShutdown func()) { - g.lock.Lock() - defer g.lock.Unlock() - g.toRunAtShutdown = append(g.toRunAtShutdown, - func() { - defer func() { - if err := recover(); err != nil { - log.Critical("PANIC during RunWithShutdownFns: %v\nStacktrace: %s", err, log.Stack(2)) - g.doShutdown() - } - }() - atShutdown() - }) - }, func(atTerminate func()) { - g.RunAtTerminate(atTerminate) - }) + rc.Run() } // RunWithShutdownContext takes a function that has a context to watch for shutdown. @@ -151,21 +129,6 @@ func (g *Manager) RunAtShutdown(ctx context.Context, shutdown func()) { }) } -// RunAtHammer creates a go-routine to run the provided function at shutdown -func (g *Manager) RunAtHammer(hammer func()) { - g.lock.Lock() - defer g.lock.Unlock() - g.toRunAtHammer = append(g.toRunAtHammer, - func() { - defer func() { - if err := recover(); err != nil { - log.Critical("PANIC during RunAtHammer: %v\nStacktrace: %s", err, log.Stack(2)) - } - }() - hammer() - }) -} - func (g *Manager) doShutdown() { if !g.setStateTransition(stateRunning, stateShuttingDown) { g.DoImmediateHammer() @@ -206,9 +169,6 @@ func (g *Manager) doHammerTime(d time.Duration) { g.hammerCtxCancel() atHammerCtx := pprof.WithLabels(g.terminateCtx, pprof.Labels("graceful-lifecycle", "post-hammer")) pprof.SetGoroutineLabels(atHammerCtx) - for _, fn := range g.toRunAtHammer { - go fn() - } } g.lock.Unlock() } diff --git a/modules/graceful/manager_unix.go b/modules/graceful/manager_unix.go index f949abfd21..b1fd6da76d 100644 --- a/modules/graceful/manager_unix.go +++ b/modules/graceful/manager_unix.go @@ -41,7 +41,6 @@ type Manager struct { terminateWaitGroup sync.WaitGroup toRunAtShutdown []func() - toRunAtHammer []func() toRunAtTerminate []func() } diff --git a/modules/graceful/manager_windows.go b/modules/graceful/manager_windows.go index f3606084aa..f676f86d04 100644 --- a/modules/graceful/manager_windows.go +++ b/modules/graceful/manager_windows.go @@ -50,7 +50,6 @@ type Manager struct { shutdownRequested chan struct{} toRunAtShutdown []func() - toRunAtHammer []func() toRunAtTerminate []func() } diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go index e9b8e76500..f38fd6000c 100644 --- a/modules/indexer/code/indexer.go +++ b/modules/indexer/code/indexer.go @@ -166,7 +166,7 @@ func Init() { handler := func(items ...*IndexerData) (unhandled []*IndexerData) { idx, err := indexer.get() if idx == nil || err != nil { - log.Error("Codes indexer handler: unable to get indexer!") + log.Warn("Codes indexer handler: indexer is not ready, retry later.") return items } @@ -201,7 +201,7 @@ func Init() { return unhandled } - indexerQueue = queue.CreateUniqueQueue("code_indexer", handler) + indexerQueue = queue.CreateUniqueQueue(ctx, "code_indexer", handler) if indexerQueue == nil { log.Fatal("Unable to create codes indexer queue") } @@ -259,7 +259,7 @@ func Init() { indexer.set(rIndexer) // Start processing the queue - go graceful.GetManager().RunWithShutdownFns(indexerQueue.Run) + go graceful.GetManager().RunWithCancel(indexerQueue) if populate { go graceful.GetManager().RunWithShutdownContext(populateRepoIndexer) diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index 76ff80ffca..f36ea10935 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -102,7 +102,7 @@ var ( func InitIssueIndexer(syncReindex bool) { ctx, _, finished := process.GetManager().AddTypedContext(context.Background(), "Service: IssueIndexer", process.SystemProcessType, false) - waitChannel := make(chan time.Duration, 1) + indexerInitWaitChannel := make(chan time.Duration, 1) // Create the Queue switch setting.Indexer.IssueType { @@ -110,7 +110,7 @@ func InitIssueIndexer(syncReindex bool) { handler := func(items ...*IndexerData) (unhandled []*IndexerData) { indexer := holder.get() if indexer == nil { - log.Error("Issue indexer handler: unable to get indexer.") + log.Warn("Issue indexer handler: indexer is not ready, retry later.") return items } toIndex := make([]*IndexerData, 0, len(items)) @@ -138,15 +138,17 @@ func InitIssueIndexer(syncReindex bool) { return unhandled } - issueIndexerQueue = queue.CreateSimpleQueue("issue_indexer", handler) + issueIndexerQueue = queue.CreateSimpleQueue(ctx, "issue_indexer", handler) if issueIndexerQueue == nil { log.Fatal("Unable to create issue indexer queue") } default: - issueIndexerQueue = queue.CreateSimpleQueue[*IndexerData]("issue_indexer", nil) + issueIndexerQueue = queue.CreateSimpleQueue[*IndexerData](ctx, "issue_indexer", nil) } + graceful.GetManager().RunAtTerminate(finished) + // Create the Indexer go func() { pprof.SetGoroutineLabels(ctx) @@ -178,51 +180,41 @@ func InitIssueIndexer(syncReindex bool) { if issueIndexer != nil { issueIndexer.Close() } - finished() log.Info("PID: %d Issue Indexer closed", os.Getpid()) }) log.Debug("Created Bleve Indexer") case "elasticsearch": - graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(func())) { - pprof.SetGoroutineLabels(ctx) - issueIndexer, err := NewElasticSearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueIndexerName) - if err != nil { - log.Fatal("Unable to initialize Elastic Search Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err) - } - exist, err := issueIndexer.Init() - if err != nil { - log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err) - } - populate = !exist - holder.set(issueIndexer) - atTerminate(finished) - }) + issueIndexer, err := NewElasticSearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueIndexerName) + if err != nil { + log.Fatal("Unable to initialize Elastic Search Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err) + } + exist, err := issueIndexer.Init() + if err != nil { + log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err) + } + populate = !exist + holder.set(issueIndexer) case "db": issueIndexer := &DBIndexer{} holder.set(issueIndexer) - graceful.GetManager().RunAtTerminate(finished) case "meilisearch": - graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(func())) { - pprof.SetGoroutineLabels(ctx) - issueIndexer, err := NewMeilisearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueConnAuth, setting.Indexer.IssueIndexerName) - if err != nil { - log.Fatal("Unable to initialize Meilisearch Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err) - } - exist, err := issueIndexer.Init() - if err != nil { - log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err) - } - populate = !exist - holder.set(issueIndexer) - atTerminate(finished) - }) + issueIndexer, err := NewMeilisearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueConnAuth, setting.Indexer.IssueIndexerName) + if err != nil { + log.Fatal("Unable to initialize Meilisearch Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err) + } + exist, err := issueIndexer.Init() + if err != nil { + log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err) + } + populate = !exist + holder.set(issueIndexer) default: holder.cancel() log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType) } // Start processing the queue - go graceful.GetManager().RunWithShutdownFns(issueIndexerQueue.Run) + go graceful.GetManager().RunWithCancel(issueIndexerQueue) // Populate the index if populate { @@ -232,13 +224,14 @@ func InitIssueIndexer(syncReindex bool) { go graceful.GetManager().RunWithShutdownContext(populateIssueIndexer) } } - waitChannel <- time.Since(start) - close(waitChannel) + + indexerInitWaitChannel <- time.Since(start) + close(indexerInitWaitChannel) }() if syncReindex { select { - case <-waitChannel: + case <-indexerInitWaitChannel: case <-graceful.GetManager().IsShutdown(): } } else if setting.Indexer.StartupTimeout > 0 { @@ -249,7 +242,7 @@ func InitIssueIndexer(syncReindex bool) { timeout += setting.GracefulHammerTime } select { - case duration := <-waitChannel: + case duration := <-indexerInitWaitChannel: log.Info("Issue Indexer Initialization took %v", duration) case <-graceful.GetManager().IsShutdown(): log.Warn("Shutdown occurred before issue index initialisation was complete") diff --git a/modules/indexer/stats/queue.go b/modules/indexer/stats/queue.go index 46438925e4..d002bd57cf 100644 --- a/modules/indexer/stats/queue.go +++ b/modules/indexer/stats/queue.go @@ -29,13 +29,11 @@ func handler(items ...int64) []int64 { } func initStatsQueue() error { - statsQueue = queue.CreateUniqueQueue("repo_stats_update", handler) + statsQueue = queue.CreateUniqueQueue(graceful.GetManager().ShutdownContext(), "repo_stats_update", handler) if statsQueue == nil { - return fmt.Errorf("Unable to create repo_stats_update Queue") + return fmt.Errorf("unable to create repo_stats_update queue") } - - go graceful.GetManager().RunWithShutdownFns(statsQueue.Run) - + go graceful.GetManager().RunWithCancel(statsQueue) return nil } diff --git a/modules/log/event_writer_base.go b/modules/log/event_writer_base.go index f61d9a7b9d..1d45d579c0 100644 --- a/modules/log/event_writer_base.go +++ b/modules/log/event_writer_base.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "regexp" + "runtime/pprof" "time" ) @@ -143,9 +144,17 @@ func eventWriterStartGo(ctx context.Context, w EventWriter, shared bool) { } w.Base().shared = shared w.Base().stopped = make(chan struct{}) + + ctxDesc := "Logger: EventWriter: " + w.GetWriterName() + if shared { + ctxDesc = "Logger: EventWriter (shared): " + w.GetWriterName() + } + writerCtx, writerCancel := newContext(ctx, ctxDesc) go func() { + defer writerCancel() defer close(w.Base().stopped) - w.Run(ctx) + pprof.SetGoroutineLabels(writerCtx) + w.Run(writerCtx) }() } diff --git a/modules/log/event_writer_conn_test.go b/modules/log/event_writer_conn_test.go index e08ec025a3..69e87aa8c4 100644 --- a/modules/log/event_writer_conn_test.go +++ b/modules/log/event_writer_conn_test.go @@ -40,7 +40,7 @@ func TestConnLogger(t *testing.T) { level := INFO flags := LstdFlags | LUTC | Lfuncname - logger := NewLoggerWithWriters(context.Background(), NewEventWriterConn("test-conn", WriterMode{ + logger := NewLoggerWithWriters(context.Background(), "test", NewEventWriterConn("test-conn", WriterMode{ Level: level, Prefix: prefix, Flags: FlagsFromBits(flags), diff --git a/modules/log/init.go b/modules/log/init.go index 798ba86410..38a3ad60a5 100644 --- a/modules/log/init.go +++ b/modules/log/init.go @@ -4,14 +4,19 @@ package log import ( + "context" "runtime" "strings" + "sync/atomic" "code.gitea.io/gitea/modules/process" "code.gitea.io/gitea/modules/util/rotatingfilewriter" ) -var projectPackagePrefix string +var ( + projectPackagePrefix string + processTraceDisabled atomic.Int64 +) func init() { _, filename, _, _ := runtime.Caller(0) @@ -24,6 +29,10 @@ func init() { rotatingfilewriter.ErrorPrintf = FallbackErrorf process.Trace = func(start bool, pid process.IDType, description string, parentPID process.IDType, typ string) { + // the logger manager has its own mutex lock, so it's safe to use "Load" here + if processTraceDisabled.Load() != 0 { + return + } if start && parentPID != "" { Log(1, TRACE, "Start %s: %s (from %s) (%s)", NewColoredValue(pid, FgHiYellow), description, NewColoredValue(parentPID, FgYellow), NewColoredValue(typ, Reset)) } else if start { @@ -33,3 +42,11 @@ func init() { } } } + +func newContext(parent context.Context, desc string) (ctx context.Context, cancel context.CancelFunc) { + // the "process manager" also calls "log.Trace()" to output logs, so if we want to create new contexts by the manager, we need to disable the trace temporarily + processTraceDisabled.Add(1) + defer processTraceDisabled.Add(-1) + ctx, _, cancel = process.GetManager().AddTypedContext(parent, desc, process.SystemProcessType, false) + return ctx, cancel +} diff --git a/modules/log/logger_impl.go b/modules/log/logger_impl.go index c7e8fde3c0..abd72d326f 100644 --- a/modules/log/logger_impl.go +++ b/modules/log/logger_impl.go @@ -228,9 +228,9 @@ func (l *LoggerImpl) GetLevel() Level { return Level(l.level.Load()) } -func NewLoggerWithWriters(ctx context.Context, writer ...EventWriter) *LoggerImpl { +func NewLoggerWithWriters(ctx context.Context, name string, writer ...EventWriter) *LoggerImpl { l := &LoggerImpl{} - l.ctx, l.ctxCancel = context.WithCancel(ctx) + l.ctx, l.ctxCancel = newContext(ctx, "Logger: "+name) l.LevelLogger = BaseLoggerToGeneralLogger(l) l.eventWriters = map[string]EventWriter{} l.syncLevelInternal() diff --git a/modules/log/logger_test.go b/modules/log/logger_test.go index a91b4d23af..70222f64f5 100644 --- a/modules/log/logger_test.go +++ b/modules/log/logger_test.go @@ -53,7 +53,7 @@ func newDummyWriter(name string, level Level, delay time.Duration) *dummyWriter } func TestLogger(t *testing.T) { - logger := NewLoggerWithWriters(context.Background()) + logger := NewLoggerWithWriters(context.Background(), "test") dump := logger.DumpWriters() assert.EqualValues(t, 0, len(dump)) @@ -88,7 +88,7 @@ func TestLogger(t *testing.T) { } func TestLoggerPause(t *testing.T) { - logger := NewLoggerWithWriters(context.Background()) + logger := NewLoggerWithWriters(context.Background(), "test") w1 := newDummyWriter("dummy-1", DEBUG, 0) logger.AddWriters(w1) @@ -117,7 +117,7 @@ func (t testLogString) LogString() string { } func TestLoggerLogString(t *testing.T) { - logger := NewLoggerWithWriters(context.Background()) + logger := NewLoggerWithWriters(context.Background(), "test") w1 := newDummyWriter("dummy-1", DEBUG, 0) w1.Mode.Colorize = true @@ -130,7 +130,7 @@ func TestLoggerLogString(t *testing.T) { } func TestLoggerExpressionFilter(t *testing.T) { - logger := NewLoggerWithWriters(context.Background()) + logger := NewLoggerWithWriters(context.Background(), "test") w1 := newDummyWriter("dummy-1", DEBUG, 0) w1.Mode.Expression = "foo.*" diff --git a/modules/log/manager.go b/modules/log/manager.go index bbaef7eb20..b5d6cbf8e1 100644 --- a/modules/log/manager.go +++ b/modules/log/manager.go @@ -39,7 +39,7 @@ func (m *LoggerManager) GetLogger(name string) *LoggerImpl { logger := m.loggers[name] if logger == nil { - logger = NewLoggerWithWriters(m.ctx) + logger = NewLoggerWithWriters(m.ctx, name) m.loggers[name] = logger if name == DEFAULT { m.defaultLogger.Store(logger) @@ -137,6 +137,6 @@ func GetManager() *LoggerManager { func NewManager() *LoggerManager { m := &LoggerManager{writers: map[string]EventWriter{}, loggers: map[string]*LoggerImpl{}} - m.ctx, m.ctxCancel = context.WithCancel(context.Background()) + m.ctx, m.ctxCancel = newContext(context.Background(), "LoggerManager") return m } diff --git a/modules/mirror/mirror.go b/modules/mirror/mirror.go index 73e591adba..0d9a624730 100644 --- a/modules/mirror/mirror.go +++ b/modules/mirror/mirror.go @@ -33,9 +33,11 @@ func StartSyncMirrors(queueHandle func(data ...*SyncRequest) []*SyncRequest) { if !setting.Mirror.Enabled { return } - mirrorQueue = queue.CreateUniqueQueue("mirror", queueHandle) - - go graceful.GetManager().RunWithShutdownFns(mirrorQueue.Run) + mirrorQueue = queue.CreateUniqueQueue(graceful.GetManager().ShutdownContext(), "mirror", queueHandle) + if mirrorQueue == nil { + log.Fatal("Unable to create mirror queue") + } + go graceful.GetManager().RunWithCancel(mirrorQueue) } // AddPullMirrorToQueue adds repoID to mirror queue diff --git a/modules/notification/ui/ui.go b/modules/notification/ui/ui.go index a4576b6791..6c5f22c122 100644 --- a/modules/notification/ui/ui.go +++ b/modules/notification/ui/ui.go @@ -37,7 +37,10 @@ var _ base.Notifier = ¬ificationService{} // NewNotifier create a new notificationService notifier func NewNotifier() base.Notifier { ns := ¬ificationService{} - ns.issueQueue = queue.CreateSimpleQueue("notification-service", handler) + ns.issueQueue = queue.CreateSimpleQueue(graceful.GetManager().ShutdownContext(), "notification-service", handler) + if ns.issueQueue == nil { + log.Fatal("Unable to create notification-service queue") + } return ns } @@ -51,7 +54,7 @@ func handler(items ...issueNotificationOpts) []issueNotificationOpts { } func (ns *notificationService) Run() { - go graceful.GetManager().RunWithShutdownFns(ns.issueQueue.Run) + go graceful.GetManager().RunWithCancel(ns.issueQueue) // TODO: using "go" here doesn't seem right, just leave it as old code } func (ns *notificationService) NotifyCreateIssueComment(ctx context.Context, doer *user_model.User, repo *repo_model.Repository, diff --git a/modules/process/manager.go b/modules/process/manager.go index fdfca3db7a..25d503c594 100644 --- a/modules/process/manager.go +++ b/modules/process/manager.go @@ -167,6 +167,7 @@ func (pm *Manager) Add(ctx context.Context, description string, cancel context.C pm.processMap[pid] = process pm.mutex.Unlock() + Trace(true, pid, description, parentPID, processType) pprofCtx := pprof.WithLabels(ctx, pprof.Labels(DescriptionPProfLabel, description, PPIDPProfLabel, string(parentPID), PIDPProfLabel, string(pid), ProcessTypePProfLabel, processType)) @@ -200,10 +201,16 @@ func (pm *Manager) nextPID() (start time.Time, pid IDType) { } func (pm *Manager) remove(process *process) { + deleted := false + pm.mutex.Lock() - defer pm.mutex.Unlock() - if p := pm.processMap[process.PID]; p == process { + if pm.processMap[process.PID] == process { delete(pm.processMap, process.PID) + deleted = true + } + pm.mutex.Unlock() + + if deleted { Trace(false, process.PID, process.Description, process.ParentPID, process.Type) } } diff --git a/modules/queue/manager.go b/modules/queue/manager.go index 95b3bad57b..8b964c0c28 100644 --- a/modules/queue/manager.go +++ b/modules/queue/manager.go @@ -88,22 +88,22 @@ func (m *Manager) FlushAll(ctx context.Context, timeout time.Duration) error { } // CreateSimpleQueue creates a simple queue from global setting config provider by name -func CreateSimpleQueue[T any](name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T] { - return createWorkerPoolQueue(name, setting.CfgProvider, handler, false) +func CreateSimpleQueue[T any](ctx context.Context, name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T] { + return createWorkerPoolQueue(ctx, name, setting.CfgProvider, handler, false) } // CreateUniqueQueue creates a unique queue from global setting config provider by name -func CreateUniqueQueue[T any](name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T] { - return createWorkerPoolQueue(name, setting.CfgProvider, handler, true) +func CreateUniqueQueue[T any](ctx context.Context, name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T] { + return createWorkerPoolQueue(ctx, name, setting.CfgProvider, handler, true) } -func createWorkerPoolQueue[T any](name string, cfgProvider setting.ConfigProvider, handler HandlerFuncT[T], unique bool) *WorkerPoolQueue[T] { +func createWorkerPoolQueue[T any](ctx context.Context, name string, cfgProvider setting.ConfigProvider, handler HandlerFuncT[T], unique bool) *WorkerPoolQueue[T] { queueSetting, err := setting.GetQueueSettings(cfgProvider, name) if err != nil { log.Error("Failed to get queue settings for %q: %v", name, err) return nil } - w, err := NewWorkerPoolQueueBySetting(name, queueSetting, handler, unique) + w, err := NewWorkerPoolQueueWithContext(ctx, name, queueSetting, handler, unique) if err != nil { log.Error("Failed to create queue %q: %v", name, err) return nil diff --git a/modules/queue/manager_test.go b/modules/queue/manager_test.go index 50265e27b6..1fd29f813f 100644 --- a/modules/queue/manager_test.go +++ b/modules/queue/manager_test.go @@ -29,7 +29,7 @@ func TestManager(t *testing.T) { if err != nil { return nil, err } - return NewWorkerPoolQueueBySetting(name, qs, func(s ...int) (unhandled []int) { return nil }, false) + return newWorkerPoolQueueForTest(name, qs, func(s ...int) (unhandled []int) { return nil }, false) } // test invalid CONN_STR @@ -80,7 +80,7 @@ MAX_WORKERS = 2 assert.NoError(t, err) - q1 := createWorkerPoolQueue[string]("no-such", cfgProvider, nil, false) + q1 := createWorkerPoolQueue[string](context.Background(), "no-such", cfgProvider, nil, false) assert.Equal(t, "no-such", q1.GetName()) assert.Equal(t, "dummy", q1.GetType()) // no handler, so it becomes dummy assert.Equal(t, filepath.Join(setting.AppDataPath, "queues/dir1"), q1.baseConfig.DataFullDir) @@ -96,7 +96,7 @@ MAX_WORKERS = 2 assert.Equal(t, "string", q1.GetItemTypeName()) qid1 := GetManager().qidCounter - q2 := createWorkerPoolQueue("sub", cfgProvider, func(s ...int) (unhandled []int) { return nil }, false) + q2 := createWorkerPoolQueue(context.Background(), "sub", cfgProvider, func(s ...int) (unhandled []int) { return nil }, false) assert.Equal(t, "sub", q2.GetName()) assert.Equal(t, "level", q2.GetType()) assert.Equal(t, filepath.Join(setting.AppDataPath, "queues/dir2"), q2.baseConfig.DataFullDir) diff --git a/modules/queue/workergroup.go b/modules/queue/workergroup.go index 7127ea1117..147a4f335e 100644 --- a/modules/queue/workergroup.go +++ b/modules/queue/workergroup.go @@ -5,6 +5,7 @@ package queue import ( "context" + "runtime/pprof" "sync" "sync/atomic" "time" @@ -13,9 +14,10 @@ import ( ) var ( - infiniteTimerC = make(chan time.Time) - batchDebounceDuration = 100 * time.Millisecond - workerIdleDuration = 1 * time.Second + infiniteTimerC = make(chan time.Time) + batchDebounceDuration = 100 * time.Millisecond + workerIdleDuration = 1 * time.Second + shutdownDefaultTimeout = 2 * time.Second unhandledItemRequeueDuration atomic.Int64 // to avoid data race during test ) @@ -116,13 +118,15 @@ func (q *WorkerPoolQueue[T]) doWorkerHandle(batch []T) { // If the queue is shutting down, it returns true and try to push the items // Otherwise it does nothing and returns false func (q *WorkerPoolQueue[T]) basePushForShutdown(items ...T) bool { - ctxShutdown := q.ctxShutdown.Load() - if ctxShutdown == nil { + shutdownTimeout := time.Duration(q.shutdownTimeout.Load()) + if shutdownTimeout == 0 { return false } + ctxShutdown, ctxShutdownCancel := context.WithTimeout(context.Background(), shutdownTimeout) + defer ctxShutdownCancel() for _, item := range items { // if there is still any error, the queue can do nothing instead of losing the items - if err := q.baseQueue.PushItem(*ctxShutdown, q.marshal(item)); err != nil { + if err := q.baseQueue.PushItem(ctxShutdown, q.marshal(item)); err != nil { log.Error("Failed to requeue item for queue %q when shutting down: %v", q.GetName(), err) } } @@ -246,6 +250,8 @@ var skipFlushChan = make(chan flushType) // an empty flush chan, used to skip re // doRun is the main loop of the queue. All related "doXxx" functions are executed in its context. func (q *WorkerPoolQueue[T]) doRun() { + pprof.SetGoroutineLabels(q.ctxRun) + log.Debug("Queue %q starts running", q.GetName()) defer log.Debug("Queue %q stops running", q.GetName()) @@ -271,8 +277,8 @@ func (q *WorkerPoolQueue[T]) doRun() { } } - ctxShutdownPtr := q.ctxShutdown.Load() - if ctxShutdownPtr != nil { + shutdownTimeout := time.Duration(q.shutdownTimeout.Load()) + if shutdownTimeout != 0 { // if there is a shutdown context, try to push the items back to the base queue q.basePushForShutdown(unhandled...) workerDone := make(chan struct{}) @@ -280,7 +286,7 @@ func (q *WorkerPoolQueue[T]) doRun() { go func() { wg.wg.Wait(); close(workerDone) }() select { case <-workerDone: - case <-(*ctxShutdownPtr).Done(): + case <-time.After(shutdownTimeout): log.Error("Queue %q is shutting down, but workers are still running after timeout", q.GetName()) } } else { diff --git a/modules/queue/workerqueue.go b/modules/queue/workerqueue.go index 5695c6cc23..e0d5183bd9 100644 --- a/modules/queue/workerqueue.go +++ b/modules/queue/workerqueue.go @@ -10,9 +10,9 @@ import ( "sync/atomic" "time" - "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/json" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/process" "code.gitea.io/gitea/modules/setting" ) @@ -21,8 +21,9 @@ import ( type WorkerPoolQueue[T any] struct { ctxRun context.Context ctxRunCancel context.CancelFunc - ctxShutdown atomic.Pointer[context.Context] - shutdownDone chan struct{} + + shutdownDone chan struct{} + shutdownTimeout atomic.Int64 // in case some buggy handlers (workers) would hang forever, "shutdown" should finish in predictable time origHandler HandlerFuncT[T] safeHandler HandlerFuncT[T] @@ -175,22 +176,19 @@ func (q *WorkerPoolQueue[T]) Has(data T) (bool, error) { return q.baseQueue.HasItem(q.ctxRun, q.marshal(data)) } -func (q *WorkerPoolQueue[T]) Run(atShutdown, atTerminate func(func())) { - atShutdown(func() { - // in case some queue handlers are slow or have hanging bugs, at most wait for a short time - q.ShutdownWait(1 * time.Second) - }) +func (q *WorkerPoolQueue[T]) Run() { q.doRun() } +func (q *WorkerPoolQueue[T]) Cancel() { + q.ctxRunCancel() +} + // ShutdownWait shuts down the queue, waits for all workers to finish their jobs, and pushes the unhandled items back to the base queue // It waits for all workers (handlers) to finish their jobs, in case some buggy handlers would hang forever, a reasonable timeout is needed func (q *WorkerPoolQueue[T]) ShutdownWait(timeout time.Duration) { - shutdownCtx, shutdownCtxCancel := context.WithTimeout(context.Background(), timeout) - defer shutdownCtxCancel() - if q.ctxShutdown.CompareAndSwap(nil, &shutdownCtx) { - q.ctxRunCancel() - } + q.shutdownTimeout.Store(int64(timeout)) + q.ctxRunCancel() <-q.shutdownDone } @@ -207,7 +205,11 @@ func getNewQueueFn(t string) (string, func(cfg *BaseConfig, unique bool) (baseQu } } -func NewWorkerPoolQueueBySetting[T any](name string, queueSetting setting.QueueSettings, handler HandlerFuncT[T], unique bool) (*WorkerPoolQueue[T], error) { +func newWorkerPoolQueueForTest[T any](name string, queueSetting setting.QueueSettings, handler HandlerFuncT[T], unique bool) (*WorkerPoolQueue[T], error) { + return NewWorkerPoolQueueWithContext(context.Background(), name, queueSetting, handler, unique) +} + +func NewWorkerPoolQueueWithContext[T any](ctx context.Context, name string, queueSetting setting.QueueSettings, handler HandlerFuncT[T], unique bool) (*WorkerPoolQueue[T], error) { if handler == nil { log.Debug("Use dummy queue for %q because handler is nil and caller doesn't want to process the queue items", name) queueSetting.Type = "dummy" @@ -224,10 +226,11 @@ func NewWorkerPoolQueueBySetting[T any](name string, queueSetting setting.QueueS } log.Trace("Created queue %q of type %q", name, queueType) - w.ctxRun, w.ctxRunCancel = context.WithCancel(graceful.GetManager().ShutdownContext()) + w.ctxRun, _, w.ctxRunCancel = process.GetManager().AddTypedContext(ctx, "Queue: "+w.GetName(), process.SystemProcessType, false) w.batchChan = make(chan []T) w.flushChan = make(chan flushType) w.shutdownDone = make(chan struct{}) + w.shutdownTimeout.Store(int64(shutdownDefaultTimeout)) w.workerMaxNum = queueSetting.MaxWorkers w.batchLength = queueSetting.BatchLength diff --git a/modules/queue/workerqueue_test.go b/modules/queue/workerqueue_test.go index da9451cd77..e60120162a 100644 --- a/modules/queue/workerqueue_test.go +++ b/modules/queue/workerqueue_test.go @@ -16,17 +16,9 @@ import ( ) func runWorkerPoolQueue[T any](q *WorkerPoolQueue[T]) func() { - var stop func() - started := make(chan struct{}) - stopped := make(chan struct{}) - go func() { - q.Run(func(f func()) { stop = f; close(started) }, nil) - close(stopped) - }() - <-started + go q.Run() return func() { - stop() - <-stopped + q.ShutdownWait(1 * time.Second) } } @@ -57,7 +49,7 @@ func TestWorkerPoolQueueUnhandled(t *testing.T) { return unhandled } - q, _ := NewWorkerPoolQueueBySetting("test-workpoolqueue", queueSetting, handler, false) + q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", queueSetting, handler, false) stop := runWorkerPoolQueue(q) for i := 0; i < queueSetting.Length; i++ { testRecorder.Record("push:%v", i) @@ -145,7 +137,7 @@ func testWorkerPoolQueuePersistence(t *testing.T, queueSetting setting.QueueSett return nil } - q, _ := NewWorkerPoolQueueBySetting("pr_patch_checker_test", queueSetting, testHandler, true) + q, _ := newWorkerPoolQueueForTest("pr_patch_checker_test", queueSetting, testHandler, true) stop := runWorkerPoolQueue(q) for i := 0; i < testCount; i++ { _ = q.Push("task-" + strconv.Itoa(i)) @@ -169,7 +161,7 @@ func testWorkerPoolQueuePersistence(t *testing.T, queueSetting setting.QueueSett return nil } - q, _ := NewWorkerPoolQueueBySetting("pr_patch_checker_test", queueSetting, testHandler, true) + q, _ := newWorkerPoolQueueForTest("pr_patch_checker_test", queueSetting, testHandler, true) stop := runWorkerPoolQueue(q) assert.NoError(t, q.FlushWithContext(context.Background(), 0)) stop() @@ -194,7 +186,7 @@ func TestWorkerPoolQueueActiveWorkers(t *testing.T) { return nil } - q, _ := NewWorkerPoolQueueBySetting("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 1, Length: 100}, handler, false) + q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 1, Length: 100}, handler, false) stop := runWorkerPoolQueue(q) for i := 0; i < 5; i++ { assert.NoError(t, q.Push(i)) @@ -210,7 +202,7 @@ func TestWorkerPoolQueueActiveWorkers(t *testing.T) { assert.EqualValues(t, 1, q.GetWorkerNumber()) // there is at least one worker after the queue begins working stop() - q, _ = NewWorkerPoolQueueBySetting("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 3, Length: 100}, handler, false) + q, _ = newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 3, Length: 100}, handler, false) stop = runWorkerPoolQueue(q) for i := 0; i < 15; i++ { assert.NoError(t, q.Push(i)) @@ -238,23 +230,23 @@ func TestWorkerPoolQueueShutdown(t *testing.T) { if items[0] == 0 { close(handlerCalled) } - time.Sleep(100 * time.Millisecond) + time.Sleep(400 * time.Millisecond) return items } qs := setting.QueueSettings{Type: "level", Datadir: t.TempDir() + "/queue", BatchLength: 3, MaxWorkers: 4, Length: 20} - q, _ := NewWorkerPoolQueueBySetting("test-workpoolqueue", qs, handler, false) + q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", qs, handler, false) stop := runWorkerPoolQueue(q) for i := 0; i < qs.Length; i++ { assert.NoError(t, q.Push(i)) } <-handlerCalled - time.Sleep(50 * time.Millisecond) // wait for a while to make sure all workers are active + time.Sleep(200 * time.Millisecond) // wait for a while to make sure all workers are active assert.EqualValues(t, 4, q.GetWorkerActiveNumber()) stop() // stop triggers shutdown assert.EqualValues(t, 0, q.GetWorkerActiveNumber()) // no item was ever handled, so we still get all of them again - q, _ = NewWorkerPoolQueueBySetting("test-workpoolqueue", qs, handler, false) + q, _ = newWorkerPoolQueueForTest("test-workpoolqueue", qs, handler, false) assert.EqualValues(t, 20, q.GetQueueItemNumber()) } diff --git a/routers/web/admin/queue_tester.go b/routers/web/admin/queue_tester.go index 96373c4d5f..8f713b3bb1 100644 --- a/routers/web/admin/queue_tester.go +++ b/routers/web/admin/queue_tester.go @@ -4,12 +4,13 @@ package admin import ( - gocontext "context" + "runtime/pprof" "sync" "time" "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/process" "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" ) @@ -21,6 +22,7 @@ var testQueueOnce sync.Once // developers could see the queue length / worker number / items number on the admin page and try to remove the items func initTestQueueOnce() { testQueueOnce.Do(func() { + ctx, _, finished := process.GetManager().AddTypedContext(graceful.GetManager().ShutdownContext(), "TestQueue", process.SystemProcessType, false) qs := setting.QueueSettings{ Name: "test-queue", Type: "channel", @@ -28,7 +30,7 @@ func initTestQueueOnce() { BatchLength: 2, MaxWorkers: 3, } - testQueue, err := queue.NewWorkerPoolQueueBySetting("test-queue", qs, func(t ...int64) (unhandled []int64) { + testQueue, err := queue.NewWorkerPoolQueueWithContext(ctx, "test-queue", qs, func(t ...int64) (unhandled []int64) { for range t { select { case <-graceful.GetManager().ShutdownContext().Done(): @@ -44,8 +46,11 @@ func initTestQueueOnce() { queue.GetManager().AddManagedQueue(testQueue) testQueue.SetWorkerMaxNumber(5) - go graceful.GetManager().RunWithShutdownFns(testQueue.Run) - go graceful.GetManager().RunWithShutdownContext(func(ctx gocontext.Context) { + go graceful.GetManager().RunWithCancel(testQueue) + go func() { + pprof.SetGoroutineLabels(ctx) + defer finished() + cnt := int64(0) adding := true for { @@ -67,6 +72,6 @@ func initTestQueueOnce() { } } } - }) + }() }) } diff --git a/services/actions/init.go b/services/actions/init.go index 8a9a30084a..26573c1681 100644 --- a/services/actions/init.go +++ b/services/actions/init.go @@ -5,6 +5,7 @@ package actions import ( "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/notification" "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" @@ -15,8 +16,11 @@ func Init() { return } - jobEmitterQueue = queue.CreateUniqueQueue("actions_ready_job", jobEmitterQueueHandler) - go graceful.GetManager().RunWithShutdownFns(jobEmitterQueue.Run) + jobEmitterQueue = queue.CreateUniqueQueue(graceful.GetManager().ShutdownContext(), "actions_ready_job", jobEmitterQueueHandler) + if jobEmitterQueue == nil { + log.Fatal("Unable to create actions_ready_job queue") + } + go graceful.GetManager().RunWithCancel(jobEmitterQueue) notification.RegisterNotifier(NewNotifier()) } diff --git a/services/automerge/automerge.go b/services/automerge/automerge.go index f001a6ccc5..bf713c4431 100644 --- a/services/automerge/automerge.go +++ b/services/automerge/automerge.go @@ -29,11 +29,11 @@ var prAutoMergeQueue *queue.WorkerPoolQueue[string] // Init runs the task queue to that handles auto merges func Init() error { - prAutoMergeQueue = queue.CreateUniqueQueue("pr_auto_merge", handler) + prAutoMergeQueue = queue.CreateUniqueQueue(graceful.GetManager().ShutdownContext(), "pr_auto_merge", handler) if prAutoMergeQueue == nil { - return fmt.Errorf("Unable to create pr_auto_merge Queue") + return fmt.Errorf("unable to create pr_auto_merge queue") } - go graceful.GetManager().RunWithShutdownFns(prAutoMergeQueue.Run) + go graceful.GetManager().RunWithCancel(prAutoMergeQueue) return nil } diff --git a/services/mailer/mailer.go b/services/mailer/mailer.go index 5aeda9ed79..ee4721d438 100644 --- a/services/mailer/mailer.go +++ b/services/mailer/mailer.go @@ -401,7 +401,9 @@ func NewContext(ctx context.Context) { Sender = &smtpSender{} } - mailQueue = queue.CreateSimpleQueue("mail", func(items ...*Message) []*Message { + subjectTemplates, bodyTemplates = templates.Mailer(ctx) + + mailQueue = queue.CreateSimpleQueue(graceful.GetManager().ShutdownContext(), "mail", func(items ...*Message) []*Message { for _, msg := range items { gomailMsg := msg.ToMessage() log.Trace("New e-mail sending request %s: %s", gomailMsg.GetHeader("To"), msg.Info) @@ -413,10 +415,10 @@ func NewContext(ctx context.Context) { } return nil }) - - go graceful.GetManager().RunWithShutdownFns(mailQueue.Run) - - subjectTemplates, bodyTemplates = templates.Mailer(ctx) + if mailQueue == nil { + log.Fatal("Unable to create mail queue") + } + go graceful.GetManager().RunWithCancel(mailQueue) } // SendAsync send mail asynchronously diff --git a/services/pull/check.go b/services/pull/check.go index 8bc2bdff1d..b5150ad9a8 100644 --- a/services/pull/check.go +++ b/services/pull/check.go @@ -384,13 +384,13 @@ func CheckPRsForBaseBranch(baseRepo *repo_model.Repository, baseBranchName strin // Init runs the task queue to test all the checking status pull requests func Init() error { - prPatchCheckerQueue = queue.CreateUniqueQueue("pr_patch_checker", handler) + prPatchCheckerQueue = queue.CreateUniqueQueue(graceful.GetManager().ShutdownContext(), "pr_patch_checker", handler) if prPatchCheckerQueue == nil { - return fmt.Errorf("Unable to create pr_patch_checker Queue") + return fmt.Errorf("unable to create pr_patch_checker queue") } - go graceful.GetManager().RunWithShutdownFns(prPatchCheckerQueue.Run) + go graceful.GetManager().RunWithCancel(prPatchCheckerQueue) go graceful.GetManager().RunWithShutdownContext(InitializePullRequests) return nil } diff --git a/services/pull/check_test.go b/services/pull/check_test.go index 52209b4d35..57ccb20033 100644 --- a/services/pull/check_test.go +++ b/services/pull/check_test.go @@ -5,6 +5,7 @@ package pull import ( + "context" "strconv" "testing" "time" @@ -31,7 +32,7 @@ func TestPullRequest_AddToTaskQueue(t *testing.T) { cfg, err := setting.GetQueueSettings(setting.CfgProvider, "pr_patch_checker") assert.NoError(t, err) - prPatchCheckerQueue, err = queue.NewWorkerPoolQueueBySetting("pr_patch_checker", cfg, testHandler, true) + prPatchCheckerQueue, err = queue.NewWorkerPoolQueueWithContext(context.Background(), "pr_patch_checker", cfg, testHandler, true) assert.NoError(t, err) pr := unittest.AssertExistsAndLoadBean(t, &issues_model.PullRequest{ID: 2}) @@ -46,12 +47,7 @@ func TestPullRequest_AddToTaskQueue(t *testing.T) { assert.True(t, has) assert.NoError(t, err) - var queueShutdown, queueTerminate []func() - go prPatchCheckerQueue.Run(func(shutdown func()) { - queueShutdown = append(queueShutdown, shutdown) - }, func(terminate func()) { - queueTerminate = append(queueTerminate, terminate) - }) + go prPatchCheckerQueue.Run() select { case id := <-idChan: @@ -67,12 +63,6 @@ func TestPullRequest_AddToTaskQueue(t *testing.T) { pr = unittest.AssertExistsAndLoadBean(t, &issues_model.PullRequest{ID: 2}) assert.Equal(t, issues_model.PullRequestStatusChecking, pr.Status) - for _, callback := range queueShutdown { - callback() - } - for _, callback := range queueTerminate { - callback() - } - + prPatchCheckerQueue.ShutdownWait(5 * time.Second) prPatchCheckerQueue = nil } diff --git a/services/repository/archiver/archiver.go b/services/repository/archiver/archiver.go index 1c514a4112..2e3defee8d 100644 --- a/services/repository/archiver/archiver.go +++ b/services/repository/archiver/archiver.go @@ -297,7 +297,7 @@ func ArchiveRepository(request *ArchiveRequest) (*repo_model.RepoArchiver, error var archiverQueue *queue.WorkerPoolQueue[*ArchiveRequest] -// Init initlize archive +// Init initializes archiver func Init() error { handler := func(items ...*ArchiveRequest) []*ArchiveRequest { for _, archiveReq := range items { @@ -309,12 +309,11 @@ func Init() error { return nil } - archiverQueue = queue.CreateUniqueQueue("repo-archive", handler) + archiverQueue = queue.CreateUniqueQueue(graceful.GetManager().ShutdownContext(), "repo-archive", handler) if archiverQueue == nil { - return errors.New("unable to create codes indexer queue") + return errors.New("unable to create repo-archive queue") } - - go graceful.GetManager().RunWithShutdownFns(archiverQueue.Run) + go graceful.GetManager().RunWithCancel(archiverQueue) return nil } diff --git a/services/repository/push.go b/services/repository/push.go index f213948916..571eedccb3 100644 --- a/services/repository/push.go +++ b/services/repository/push.go @@ -42,12 +42,11 @@ func handler(items ...[]*repo_module.PushUpdateOptions) [][]*repo_module.PushUpd } func initPushQueue() error { - pushQueue = queue.CreateSimpleQueue("push_update", handler) + pushQueue = queue.CreateSimpleQueue(graceful.GetManager().ShutdownContext(), "push_update", handler) if pushQueue == nil { - return errors.New("unable to create push_update Queue") + return errors.New("unable to create push_update queue") } - - go graceful.GetManager().RunWithShutdownFns(pushQueue.Run) + go graceful.GetManager().RunWithCancel(pushQueue) return nil } diff --git a/services/task/task.go b/services/task/task.go index 493586a645..11a47a68bb 100644 --- a/services/task/task.go +++ b/services/task/task.go @@ -37,14 +37,11 @@ func Run(t *admin_model.Task) error { // Init will start the service to get all unfinished tasks and run them func Init() error { - taskQueue = queue.CreateSimpleQueue("task", handler) - + taskQueue = queue.CreateSimpleQueue(graceful.GetManager().ShutdownContext(), "task", handler) if taskQueue == nil { - return fmt.Errorf("Unable to create Task Queue") + return fmt.Errorf("unable to create task queue") } - - go graceful.GetManager().RunWithShutdownFns(taskQueue.Run) - + go graceful.GetManager().RunWithCancel(taskQueue) return nil } diff --git a/services/webhook/deliver.go b/services/webhook/deliver.go index e817783e55..fd7a3d7fba 100644 --- a/services/webhook/deliver.go +++ b/services/webhook/deliver.go @@ -283,11 +283,11 @@ func Init() error { }, } - hookQueue = queue.CreateUniqueQueue("webhook_sender", handler) + hookQueue = queue.CreateUniqueQueue(graceful.GetManager().ShutdownContext(), "webhook_sender", handler) if hookQueue == nil { - return fmt.Errorf("Unable to create webhook_sender Queue") + return fmt.Errorf("unable to create webhook_sender queue") } - go graceful.GetManager().RunWithShutdownFns(hookQueue.Run) + go graceful.GetManager().RunWithCancel(hookQueue) go graceful.GetManager().RunWithShutdownContext(populateWebhookSendingQueue) |