diff options
author | zeripath <art27@cantab.net> | 2021-05-15 15:22:26 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-05-15 16:22:26 +0200 |
commit | ba526ceffe33a54b6015cdfbdc9bba920484dc23 (patch) | |
tree | ddd9ff13b0da7b272b5a60445a997319cb0de882 /modules/graceful | |
parent | 9f19c2b8cca9edf2ad7b8803e6ed72b1aea322a5 (diff) | |
download | gitea-ba526ceffe33a54b6015cdfbdc9bba920484dc23.tar.gz gitea-ba526ceffe33a54b6015cdfbdc9bba920484dc23.zip |
Multiple Queue improvements: LevelDB Wait on empty, shutdown empty shadow level queue, reduce goroutines etc (#15693)
* move shutdownfns, terminatefns and hammerfns out of separate goroutines
Coalesce the shutdownfns etc into a list of functions that get run at shutdown
rather then have them run at goroutines blocked on selects.
This may help reduce the background select/poll load in certain
configurations.
* The LevelDB queues can actually wait on empty instead of polling
Slight refactor to cause leveldb queues to wait on empty instead of polling.
* Shutdown the shadow level queue once it is empty
* Remove bytefifo additional goroutine for readToChan as it can just be run in run
* Remove additional removeWorkers goroutine for workers
* Simplify the AtShutdown and AtTerminate functions and add Channel Flusher
* Add shutdown flusher to CUQ
* move persistable channel shutdown stuff to Shutdown Fn
* Ensure that UPCQ has the correct config
* handle shutdown during the flushing
* reduce risk of race between zeroBoost and addWorkers
* prevent double shutdown
Signed-off-by: Andrew Thornton <art27@cantab.net>
Diffstat (limited to 'modules/graceful')
-rw-r--r-- | modules/graceful/context.go | 23 | ||||
-rw-r--r-- | modules/graceful/manager.go | 154 | ||||
-rw-r--r-- | modules/graceful/manager_unix.go | 26 | ||||
-rw-r--r-- | modules/graceful/manager_windows.go | 28 |
4 files changed, 121 insertions, 110 deletions
diff --git a/modules/graceful/context.go b/modules/graceful/context.go index 1ad1109b4e..9d955329a4 100644 --- a/modules/graceful/context.go +++ b/modules/graceful/context.go @@ -6,17 +6,9 @@ package graceful import ( "context" - "fmt" "time" ) -// Errors for context.Err() -var ( - ErrShutdown = fmt.Errorf("Graceful Manager called Shutdown") - ErrHammer = fmt.Errorf("Graceful Manager called Hammer") - ErrTerminate = fmt.Errorf("Graceful Manager called Terminate") -) - // ChannelContext is a context that wraps a channel and error as a context type ChannelContext struct { done <-chan struct{} @@ -63,28 +55,19 @@ func (ctx *ChannelContext) Value(key interface{}) interface{} { // Callers using this context should ensure that they are registered as a running server // in order that they are waited for. func (g *Manager) ShutdownContext() context.Context { - return &ChannelContext{ - done: g.IsShutdown(), - err: ErrShutdown, - } + return g.shutdownCtx } // HammerContext returns a context.Context that is Done at hammer // Callers using this context should ensure that they are registered as a running server // in order that they are waited for. func (g *Manager) HammerContext() context.Context { - return &ChannelContext{ - done: g.IsHammer(), - err: ErrHammer, - } + return g.hammerCtx } // TerminateContext returns a context.Context that is Done at terminate // Callers using this context should ensure that they are registered as a terminating server // in order that they are waited for. func (g *Manager) TerminateContext() context.Context { - return &ChannelContext{ - done: g.IsTerminate(), - err: ErrTerminate, - } + return g.terminateCtx } diff --git a/modules/graceful/manager.go b/modules/graceful/manager.go index 903d05ed21..8c3b95c4aa 100644 --- a/modules/graceful/manager.go +++ b/modules/graceful/manager.go @@ -54,8 +54,8 @@ func InitManager(ctx context.Context) { }) } -// CallbackWithContext is combined runnable and context to watch to see if the caller has finished -type CallbackWithContext func(ctx context.Context, callback func()) +// WithCallback is a runnable to call when the caller has finished +type WithCallback func(callback func()) // RunnableWithShutdownFns is a runnable with functions to run at shutdown and terminate // After the callback to atShutdown is called and is complete, the main function must return. @@ -63,7 +63,7 @@ type CallbackWithContext func(ctx context.Context, callback func()) // Please note that use of the atShutdown and atTerminate callbacks will create go-routines that will wait till their respective signals // - users must therefore be careful to only call these as necessary. // If run is not expected to run indefinitely RunWithShutdownChan is likely to be more appropriate. -type RunnableWithShutdownFns func(atShutdown, atTerminate func(context.Context, func())) +type RunnableWithShutdownFns func(atShutdown, atTerminate func(func())) // RunWithShutdownFns takes a function that has both atShutdown and atTerminate callbacks // After the callback to atShutdown is called and is complete, the main function must return. @@ -80,17 +80,21 @@ func (g *Manager) RunWithShutdownFns(run RunnableWithShutdownFns) { g.doShutdown() } }() - run(func(ctx context.Context, atShutdown func()) { - go func() { - select { - case <-g.IsShutdown(): + run(func(atShutdown func()) { + g.lock.Lock() + defer g.lock.Unlock() + g.toRunAtShutdown = append(g.toRunAtShutdown, + func() { + defer func() { + if err := recover(); err != nil { + log.Critical("PANIC during RunWithShutdownFns: %v\nStacktrace: %s", err, log.Stack(2)) + g.doShutdown() + } + }() atShutdown() - case <-ctx.Done(): - return - } - }() - }, func(ctx context.Context, atTerminate func()) { - g.RunAtTerminate(ctx, atTerminate) + }) + }, func(atTerminate func()) { + g.RunAtTerminate(atTerminate) }) } @@ -99,7 +103,7 @@ func (g *Manager) RunWithShutdownFns(run RunnableWithShutdownFns) { // (Optionally IsHammer may be waited for instead however, this should be avoided if possible.) // The callback function provided to atTerminate must return once termination is complete. // Please note that use of the atTerminate function will create a go-routine that will wait till terminate - users must therefore be careful to only call this as necessary. -type RunnableWithShutdownChan func(atShutdown <-chan struct{}, atTerminate CallbackWithContext) +type RunnableWithShutdownChan func(atShutdown <-chan struct{}, atTerminate WithCallback) // RunWithShutdownChan takes a function that has channel to watch for shutdown and atTerminate callbacks // After the atShutdown channel is closed, the main function must return once shutdown is complete. @@ -115,8 +119,8 @@ func (g *Manager) RunWithShutdownChan(run RunnableWithShutdownChan) { g.doShutdown() } }() - run(g.IsShutdown(), func(ctx context.Context, atTerminate func()) { - g.RunAtTerminate(ctx, atTerminate) + run(g.IsShutdown(), func(atTerminate func()) { + g.RunAtTerminate(atTerminate) }) } @@ -136,60 +140,65 @@ func (g *Manager) RunWithShutdownContext(run func(context.Context)) { } // RunAtTerminate adds to the terminate wait group and creates a go-routine to run the provided function at termination -func (g *Manager) RunAtTerminate(ctx context.Context, terminate func()) { +func (g *Manager) RunAtTerminate(terminate func()) { g.terminateWaitGroup.Add(1) - go func() { - defer g.terminateWaitGroup.Done() - defer func() { - if err := recover(); err != nil { - log.Critical("PANIC during RunAtTerminate: %v\nStacktrace: %s", err, log.Stack(2)) - } - }() - select { - case <-g.IsTerminate(): + g.lock.Lock() + defer g.lock.Unlock() + g.toRunAtTerminate = append(g.toRunAtTerminate, + func() { + defer g.terminateWaitGroup.Done() + defer func() { + if err := recover(); err != nil { + log.Critical("PANIC during RunAtTerminate: %v\nStacktrace: %s", err, log.Stack(2)) + } + }() terminate() - case <-ctx.Done(): - } - }() + }) } // RunAtShutdown creates a go-routine to run the provided function at shutdown func (g *Manager) RunAtShutdown(ctx context.Context, shutdown func()) { - go func() { - defer func() { - if err := recover(); err != nil { - log.Critical("PANIC during RunAtShutdown: %v\nStacktrace: %s", err, log.Stack(2)) + g.lock.Lock() + defer g.lock.Unlock() + g.toRunAtShutdown = append(g.toRunAtShutdown, + func() { + defer func() { + if err := recover(); err != nil { + log.Critical("PANIC during RunAtShutdown: %v\nStacktrace: %s", err, log.Stack(2)) + } + }() + select { + case <-ctx.Done(): + return + default: + shutdown() } - }() - select { - case <-g.IsShutdown(): - shutdown() - case <-ctx.Done(): - } - }() + }) } // RunAtHammer creates a go-routine to run the provided function at shutdown -func (g *Manager) RunAtHammer(ctx context.Context, hammer func()) { - go func() { - defer func() { - if err := recover(); err != nil { - log.Critical("PANIC during RunAtHammer: %v\nStacktrace: %s", err, log.Stack(2)) - } - }() - select { - case <-g.IsHammer(): +func (g *Manager) RunAtHammer(hammer func()) { + g.lock.Lock() + defer g.lock.Unlock() + g.toRunAtHammer = append(g.toRunAtHammer, + func() { + defer func() { + if err := recover(); err != nil { + log.Critical("PANIC during RunAtHammer: %v\nStacktrace: %s", err, log.Stack(2)) + } + }() hammer() - case <-ctx.Done(): - } - }() + }) } func (g *Manager) doShutdown() { if !g.setStateTransition(stateRunning, stateShuttingDown) { return } g.lock.Lock() - close(g.shutdown) + g.shutdownCtxCancel() + for _, fn := range g.toRunAtShutdown { + go fn() + } g.lock.Unlock() if setting.GracefulHammerTime >= 0 { @@ -203,7 +212,7 @@ func (g *Manager) doShutdown() { g.doTerminate() g.WaitForTerminate() g.lock.Lock() - close(g.done) + g.doneCtxCancel() g.lock.Unlock() }() } @@ -212,10 +221,13 @@ func (g *Manager) doHammerTime(d time.Duration) { time.Sleep(d) g.lock.Lock() select { - case <-g.hammer: + case <-g.hammerCtx.Done(): default: log.Warn("Setting Hammer condition") - close(g.hammer) + g.hammerCtxCancel() + for _, fn := range g.toRunAtHammer { + go fn() + } } g.lock.Unlock() } @@ -226,10 +238,13 @@ func (g *Manager) doTerminate() { } g.lock.Lock() select { - case <-g.terminate: + case <-g.terminateCtx.Done(): default: log.Warn("Terminating") - close(g.terminate) + g.terminateCtxCancel() + for _, fn := range g.toRunAtTerminate { + go fn() + } } g.lock.Unlock() } @@ -242,7 +257,7 @@ func (g *Manager) IsChild() bool { // IsShutdown returns a channel which will be closed at shutdown. // The order of closure is IsShutdown, IsHammer (potentially), IsTerminate func (g *Manager) IsShutdown() <-chan struct{} { - return g.shutdown + return g.shutdownCtx.Done() } // IsHammer returns a channel which will be closed at hammer @@ -250,14 +265,14 @@ func (g *Manager) IsShutdown() <-chan struct{} { // Servers running within the running server wait group should respond to IsHammer // if not shutdown already func (g *Manager) IsHammer() <-chan struct{} { - return g.hammer + return g.hammerCtx.Done() } // IsTerminate returns a channel which will be closed at terminate // The order of closure is IsShutdown, IsHammer (potentially), IsTerminate // IsTerminate will only close once all running servers have stopped func (g *Manager) IsTerminate() <-chan struct{} { - return g.terminate + return g.terminateCtx.Done() } // ServerDone declares a running server done and subtracts one from the @@ -314,25 +329,20 @@ func (g *Manager) InformCleanup() { // Done allows the manager to be viewed as a context.Context, it returns a channel that is closed when the server is finished terminating func (g *Manager) Done() <-chan struct{} { - return g.done + return g.doneCtx.Done() } -// Err allows the manager to be viewed as a context.Context done at Terminate, it returns ErrTerminate +// Err allows the manager to be viewed as a context.Context done at Terminate func (g *Manager) Err() error { - select { - case <-g.Done(): - return ErrTerminate - default: - return nil - } + return g.doneCtx.Err() } -// Value allows the manager to be viewed as a context.Context done at Terminate, it has no values +// Value allows the manager to be viewed as a context.Context done at Terminate func (g *Manager) Value(key interface{}) interface{} { - return nil + return g.doneCtx.Value(key) } // Deadline returns nil as there is no fixed Deadline for the manager, it allows the manager to be viewed as a context.Context func (g *Manager) Deadline() (deadline time.Time, ok bool) { - return + return g.doneCtx.Deadline() } diff --git a/modules/graceful/manager_unix.go b/modules/graceful/manager_unix.go index 540974454c..20d9b3905c 100644 --- a/modules/graceful/manager_unix.go +++ b/modules/graceful/manager_unix.go @@ -25,13 +25,21 @@ type Manager struct { forked bool lock *sync.RWMutex state state - shutdown chan struct{} - hammer chan struct{} - terminate chan struct{} - done chan struct{} + shutdownCtx context.Context + hammerCtx context.Context + terminateCtx context.Context + doneCtx context.Context + shutdownCtxCancel context.CancelFunc + hammerCtxCancel context.CancelFunc + terminateCtxCancel context.CancelFunc + doneCtxCancel context.CancelFunc runningServerWaitGroup sync.WaitGroup createServerWaitGroup sync.WaitGroup terminateWaitGroup sync.WaitGroup + + toRunAtShutdown []func() + toRunAtHammer []func() + toRunAtTerminate []func() } func newGracefulManager(ctx context.Context) *Manager { @@ -45,11 +53,11 @@ func newGracefulManager(ctx context.Context) *Manager { } func (g *Manager) start(ctx context.Context) { - // Make channels - g.terminate = make(chan struct{}) - g.shutdown = make(chan struct{}) - g.hammer = make(chan struct{}) - g.done = make(chan struct{}) + // Make contexts + g.terminateCtx, g.terminateCtxCancel = context.WithCancel(ctx) + g.shutdownCtx, g.shutdownCtxCancel = context.WithCancel(ctx) + g.hammerCtx, g.hammerCtxCancel = context.WithCancel(ctx) + g.doneCtx, g.doneCtxCancel = context.WithCancel(ctx) // Set the running state & handle signals g.setState(stateRunning) diff --git a/modules/graceful/manager_windows.go b/modules/graceful/manager_windows.go index 14923c2a9b..51f29778ba 100644 --- a/modules/graceful/manager_windows.go +++ b/modules/graceful/manager_windows.go @@ -36,14 +36,22 @@ type Manager struct { isChild bool lock *sync.RWMutex state state - shutdown chan struct{} - hammer chan struct{} - terminate chan struct{} - done chan struct{} + shutdownCtx context.Context + hammerCtx context.Context + terminateCtx context.Context + doneCtx context.Context + shutdownCtxCancel context.CancelFunc + hammerCtxCancel context.CancelFunc + terminateCtxCancel context.CancelFunc + doneCtxCancel context.CancelFunc runningServerWaitGroup sync.WaitGroup createServerWaitGroup sync.WaitGroup terminateWaitGroup sync.WaitGroup shutdownRequested chan struct{} + + toRunAtShutdown []func() + toRunAtHammer []func() + toRunAtTerminate []func() } func newGracefulManager(ctx context.Context) *Manager { @@ -58,11 +66,13 @@ func newGracefulManager(ctx context.Context) *Manager { } func (g *Manager) start() { + // Make contexts + g.terminateCtx, g.terminateCtxCancel = context.WithCancel(g.ctx) + g.shutdownCtx, g.shutdownCtxCancel = context.WithCancel(g.ctx) + g.hammerCtx, g.hammerCtxCancel = context.WithCancel(g.ctx) + g.doneCtx, g.doneCtxCancel = context.WithCancel(g.ctx) + // Make channels - g.terminate = make(chan struct{}) - g.shutdown = make(chan struct{}) - g.hammer = make(chan struct{}) - g.done = make(chan struct{}) g.shutdownRequested = make(chan struct{}) // Set the running state @@ -171,7 +181,7 @@ hammerLoop: default: log.Debug("Unexpected control request: %v", change.Cmd) } - case <-g.hammer: + case <-g.hammerCtx.Done(): break hammerLoop } } |