From 167e8f18da3aadcdcdd7bb8c488c39d73ac65803 Mon Sep 17 00:00:00 2001 From: zeripath Date: Tue, 15 Oct 2019 14:39:51 +0100 Subject: Restore Graceful Restarting & Socket Activation (#7274) * Prevent deadlock in indexer initialisation during graceful restart * Move from gracehttp to our own service to add graceful ssh * Add timeout for start of indexers and make hammer time configurable * Fix issue with re-initialization in indexer during tests * move the code to detect use of closed to graceful * Handle logs gracefully - add a pid suffix just before restart * Move to using a cond and a holder for indexers * use time.Since * Add some comments and attribution * update modules.txt * Use zero to disable timeout * Move RestartProcess to its own file * Add cleanup routine --- modules/graceful/cleanup.go | 38 +++++ modules/graceful/net.go | 209 ++++++++++++++++++++++++++ modules/graceful/restart.go | 67 +++++++++ modules/graceful/server.go | 267 +++++++++++++++++++++++++++++++++ modules/graceful/server_hooks.go | 119 +++++++++++++++ modules/graceful/server_http.go | 45 ++++++ modules/graceful/server_signals.go | 93 ++++++++++++ modules/indexer/issues/indexer.go | 193 ++++++++++++++++-------- modules/indexer/issues/indexer_test.go | 14 +- modules/indexer/repo.go | 52 +++++-- modules/setting/indexer.go | 3 + modules/setting/log.go | 11 +- modules/setting/setting.go | 4 + modules/ssh/ssh.go | 7 +- modules/ssh/ssh_graceful.go | 30 ++++ modules/ssh/ssh_windows.go | 24 +++ 16 files changed, 1083 insertions(+), 93 deletions(-) create mode 100644 modules/graceful/cleanup.go create mode 100644 modules/graceful/net.go create mode 100644 modules/graceful/restart.go create mode 100644 modules/graceful/server.go create mode 100644 modules/graceful/server_hooks.go create mode 100644 modules/graceful/server_http.go create mode 100644 modules/graceful/server_signals.go create mode 100644 modules/ssh/ssh_graceful.go create mode 100644 modules/ssh/ssh_windows.go (limited to 'modules') diff --git a/modules/graceful/cleanup.go b/modules/graceful/cleanup.go new file mode 100644 index 0000000000..1de087a999 --- /dev/null +++ b/modules/graceful/cleanup.go @@ -0,0 +1,38 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package graceful + +import "sync" + +var cleanupWaitGroup sync.WaitGroup + +func init() { + cleanupWaitGroup = sync.WaitGroup{} + + // There are three places that could inherit sockets: + // + // * HTTP or HTTPS main listener + // * HTTP redirection fallback + // * SSH + // + // If you add an additional place you must increment this number + // and add a function to call InformCleanup if it's not going to be used + cleanupWaitGroup.Add(3) + + // Wait till we're done getting all of the listeners and then close + // the unused ones + go func() { + cleanupWaitGroup.Wait() + // Ignore the error here there's not much we can do with it + // They're logged in the CloseProvidedListeners function + _ = CloseProvidedListeners() + }() +} + +// InformCleanup tells the cleanup wait group that we have either taken a listener +// or will not be taking a listener +func InformCleanup() { + cleanupWaitGroup.Done() +} diff --git a/modules/graceful/net.go b/modules/graceful/net.go new file mode 100644 index 0000000000..f2612e21be --- /dev/null +++ b/modules/graceful/net.go @@ -0,0 +1,209 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +// This code is heavily inspired by the archived gofacebook/gracenet/net.go handler + +package graceful + +import ( + "fmt" + "net" + "os" + "strconv" + "strings" + "sync" + + "code.gitea.io/gitea/modules/log" +) + +const ( + listenFDs = "LISTEN_FDS" + startFD = 3 +) + +// In order to keep the working directory the same as when we started we record +// it at startup. +var originalWD, _ = os.Getwd() + +var ( + once = sync.Once{} + mutex = sync.Mutex{} + + providedListeners = []net.Listener{} + activeListeners = []net.Listener{} +) + +func getProvidedFDs() (savedErr error) { + // Only inherit the provided FDS once but we will save the error so that repeated calls to this function will return the same error + once.Do(func() { + mutex.Lock() + defer mutex.Unlock() + + numFDs := os.Getenv(listenFDs) + if numFDs == "" { + return + } + n, err := strconv.Atoi(numFDs) + if err != nil { + savedErr = fmt.Errorf("%s is not a number: %s. Err: %v", listenFDs, numFDs, err) + return + } + + for i := startFD; i < n+startFD; i++ { + file := os.NewFile(uintptr(i), fmt.Sprintf("listener_FD%d", i)) + + l, err := net.FileListener(file) + if err == nil { + // Close the inherited file if it's a listener + if err = file.Close(); err != nil { + savedErr = fmt.Errorf("error closing provided socket fd %d: %s", i, err) + return + } + providedListeners = append(providedListeners, l) + continue + } + + // If needed we can handle packetconns here. + savedErr = fmt.Errorf("Error getting provided socket fd %d: %v", i, err) + return + } + }) + return savedErr +} + +// CloseProvidedListeners closes all unused provided listeners. +func CloseProvidedListeners() error { + mutex.Lock() + defer mutex.Unlock() + var returnableError error + for _, l := range providedListeners { + err := l.Close() + if err != nil { + log.Error("Error in closing unused provided listener: %v", err) + if returnableError != nil { + returnableError = fmt.Errorf("%v & %v", returnableError, err) + } else { + returnableError = err + } + } + } + providedListeners = []net.Listener{} + + return returnableError +} + +// GetListener obtains a listener for the local network address. The network must be +// a stream-oriented network: "tcp", "tcp4", "tcp6", "unix" or "unixpacket". It +// returns an provided net.Listener for the matching network and address, or +// creates a new one using net.Listen. +func GetListener(network, address string) (net.Listener, error) { + // Add a deferral to say that we've tried to grab a listener + defer InformCleanup() + switch network { + case "tcp", "tcp4", "tcp6": + tcpAddr, err := net.ResolveTCPAddr(network, address) + if err != nil { + return nil, err + } + return GetListenerTCP(network, tcpAddr) + case "unix", "unixpacket": + unixAddr, err := net.ResolveUnixAddr(network, address) + if err != nil { + return nil, err + } + return GetListenerUnix(network, unixAddr) + default: + return nil, net.UnknownNetworkError(network) + } +} + +// GetListenerTCP announces on the local network address. The network must be: +// "tcp", "tcp4" or "tcp6". It returns a provided net.Listener for the +// matching network and address, or creates a new one using net.ListenTCP. +func GetListenerTCP(network string, address *net.TCPAddr) (*net.TCPListener, error) { + if err := getProvidedFDs(); err != nil { + return nil, err + } + + mutex.Lock() + defer mutex.Unlock() + + // look for a provided listener + for i, l := range providedListeners { + if isSameAddr(l.Addr(), address) { + providedListeners = append(providedListeners[:i], providedListeners[i+1:]...) + + activeListeners = append(activeListeners, l) + return l.(*net.TCPListener), nil + } + } + + // no provided listener for this address -> make a fresh listener + l, err := net.ListenTCP(network, address) + if err != nil { + return nil, err + } + activeListeners = append(activeListeners, l) + return l, nil +} + +// GetListenerUnix announces on the local network address. The network must be: +// "unix" or "unixpacket". It returns a provided net.Listener for the +// matching network and address, or creates a new one using net.ListenUnix. +func GetListenerUnix(network string, address *net.UnixAddr) (*net.UnixListener, error) { + if err := getProvidedFDs(); err != nil { + return nil, err + } + + mutex.Lock() + defer mutex.Unlock() + + // look for a provided listener + for i, l := range providedListeners { + if isSameAddr(l.Addr(), address) { + providedListeners = append(providedListeners[:i], providedListeners[i+1:]...) + activeListeners = append(activeListeners, l) + return l.(*net.UnixListener), nil + } + } + + // make a fresh listener + l, err := net.ListenUnix(network, address) + if err != nil { + return nil, err + } + activeListeners = append(activeListeners, l) + return l, nil +} + +func isSameAddr(a1, a2 net.Addr) bool { + // If the addresses are not on the same network fail. + if a1.Network() != a2.Network() { + return false + } + + // If the two addresses have the same string representation they're equal + a1s := a1.String() + a2s := a2.String() + if a1s == a2s { + return true + } + + // This allows for ipv6 vs ipv4 local addresses to compare as equal. This + // scenario is common when listening on localhost. + const ipv6prefix = "[::]" + a1s = strings.TrimPrefix(a1s, ipv6prefix) + a2s = strings.TrimPrefix(a2s, ipv6prefix) + const ipv4prefix = "0.0.0.0" + a1s = strings.TrimPrefix(a1s, ipv4prefix) + a2s = strings.TrimPrefix(a2s, ipv4prefix) + return a1s == a2s +} + +func getActiveListeners() []net.Listener { + mutex.Lock() + defer mutex.Unlock() + listeners := make([]net.Listener, len(activeListeners)) + copy(listeners, activeListeners) + return listeners +} diff --git a/modules/graceful/restart.go b/modules/graceful/restart.go new file mode 100644 index 0000000000..33b3c4d417 --- /dev/null +++ b/modules/graceful/restart.go @@ -0,0 +1,67 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +// This code is heavily inspired by the archived gofacebook/gracenet/net.go handler + +package graceful + +import ( + "fmt" + "os" + "os/exec" + "strings" +) + +// RestartProcess starts a new process passing it the active listeners. It +// doesn't fork, but starts a new process using the same environment and +// arguments as when it was originally started. This allows for a newly +// deployed binary to be started. It returns the pid of the newly started +// process when successful. +func RestartProcess() (int, error) { + listeners := getActiveListeners() + + // Extract the fds from the listeners. + files := make([]*os.File, len(listeners)) + for i, l := range listeners { + var err error + // Now, all our listeners actually have File() functions so instead of + // individually casting we just use a hacky interface + files[i], err = l.(filer).File() + if err != nil { + return 0, err + } + // Remember to close these at the end. + defer files[i].Close() + } + + // Use the original binary location. This works with symlinks such that if + // the file it points to has been changed we will use the updated symlink. + argv0, err := exec.LookPath(os.Args[0]) + if err != nil { + return 0, err + } + + // Pass on the environment and replace the old count key with the new one. + var env []string + for _, v := range os.Environ() { + if !strings.HasPrefix(v, listenFDs+"=") { + env = append(env, v) + } + } + env = append(env, fmt.Sprintf("%s=%d", listenFDs, len(listeners))) + + allFiles := append([]*os.File{os.Stdin, os.Stdout, os.Stderr}, files...) + process, err := os.StartProcess(argv0, os.Args, &os.ProcAttr{ + Dir: originalWD, + Env: env, + Files: allFiles, + }) + if err != nil { + return 0, err + } + return process.Pid, nil +} + +type filer interface { + File() (*os.File, error) +} diff --git a/modules/graceful/server.go b/modules/graceful/server.go new file mode 100644 index 0000000000..efe8b264b3 --- /dev/null +++ b/modules/graceful/server.go @@ -0,0 +1,267 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +// This code is highly inspired by endless go + +package graceful + +import ( + "crypto/tls" + "net" + "os" + "strings" + "sync" + "syscall" + "time" + + "code.gitea.io/gitea/modules/log" +) + +type state uint8 + +const ( + stateInit state = iota + stateRunning + stateShuttingDown + stateTerminate +) + +var ( + // RWMutex for when adding servers or shutting down + runningServerReg sync.RWMutex + // ensure we only fork once + runningServersForked bool + + // DefaultReadTimeOut default read timeout + DefaultReadTimeOut time.Duration + // DefaultWriteTimeOut default write timeout + DefaultWriteTimeOut time.Duration + // DefaultMaxHeaderBytes default max header bytes + DefaultMaxHeaderBytes int + + // IsChild reports if we are a fork iff LISTEN_FDS is set and our parent PID is not 1 + IsChild = len(os.Getenv(listenFDs)) > 0 && os.Getppid() > 1 +) + +func init() { + runningServerReg = sync.RWMutex{} + + DefaultMaxHeaderBytes = 0 // use http.DefaultMaxHeaderBytes - which currently is 1 << 20 (1MB) +} + +// ServeFunction represents a listen.Accept loop +type ServeFunction = func(net.Listener) error + +// Server represents our graceful server +type Server struct { + network string + address string + listener net.Listener + PreSignalHooks map[os.Signal][]func() + PostSignalHooks map[os.Signal][]func() + wg sync.WaitGroup + sigChan chan os.Signal + state state + lock *sync.RWMutex + BeforeBegin func(network, address string) + OnShutdown func() +} + +// NewServer creates a server on network at provided address +func NewServer(network, address string) *Server { + runningServerReg.Lock() + defer runningServerReg.Unlock() + + if IsChild { + log.Info("Restarting new server: %s:%s on PID: %d", network, address, os.Getpid()) + } else { + log.Info("Starting new server: %s:%s on PID: %d", network, address, os.Getpid()) + } + srv := &Server{ + wg: sync.WaitGroup{}, + sigChan: make(chan os.Signal), + PreSignalHooks: map[os.Signal][]func(){}, + PostSignalHooks: map[os.Signal][]func(){}, + state: stateInit, + lock: &sync.RWMutex{}, + network: network, + address: address, + } + + srv.BeforeBegin = func(network, addr string) { + log.Debug("Starting server on %s:%s (PID: %d)", network, addr, syscall.Getpid()) + } + + return srv +} + +// ListenAndServe listens on the provided network address and then calls Serve +// to handle requests on incoming connections. +func (srv *Server) ListenAndServe(serve ServeFunction) error { + go srv.handleSignals() + + l, err := GetListener(srv.network, srv.address) + if err != nil { + log.Error("Unable to GetListener: %v", err) + return err + } + + srv.listener = newWrappedListener(l, srv) + + if IsChild { + _ = syscall.Kill(syscall.Getppid(), syscall.SIGTERM) + } + + srv.BeforeBegin(srv.network, srv.address) + + return srv.Serve(serve) +} + +// ListenAndServeTLS listens on the provided network address and then calls +// Serve to handle requests on incoming TLS connections. +// +// Filenames containing a certificate and matching private key for the server must +// be provided. If the certificate is signed by a certificate authority, the +// certFile should be the concatenation of the server's certificate followed by the +// CA's certificate. +func (srv *Server) ListenAndServeTLS(certFile, keyFile string, serve ServeFunction) error { + config := &tls.Config{} + if config.NextProtos == nil { + config.NextProtos = []string{"http/1.1"} + } + + config.Certificates = make([]tls.Certificate, 1) + var err error + config.Certificates[0], err = tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { + log.Error("Failed to load https cert file %s for %s:%s: %v", certFile, srv.network, srv.address, err) + return err + } + return srv.ListenAndServeTLSConfig(config, serve) +} + +// ListenAndServeTLSConfig listens on the provided network address and then calls +// Serve to handle requests on incoming TLS connections. +func (srv *Server) ListenAndServeTLSConfig(tlsConfig *tls.Config, serve ServeFunction) error { + go srv.handleSignals() + + l, err := GetListener(srv.network, srv.address) + if err != nil { + log.Error("Unable to get Listener: %v", err) + return err + } + + wl := newWrappedListener(l, srv) + srv.listener = tls.NewListener(wl, tlsConfig) + + if IsChild { + _ = syscall.Kill(syscall.Getppid(), syscall.SIGTERM) + } + srv.BeforeBegin(srv.network, srv.address) + + return srv.Serve(serve) +} + +// Serve accepts incoming HTTP connections on the wrapped listener l, creating a new +// service goroutine for each. The service goroutines read requests and then call +// handler to reply to them. Handler is typically nil, in which case the +// DefaultServeMux is used. +// +// In addition to the standard Serve behaviour each connection is added to a +// sync.Waitgroup so that all outstanding connections can be served before shutting +// down the server. +func (srv *Server) Serve(serve ServeFunction) error { + defer log.Debug("Serve() returning... (PID: %d)", syscall.Getpid()) + srv.setState(stateRunning) + err := serve(srv.listener) + log.Debug("Waiting for connections to finish... (PID: %d)", syscall.Getpid()) + srv.wg.Wait() + srv.setState(stateTerminate) + // use of closed means that the listeners are closed - i.e. we should be shutting down - return nil + if err != nil && strings.Contains(err.Error(), "use of closed") { + return nil + } + return err +} + +func (srv *Server) getState() state { + srv.lock.RLock() + defer srv.lock.RUnlock() + + return srv.state +} + +func (srv *Server) setState(st state) { + srv.lock.Lock() + defer srv.lock.Unlock() + + srv.state = st +} + +type wrappedListener struct { + net.Listener + stopped bool + server *Server +} + +func newWrappedListener(l net.Listener, srv *Server) *wrappedListener { + return &wrappedListener{ + Listener: l, + server: srv, + } +} + +func (wl *wrappedListener) Accept() (net.Conn, error) { + var c net.Conn + // Set keepalive on TCPListeners connections. + if tcl, ok := wl.Listener.(*net.TCPListener); ok { + tc, err := tcl.AcceptTCP() + if err != nil { + return nil, err + } + _ = tc.SetKeepAlive(true) // see http.tcpKeepAliveListener + _ = tc.SetKeepAlivePeriod(3 * time.Minute) // see http.tcpKeepAliveListener + c = tc + } else { + var err error + c, err = wl.Listener.Accept() + if err != nil { + return nil, err + } + } + + c = wrappedConn{ + Conn: c, + server: wl.server, + } + + wl.server.wg.Add(1) + return c, nil +} + +func (wl *wrappedListener) Close() error { + if wl.stopped { + return syscall.EINVAL + } + + wl.stopped = true + return wl.Listener.Close() +} + +func (wl *wrappedListener) File() (*os.File, error) { + // returns a dup(2) - FD_CLOEXEC flag *not* set so the listening socket can be passed to child processes + return wl.Listener.(filer).File() +} + +type wrappedConn struct { + net.Conn + server *Server +} + +func (w wrappedConn) Close() error { + err := w.Conn.Close() + if err == nil { + w.server.wg.Done() + } + return err +} diff --git a/modules/graceful/server_hooks.go b/modules/graceful/server_hooks.go new file mode 100644 index 0000000000..a80d955556 --- /dev/null +++ b/modules/graceful/server_hooks.go @@ -0,0 +1,119 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package graceful + +import ( + "errors" + "fmt" + "os" + "runtime" + "time" + + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" +) + +// shutdown closes the listener so that no new connections are accepted +// and starts a goroutine that will hammer (stop all running requests) the server +// after setting.GracefulHammerTime. +func (srv *Server) shutdown() { + // only shutdown if we're running. + if srv.getState() != stateRunning { + return + } + + srv.setState(stateShuttingDown) + if setting.GracefulHammerTime >= 0 { + go srv.hammerTime(setting.GracefulHammerTime) + } + + if srv.OnShutdown != nil { + srv.OnShutdown() + } + err := srv.listener.Close() + if err != nil { + log.Error("PID: %d Listener.Close() error: %v", os.Getpid(), err) + } else { + log.Info("PID: %d Listener (%s) closed.", os.Getpid(), srv.listener.Addr()) + } +} + +// hammerTime forces the server to shutdown in a given timeout - whether it +// finished outstanding requests or not. if Read/WriteTimeout are not set or the +// max header size is very big a connection could hang... +// +// srv.Serve() will not return until all connections are served. this will +// unblock the srv.wg.Wait() in Serve() thus causing ListenAndServe* functions to +// return. +func (srv *Server) hammerTime(d time.Duration) { + defer func() { + // We call srv.wg.Done() until it panics. + // This happens if we call Done() when the WaitGroup counter is already at 0 + // So if it panics -> we're done, Serve() will return and the + // parent will goroutine will exit. + if r := recover(); r != nil { + log.Error("WaitGroup at 0: Error: %v", r) + } + }() + if srv.getState() != stateShuttingDown { + return + } + time.Sleep(d) + log.Warn("Forcefully shutting down parent") + for { + if srv.getState() == stateTerminate { + break + } + srv.wg.Done() + + // Give other goroutines a chance to finish before we forcibly stop them. + runtime.Gosched() + } +} + +func (srv *Server) fork() error { + runningServerReg.Lock() + defer runningServerReg.Unlock() + + // only one server instance should fork! + if runningServersForked { + return errors.New("another process already forked. Ignoring this one") + } + + runningServersForked = true + + // We need to move the file logs to append pids + setting.RestartLogsWithPIDSuffix() + + _, err := RestartProcess() + + return err +} + +// RegisterPreSignalHook registers a function to be run before the signal handler for +// a given signal. These are not mutex locked and should therefore be only called before Serve. +func (srv *Server) RegisterPreSignalHook(sig os.Signal, f func()) (err error) { + for _, s := range hookableSignals { + if s == sig { + srv.PreSignalHooks[sig] = append(srv.PreSignalHooks[sig], f) + return + } + } + err = fmt.Errorf("Signal %v is not supported", sig) + return +} + +// RegisterPostSignalHook registers a function to be run after the signal handler for +// a given signal. These are not mutex locked and should therefore be only called before Serve. +func (srv *Server) RegisterPostSignalHook(sig os.Signal, f func()) (err error) { + for _, s := range hookableSignals { + if s == sig { + srv.PostSignalHooks[sig] = append(srv.PostSignalHooks[sig], f) + return + } + } + err = fmt.Errorf("Signal %v is not supported", sig) + return +} diff --git a/modules/graceful/server_http.go b/modules/graceful/server_http.go new file mode 100644 index 0000000000..1052637d5e --- /dev/null +++ b/modules/graceful/server_http.go @@ -0,0 +1,45 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package graceful + +import ( + "crypto/tls" + "net/http" +) + +func newHTTPServer(network, address string, handler http.Handler) (*Server, ServeFunction) { + server := NewServer(network, address) + httpServer := http.Server{ + ReadTimeout: DefaultReadTimeOut, + WriteTimeout: DefaultWriteTimeOut, + MaxHeaderBytes: DefaultMaxHeaderBytes, + Handler: handler, + } + server.OnShutdown = func() { + httpServer.SetKeepAlivesEnabled(false) + } + return server, httpServer.Serve +} + +// HTTPListenAndServe listens on the provided network address and then calls Serve +// to handle requests on incoming connections. +func HTTPListenAndServe(network, address string, handler http.Handler) error { + server, lHandler := newHTTPServer(network, address, handler) + return server.ListenAndServe(lHandler) +} + +// HTTPListenAndServeTLS listens on the provided network address and then calls Serve +// to handle requests on incoming connections. +func HTTPListenAndServeTLS(network, address, certFile, keyFile string, handler http.Handler) error { + server, lHandler := newHTTPServer(network, address, handler) + return server.ListenAndServeTLS(certFile, keyFile, lHandler) +} + +// HTTPListenAndServeTLSConfig listens on the provided network address and then calls Serve +// to handle requests on incoming connections. +func HTTPListenAndServeTLSConfig(network, address string, tlsConfig *tls.Config, handler http.Handler) error { + server, lHandler := newHTTPServer(network, address, handler) + return server.ListenAndServeTLSConfig(tlsConfig, lHandler) +} diff --git a/modules/graceful/server_signals.go b/modules/graceful/server_signals.go new file mode 100644 index 0000000000..ea76b5509c --- /dev/null +++ b/modules/graceful/server_signals.go @@ -0,0 +1,93 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package graceful + +import ( + "os" + "os/signal" + "syscall" + "time" + + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" +) + +var hookableSignals []os.Signal + +func init() { + hookableSignals = []os.Signal{ + syscall.SIGHUP, + syscall.SIGUSR1, + syscall.SIGUSR2, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGTSTP, + } +} + +// handleSignals listens for os Signals and calls any hooked in function that the +// user had registered with the signal. +func (srv *Server) handleSignals() { + var sig os.Signal + + signal.Notify( + srv.sigChan, + hookableSignals..., + ) + + pid := syscall.Getpid() + for { + sig = <-srv.sigChan + srv.preSignalHooks(sig) + switch sig { + case syscall.SIGHUP: + if setting.GracefulRestartable { + log.Info("PID: %d. Received SIGHUP. Forking...", pid) + err := srv.fork() + if err != nil { + log.Error("Error whilst forking from PID: %d : %v", pid, err) + } + } else { + log.Info("PID: %d. Received SIGHUP. Not set restartable. Shutting down...", pid) + + srv.shutdown() + } + case syscall.SIGUSR1: + log.Info("PID %d. Received SIGUSR1.", pid) + case syscall.SIGUSR2: + log.Warn("PID %d. Received SIGUSR2. Hammering...", pid) + srv.hammerTime(0 * time.Second) + case syscall.SIGINT: + log.Warn("PID %d. Received SIGINT. Shutting down...", pid) + srv.shutdown() + case syscall.SIGTERM: + log.Warn("PID %d. Received SIGTERM. Shutting down...", pid) + srv.shutdown() + case syscall.SIGTSTP: + log.Info("PID %d. Received SIGTSTP.") + default: + log.Info("PID %d. Received %v.", sig) + } + srv.postSignalHooks(sig) + } +} + +func (srv *Server) preSignalHooks(sig os.Signal) { + if _, notSet := srv.PreSignalHooks[sig]; !notSet { + return + } + for _, f := range srv.PreSignalHooks[sig] { + f() + } +} + +func (srv *Server) postSignalHooks(sig os.Signal) { + if _, notSet := srv.PostSignalHooks[sig]; !notSet { + return + } + for _, f := range srv.PostSignalHooks[sig] { + f() + } +} diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index df8bfd6305..4f410daf4c 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -5,9 +5,11 @@ package issues import ( - "fmt" + "sync" + "time" "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/util" @@ -45,78 +47,143 @@ type Indexer interface { Search(kw string, repoID int64, limit, start int) (*SearchResult, error) } +type indexerHolder struct { + indexer Indexer + mutex sync.RWMutex + cond *sync.Cond +} + +func newIndexerHolder() *indexerHolder { + h := &indexerHolder{} + h.cond = sync.NewCond(h.mutex.RLocker()) + return h +} + +func (h *indexerHolder) set(indexer Indexer) { + h.mutex.Lock() + defer h.mutex.Unlock() + h.indexer = indexer + h.cond.Broadcast() +} + +func (h *indexerHolder) get() Indexer { + h.mutex.RLock() + defer h.mutex.RUnlock() + if h.indexer == nil { + h.cond.Wait() + } + return h.indexer +} + var ( + issueIndexerChannel = make(chan *IndexerData, setting.Indexer.UpdateQueueLength) // issueIndexerQueue queue of issue ids to be updated issueIndexerQueue Queue - issueIndexer Indexer + holder = newIndexerHolder() ) // InitIssueIndexer initialize issue indexer, syncReindex is true then reindex until // all issue index done. -func InitIssueIndexer(syncReindex bool) error { - var populate bool - var dummyQueue bool - switch setting.Indexer.IssueType { - case "bleve": - issueIndexer = NewBleveIndexer(setting.Indexer.IssuePath) - exist, err := issueIndexer.Init() - if err != nil { - return err +func InitIssueIndexer(syncReindex bool) { + waitChannel := make(chan time.Duration) + go func() { + start := time.Now() + log.Info("Initializing Issue Indexer") + var populate bool + var dummyQueue bool + switch setting.Indexer.IssueType { + case "bleve": + issueIndexer := NewBleveIndexer(setting.Indexer.IssuePath) + exist, err := issueIndexer.Init() + if err != nil { + log.Fatal("Unable to initialize Bleve Issue Indexer: %v", err) + } + populate = !exist + holder.set(issueIndexer) + case "db": + issueIndexer := &DBIndexer{} + holder.set(issueIndexer) + dummyQueue = true + default: + log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType) } - populate = !exist - case "db": - issueIndexer = &DBIndexer{} - dummyQueue = true - default: - return fmt.Errorf("unknow issue indexer type: %s", setting.Indexer.IssueType) - } - if dummyQueue { - issueIndexerQueue = &DummyQueue{} - return nil - } + if dummyQueue { + issueIndexerQueue = &DummyQueue{} + } else { + var err error + switch setting.Indexer.IssueQueueType { + case setting.LevelQueueType: + issueIndexerQueue, err = NewLevelQueue( + holder.get(), + setting.Indexer.IssueQueueDir, + setting.Indexer.IssueQueueBatchNumber) + if err != nil { + log.Fatal( + "Unable create level queue for issue queue dir: %s batch number: %d : %v", + setting.Indexer.IssueQueueDir, + setting.Indexer.IssueQueueBatchNumber, + err) + } + case setting.ChannelQueueType: + issueIndexerQueue = NewChannelQueue(holder.get(), setting.Indexer.IssueQueueBatchNumber) + case setting.RedisQueueType: + addrs, pass, idx, err := parseConnStr(setting.Indexer.IssueQueueConnStr) + if err != nil { + log.Fatal("Unable to parse connection string for RedisQueueType: %s : %v", + setting.Indexer.IssueQueueConnStr, + err) + } + issueIndexerQueue, err = NewRedisQueue(addrs, pass, idx, holder.get(), setting.Indexer.IssueQueueBatchNumber) + if err != nil { + log.Fatal("Unable to create RedisQueue: %s : %v", + setting.Indexer.IssueQueueConnStr, + err) + } + default: + log.Fatal("Unsupported indexer queue type: %v", + setting.Indexer.IssueQueueType) + } - var err error - switch setting.Indexer.IssueQueueType { - case setting.LevelQueueType: - issueIndexerQueue, err = NewLevelQueue( - issueIndexer, - setting.Indexer.IssueQueueDir, - setting.Indexer.IssueQueueBatchNumber) - if err != nil { - return err + go func() { + err = issueIndexerQueue.Run() + if err != nil { + log.Error("issueIndexerQueue.Run: %v", err) + } + }() } - case setting.ChannelQueueType: - issueIndexerQueue = NewChannelQueue(issueIndexer, setting.Indexer.IssueQueueBatchNumber) - case setting.RedisQueueType: - addrs, pass, idx, err := parseConnStr(setting.Indexer.IssueQueueConnStr) - if err != nil { - return err - } - issueIndexerQueue, err = NewRedisQueue(addrs, pass, idx, issueIndexer, setting.Indexer.IssueQueueBatchNumber) - if err != nil { - return err - } - default: - return fmt.Errorf("Unsupported indexer queue type: %v", setting.Indexer.IssueQueueType) - } - go func() { - err = issueIndexerQueue.Run() - if err != nil { - log.Error("issueIndexerQueue.Run: %v", err) - } - }() + go func() { + for data := range issueIndexerChannel { + _ = issueIndexerQueue.Push(data) + } + }() - if populate { - if syncReindex { - populateIssueIndexer() - } else { - go populateIssueIndexer() + if populate { + if syncReindex { + populateIssueIndexer() + } else { + go populateIssueIndexer() + } } + waitChannel <- time.Since(start) + }() + if syncReindex { + <-waitChannel + } else if setting.Indexer.StartupTimeout > 0 { + go func() { + timeout := setting.Indexer.StartupTimeout + if graceful.IsChild && setting.GracefulHammerTime > 0 { + timeout += setting.GracefulHammerTime + } + select { + case duration := <-waitChannel: + log.Info("Issue Indexer Initialization took %v", duration) + case <-time.After(timeout): + log.Fatal("Issue Indexer Initialization timed-out after: %v", timeout) + } + }() } - - return nil } // populateIssueIndexer populate the issue indexer with issue data @@ -166,13 +233,13 @@ func UpdateIssueIndexer(issue *models.Issue) { comments = append(comments, comment.Content) } } - _ = issueIndexerQueue.Push(&IndexerData{ + issueIndexerChannel <- &IndexerData{ ID: issue.ID, RepoID: issue.RepoID, Title: issue.Title, Content: issue.Content, Comments: comments, - }) + } } // DeleteRepoIssueIndexer deletes repo's all issues indexes @@ -188,16 +255,16 @@ func DeleteRepoIssueIndexer(repo *models.Repository) { return } - _ = issueIndexerQueue.Push(&IndexerData{ + issueIndexerChannel <- &IndexerData{ IDs: ids, IsDelete: true, - }) + } } // SearchIssuesByKeyword search issue ids by keywords and repo id func SearchIssuesByKeyword(repoID int64, keyword string) ([]int64, error) { var issueIDs []int64 - res, err := issueIndexer.Search(keyword, repoID, 1000, 0) + res, err := holder.get().Search(keyword, repoID, 1000, 0) if err != nil { return nil, err } diff --git a/modules/indexer/issues/indexer_test.go b/modules/indexer/issues/indexer_test.go index 59a7beed47..212c2edfbe 100644 --- a/modules/indexer/issues/indexer_test.go +++ b/modules/indexer/issues/indexer_test.go @@ -5,7 +5,6 @@ package issues import ( - "fmt" "os" "path/filepath" "testing" @@ -17,11 +16,6 @@ import ( "github.com/stretchr/testify/assert" ) -func fatalTestError(fmtStr string, args ...interface{}) { - fmt.Fprintf(os.Stderr, fmtStr, args...) - os.Exit(1) -} - func TestMain(m *testing.M) { models.MainTest(m, filepath.Join("..", "..", "..")) } @@ -32,9 +26,7 @@ func TestBleveSearchIssues(t *testing.T) { os.RemoveAll(setting.Indexer.IssueQueueDir) os.RemoveAll(setting.Indexer.IssuePath) setting.Indexer.IssueType = "bleve" - if err := InitIssueIndexer(true); err != nil { - fatalTestError("Error InitIssueIndexer: %v\n", err) - } + InitIssueIndexer(true) time.Sleep(5 * time.Second) @@ -59,9 +51,7 @@ func TestDBSearchIssues(t *testing.T) { assert.NoError(t, models.PrepareTestDatabase()) setting.Indexer.IssueType = "db" - if err := InitIssueIndexer(true); err != nil { - fatalTestError("Error InitIssueIndexer: %v\n", err) - } + InitIssueIndexer(true) ids, err := SearchIssuesByKeyword(1, "issue2") assert.NoError(t, err) diff --git a/modules/indexer/repo.go b/modules/indexer/repo.go index 91ed173aa7..841f29acd7 100644 --- a/modules/indexer/repo.go +++ b/modules/indexer/repo.go @@ -6,6 +6,7 @@ package indexer import ( "strings" + "sync" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" @@ -25,8 +26,36 @@ const ( repoIndexerLatestVersion = 4 ) +type bleveIndexerHolder struct { + index bleve.Index + mutex sync.RWMutex + cond *sync.Cond +} + +func newBleveIndexerHolder() *bleveIndexerHolder { + b := &bleveIndexerHolder{} + b.cond = sync.NewCond(b.mutex.RLocker()) + return b +} + +func (r *bleveIndexerHolder) set(index bleve.Index) { + r.mutex.Lock() + defer r.mutex.Unlock() + r.index = index + r.cond.Broadcast() +} + +func (r *bleveIndexerHolder) get() bleve.Index { + r.mutex.RLock() + defer r.mutex.RUnlock() + if r.index == nil { + r.cond.Wait() + } + return r.index +} + // repoIndexer (thread-safe) index for repository contents -var repoIndexer bleve.Index +var indexerHolder = newBleveIndexerHolder() // RepoIndexerOp type of operation to perform on repo indexer type RepoIndexerOp int @@ -73,12 +102,12 @@ func (update RepoIndexerUpdate) AddToFlushingBatch(batch rupture.FlushingBatch) // InitRepoIndexer initialize repo indexer func InitRepoIndexer(populateIndexer func() error) { - var err error - repoIndexer, err = openIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion) + indexer, err := openIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion) if err != nil { log.Fatal("InitRepoIndexer: %v", err) } - if repoIndexer != nil { + if indexer != nil { + indexerHolder.set(indexer) return } @@ -92,7 +121,6 @@ func InitRepoIndexer(populateIndexer func() error) { // createRepoIndexer create a repo indexer if one does not already exist func createRepoIndexer(path string, latestVersion int) error { - var err error docMapping := bleve.NewDocumentMapping() numericFieldMapping := bleve.NewNumericFieldMapping() numericFieldMapping.IncludeInAll = false @@ -103,9 +131,9 @@ func createRepoIndexer(path string, latestVersion int) error { docMapping.AddFieldMappingsAt("Content", textFieldMapping) mapping := bleve.NewIndexMapping() - if err = addUnicodeNormalizeTokenFilter(mapping); err != nil { + if err := addUnicodeNormalizeTokenFilter(mapping); err != nil { return err - } else if err = mapping.AddCustomAnalyzer(repoIndexerAnalyzer, map[string]interface{}{ + } else if err := mapping.AddCustomAnalyzer(repoIndexerAnalyzer, map[string]interface{}{ "type": custom.Name, "char_filters": []string{}, "tokenizer": unicode.Name, @@ -117,10 +145,12 @@ func createRepoIndexer(path string, latestVersion int) error { mapping.AddDocumentMapping(repoIndexerDocType, docMapping) mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping()) - repoIndexer, err = bleve.New(path, mapping) + indexer, err := bleve.New(path, mapping) if err != nil { return err } + indexerHolder.set(indexer) + return rupture.WriteIndexMetadata(path, &rupture.IndexMetadata{ Version: latestVersion, }) @@ -140,14 +170,14 @@ func filenameOfIndexerID(indexerID string) string { // RepoIndexerBatch batch to add updates to func RepoIndexerBatch() rupture.FlushingBatch { - return rupture.NewFlushingBatch(repoIndexer, maxBatchSize) + return rupture.NewFlushingBatch(indexerHolder.get(), maxBatchSize) } // DeleteRepoFromIndexer delete all of a repo's files from indexer func DeleteRepoFromIndexer(repoID int64) error { query := numericEqualityQuery(repoID, "RepoID") searchRequest := bleve.NewSearchRequestOptions(query, 2147483647, 0, false) - result, err := repoIndexer.Search(searchRequest) + result, err := indexerHolder.get().Search(searchRequest) if err != nil { return err } @@ -196,7 +226,7 @@ func SearchRepoByKeyword(repoIDs []int64, keyword string, page, pageSize int) (i searchRequest.Fields = []string{"Content", "RepoID"} searchRequest.IncludeLocations = true - result, err := repoIndexer.Search(searchRequest) + result, err := indexerHolder.get().Search(searchRequest) if err != nil { return 0, nil, err } diff --git a/modules/setting/indexer.go b/modules/setting/indexer.go index 30c670d407..fbaef3fcf2 100644 --- a/modules/setting/indexer.go +++ b/modules/setting/indexer.go @@ -8,6 +8,7 @@ import ( "path" "path/filepath" "strings" + "time" "code.gitea.io/gitea/modules/log" @@ -34,6 +35,7 @@ var ( IssueQueueDir string IssueQueueConnStr string IssueQueueBatchNumber int + StartupTimeout time.Duration IncludePatterns []glob.Glob ExcludePatterns []glob.Glob }{ @@ -67,6 +69,7 @@ func newIndexerService() { Indexer.IssueQueueDir = sec.Key("ISSUE_INDEXER_QUEUE_DIR").MustString(path.Join(AppDataPath, "indexers/issues.queue")) Indexer.IssueQueueConnStr = sec.Key("ISSUE_INDEXER_QUEUE_CONN_STR").MustString(path.Join(AppDataPath, "")) Indexer.IssueQueueBatchNumber = sec.Key("ISSUE_INDEXER_QUEUE_BATCH_NUMBER").MustInt(20) + Indexer.StartupTimeout = sec.Key("STARTUP_TIMEOUT").MustDuration(30 * time.Second) } // IndexerGlobFromString parses a comma separated list of patterns and returns a glob.Glob slice suited for repo indexing diff --git a/modules/setting/log.go b/modules/setting/log.go index 5e2d2d769d..cb8f142084 100644 --- a/modules/setting/log.go +++ b/modules/setting/log.go @@ -6,6 +6,7 @@ package setting import ( "encoding/json" + "fmt" golog "log" "os" "path" @@ -17,6 +18,8 @@ import ( ini "gopkg.in/ini.v1" ) +var filenameSuffix = "" + type defaultLogOptions struct { levelName string // LogLevel flags string @@ -112,7 +115,7 @@ func generateLogConfig(sec *ini.Section, name string, defaults defaultLogOptions panic(err.Error()) } - logConfig["filename"] = logPath + logConfig["filename"] = logPath + filenameSuffix logConfig["rotate"] = sec.Key("LOG_ROTATE").MustBool(true) logConfig["maxsize"] = 1 << uint(sec.Key("MAX_SIZE_SHIFT").MustInt(28)) logConfig["daily"] = sec.Key("DAILY_ROTATE").MustBool(true) @@ -277,6 +280,12 @@ func newLogService() { golog.SetOutput(log.NewLoggerAsWriter("INFO", log.GetLogger(log.DEFAULT))) } +// RestartLogsWithPIDSuffix restarts the logs with a PID suffix on files +func RestartLogsWithPIDSuffix() { + filenameSuffix = fmt.Sprintf(".%d", os.Getpid()) + NewLogServices(false) +} + // NewLogServices creates all the log services func NewLogServices(disableConsole bool) { newLogService() diff --git a/modules/setting/setting.go b/modules/setting/setting.go index 629a89766f..144882976f 100644 --- a/modules/setting/setting.go +++ b/modules/setting/setting.go @@ -97,6 +97,8 @@ var ( LetsEncryptTOS bool LetsEncryptDirectory string LetsEncryptEmail string + GracefulRestartable bool + GracefulHammerTime time.Duration SSH = struct { Disabled bool `ini:"DISABLE_SSH"` @@ -563,6 +565,8 @@ func NewContext() { Domain = sec.Key("DOMAIN").MustString("localhost") HTTPAddr = sec.Key("HTTP_ADDR").MustString("0.0.0.0") HTTPPort = sec.Key("HTTP_PORT").MustString("3000") + GracefulRestartable = sec.Key("ALLOW_GRACEFUL_RESTARTS").MustBool(true) + GracefulHammerTime = sec.Key("GRACEFUL_HAMMER_TIME").MustDuration(60 * time.Second) defaultAppURL := string(Protocol) + "://" + Domain if (Protocol == HTTP && HTTPPort != "80") || (Protocol == HTTPS && HTTPPort != "443") { diff --git a/modules/ssh/ssh.go b/modules/ssh/ssh.go index 7ff0c32326..e7a694683a 100644 --- a/modules/ssh/ssh.go +++ b/modules/ssh/ssh.go @@ -183,12 +183,7 @@ func Listen(host string, port int, ciphers []string, keyExchanges []string, macs log.Error("Failed to set Host Key. %s", err) } - go func() { - err := srv.ListenAndServe() - if err != nil { - log.Error("Failed to serve with builtin SSH server. %s", err) - } - }() + go listen(&srv) } diff --git a/modules/ssh/ssh_graceful.go b/modules/ssh/ssh_graceful.go new file mode 100644 index 0000000000..d66c7d6540 --- /dev/null +++ b/modules/ssh/ssh_graceful.go @@ -0,0 +1,30 @@ +// +build !windows + +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package ssh + +import ( + "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/log" + + "github.com/gliderlabs/ssh" +) + +func listen(server *ssh.Server) { + gracefulServer := graceful.NewServer("tcp", server.Addr) + + err := gracefulServer.ListenAndServe(server.Serve) + if err != nil { + log.Critical("Failed to start SSH server: %v", err) + } + log.Info("SSH Listener: %s Closed", server.Addr) + +} + +// Unused informs our cleanup routine that we will not be using a ssh port +func Unused() { + graceful.InformCleanup() +} diff --git a/modules/ssh/ssh_windows.go b/modules/ssh/ssh_windows.go new file mode 100644 index 0000000000..55032e17cd --- /dev/null +++ b/modules/ssh/ssh_windows.go @@ -0,0 +1,24 @@ +// +build windows + +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package ssh + +import ( + "code.gitea.io/gitea/modules/log" + "github.com/gliderlabs/ssh" +) + +func listen(server *ssh.Server) { + err := server.ListenAndServe() + if err != nil { + log.Critical("Failed to serve with builtin SSH server. %s", err) + } +} + +// Unused does nothing on windows +func Unused() { + // Do nothing +} -- cgit v1.2.3