diff options
author | zeripath <art27@cantab.net> | 2019-12-15 09:51:28 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-12-15 09:51:28 +0000 |
commit | e3c3b33ea7a5a223e22688c3f0eb2d3dab9f991c (patch) | |
tree | 21dcdc6ec138a502590550672ac0a11f364935ea /modules/graceful | |
parent | 8bea92c3dc162e24f6dcc2902dfed5ab94576826 (diff) | |
download | gitea-e3c3b33ea7a5a223e22688c3f0eb2d3dab9f991c.tar.gz gitea-e3c3b33ea7a5a223e22688c3f0eb2d3dab9f991c.zip |
Graceful: Xorm, RepoIndexer, Cron and Others (#9282)
* Change graceful to use a singleton obtained through GetManager instead of a global.
* Graceful: Make TestPullRequests shutdownable
* Graceful: Make the cron tasks graceful
* Graceful: AddTestPullRequest run in graceful ctx
* Graceful: SyncMirrors shutdown
* Graceful: SetDefaultContext for Xorm to be HammerContext
* Avoid starting graceful for migrate commands and checkout
* Graceful: DeliverHooks now can be shutdown
* Fix multiple syncing errors in modules/sync/UniqueQueue & Make UniqueQueue closable
* Begin the process of making the repo indexer shutdown gracefully
Diffstat (limited to 'modules/graceful')
-rw-r--r-- | modules/graceful/context.go | 6 | ||||
-rw-r--r-- | modules/graceful/manager.go | 140 | ||||
-rw-r--r-- | modules/graceful/manager_unix.go | 45 | ||||
-rw-r--r-- | modules/graceful/manager_windows.go | 31 | ||||
-rw-r--r-- | modules/graceful/net_unix.go | 2 | ||||
-rw-r--r-- | modules/graceful/restart_unix.go | 2 | ||||
-rw-r--r-- | modules/graceful/server.go | 6 | ||||
-rw-r--r-- | modules/graceful/server_hooks.go | 6 |
8 files changed, 143 insertions, 95 deletions
diff --git a/modules/graceful/context.go b/modules/graceful/context.go index a4a4df7dea..1ad1109b4e 100644 --- a/modules/graceful/context.go +++ b/modules/graceful/context.go @@ -62,7 +62,7 @@ func (ctx *ChannelContext) Value(key interface{}) interface{} { // ShutdownContext returns a context.Context that is Done at shutdown // Callers using this context should ensure that they are registered as a running server // in order that they are waited for. -func (g *gracefulManager) ShutdownContext() context.Context { +func (g *Manager) ShutdownContext() context.Context { return &ChannelContext{ done: g.IsShutdown(), err: ErrShutdown, @@ -72,7 +72,7 @@ func (g *gracefulManager) ShutdownContext() context.Context { // HammerContext returns a context.Context that is Done at hammer // Callers using this context should ensure that they are registered as a running server // in order that they are waited for. -func (g *gracefulManager) HammerContext() context.Context { +func (g *Manager) HammerContext() context.Context { return &ChannelContext{ done: g.IsHammer(), err: ErrHammer, @@ -82,7 +82,7 @@ func (g *gracefulManager) HammerContext() context.Context { // TerminateContext returns a context.Context that is Done at terminate // Callers using this context should ensure that they are registered as a terminating server // in order that they are waited for. -func (g *gracefulManager) TerminateContext() context.Context { +func (g *Manager) TerminateContext() context.Context { return &ChannelContext{ done: g.IsTerminate(), err: ErrTerminate, diff --git a/modules/graceful/manager.go b/modules/graceful/manager.go index b9a56ca9c6..eec675e297 100644 --- a/modules/graceful/manager.go +++ b/modules/graceful/manager.go @@ -6,9 +6,9 @@ package graceful import ( "context" + "sync" "time" - "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/process" "code.gitea.io/gitea/modules/setting" @@ -34,14 +34,24 @@ const ( const numberOfServersToCreate = 3 // Manager represents the graceful server manager interface -var Manager *gracefulManager - -func init() { - Manager = newGracefulManager(context.Background()) - // Set the git default context to the HammerContext - git.DefaultContext = Manager.HammerContext() - // Set the process default context to the HammerContext - process.DefaultContext = Manager.HammerContext() +var manager *Manager + +var initOnce = sync.Once{} + +// GetManager returns the Manager +func GetManager() *Manager { + InitManager(context.Background()) + return manager +} + +// InitManager creates the graceful manager in the provided context +func InitManager(ctx context.Context) { + initOnce.Do(func() { + manager = newGracefulManager(ctx) + + // Set the process default context to the HammerContext + process.DefaultContext = manager.HammerContext() + }) } // CallbackWithContext is combined runnable and context to watch to see if the caller has finished @@ -61,7 +71,7 @@ type RunnableWithShutdownFns func(atShutdown, atTerminate func(context.Context, // Please note that use of the atShutdown and atTerminate callbacks will create go-routines that will wait till their respective signals // - users must therefore be careful to only call these as necessary. // If run is not expected to run indefinitely RunWithShutdownChan is likely to be more appropriate. -func (g *gracefulManager) RunWithShutdownFns(run RunnableWithShutdownFns) { +func (g *Manager) RunWithShutdownFns(run RunnableWithShutdownFns) { g.runningServerWaitGroup.Add(1) defer g.runningServerWaitGroup.Done() run(func(ctx context.Context, atShutdown func()) { @@ -90,7 +100,7 @@ type RunnableWithShutdownChan func(atShutdown <-chan struct{}, atTerminate Callb // (Optionally IsHammer may be waited for instead however, this should be avoided if possible.) // The callback function provided to atTerminate must return once termination is complete. // Please note that use of the atTerminate function will create a go-routine that will wait till terminate - users must therefore be careful to only call this as necessary. -func (g *gracefulManager) RunWithShutdownChan(run RunnableWithShutdownChan) { +func (g *Manager) RunWithShutdownChan(run RunnableWithShutdownChan) { g.runningServerWaitGroup.Add(1) defer g.runningServerWaitGroup.Done() run(g.IsShutdown(), func(ctx context.Context, atTerminate func()) { @@ -101,14 +111,14 @@ func (g *gracefulManager) RunWithShutdownChan(run RunnableWithShutdownChan) { // RunWithShutdownContext takes a function that has a context to watch for shutdown. // After the provided context is Done(), the main function must return once shutdown is complete. // (Optionally the HammerContext may be obtained and waited for however, this should be avoided if possible.) -func (g *gracefulManager) RunWithShutdownContext(run func(context.Context)) { +func (g *Manager) RunWithShutdownContext(run func(context.Context)) { g.runningServerWaitGroup.Add(1) defer g.runningServerWaitGroup.Done() run(g.ShutdownContext()) } // RunAtTerminate adds to the terminate wait group and creates a go-routine to run the provided function at termination -func (g *gracefulManager) RunAtTerminate(ctx context.Context, terminate func()) { +func (g *Manager) RunAtTerminate(ctx context.Context, terminate func()) { g.terminateWaitGroup.Add(1) go func() { select { @@ -121,7 +131,7 @@ func (g *gracefulManager) RunAtTerminate(ctx context.Context, terminate func()) } // RunAtShutdown creates a go-routine to run the provided function at shutdown -func (g *gracefulManager) RunAtShutdown(ctx context.Context, shutdown func()) { +func (g *Manager) RunAtShutdown(ctx context.Context, shutdown func()) { go func() { select { case <-g.IsShutdown(): @@ -132,7 +142,7 @@ func (g *gracefulManager) RunAtShutdown(ctx context.Context, shutdown func()) { } // RunAtHammer creates a go-routine to run the provided function at shutdown -func (g *gracefulManager) RunAtHammer(ctx context.Context, hammer func()) { +func (g *Manager) RunAtHammer(ctx context.Context, hammer func()) { go func() { select { case <-g.IsHammer(): @@ -141,7 +151,7 @@ func (g *gracefulManager) RunAtHammer(ctx context.Context, hammer func()) { } }() } -func (g *gracefulManager) doShutdown() { +func (g *Manager) doShutdown() { if !g.setStateTransition(stateRunning, stateShuttingDown) { return } @@ -158,48 +168,47 @@ func (g *gracefulManager) doShutdown() { g.doHammerTime(0) <-time.After(1 * time.Second) g.doTerminate() + g.WaitForTerminate() + g.lock.Lock() + close(g.done) + g.lock.Unlock() }() } -func (g *gracefulManager) doHammerTime(d time.Duration) { +func (g *Manager) doHammerTime(d time.Duration) { time.Sleep(d) + g.lock.Lock() select { case <-g.hammer: default: log.Warn("Setting Hammer condition") close(g.hammer) } - + g.lock.Unlock() } -func (g *gracefulManager) doTerminate() { +func (g *Manager) doTerminate() { if !g.setStateTransition(stateShuttingDown, stateTerminate) { return } g.lock.Lock() - close(g.terminate) + select { + case <-g.terminate: + default: + log.Warn("Terminating") + close(g.terminate) + } g.lock.Unlock() } // IsChild returns if the current process is a child of previous Gitea process -func (g *gracefulManager) IsChild() bool { +func (g *Manager) IsChild() bool { return g.isChild } // IsShutdown returns a channel which will be closed at shutdown. // The order of closure is IsShutdown, IsHammer (potentially), IsTerminate -func (g *gracefulManager) IsShutdown() <-chan struct{} { - g.lock.RLock() - if g.shutdown == nil { - g.lock.RUnlock() - g.lock.Lock() - if g.shutdown == nil { - g.shutdown = make(chan struct{}) - } - defer g.lock.Unlock() - return g.shutdown - } - defer g.lock.RUnlock() +func (g *Manager) IsShutdown() <-chan struct{} { return g.shutdown } @@ -207,65 +216,43 @@ func (g *gracefulManager) IsShutdown() <-chan struct{} { // The order of closure is IsShutdown, IsHammer (potentially), IsTerminate // Servers running within the running server wait group should respond to IsHammer // if not shutdown already -func (g *gracefulManager) IsHammer() <-chan struct{} { - g.lock.RLock() - if g.hammer == nil { - g.lock.RUnlock() - g.lock.Lock() - if g.hammer == nil { - g.hammer = make(chan struct{}) - } - defer g.lock.Unlock() - return g.hammer - } - defer g.lock.RUnlock() +func (g *Manager) IsHammer() <-chan struct{} { return g.hammer } // IsTerminate returns a channel which will be closed at terminate // The order of closure is IsShutdown, IsHammer (potentially), IsTerminate // IsTerminate will only close once all running servers have stopped -func (g *gracefulManager) IsTerminate() <-chan struct{} { - g.lock.RLock() - if g.terminate == nil { - g.lock.RUnlock() - g.lock.Lock() - if g.terminate == nil { - g.terminate = make(chan struct{}) - } - defer g.lock.Unlock() - return g.terminate - } - defer g.lock.RUnlock() +func (g *Manager) IsTerminate() <-chan struct{} { return g.terminate } // ServerDone declares a running server done and subtracts one from the // running server wait group. Users probably do not want to call this // and should use one of the RunWithShutdown* functions -func (g *gracefulManager) ServerDone() { +func (g *Manager) ServerDone() { g.runningServerWaitGroup.Done() } // WaitForServers waits for all running servers to finish. Users should probably // instead use AtTerminate or IsTerminate -func (g *gracefulManager) WaitForServers() { +func (g *Manager) WaitForServers() { g.runningServerWaitGroup.Wait() } // WaitForTerminate waits for all terminating actions to finish. // Only the main go-routine should use this -func (g *gracefulManager) WaitForTerminate() { +func (g *Manager) WaitForTerminate() { g.terminateWaitGroup.Wait() } -func (g *gracefulManager) getState() state { +func (g *Manager) getState() state { g.lock.RLock() defer g.lock.RUnlock() return g.state } -func (g *gracefulManager) setStateTransition(old, new state) bool { +func (g *Manager) setStateTransition(old, new state) bool { if old != g.getState() { return false } @@ -279,7 +266,7 @@ func (g *gracefulManager) setStateTransition(old, new state) bool { return true } -func (g *gracefulManager) setState(st state) { +func (g *Manager) setState(st state) { g.lock.Lock() defer g.lock.Unlock() @@ -288,6 +275,31 @@ func (g *gracefulManager) setState(st state) { // InformCleanup tells the cleanup wait group that we have either taken a listener // or will not be taking a listener -func (g *gracefulManager) InformCleanup() { +func (g *Manager) InformCleanup() { g.createServerWaitGroup.Done() } + +// Done allows the manager to be viewed as a context.Context, it returns a channel that is closed when the server is finished terminating +func (g *Manager) Done() <-chan struct{} { + return g.done +} + +// Err allows the manager to be viewed as a context.Context done at Terminate, it returns ErrTerminate +func (g *Manager) Err() error { + select { + case <-g.Done(): + return ErrTerminate + default: + return nil + } +} + +// Value allows the manager to be viewed as a context.Context done at Terminate, it has no values +func (g *Manager) Value(key interface{}) interface{} { + return nil +} + +// Deadline returns nil as there is no fixed Deadline for the manager, it allows the manager to be viewed as a context.Context +func (g *Manager) Deadline() (deadline time.Time, ok bool) { + return +} diff --git a/modules/graceful/manager_unix.go b/modules/graceful/manager_unix.go index 1ffc59f0df..323c6a4111 100644 --- a/modules/graceful/manager_unix.go +++ b/modules/graceful/manager_unix.go @@ -19,7 +19,8 @@ import ( "code.gitea.io/gitea/modules/setting" ) -type gracefulManager struct { +// Manager manages the graceful shutdown process +type Manager struct { isChild bool forked bool lock *sync.RWMutex @@ -27,27 +28,37 @@ type gracefulManager struct { shutdown chan struct{} hammer chan struct{} terminate chan struct{} + done chan struct{} runningServerWaitGroup sync.WaitGroup createServerWaitGroup sync.WaitGroup terminateWaitGroup sync.WaitGroup } -func newGracefulManager(ctx context.Context) *gracefulManager { - manager := &gracefulManager{ +func newGracefulManager(ctx context.Context) *Manager { + manager := &Manager{ isChild: len(os.Getenv(listenFDs)) > 0 && os.Getppid() > 1, lock: &sync.RWMutex{}, } manager.createServerWaitGroup.Add(numberOfServersToCreate) - manager.Run(ctx) + manager.start(ctx) return manager } -func (g *gracefulManager) Run(ctx context.Context) { +func (g *Manager) start(ctx context.Context) { + // Make channels + g.terminate = make(chan struct{}) + g.shutdown = make(chan struct{}) + g.hammer = make(chan struct{}) + g.done = make(chan struct{}) + + // Set the running state & handle signals g.setState(stateRunning) go g.handleSignals(ctx) - c := make(chan struct{}) + + // Handle clean up of unused provided listeners and delayed start-up + startupDone := make(chan struct{}) go func() { - defer close(c) + defer close(startupDone) // Wait till we're done getting all of the listeners and then close // the unused ones g.createServerWaitGroup.Wait() @@ -58,9 +69,19 @@ func (g *gracefulManager) Run(ctx context.Context) { if setting.StartupTimeout > 0 { go func() { select { - case <-c: + case <-startupDone: return case <-g.IsShutdown(): + func() { + // When waitgroup counter goes negative it will panic - we don't care about this so we can just ignore it. + defer func() { + _ = recover() + }() + // Ensure that the createServerWaitGroup stops waiting + for { + g.createServerWaitGroup.Done() + } + }() return case <-time.After(setting.StartupTimeout): log.Error("Startup took too long! Shutting down") @@ -70,7 +91,7 @@ func (g *gracefulManager) Run(ctx context.Context) { } } -func (g *gracefulManager) handleSignals(ctx context.Context) { +func (g *Manager) handleSignals(ctx context.Context) { signalChannel := make(chan os.Signal, 1) signal.Notify( @@ -123,7 +144,7 @@ func (g *gracefulManager) handleSignals(ctx context.Context) { } } -func (g *gracefulManager) doFork() error { +func (g *Manager) doFork() error { g.lock.Lock() if g.forked { g.lock.Unlock() @@ -139,7 +160,9 @@ func (g *gracefulManager) doFork() error { return err } -func (g *gracefulManager) RegisterServer() { +// RegisterServer registers the running of a listening server, in the case of unix this means that the parent process can now die. +// Any call to RegisterServer must be matched by a call to ServerDone +func (g *Manager) RegisterServer() { KillParent() g.runningServerWaitGroup.Add(1) } diff --git a/modules/graceful/manager_windows.go b/modules/graceful/manager_windows.go index 26c791e6ed..526fc0bd24 100644 --- a/modules/graceful/manager_windows.go +++ b/modules/graceful/manager_windows.go @@ -30,7 +30,8 @@ const ( acceptHammerCode = svc.Accepted(hammerCode) ) -type gracefulManager struct { +// Manager manages the graceful shutdown process +type Manager struct { ctx context.Context isChild bool lock *sync.RWMutex @@ -38,27 +39,37 @@ type gracefulManager struct { shutdown chan struct{} hammer chan struct{} terminate chan struct{} + done chan struct{} runningServerWaitGroup sync.WaitGroup createServerWaitGroup sync.WaitGroup terminateWaitGroup sync.WaitGroup } -func newGracefulManager(ctx context.Context) *gracefulManager { - manager := &gracefulManager{ +func newGracefulManager(ctx context.Context) *Manager { + manager := &Manager{ isChild: false, lock: &sync.RWMutex{}, ctx: ctx, } manager.createServerWaitGroup.Add(numberOfServersToCreate) - manager.Run() + manager.start() return manager } -func (g *gracefulManager) Run() { +func (g *Manager) start() { + // Make channels + g.terminate = make(chan struct{}) + g.shutdown = make(chan struct{}) + g.hammer = make(chan struct{}) + g.done = make(chan struct{}) + + // Set the running state g.setState(stateRunning) if skip, _ := strconv.ParseBool(os.Getenv("SKIP_MINWINSVC")); skip { return } + + // Make SVC process run := svc.Run isInteractive, err := svc.IsAnInteractiveSession() if err != nil { @@ -71,8 +82,8 @@ func (g *gracefulManager) Run() { go run(WindowsServiceName, g) } -// Execute makes gracefulManager implement svc.Handler -func (g *gracefulManager) Execute(args []string, changes <-chan svc.ChangeRequest, status chan<- svc.Status) (svcSpecificEC bool, exitCode uint32) { +// Execute makes Manager implement svc.Handler +func (g *Manager) Execute(args []string, changes <-chan svc.ChangeRequest, status chan<- svc.Status) (svcSpecificEC bool, exitCode uint32) { if setting.StartupTimeout > 0 { status <- svc.Status{State: svc.StartPending} } else { @@ -141,11 +152,13 @@ hammerLoop: return false, 0 } -func (g *gracefulManager) RegisterServer() { +// RegisterServer registers the running of a listening server. +// Any call to RegisterServer must be matched by a call to ServerDone +func (g *Manager) RegisterServer() { g.runningServerWaitGroup.Add(1) } -func (g *gracefulManager) awaitServer(limit time.Duration) bool { +func (g *Manager) awaitServer(limit time.Duration) bool { c := make(chan struct{}) go func() { defer close(c) diff --git a/modules/graceful/net_unix.go b/modules/graceful/net_unix.go index 5550c09f42..1e496e9d91 100644 --- a/modules/graceful/net_unix.go +++ b/modules/graceful/net_unix.go @@ -101,7 +101,7 @@ func CloseProvidedListeners() error { // creates a new one using net.Listen. func GetListener(network, address string) (net.Listener, error) { // Add a deferral to say that we've tried to grab a listener - defer Manager.InformCleanup() + defer GetManager().InformCleanup() switch network { case "tcp", "tcp4", "tcp6": tcpAddr, err := net.ResolveTCPAddr(network, address) diff --git a/modules/graceful/restart_unix.go b/modules/graceful/restart_unix.go index 3fc4f0511d..9a94e5fa67 100644 --- a/modules/graceful/restart_unix.go +++ b/modules/graceful/restart_unix.go @@ -22,7 +22,7 @@ var killParent sync.Once // KillParent sends the kill signal to the parent process if we are a child func KillParent() { killParent.Do(func() { - if Manager.IsChild() { + if GetManager().IsChild() { ppid := syscall.Getppid() if ppid > 1 { _ = syscall.Kill(ppid, syscall.SIGTERM) diff --git a/modules/graceful/server.go b/modules/graceful/server.go index c6692cbb75..30fb8cdffa 100644 --- a/modules/graceful/server.go +++ b/modules/graceful/server.go @@ -47,7 +47,7 @@ type Server struct { // NewServer creates a server on network at provided address func NewServer(network, address string) *Server { - if Manager.IsChild() { + if GetManager().IsChild() { log.Info("Restarting new server: %s:%s on PID: %d", network, address, os.Getpid()) } else { log.Info("Starting new server: %s:%s on PID: %d", network, address, os.Getpid()) @@ -138,12 +138,12 @@ func (srv *Server) ListenAndServeTLSConfig(tlsConfig *tls.Config, serve ServeFun func (srv *Server) Serve(serve ServeFunction) error { defer log.Debug("Serve() returning... (PID: %d)", syscall.Getpid()) srv.setState(stateRunning) - Manager.RegisterServer() + GetManager().RegisterServer() err := serve(srv.listener) log.Debug("Waiting for connections to finish... (PID: %d)", syscall.Getpid()) srv.wg.Wait() srv.setState(stateTerminate) - Manager.ServerDone() + GetManager().ServerDone() // use of closed means that the listeners are closed - i.e. we should be shutting down - return nil if err != nil && strings.Contains(err.Error(), "use of closed") { return nil diff --git a/modules/graceful/server_hooks.go b/modules/graceful/server_hooks.go index 74b0fcb885..c634905711 100644 --- a/modules/graceful/server_hooks.go +++ b/modules/graceful/server_hooks.go @@ -14,15 +14,15 @@ import ( // awaitShutdown waits for the shutdown signal from the Manager func (srv *Server) awaitShutdown() { select { - case <-Manager.IsShutdown(): + case <-GetManager().IsShutdown(): // Shutdown srv.doShutdown() - case <-Manager.IsHammer(): + case <-GetManager().IsHammer(): // Hammer srv.doShutdown() srv.doHammer() } - <-Manager.IsHammer() + <-GetManager().IsHammer() srv.doHammer() } |