diff options
Diffstat (limited to 'modules/graceful/manager.go')
-rw-r--r-- | modules/graceful/manager.go | 154 |
1 files changed, 82 insertions, 72 deletions
diff --git a/modules/graceful/manager.go b/modules/graceful/manager.go index 903d05ed21..8c3b95c4aa 100644 --- a/modules/graceful/manager.go +++ b/modules/graceful/manager.go @@ -54,8 +54,8 @@ func InitManager(ctx context.Context) { }) } -// CallbackWithContext is combined runnable and context to watch to see if the caller has finished -type CallbackWithContext func(ctx context.Context, callback func()) +// WithCallback is a runnable to call when the caller has finished +type WithCallback func(callback func()) // RunnableWithShutdownFns is a runnable with functions to run at shutdown and terminate // After the callback to atShutdown is called and is complete, the main function must return. @@ -63,7 +63,7 @@ type CallbackWithContext func(ctx context.Context, callback func()) // Please note that use of the atShutdown and atTerminate callbacks will create go-routines that will wait till their respective signals // - users must therefore be careful to only call these as necessary. // If run is not expected to run indefinitely RunWithShutdownChan is likely to be more appropriate. -type RunnableWithShutdownFns func(atShutdown, atTerminate func(context.Context, func())) +type RunnableWithShutdownFns func(atShutdown, atTerminate func(func())) // RunWithShutdownFns takes a function that has both atShutdown and atTerminate callbacks // After the callback to atShutdown is called and is complete, the main function must return. @@ -80,17 +80,21 @@ func (g *Manager) RunWithShutdownFns(run RunnableWithShutdownFns) { g.doShutdown() } }() - run(func(ctx context.Context, atShutdown func()) { - go func() { - select { - case <-g.IsShutdown(): + run(func(atShutdown func()) { + g.lock.Lock() + defer g.lock.Unlock() + g.toRunAtShutdown = append(g.toRunAtShutdown, + func() { + defer func() { + if err := recover(); err != nil { + log.Critical("PANIC during RunWithShutdownFns: %v\nStacktrace: %s", err, log.Stack(2)) + g.doShutdown() + } + }() atShutdown() - case <-ctx.Done(): - return - } - }() - }, func(ctx context.Context, atTerminate func()) { - g.RunAtTerminate(ctx, atTerminate) + }) + }, func(atTerminate func()) { + g.RunAtTerminate(atTerminate) }) } @@ -99,7 +103,7 @@ func (g *Manager) RunWithShutdownFns(run RunnableWithShutdownFns) { // (Optionally IsHammer may be waited for instead however, this should be avoided if possible.) // The callback function provided to atTerminate must return once termination is complete. // Please note that use of the atTerminate function will create a go-routine that will wait till terminate - users must therefore be careful to only call this as necessary. -type RunnableWithShutdownChan func(atShutdown <-chan struct{}, atTerminate CallbackWithContext) +type RunnableWithShutdownChan func(atShutdown <-chan struct{}, atTerminate WithCallback) // RunWithShutdownChan takes a function that has channel to watch for shutdown and atTerminate callbacks // After the atShutdown channel is closed, the main function must return once shutdown is complete. @@ -115,8 +119,8 @@ func (g *Manager) RunWithShutdownChan(run RunnableWithShutdownChan) { g.doShutdown() } }() - run(g.IsShutdown(), func(ctx context.Context, atTerminate func()) { - g.RunAtTerminate(ctx, atTerminate) + run(g.IsShutdown(), func(atTerminate func()) { + g.RunAtTerminate(atTerminate) }) } @@ -136,60 +140,65 @@ func (g *Manager) RunWithShutdownContext(run func(context.Context)) { } // RunAtTerminate adds to the terminate wait group and creates a go-routine to run the provided function at termination -func (g *Manager) RunAtTerminate(ctx context.Context, terminate func()) { +func (g *Manager) RunAtTerminate(terminate func()) { g.terminateWaitGroup.Add(1) - go func() { - defer g.terminateWaitGroup.Done() - defer func() { - if err := recover(); err != nil { - log.Critical("PANIC during RunAtTerminate: %v\nStacktrace: %s", err, log.Stack(2)) - } - }() - select { - case <-g.IsTerminate(): + g.lock.Lock() + defer g.lock.Unlock() + g.toRunAtTerminate = append(g.toRunAtTerminate, + func() { + defer g.terminateWaitGroup.Done() + defer func() { + if err := recover(); err != nil { + log.Critical("PANIC during RunAtTerminate: %v\nStacktrace: %s", err, log.Stack(2)) + } + }() terminate() - case <-ctx.Done(): - } - }() + }) } // RunAtShutdown creates a go-routine to run the provided function at shutdown func (g *Manager) RunAtShutdown(ctx context.Context, shutdown func()) { - go func() { - defer func() { - if err := recover(); err != nil { - log.Critical("PANIC during RunAtShutdown: %v\nStacktrace: %s", err, log.Stack(2)) + g.lock.Lock() + defer g.lock.Unlock() + g.toRunAtShutdown = append(g.toRunAtShutdown, + func() { + defer func() { + if err := recover(); err != nil { + log.Critical("PANIC during RunAtShutdown: %v\nStacktrace: %s", err, log.Stack(2)) + } + }() + select { + case <-ctx.Done(): + return + default: + shutdown() } - }() - select { - case <-g.IsShutdown(): - shutdown() - case <-ctx.Done(): - } - }() + }) } // RunAtHammer creates a go-routine to run the provided function at shutdown -func (g *Manager) RunAtHammer(ctx context.Context, hammer func()) { - go func() { - defer func() { - if err := recover(); err != nil { - log.Critical("PANIC during RunAtHammer: %v\nStacktrace: %s", err, log.Stack(2)) - } - }() - select { - case <-g.IsHammer(): +func (g *Manager) RunAtHammer(hammer func()) { + g.lock.Lock() + defer g.lock.Unlock() + g.toRunAtHammer = append(g.toRunAtHammer, + func() { + defer func() { + if err := recover(); err != nil { + log.Critical("PANIC during RunAtHammer: %v\nStacktrace: %s", err, log.Stack(2)) + } + }() hammer() - case <-ctx.Done(): - } - }() + }) } func (g *Manager) doShutdown() { if !g.setStateTransition(stateRunning, stateShuttingDown) { return } g.lock.Lock() - close(g.shutdown) + g.shutdownCtxCancel() + for _, fn := range g.toRunAtShutdown { + go fn() + } g.lock.Unlock() if setting.GracefulHammerTime >= 0 { @@ -203,7 +212,7 @@ func (g *Manager) doShutdown() { g.doTerminate() g.WaitForTerminate() g.lock.Lock() - close(g.done) + g.doneCtxCancel() g.lock.Unlock() }() } @@ -212,10 +221,13 @@ func (g *Manager) doHammerTime(d time.Duration) { time.Sleep(d) g.lock.Lock() select { - case <-g.hammer: + case <-g.hammerCtx.Done(): default: log.Warn("Setting Hammer condition") - close(g.hammer) + g.hammerCtxCancel() + for _, fn := range g.toRunAtHammer { + go fn() + } } g.lock.Unlock() } @@ -226,10 +238,13 @@ func (g *Manager) doTerminate() { } g.lock.Lock() select { - case <-g.terminate: + case <-g.terminateCtx.Done(): default: log.Warn("Terminating") - close(g.terminate) + g.terminateCtxCancel() + for _, fn := range g.toRunAtTerminate { + go fn() + } } g.lock.Unlock() } @@ -242,7 +257,7 @@ func (g *Manager) IsChild() bool { // IsShutdown returns a channel which will be closed at shutdown. // The order of closure is IsShutdown, IsHammer (potentially), IsTerminate func (g *Manager) IsShutdown() <-chan struct{} { - return g.shutdown + return g.shutdownCtx.Done() } // IsHammer returns a channel which will be closed at hammer @@ -250,14 +265,14 @@ func (g *Manager) IsShutdown() <-chan struct{} { // Servers running within the running server wait group should respond to IsHammer // if not shutdown already func (g *Manager) IsHammer() <-chan struct{} { - return g.hammer + return g.hammerCtx.Done() } // IsTerminate returns a channel which will be closed at terminate // The order of closure is IsShutdown, IsHammer (potentially), IsTerminate // IsTerminate will only close once all running servers have stopped func (g *Manager) IsTerminate() <-chan struct{} { - return g.terminate + return g.terminateCtx.Done() } // ServerDone declares a running server done and subtracts one from the @@ -314,25 +329,20 @@ func (g *Manager) InformCleanup() { // Done allows the manager to be viewed as a context.Context, it returns a channel that is closed when the server is finished terminating func (g *Manager) Done() <-chan struct{} { - return g.done + return g.doneCtx.Done() } -// Err allows the manager to be viewed as a context.Context done at Terminate, it returns ErrTerminate +// Err allows the manager to be viewed as a context.Context done at Terminate func (g *Manager) Err() error { - select { - case <-g.Done(): - return ErrTerminate - default: - return nil - } + return g.doneCtx.Err() } -// Value allows the manager to be viewed as a context.Context done at Terminate, it has no values +// Value allows the manager to be viewed as a context.Context done at Terminate func (g *Manager) Value(key interface{}) interface{} { - return nil + return g.doneCtx.Value(key) } // Deadline returns nil as there is no fixed Deadline for the manager, it allows the manager to be viewed as a context.Context func (g *Manager) Deadline() (deadline time.Time, ok bool) { - return + return g.doneCtx.Deadline() } |