diff options
Diffstat (limited to 'modules')
-rw-r--r-- | modules/cron/cron.go | 9 | ||||
-rw-r--r-- | modules/git/git.go | 3 | ||||
-rw-r--r-- | modules/git/git_test.go | 3 | ||||
-rw-r--r-- | modules/graceful/context.go | 6 | ||||
-rw-r--r-- | modules/graceful/manager.go | 140 | ||||
-rw-r--r-- | modules/graceful/manager_unix.go | 45 | ||||
-rw-r--r-- | modules/graceful/manager_windows.go | 31 | ||||
-rw-r--r-- | modules/graceful/net_unix.go | 2 | ||||
-rw-r--r-- | modules/graceful/restart_unix.go | 2 | ||||
-rw-r--r-- | modules/graceful/server.go | 6 | ||||
-rw-r--r-- | modules/graceful/server_hooks.go | 6 | ||||
-rw-r--r-- | modules/indexer/code/bleve.go | 82 | ||||
-rw-r--r-- | modules/indexer/code/repo.go | 35 | ||||
-rw-r--r-- | modules/indexer/issues/indexer.go | 2 | ||||
-rw-r--r-- | modules/migrations/update.go | 26 | ||||
-rw-r--r-- | modules/ssh/ssh_graceful.go | 2 | ||||
-rw-r--r-- | modules/sync/unique_queue.go | 56 | ||||
-rw-r--r-- | modules/webhook/deliver.go | 63 |
18 files changed, 353 insertions, 166 deletions
diff --git a/modules/cron/cron.go b/modules/cron/cron.go index 795fafb51f..f4511a8e79 100644 --- a/modules/cron/cron.go +++ b/modules/cron/cron.go @@ -6,9 +6,11 @@ package cron import ( + "context" "time" "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/migrations" "code.gitea.io/gitea/modules/setting" @@ -37,17 +39,19 @@ var taskStatusTable = sync.NewStatusTable() type Func func() // WithUnique wrap a cron func with an unique running check -func WithUnique(name string, body Func) Func { +func WithUnique(name string, body func(context.Context)) Func { return func() { if !taskStatusTable.StartIfNotRunning(name) { return } defer taskStatusTable.Stop(name) - body() + graceful.GetManager().RunWithShutdownContext(body) } } // NewContext begins cron tasks +// Each cron task is run within the shutdown context as a running server +// AtShutdown the cron server is stopped func NewContext() { var ( entry *cron.Entry @@ -129,6 +133,7 @@ func NewContext() { go WithUnique(updateMigrationPosterID, migrations.UpdateMigrationPosterID)() c.Start() + graceful.GetManager().RunAtShutdown(context.Background(), c.Stop) } // ListTasks returns all running cron tasks. diff --git a/modules/git/git.go b/modules/git/git.go index 286e1ad8b4..d5caaa0912 100644 --- a/modules/git/git.go +++ b/modules/git/git.go @@ -106,7 +106,8 @@ func SetExecutablePath(path string) error { } // Init initializes git module -func Init() error { +func Init(ctx context.Context) error { + DefaultContext = ctx // Git requires setting user.name and user.email in order to commit changes. for configKey, defaultValue := range map[string]string{"user.name": "Gitea", "user.email": "gitea@fake.local"} { if stdout, stderr, err := process.GetManager().Exec("git.Init(get setting)", GitExecutable, "config", "--get", configKey); err != nil || strings.TrimSpace(stdout) == "" { diff --git a/modules/git/git_test.go b/modules/git/git_test.go index 0c6259a9c5..27951d639b 100644 --- a/modules/git/git_test.go +++ b/modules/git/git_test.go @@ -5,6 +5,7 @@ package git import ( + "context" "fmt" "os" "testing" @@ -16,7 +17,7 @@ func fatalTestError(fmtStr string, args ...interface{}) { } func TestMain(m *testing.M) { - if err := Init(); err != nil { + if err := Init(context.Background()); err != nil { fatalTestError("Init failed: %v", err) } diff --git a/modules/graceful/context.go b/modules/graceful/context.go index a4a4df7dea..1ad1109b4e 100644 --- a/modules/graceful/context.go +++ b/modules/graceful/context.go @@ -62,7 +62,7 @@ func (ctx *ChannelContext) Value(key interface{}) interface{} { // ShutdownContext returns a context.Context that is Done at shutdown // Callers using this context should ensure that they are registered as a running server // in order that they are waited for. -func (g *gracefulManager) ShutdownContext() context.Context { +func (g *Manager) ShutdownContext() context.Context { return &ChannelContext{ done: g.IsShutdown(), err: ErrShutdown, @@ -72,7 +72,7 @@ func (g *gracefulManager) ShutdownContext() context.Context { // HammerContext returns a context.Context that is Done at hammer // Callers using this context should ensure that they are registered as a running server // in order that they are waited for. -func (g *gracefulManager) HammerContext() context.Context { +func (g *Manager) HammerContext() context.Context { return &ChannelContext{ done: g.IsHammer(), err: ErrHammer, @@ -82,7 +82,7 @@ func (g *gracefulManager) HammerContext() context.Context { // TerminateContext returns a context.Context that is Done at terminate // Callers using this context should ensure that they are registered as a terminating server // in order that they are waited for. -func (g *gracefulManager) TerminateContext() context.Context { +func (g *Manager) TerminateContext() context.Context { return &ChannelContext{ done: g.IsTerminate(), err: ErrTerminate, diff --git a/modules/graceful/manager.go b/modules/graceful/manager.go index b9a56ca9c6..eec675e297 100644 --- a/modules/graceful/manager.go +++ b/modules/graceful/manager.go @@ -6,9 +6,9 @@ package graceful import ( "context" + "sync" "time" - "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/process" "code.gitea.io/gitea/modules/setting" @@ -34,14 +34,24 @@ const ( const numberOfServersToCreate = 3 // Manager represents the graceful server manager interface -var Manager *gracefulManager - -func init() { - Manager = newGracefulManager(context.Background()) - // Set the git default context to the HammerContext - git.DefaultContext = Manager.HammerContext() - // Set the process default context to the HammerContext - process.DefaultContext = Manager.HammerContext() +var manager *Manager + +var initOnce = sync.Once{} + +// GetManager returns the Manager +func GetManager() *Manager { + InitManager(context.Background()) + return manager +} + +// InitManager creates the graceful manager in the provided context +func InitManager(ctx context.Context) { + initOnce.Do(func() { + manager = newGracefulManager(ctx) + + // Set the process default context to the HammerContext + process.DefaultContext = manager.HammerContext() + }) } // CallbackWithContext is combined runnable and context to watch to see if the caller has finished @@ -61,7 +71,7 @@ type RunnableWithShutdownFns func(atShutdown, atTerminate func(context.Context, // Please note that use of the atShutdown and atTerminate callbacks will create go-routines that will wait till their respective signals // - users must therefore be careful to only call these as necessary. // If run is not expected to run indefinitely RunWithShutdownChan is likely to be more appropriate. -func (g *gracefulManager) RunWithShutdownFns(run RunnableWithShutdownFns) { +func (g *Manager) RunWithShutdownFns(run RunnableWithShutdownFns) { g.runningServerWaitGroup.Add(1) defer g.runningServerWaitGroup.Done() run(func(ctx context.Context, atShutdown func()) { @@ -90,7 +100,7 @@ type RunnableWithShutdownChan func(atShutdown <-chan struct{}, atTerminate Callb // (Optionally IsHammer may be waited for instead however, this should be avoided if possible.) // The callback function provided to atTerminate must return once termination is complete. // Please note that use of the atTerminate function will create a go-routine that will wait till terminate - users must therefore be careful to only call this as necessary. -func (g *gracefulManager) RunWithShutdownChan(run RunnableWithShutdownChan) { +func (g *Manager) RunWithShutdownChan(run RunnableWithShutdownChan) { g.runningServerWaitGroup.Add(1) defer g.runningServerWaitGroup.Done() run(g.IsShutdown(), func(ctx context.Context, atTerminate func()) { @@ -101,14 +111,14 @@ func (g *gracefulManager) RunWithShutdownChan(run RunnableWithShutdownChan) { // RunWithShutdownContext takes a function that has a context to watch for shutdown. // After the provided context is Done(), the main function must return once shutdown is complete. // (Optionally the HammerContext may be obtained and waited for however, this should be avoided if possible.) -func (g *gracefulManager) RunWithShutdownContext(run func(context.Context)) { +func (g *Manager) RunWithShutdownContext(run func(context.Context)) { g.runningServerWaitGroup.Add(1) defer g.runningServerWaitGroup.Done() run(g.ShutdownContext()) } // RunAtTerminate adds to the terminate wait group and creates a go-routine to run the provided function at termination -func (g *gracefulManager) RunAtTerminate(ctx context.Context, terminate func()) { +func (g *Manager) RunAtTerminate(ctx context.Context, terminate func()) { g.terminateWaitGroup.Add(1) go func() { select { @@ -121,7 +131,7 @@ func (g *gracefulManager) RunAtTerminate(ctx context.Context, terminate func()) } // RunAtShutdown creates a go-routine to run the provided function at shutdown -func (g *gracefulManager) RunAtShutdown(ctx context.Context, shutdown func()) { +func (g *Manager) RunAtShutdown(ctx context.Context, shutdown func()) { go func() { select { case <-g.IsShutdown(): @@ -132,7 +142,7 @@ func (g *gracefulManager) RunAtShutdown(ctx context.Context, shutdown func()) { } // RunAtHammer creates a go-routine to run the provided function at shutdown -func (g *gracefulManager) RunAtHammer(ctx context.Context, hammer func()) { +func (g *Manager) RunAtHammer(ctx context.Context, hammer func()) { go func() { select { case <-g.IsHammer(): @@ -141,7 +151,7 @@ func (g *gracefulManager) RunAtHammer(ctx context.Context, hammer func()) { } }() } -func (g *gracefulManager) doShutdown() { +func (g *Manager) doShutdown() { if !g.setStateTransition(stateRunning, stateShuttingDown) { return } @@ -158,48 +168,47 @@ func (g *gracefulManager) doShutdown() { g.doHammerTime(0) <-time.After(1 * time.Second) g.doTerminate() + g.WaitForTerminate() + g.lock.Lock() + close(g.done) + g.lock.Unlock() }() } -func (g *gracefulManager) doHammerTime(d time.Duration) { +func (g *Manager) doHammerTime(d time.Duration) { time.Sleep(d) + g.lock.Lock() select { case <-g.hammer: default: log.Warn("Setting Hammer condition") close(g.hammer) } - + g.lock.Unlock() } -func (g *gracefulManager) doTerminate() { +func (g *Manager) doTerminate() { if !g.setStateTransition(stateShuttingDown, stateTerminate) { return } g.lock.Lock() - close(g.terminate) + select { + case <-g.terminate: + default: + log.Warn("Terminating") + close(g.terminate) + } g.lock.Unlock() } // IsChild returns if the current process is a child of previous Gitea process -func (g *gracefulManager) IsChild() bool { +func (g *Manager) IsChild() bool { return g.isChild } // IsShutdown returns a channel which will be closed at shutdown. // The order of closure is IsShutdown, IsHammer (potentially), IsTerminate -func (g *gracefulManager) IsShutdown() <-chan struct{} { - g.lock.RLock() - if g.shutdown == nil { - g.lock.RUnlock() - g.lock.Lock() - if g.shutdown == nil { - g.shutdown = make(chan struct{}) - } - defer g.lock.Unlock() - return g.shutdown - } - defer g.lock.RUnlock() +func (g *Manager) IsShutdown() <-chan struct{} { return g.shutdown } @@ -207,65 +216,43 @@ func (g *gracefulManager) IsShutdown() <-chan struct{} { // The order of closure is IsShutdown, IsHammer (potentially), IsTerminate // Servers running within the running server wait group should respond to IsHammer // if not shutdown already -func (g *gracefulManager) IsHammer() <-chan struct{} { - g.lock.RLock() - if g.hammer == nil { - g.lock.RUnlock() - g.lock.Lock() - if g.hammer == nil { - g.hammer = make(chan struct{}) - } - defer g.lock.Unlock() - return g.hammer - } - defer g.lock.RUnlock() +func (g *Manager) IsHammer() <-chan struct{} { return g.hammer } // IsTerminate returns a channel which will be closed at terminate // The order of closure is IsShutdown, IsHammer (potentially), IsTerminate // IsTerminate will only close once all running servers have stopped -func (g *gracefulManager) IsTerminate() <-chan struct{} { - g.lock.RLock() - if g.terminate == nil { - g.lock.RUnlock() - g.lock.Lock() - if g.terminate == nil { - g.terminate = make(chan struct{}) - } - defer g.lock.Unlock() - return g.terminate - } - defer g.lock.RUnlock() +func (g *Manager) IsTerminate() <-chan struct{} { return g.terminate } // ServerDone declares a running server done and subtracts one from the // running server wait group. Users probably do not want to call this // and should use one of the RunWithShutdown* functions -func (g *gracefulManager) ServerDone() { +func (g *Manager) ServerDone() { g.runningServerWaitGroup.Done() } // WaitForServers waits for all running servers to finish. Users should probably // instead use AtTerminate or IsTerminate -func (g *gracefulManager) WaitForServers() { +func (g *Manager) WaitForServers() { g.runningServerWaitGroup.Wait() } // WaitForTerminate waits for all terminating actions to finish. // Only the main go-routine should use this -func (g *gracefulManager) WaitForTerminate() { +func (g *Manager) WaitForTerminate() { g.terminateWaitGroup.Wait() } -func (g *gracefulManager) getState() state { +func (g *Manager) getState() state { g.lock.RLock() defer g.lock.RUnlock() return g.state } -func (g *gracefulManager) setStateTransition(old, new state) bool { +func (g *Manager) setStateTransition(old, new state) bool { if old != g.getState() { return false } @@ -279,7 +266,7 @@ func (g *gracefulManager) setStateTransition(old, new state) bool { return true } -func (g *gracefulManager) setState(st state) { +func (g *Manager) setState(st state) { g.lock.Lock() defer g.lock.Unlock() @@ -288,6 +275,31 @@ func (g *gracefulManager) setState(st state) { // InformCleanup tells the cleanup wait group that we have either taken a listener // or will not be taking a listener -func (g *gracefulManager) InformCleanup() { +func (g *Manager) InformCleanup() { g.createServerWaitGroup.Done() } + +// Done allows the manager to be viewed as a context.Context, it returns a channel that is closed when the server is finished terminating +func (g *Manager) Done() <-chan struct{} { + return g.done +} + +// Err allows the manager to be viewed as a context.Context done at Terminate, it returns ErrTerminate +func (g *Manager) Err() error { + select { + case <-g.Done(): + return ErrTerminate + default: + return nil + } +} + +// Value allows the manager to be viewed as a context.Context done at Terminate, it has no values +func (g *Manager) Value(key interface{}) interface{} { + return nil +} + +// Deadline returns nil as there is no fixed Deadline for the manager, it allows the manager to be viewed as a context.Context +func (g *Manager) Deadline() (deadline time.Time, ok bool) { + return +} diff --git a/modules/graceful/manager_unix.go b/modules/graceful/manager_unix.go index 1ffc59f0df..323c6a4111 100644 --- a/modules/graceful/manager_unix.go +++ b/modules/graceful/manager_unix.go @@ -19,7 +19,8 @@ import ( "code.gitea.io/gitea/modules/setting" ) -type gracefulManager struct { +// Manager manages the graceful shutdown process +type Manager struct { isChild bool forked bool lock *sync.RWMutex @@ -27,27 +28,37 @@ type gracefulManager struct { shutdown chan struct{} hammer chan struct{} terminate chan struct{} + done chan struct{} runningServerWaitGroup sync.WaitGroup createServerWaitGroup sync.WaitGroup terminateWaitGroup sync.WaitGroup } -func newGracefulManager(ctx context.Context) *gracefulManager { - manager := &gracefulManager{ +func newGracefulManager(ctx context.Context) *Manager { + manager := &Manager{ isChild: len(os.Getenv(listenFDs)) > 0 && os.Getppid() > 1, lock: &sync.RWMutex{}, } manager.createServerWaitGroup.Add(numberOfServersToCreate) - manager.Run(ctx) + manager.start(ctx) return manager } -func (g *gracefulManager) Run(ctx context.Context) { +func (g *Manager) start(ctx context.Context) { + // Make channels + g.terminate = make(chan struct{}) + g.shutdown = make(chan struct{}) + g.hammer = make(chan struct{}) + g.done = make(chan struct{}) + + // Set the running state & handle signals g.setState(stateRunning) go g.handleSignals(ctx) - c := make(chan struct{}) + + // Handle clean up of unused provided listeners and delayed start-up + startupDone := make(chan struct{}) go func() { - defer close(c) + defer close(startupDone) // Wait till we're done getting all of the listeners and then close // the unused ones g.createServerWaitGroup.Wait() @@ -58,9 +69,19 @@ func (g *gracefulManager) Run(ctx context.Context) { if setting.StartupTimeout > 0 { go func() { select { - case <-c: + case <-startupDone: return case <-g.IsShutdown(): + func() { + // When waitgroup counter goes negative it will panic - we don't care about this so we can just ignore it. + defer func() { + _ = recover() + }() + // Ensure that the createServerWaitGroup stops waiting + for { + g.createServerWaitGroup.Done() + } + }() return case <-time.After(setting.StartupTimeout): log.Error("Startup took too long! Shutting down") @@ -70,7 +91,7 @@ func (g *gracefulManager) Run(ctx context.Context) { } } -func (g *gracefulManager) handleSignals(ctx context.Context) { +func (g *Manager) handleSignals(ctx context.Context) { signalChannel := make(chan os.Signal, 1) signal.Notify( @@ -123,7 +144,7 @@ func (g *gracefulManager) handleSignals(ctx context.Context) { } } -func (g *gracefulManager) doFork() error { +func (g *Manager) doFork() error { g.lock.Lock() if g.forked { g.lock.Unlock() @@ -139,7 +160,9 @@ func (g *gracefulManager) doFork() error { return err } -func (g *gracefulManager) RegisterServer() { +// RegisterServer registers the running of a listening server, in the case of unix this means that the parent process can now die. +// Any call to RegisterServer must be matched by a call to ServerDone +func (g *Manager) RegisterServer() { KillParent() g.runningServerWaitGroup.Add(1) } diff --git a/modules/graceful/manager_windows.go b/modules/graceful/manager_windows.go index 26c791e6ed..526fc0bd24 100644 --- a/modules/graceful/manager_windows.go +++ b/modules/graceful/manager_windows.go @@ -30,7 +30,8 @@ const ( acceptHammerCode = svc.Accepted(hammerCode) ) -type gracefulManager struct { +// Manager manages the graceful shutdown process +type Manager struct { ctx context.Context isChild bool lock *sync.RWMutex @@ -38,27 +39,37 @@ type gracefulManager struct { shutdown chan struct{} hammer chan struct{} terminate chan struct{} + done chan struct{} runningServerWaitGroup sync.WaitGroup createServerWaitGroup sync.WaitGroup terminateWaitGroup sync.WaitGroup } -func newGracefulManager(ctx context.Context) *gracefulManager { - manager := &gracefulManager{ +func newGracefulManager(ctx context.Context) *Manager { + manager := &Manager{ isChild: false, lock: &sync.RWMutex{}, ctx: ctx, } manager.createServerWaitGroup.Add(numberOfServersToCreate) - manager.Run() + manager.start() return manager } -func (g *gracefulManager) Run() { +func (g *Manager) start() { + // Make channels + g.terminate = make(chan struct{}) + g.shutdown = make(chan struct{}) + g.hammer = make(chan struct{}) + g.done = make(chan struct{}) + + // Set the running state g.setState(stateRunning) if skip, _ := strconv.ParseBool(os.Getenv("SKIP_MINWINSVC")); skip { return } + + // Make SVC process run := svc.Run isInteractive, err := svc.IsAnInteractiveSession() if err != nil { @@ -71,8 +82,8 @@ func (g *gracefulManager) Run() { go run(WindowsServiceName, g) } -// Execute makes gracefulManager implement svc.Handler -func (g *gracefulManager) Execute(args []string, changes <-chan svc.ChangeRequest, status chan<- svc.Status) (svcSpecificEC bool, exitCode uint32) { +// Execute makes Manager implement svc.Handler +func (g *Manager) Execute(args []string, changes <-chan svc.ChangeRequest, status chan<- svc.Status) (svcSpecificEC bool, exitCode uint32) { if setting.StartupTimeout > 0 { status <- svc.Status{State: svc.StartPending} } else { @@ -141,11 +152,13 @@ hammerLoop: return false, 0 } -func (g *gracefulManager) RegisterServer() { +// RegisterServer registers the running of a listening server. +// Any call to RegisterServer must be matched by a call to ServerDone +func (g *Manager) RegisterServer() { g.runningServerWaitGroup.Add(1) } -func (g *gracefulManager) awaitServer(limit time.Duration) bool { +func (g *Manager) awaitServer(limit time.Duration) bool { c := make(chan struct{}) go func() { defer close(c) diff --git a/modules/graceful/net_unix.go b/modules/graceful/net_unix.go index 5550c09f42..1e496e9d91 100644 --- a/modules/graceful/net_unix.go +++ b/modules/graceful/net_unix.go @@ -101,7 +101,7 @@ func CloseProvidedListeners() error { // creates a new one using net.Listen. func GetListener(network, address string) (net.Listener, error) { // Add a deferral to say that we've tried to grab a listener - defer Manager.InformCleanup() + defer GetManager().InformCleanup() switch network { case "tcp", "tcp4", "tcp6": tcpAddr, err := net.ResolveTCPAddr(network, address) diff --git a/modules/graceful/restart_unix.go b/modules/graceful/restart_unix.go index 3fc4f0511d..9a94e5fa67 100644 --- a/modules/graceful/restart_unix.go +++ b/modules/graceful/restart_unix.go @@ -22,7 +22,7 @@ var killParent sync.Once // KillParent sends the kill signal to the parent process if we are a child func KillParent() { killParent.Do(func() { - if Manager.IsChild() { + if GetManager().IsChild() { ppid := syscall.Getppid() if ppid > 1 { _ = syscall.Kill(ppid, syscall.SIGTERM) diff --git a/modules/graceful/server.go b/modules/graceful/server.go index c6692cbb75..30fb8cdffa 100644 --- a/modules/graceful/server.go +++ b/modules/graceful/server.go @@ -47,7 +47,7 @@ type Server struct { // NewServer creates a server on network at provided address func NewServer(network, address string) *Server { - if Manager.IsChild() { + if GetManager().IsChild() { log.Info("Restarting new server: %s:%s on PID: %d", network, address, os.Getpid()) } else { log.Info("Starting new server: %s:%s on PID: %d", network, address, os.Getpid()) @@ -138,12 +138,12 @@ func (srv *Server) ListenAndServeTLSConfig(tlsConfig *tls.Config, serve ServeFun func (srv *Server) Serve(serve ServeFunction) error { defer log.Debug("Serve() returning... (PID: %d)", syscall.Getpid()) srv.setState(stateRunning) - Manager.RegisterServer() + GetManager().RegisterServer() err := serve(srv.listener) log.Debug("Waiting for connections to finish... (PID: %d)", syscall.Getpid()) srv.wg.Wait() srv.setState(stateTerminate) - Manager.ServerDone() + GetManager().ServerDone() // use of closed means that the listeners are closed - i.e. we should be shutting down - return nil if err != nil && strings.Contains(err.Error(), "use of closed") { return nil diff --git a/modules/graceful/server_hooks.go b/modules/graceful/server_hooks.go index 74b0fcb885..c634905711 100644 --- a/modules/graceful/server_hooks.go +++ b/modules/graceful/server_hooks.go @@ -14,15 +14,15 @@ import ( // awaitShutdown waits for the shutdown signal from the Manager func (srv *Server) awaitShutdown() { select { - case <-Manager.IsShutdown(): + case <-GetManager().IsShutdown(): // Shutdown srv.doShutdown() - case <-Manager.IsHammer(): + case <-GetManager().IsHammer(): // Hammer srv.doShutdown() srv.doHammer() } - <-Manager.IsHammer() + <-GetManager().IsHammer() srv.doHammer() } diff --git a/modules/indexer/code/bleve.go b/modules/indexer/code/bleve.go index c8e1bb1879..bb2fc5bc74 100644 --- a/modules/indexer/code/bleve.go +++ b/modules/indexer/code/bleve.go @@ -6,6 +6,7 @@ package code import ( "fmt" + "os" "strconv" "strings" "time" @@ -34,10 +35,11 @@ func InitRepoIndexer() { return } waitChannel := make(chan time.Duration) + // FIXME: graceful: This should use a persistable queue repoIndexerOperationQueue = make(chan repoIndexerOperation, setting.Indexer.UpdateQueueLength) go func() { start := time.Now() - log.Info("Initializing Repository Indexer") + log.Info("PID: %d: Initializing Repository Indexer", os.Getpid()) initRepoIndexer(populateRepoIndexerAsynchronously) go processRepoIndexerOperationQueue() waitChannel <- time.Since(start) @@ -45,7 +47,7 @@ func InitRepoIndexer() { if setting.Indexer.StartupTimeout > 0 { go func() { timeout := setting.Indexer.StartupTimeout - if graceful.Manager.IsChild() && setting.GracefulHammerTime > 0 { + if graceful.GetManager().IsChild() && setting.GracefulHammerTime > 0 { timeout += setting.GracefulHammerTime } select { @@ -70,13 +72,6 @@ func populateRepoIndexerAsynchronously() error { return nil } - // if there is any existing repo indexer metadata in the DB, delete it - // since we are starting afresh. Also, xorm requires deletes to have a - // condition, and we want to delete everything, thus 1=1. - if err := models.DeleteAllRecords("repo_indexer_status"); err != nil { - return err - } - var maxRepoID int64 if maxRepoID, err = models.GetMaxID("repository"); err != nil { return err @@ -87,44 +82,59 @@ func populateRepoIndexerAsynchronously() error { // populateRepoIndexer populate the repo indexer with pre-existing data. This // should only be run when the indexer is created for the first time. +// FIXME: graceful: This should use a persistable queue func populateRepoIndexer(maxRepoID int64) { log.Info("Populating the repo indexer with existing repositories") + + isShutdown := graceful.GetManager().IsShutdown() + // start with the maximum existing repo ID and work backwards, so that we // don't include repos that are created after gitea starts; such repos will // already be added to the indexer, and we don't need to add them again. for maxRepoID > 0 { - repos := make([]*models.Repository, 0, models.RepositoryListDefaultPageSize) - err := models.FindByMaxID(maxRepoID, models.RepositoryListDefaultPageSize, &repos) + select { + case <-isShutdown: + log.Info("Repository Indexer population shutdown before completion") + return + default: + } + ids, err := models.GetUnindexedRepos(maxRepoID, 0, 50) if err != nil { log.Error("populateRepoIndexer: %v", err) return - } else if len(repos) == 0 { + } else if len(ids) == 0 { break } - for _, repo := range repos { + for _, id := range ids { + select { + case <-isShutdown: + log.Info("Repository Indexer population shutdown before completion") + return + default: + } repoIndexerOperationQueue <- repoIndexerOperation{ - repoID: repo.ID, + repoID: id, deleted: false, } - maxRepoID = repo.ID - 1 + maxRepoID = id - 1 } } - log.Info("Done populating the repo indexer with existing repositories") + log.Info("Done (re)populating the repo indexer with existing repositories") } func updateRepoIndexer(repoID int64) error { repo, err := models.GetRepositoryByID(repoID) if err != nil { - return err + return fmt.Errorf("UpdateRepoIndexer: Unable to GetRepositoryByID: %d, Error: %v", repoID, err) } sha, err := getDefaultBranchSha(repo) if err != nil { - return err + return fmt.Errorf("UpdateRepoIndexer: Unable to GetDefaultBranchSha for: %s/%s, Error: %v", repo.MustOwnerName(), repo.Name, err) } changes, err := getRepoChanges(repo, sha) if err != nil { - return err + return fmt.Errorf("UpdateRepoIndexer: Unable to GetRepoChanges for: %s/%s Sha: %s Error: %v", repo.MustOwnerName(), repo.Name, sha, err) } else if changes == nil { return nil } @@ -132,16 +142,16 @@ func updateRepoIndexer(repoID int64) error { batch := RepoIndexerBatch() for _, update := range changes.Updates { if err := addUpdate(update, repo, batch); err != nil { - return err + return fmt.Errorf("UpdateRepoIndexer: Unable to addUpdate to: %s/%s Sha: %s, update: %s(%s) Error: %v", repo.MustOwnerName(), repo.Name, sha, update.Filename, update.BlobSha, err) } } for _, filename := range changes.RemovedFilenames { if err := addDelete(filename, repo, batch); err != nil { - return err + return fmt.Errorf("UpdateRepoIndexer: Unable to addDelete to: %s/%s Sha: %s, filename: %s Error: %v", repo.MustOwnerName(), repo.Name, sha, filename, err) } } if err = batch.Flush(); err != nil { - return err + return fmt.Errorf("UpdateRepoIndexer: Unable to flush batch to indexer for repo: %s/%s Error: %v", repo.MustOwnerName(), repo.Name, err) } return repo.UpdateIndexerStatus(sha) } @@ -322,20 +332,26 @@ func nonGenesisChanges(repo *models.Repository, revision string) (*repoChanges, func processRepoIndexerOperationQueue() { for { - op := <-repoIndexerOperationQueue - var err error - if op.deleted { - if err = deleteRepoFromIndexer(op.repoID); err != nil { - log.Error("deleteRepoFromIndexer: %v", err) + select { + case op := <-repoIndexerOperationQueue: + var err error + if op.deleted { + if err = deleteRepoFromIndexer(op.repoID); err != nil { + log.Error("DeleteRepoFromIndexer: %v", err) + } + } else { + if err = updateRepoIndexer(op.repoID); err != nil { + log.Error("updateRepoIndexer: %v", err) + } } - } else { - if err = updateRepoIndexer(op.repoID); err != nil { - log.Error("updateRepoIndexer: %v", err) + for _, watcher := range op.watchers { + watcher <- err } + case <-graceful.GetManager().IsShutdown(): + log.Info("PID: %d Repository indexer queue processing stopped", os.Getpid()) + return } - for _, watcher := range op.watchers { - watcher <- err - } + } } diff --git a/modules/indexer/code/repo.go b/modules/indexer/code/repo.go index 31f0fa7f3d..bc5f317b7d 100644 --- a/modules/indexer/code/repo.go +++ b/modules/indexer/code/repo.go @@ -5,9 +5,13 @@ package code import ( + "context" + "os" "strings" "sync" + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" @@ -104,21 +108,50 @@ func (update RepoIndexerUpdate) AddToFlushingBatch(batch rupture.FlushingBatch) func initRepoIndexer(populateIndexer func() error) { indexer, err := openIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion) if err != nil { - log.Fatal("InitRepoIndexer: %v", err) + log.Fatal("InitRepoIndexer %s: %v", setting.Indexer.RepoPath, err) } if indexer != nil { indexerHolder.set(indexer) + closeAtTerminate() + + // Continue population from where left off + if err = populateIndexer(); err != nil { + log.Fatal("PopulateRepoIndex: %v", err) + } return } if err = createRepoIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion); err != nil { log.Fatal("CreateRepoIndexer: %v", err) } + closeAtTerminate() + + // if there is any existing repo indexer metadata in the DB, delete it + // since we are starting afresh. Also, xorm requires deletes to have a + // condition, and we want to delete everything, thus 1=1. + if err := models.DeleteAllRecords("repo_indexer_status"); err != nil { + log.Fatal("DeleteAllRepoIndexerStatus: %v", err) + } + if err = populateIndexer(); err != nil { log.Fatal("PopulateRepoIndex: %v", err) } } +func closeAtTerminate() { + graceful.GetManager().RunAtTerminate(context.Background(), func() { + log.Debug("Closing repo indexer") + indexer := indexerHolder.get() + if indexer != nil { + err := indexer.Close() + if err != nil { + log.Error("Error whilst closing the repository indexer: %v", err) + } + } + log.Info("PID: %d Repository Indexer closed", os.Getpid()) + }) +} + // createRepoIndexer create a repo indexer if one does not already exist func createRepoIndexer(path string, latestVersion int) error { docMapping := bleve.NewDocumentMapping() diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index 78eba58095..50b8d6d224 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -172,7 +172,7 @@ func InitIssueIndexer(syncReindex bool) { } else if setting.Indexer.StartupTimeout > 0 { go func() { timeout := setting.Indexer.StartupTimeout - if graceful.Manager.IsChild() && setting.GracefulHammerTime > 0 { + if graceful.GetManager().IsChild() && setting.GracefulHammerTime > 0 { timeout += setting.GracefulHammerTime } select { diff --git a/modules/migrations/update.go b/modules/migrations/update.go index d1465b2baf..3d0962657c 100644 --- a/modules/migrations/update.go +++ b/modules/migrations/update.go @@ -5,21 +5,28 @@ package migrations import ( + "context" + "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/structs" ) // UpdateMigrationPosterID updates all migrated repositories' issues and comments posterID -func UpdateMigrationPosterID() { +func UpdateMigrationPosterID(ctx context.Context) { for _, gitService := range structs.SupportedFullGitService { - if err := updateMigrationPosterIDByGitService(gitService); err != nil { + select { + case <-ctx.Done(): + log.Warn("UpdateMigrationPosterID aborted due to shutdown before %s", gitService.Name()) + default: + } + if err := updateMigrationPosterIDByGitService(ctx, gitService); err != nil { log.Error("updateMigrationPosterIDByGitService failed: %v", err) } } } -func updateMigrationPosterIDByGitService(tp structs.GitServiceType) error { +func updateMigrationPosterIDByGitService(ctx context.Context, tp structs.GitServiceType) error { provider := tp.Name() if len(provider) == 0 { return nil @@ -28,6 +35,13 @@ func updateMigrationPosterIDByGitService(tp structs.GitServiceType) error { const batchSize = 100 var start int for { + select { + case <-ctx.Done(): + log.Warn("UpdateMigrationPosterIDByGitService(%s) aborted due to shutdown", tp.Name()) + return nil + default: + } + users, err := models.FindExternalUsersByProvider(models.FindExternalUserOptions{ Provider: provider, Start: start, @@ -38,6 +52,12 @@ func updateMigrationPosterIDByGitService(tp structs.GitServiceType) error { } for _, user := range users { + select { + case <-ctx.Done(): + log.Warn("UpdateMigrationPosterIDByGitService(%s) aborted due to shutdown", tp.Name()) + return nil + default: + } externalUserID := user.ExternalID if err := models.UpdateMigrationsByType(tp, externalUserID, user.UserID); err != nil { log.Error("UpdateMigrationsByType type %s external user id %v to local user id %v failed: %v", tp.Name(), user.ExternalID, user.UserID, err) diff --git a/modules/ssh/ssh_graceful.go b/modules/ssh/ssh_graceful.go index 4d7557e2ee..f8370ab4db 100644 --- a/modules/ssh/ssh_graceful.go +++ b/modules/ssh/ssh_graceful.go @@ -24,5 +24,5 @@ func listen(server *ssh.Server) { // Unused informs our cleanup routine that we will not be using a ssh port func Unused() { - graceful.Manager.InformCleanup() + graceful.GetManager().InformCleanup() } diff --git a/modules/sync/unique_queue.go b/modules/sync/unique_queue.go index de694d8560..14644c7d4e 100644 --- a/modules/sync/unique_queue.go +++ b/modules/sync/unique_queue.go @@ -1,4 +1,5 @@ // Copyright 2016 The Gogs Authors. All rights reserved. +// Copyright 2019 The Gitea Authors. All rights reserved. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. @@ -15,8 +16,9 @@ import ( // This queue is particularly useful for preventing duplicated task // of same purpose. type UniqueQueue struct { - table *StatusTable - queue chan string + table *StatusTable + queue chan string + closed chan struct{} } // NewUniqueQueue initializes and returns a new UniqueQueue object. @@ -26,11 +28,43 @@ func NewUniqueQueue(queueLength int) *UniqueQueue { } return &UniqueQueue{ - table: NewStatusTable(), - queue: make(chan string, queueLength), + table: NewStatusTable(), + queue: make(chan string, queueLength), + closed: make(chan struct{}), } } +// Close closes this queue +func (q *UniqueQueue) Close() { + select { + case <-q.closed: + default: + q.table.lock.Lock() + select { + case <-q.closed: + default: + close(q.closed) + } + q.table.lock.Unlock() + } +} + +// IsClosed returns a channel that is closed when this Queue is closed +func (q *UniqueQueue) IsClosed() <-chan struct{} { + return q.closed +} + +// IDs returns the current ids in the pool +func (q *UniqueQueue) IDs() []interface{} { + q.table.lock.Lock() + defer q.table.lock.Unlock() + ids := make([]interface{}, 0, len(q.table.pool)) + for id := range q.table.pool { + ids = append(ids, id) + } + return ids +} + // Queue returns channel of queue for retrieving instances. func (q *UniqueQueue) Queue() <-chan string { return q.queue @@ -45,18 +79,22 @@ func (q *UniqueQueue) Exist(id interface{}) bool { // AddFunc adds new instance to the queue with a custom runnable function, // the queue is blocked until the function exits. func (q *UniqueQueue) AddFunc(id interface{}, fn func()) { - if q.Exist(id) { - return - } - idStr := com.ToStr(id) q.table.lock.Lock() + if _, ok := q.table.pool[idStr]; ok { + return + } q.table.pool[idStr] = struct{}{} if fn != nil { fn() } q.table.lock.Unlock() - q.queue <- idStr + select { + case <-q.closed: + return + case q.queue <- idStr: + return + } } // Add adds new instance to the queue. diff --git a/modules/webhook/deliver.go b/modules/webhook/deliver.go index b262505cea..9f5c938f83 100644 --- a/modules/webhook/deliver.go +++ b/modules/webhook/deliver.go @@ -5,6 +5,7 @@ package webhook import ( + "context" "crypto/tls" "fmt" "io/ioutil" @@ -16,6 +17,7 @@ import ( "time" "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" "github.com/gobwas/glob" @@ -145,8 +147,14 @@ func Deliver(t *models.HookTask) error { } // DeliverHooks checks and delivers undelivered hooks. -// TODO: shoot more hooks at same time. -func DeliverHooks() { +// FIXME: graceful: This would likely benefit from either a worker pool with dummy queue +// or a full queue. Then more hooks could be sent at same time. +func DeliverHooks(ctx context.Context) { + select { + case <-ctx.Done(): + return + default: + } tasks, err := models.FindUndeliveredHookTasks() if err != nil { log.Error("DeliverHooks: %v", err) @@ -155,33 +163,50 @@ func DeliverHooks() { // Update hook task status. for _, t := range tasks { + select { + case <-ctx.Done(): + return + default: + } if err = Deliver(t); err != nil { log.Error("deliver: %v", err) } } // Start listening on new hook requests. - for repoIDStr := range hookQueue.Queue() { - log.Trace("DeliverHooks [repo_id: %v]", repoIDStr) - hookQueue.Remove(repoIDStr) + for { + select { + case <-ctx.Done(): + hookQueue.Close() + return + case repoIDStr := <-hookQueue.Queue(): + log.Trace("DeliverHooks [repo_id: %v]", repoIDStr) + hookQueue.Remove(repoIDStr) - repoID, err := com.StrTo(repoIDStr).Int64() - if err != nil { - log.Error("Invalid repo ID: %s", repoIDStr) - continue - } + repoID, err := com.StrTo(repoIDStr).Int64() + if err != nil { + log.Error("Invalid repo ID: %s", repoIDStr) + continue + } - tasks, err := models.FindRepoUndeliveredHookTasks(repoID) - if err != nil { - log.Error("Get repository [%d] hook tasks: %v", repoID, err) - continue - } - for _, t := range tasks { - if err = Deliver(t); err != nil { - log.Error("deliver: %v", err) + tasks, err := models.FindRepoUndeliveredHookTasks(repoID) + if err != nil { + log.Error("Get repository [%d] hook tasks: %v", repoID, err) + continue + } + for _, t := range tasks { + select { + case <-ctx.Done(): + return + default: + } + if err = Deliver(t); err != nil { + log.Error("deliver: %v", err) + } } } } + } var ( @@ -234,5 +259,5 @@ func InitDeliverHooks() { }, } - go DeliverHooks() + go graceful.GetManager().RunWithShutdownContext(DeliverHooks) } |