aboutsummaryrefslogtreecommitdiffstats
path: root/modules/graceful
diff options
context:
space:
mode:
authorzeripath <art27@cantab.net>2021-05-15 15:22:26 +0100
committerGitHub <noreply@github.com>2021-05-15 16:22:26 +0200
commitba526ceffe33a54b6015cdfbdc9bba920484dc23 (patch)
treeddd9ff13b0da7b272b5a60445a997319cb0de882 /modules/graceful
parent9f19c2b8cca9edf2ad7b8803e6ed72b1aea322a5 (diff)
downloadgitea-ba526ceffe33a54b6015cdfbdc9bba920484dc23.tar.gz
gitea-ba526ceffe33a54b6015cdfbdc9bba920484dc23.zip
Multiple Queue improvements: LevelDB Wait on empty, shutdown empty shadow level queue, reduce goroutines etc (#15693)
* move shutdownfns, terminatefns and hammerfns out of separate goroutines Coalesce the shutdownfns etc into a list of functions that get run at shutdown rather then have them run at goroutines blocked on selects. This may help reduce the background select/poll load in certain configurations. * The LevelDB queues can actually wait on empty instead of polling Slight refactor to cause leveldb queues to wait on empty instead of polling. * Shutdown the shadow level queue once it is empty * Remove bytefifo additional goroutine for readToChan as it can just be run in run * Remove additional removeWorkers goroutine for workers * Simplify the AtShutdown and AtTerminate functions and add Channel Flusher * Add shutdown flusher to CUQ * move persistable channel shutdown stuff to Shutdown Fn * Ensure that UPCQ has the correct config * handle shutdown during the flushing * reduce risk of race between zeroBoost and addWorkers * prevent double shutdown Signed-off-by: Andrew Thornton <art27@cantab.net>
Diffstat (limited to 'modules/graceful')
-rw-r--r--modules/graceful/context.go23
-rw-r--r--modules/graceful/manager.go154
-rw-r--r--modules/graceful/manager_unix.go26
-rw-r--r--modules/graceful/manager_windows.go28
4 files changed, 121 insertions, 110 deletions
diff --git a/modules/graceful/context.go b/modules/graceful/context.go
index 1ad1109b4e..9d955329a4 100644
--- a/modules/graceful/context.go
+++ b/modules/graceful/context.go
@@ -6,17 +6,9 @@ package graceful
import (
"context"
- "fmt"
"time"
)
-// Errors for context.Err()
-var (
- ErrShutdown = fmt.Errorf("Graceful Manager called Shutdown")
- ErrHammer = fmt.Errorf("Graceful Manager called Hammer")
- ErrTerminate = fmt.Errorf("Graceful Manager called Terminate")
-)
-
// ChannelContext is a context that wraps a channel and error as a context
type ChannelContext struct {
done <-chan struct{}
@@ -63,28 +55,19 @@ func (ctx *ChannelContext) Value(key interface{}) interface{} {
// Callers using this context should ensure that they are registered as a running server
// in order that they are waited for.
func (g *Manager) ShutdownContext() context.Context {
- return &ChannelContext{
- done: g.IsShutdown(),
- err: ErrShutdown,
- }
+ return g.shutdownCtx
}
// HammerContext returns a context.Context that is Done at hammer
// Callers using this context should ensure that they are registered as a running server
// in order that they are waited for.
func (g *Manager) HammerContext() context.Context {
- return &ChannelContext{
- done: g.IsHammer(),
- err: ErrHammer,
- }
+ return g.hammerCtx
}
// TerminateContext returns a context.Context that is Done at terminate
// Callers using this context should ensure that they are registered as a terminating server
// in order that they are waited for.
func (g *Manager) TerminateContext() context.Context {
- return &ChannelContext{
- done: g.IsTerminate(),
- err: ErrTerminate,
- }
+ return g.terminateCtx
}
diff --git a/modules/graceful/manager.go b/modules/graceful/manager.go
index 903d05ed21..8c3b95c4aa 100644
--- a/modules/graceful/manager.go
+++ b/modules/graceful/manager.go
@@ -54,8 +54,8 @@ func InitManager(ctx context.Context) {
})
}
-// CallbackWithContext is combined runnable and context to watch to see if the caller has finished
-type CallbackWithContext func(ctx context.Context, callback func())
+// WithCallback is a runnable to call when the caller has finished
+type WithCallback func(callback func())
// RunnableWithShutdownFns is a runnable with functions to run at shutdown and terminate
// After the callback to atShutdown is called and is complete, the main function must return.
@@ -63,7 +63,7 @@ type CallbackWithContext func(ctx context.Context, callback func())
// Please note that use of the atShutdown and atTerminate callbacks will create go-routines that will wait till their respective signals
// - users must therefore be careful to only call these as necessary.
// If run is not expected to run indefinitely RunWithShutdownChan is likely to be more appropriate.
-type RunnableWithShutdownFns func(atShutdown, atTerminate func(context.Context, func()))
+type RunnableWithShutdownFns func(atShutdown, atTerminate func(func()))
// RunWithShutdownFns takes a function that has both atShutdown and atTerminate callbacks
// After the callback to atShutdown is called and is complete, the main function must return.
@@ -80,17 +80,21 @@ func (g *Manager) RunWithShutdownFns(run RunnableWithShutdownFns) {
g.doShutdown()
}
}()
- run(func(ctx context.Context, atShutdown func()) {
- go func() {
- select {
- case <-g.IsShutdown():
+ run(func(atShutdown func()) {
+ g.lock.Lock()
+ defer g.lock.Unlock()
+ g.toRunAtShutdown = append(g.toRunAtShutdown,
+ func() {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Critical("PANIC during RunWithShutdownFns: %v\nStacktrace: %s", err, log.Stack(2))
+ g.doShutdown()
+ }
+ }()
atShutdown()
- case <-ctx.Done():
- return
- }
- }()
- }, func(ctx context.Context, atTerminate func()) {
- g.RunAtTerminate(ctx, atTerminate)
+ })
+ }, func(atTerminate func()) {
+ g.RunAtTerminate(atTerminate)
})
}
@@ -99,7 +103,7 @@ func (g *Manager) RunWithShutdownFns(run RunnableWithShutdownFns) {
// (Optionally IsHammer may be waited for instead however, this should be avoided if possible.)
// The callback function provided to atTerminate must return once termination is complete.
// Please note that use of the atTerminate function will create a go-routine that will wait till terminate - users must therefore be careful to only call this as necessary.
-type RunnableWithShutdownChan func(atShutdown <-chan struct{}, atTerminate CallbackWithContext)
+type RunnableWithShutdownChan func(atShutdown <-chan struct{}, atTerminate WithCallback)
// RunWithShutdownChan takes a function that has channel to watch for shutdown and atTerminate callbacks
// After the atShutdown channel is closed, the main function must return once shutdown is complete.
@@ -115,8 +119,8 @@ func (g *Manager) RunWithShutdownChan(run RunnableWithShutdownChan) {
g.doShutdown()
}
}()
- run(g.IsShutdown(), func(ctx context.Context, atTerminate func()) {
- g.RunAtTerminate(ctx, atTerminate)
+ run(g.IsShutdown(), func(atTerminate func()) {
+ g.RunAtTerminate(atTerminate)
})
}
@@ -136,60 +140,65 @@ func (g *Manager) RunWithShutdownContext(run func(context.Context)) {
}
// RunAtTerminate adds to the terminate wait group and creates a go-routine to run the provided function at termination
-func (g *Manager) RunAtTerminate(ctx context.Context, terminate func()) {
+func (g *Manager) RunAtTerminate(terminate func()) {
g.terminateWaitGroup.Add(1)
- go func() {
- defer g.terminateWaitGroup.Done()
- defer func() {
- if err := recover(); err != nil {
- log.Critical("PANIC during RunAtTerminate: %v\nStacktrace: %s", err, log.Stack(2))
- }
- }()
- select {
- case <-g.IsTerminate():
+ g.lock.Lock()
+ defer g.lock.Unlock()
+ g.toRunAtTerminate = append(g.toRunAtTerminate,
+ func() {
+ defer g.terminateWaitGroup.Done()
+ defer func() {
+ if err := recover(); err != nil {
+ log.Critical("PANIC during RunAtTerminate: %v\nStacktrace: %s", err, log.Stack(2))
+ }
+ }()
terminate()
- case <-ctx.Done():
- }
- }()
+ })
}
// RunAtShutdown creates a go-routine to run the provided function at shutdown
func (g *Manager) RunAtShutdown(ctx context.Context, shutdown func()) {
- go func() {
- defer func() {
- if err := recover(); err != nil {
- log.Critical("PANIC during RunAtShutdown: %v\nStacktrace: %s", err, log.Stack(2))
+ g.lock.Lock()
+ defer g.lock.Unlock()
+ g.toRunAtShutdown = append(g.toRunAtShutdown,
+ func() {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Critical("PANIC during RunAtShutdown: %v\nStacktrace: %s", err, log.Stack(2))
+ }
+ }()
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ shutdown()
}
- }()
- select {
- case <-g.IsShutdown():
- shutdown()
- case <-ctx.Done():
- }
- }()
+ })
}
// RunAtHammer creates a go-routine to run the provided function at shutdown
-func (g *Manager) RunAtHammer(ctx context.Context, hammer func()) {
- go func() {
- defer func() {
- if err := recover(); err != nil {
- log.Critical("PANIC during RunAtHammer: %v\nStacktrace: %s", err, log.Stack(2))
- }
- }()
- select {
- case <-g.IsHammer():
+func (g *Manager) RunAtHammer(hammer func()) {
+ g.lock.Lock()
+ defer g.lock.Unlock()
+ g.toRunAtHammer = append(g.toRunAtHammer,
+ func() {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Critical("PANIC during RunAtHammer: %v\nStacktrace: %s", err, log.Stack(2))
+ }
+ }()
hammer()
- case <-ctx.Done():
- }
- }()
+ })
}
func (g *Manager) doShutdown() {
if !g.setStateTransition(stateRunning, stateShuttingDown) {
return
}
g.lock.Lock()
- close(g.shutdown)
+ g.shutdownCtxCancel()
+ for _, fn := range g.toRunAtShutdown {
+ go fn()
+ }
g.lock.Unlock()
if setting.GracefulHammerTime >= 0 {
@@ -203,7 +212,7 @@ func (g *Manager) doShutdown() {
g.doTerminate()
g.WaitForTerminate()
g.lock.Lock()
- close(g.done)
+ g.doneCtxCancel()
g.lock.Unlock()
}()
}
@@ -212,10 +221,13 @@ func (g *Manager) doHammerTime(d time.Duration) {
time.Sleep(d)
g.lock.Lock()
select {
- case <-g.hammer:
+ case <-g.hammerCtx.Done():
default:
log.Warn("Setting Hammer condition")
- close(g.hammer)
+ g.hammerCtxCancel()
+ for _, fn := range g.toRunAtHammer {
+ go fn()
+ }
}
g.lock.Unlock()
}
@@ -226,10 +238,13 @@ func (g *Manager) doTerminate() {
}
g.lock.Lock()
select {
- case <-g.terminate:
+ case <-g.terminateCtx.Done():
default:
log.Warn("Terminating")
- close(g.terminate)
+ g.terminateCtxCancel()
+ for _, fn := range g.toRunAtTerminate {
+ go fn()
+ }
}
g.lock.Unlock()
}
@@ -242,7 +257,7 @@ func (g *Manager) IsChild() bool {
// IsShutdown returns a channel which will be closed at shutdown.
// The order of closure is IsShutdown, IsHammer (potentially), IsTerminate
func (g *Manager) IsShutdown() <-chan struct{} {
- return g.shutdown
+ return g.shutdownCtx.Done()
}
// IsHammer returns a channel which will be closed at hammer
@@ -250,14 +265,14 @@ func (g *Manager) IsShutdown() <-chan struct{} {
// Servers running within the running server wait group should respond to IsHammer
// if not shutdown already
func (g *Manager) IsHammer() <-chan struct{} {
- return g.hammer
+ return g.hammerCtx.Done()
}
// IsTerminate returns a channel which will be closed at terminate
// The order of closure is IsShutdown, IsHammer (potentially), IsTerminate
// IsTerminate will only close once all running servers have stopped
func (g *Manager) IsTerminate() <-chan struct{} {
- return g.terminate
+ return g.terminateCtx.Done()
}
// ServerDone declares a running server done and subtracts one from the
@@ -314,25 +329,20 @@ func (g *Manager) InformCleanup() {
// Done allows the manager to be viewed as a context.Context, it returns a channel that is closed when the server is finished terminating
func (g *Manager) Done() <-chan struct{} {
- return g.done
+ return g.doneCtx.Done()
}
-// Err allows the manager to be viewed as a context.Context done at Terminate, it returns ErrTerminate
+// Err allows the manager to be viewed as a context.Context done at Terminate
func (g *Manager) Err() error {
- select {
- case <-g.Done():
- return ErrTerminate
- default:
- return nil
- }
+ return g.doneCtx.Err()
}
-// Value allows the manager to be viewed as a context.Context done at Terminate, it has no values
+// Value allows the manager to be viewed as a context.Context done at Terminate
func (g *Manager) Value(key interface{}) interface{} {
- return nil
+ return g.doneCtx.Value(key)
}
// Deadline returns nil as there is no fixed Deadline for the manager, it allows the manager to be viewed as a context.Context
func (g *Manager) Deadline() (deadline time.Time, ok bool) {
- return
+ return g.doneCtx.Deadline()
}
diff --git a/modules/graceful/manager_unix.go b/modules/graceful/manager_unix.go
index 540974454c..20d9b3905c 100644
--- a/modules/graceful/manager_unix.go
+++ b/modules/graceful/manager_unix.go
@@ -25,13 +25,21 @@ type Manager struct {
forked bool
lock *sync.RWMutex
state state
- shutdown chan struct{}
- hammer chan struct{}
- terminate chan struct{}
- done chan struct{}
+ shutdownCtx context.Context
+ hammerCtx context.Context
+ terminateCtx context.Context
+ doneCtx context.Context
+ shutdownCtxCancel context.CancelFunc
+ hammerCtxCancel context.CancelFunc
+ terminateCtxCancel context.CancelFunc
+ doneCtxCancel context.CancelFunc
runningServerWaitGroup sync.WaitGroup
createServerWaitGroup sync.WaitGroup
terminateWaitGroup sync.WaitGroup
+
+ toRunAtShutdown []func()
+ toRunAtHammer []func()
+ toRunAtTerminate []func()
}
func newGracefulManager(ctx context.Context) *Manager {
@@ -45,11 +53,11 @@ func newGracefulManager(ctx context.Context) *Manager {
}
func (g *Manager) start(ctx context.Context) {
- // Make channels
- g.terminate = make(chan struct{})
- g.shutdown = make(chan struct{})
- g.hammer = make(chan struct{})
- g.done = make(chan struct{})
+ // Make contexts
+ g.terminateCtx, g.terminateCtxCancel = context.WithCancel(ctx)
+ g.shutdownCtx, g.shutdownCtxCancel = context.WithCancel(ctx)
+ g.hammerCtx, g.hammerCtxCancel = context.WithCancel(ctx)
+ g.doneCtx, g.doneCtxCancel = context.WithCancel(ctx)
// Set the running state & handle signals
g.setState(stateRunning)
diff --git a/modules/graceful/manager_windows.go b/modules/graceful/manager_windows.go
index 14923c2a9b..51f29778ba 100644
--- a/modules/graceful/manager_windows.go
+++ b/modules/graceful/manager_windows.go
@@ -36,14 +36,22 @@ type Manager struct {
isChild bool
lock *sync.RWMutex
state state
- shutdown chan struct{}
- hammer chan struct{}
- terminate chan struct{}
- done chan struct{}
+ shutdownCtx context.Context
+ hammerCtx context.Context
+ terminateCtx context.Context
+ doneCtx context.Context
+ shutdownCtxCancel context.CancelFunc
+ hammerCtxCancel context.CancelFunc
+ terminateCtxCancel context.CancelFunc
+ doneCtxCancel context.CancelFunc
runningServerWaitGroup sync.WaitGroup
createServerWaitGroup sync.WaitGroup
terminateWaitGroup sync.WaitGroup
shutdownRequested chan struct{}
+
+ toRunAtShutdown []func()
+ toRunAtHammer []func()
+ toRunAtTerminate []func()
}
func newGracefulManager(ctx context.Context) *Manager {
@@ -58,11 +66,13 @@ func newGracefulManager(ctx context.Context) *Manager {
}
func (g *Manager) start() {
+ // Make contexts
+ g.terminateCtx, g.terminateCtxCancel = context.WithCancel(g.ctx)
+ g.shutdownCtx, g.shutdownCtxCancel = context.WithCancel(g.ctx)
+ g.hammerCtx, g.hammerCtxCancel = context.WithCancel(g.ctx)
+ g.doneCtx, g.doneCtxCancel = context.WithCancel(g.ctx)
+
// Make channels
- g.terminate = make(chan struct{})
- g.shutdown = make(chan struct{})
- g.hammer = make(chan struct{})
- g.done = make(chan struct{})
g.shutdownRequested = make(chan struct{})
// Set the running state
@@ -171,7 +181,7 @@ hammerLoop:
default:
log.Debug("Unexpected control request: %v", change.Cmd)
}
- case <-g.hammer:
+ case <-g.hammerCtx.Done():
break hammerLoop
}
}