diff options
author | zeripath <art27@cantab.net> | 2019-10-15 14:39:51 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-10-15 14:39:51 +0100 |
commit | 167e8f18da3aadcdcdd7bb8c488c39d73ac65803 (patch) | |
tree | c2ad32fc8ced5657f62034551e72134a0a238fcb | |
parent | 4a290bd64cd4c4ba77b9f3c4908a76cc521f9621 (diff) | |
download | gitea-167e8f18da3aadcdcdd7bb8c488c39d73ac65803.tar.gz gitea-167e8f18da3aadcdcdd7bb8c488c39d73ac65803.zip |
Restore Graceful Restarting & Socket Activation (#7274)
* Prevent deadlock in indexer initialisation during graceful restart
* Move from gracehttp to our own service to add graceful ssh
* Add timeout for start of indexers and make hammer time configurable
* Fix issue with re-initialization in indexer during tests
* move the code to detect use of closed to graceful
* Handle logs gracefully - add a pid suffix just before restart
* Move to using a cond and a holder for indexers
* use time.Since
* Add some comments and attribution
* update modules.txt
* Use zero to disable timeout
* Move RestartProcess to its own file
* Add cleanup routine
45 files changed, 1199 insertions, 2006 deletions
diff --git a/cmd/web.go b/cmd/web.go index 9a5ce5d2b6..ae05b9e145 100644 --- a/cmd/web.go +++ b/cmd/web.go @@ -75,17 +75,13 @@ func runLetsEncrypt(listenAddr, domain, directory, email string, m http.Handler) } go func() { log.Info("Running Let's Encrypt handler on %s", setting.HTTPAddr+":"+setting.PortToRedirect) - var err = http.ListenAndServe(setting.HTTPAddr+":"+setting.PortToRedirect, certManager.HTTPHandler(http.HandlerFunc(runLetsEncryptFallbackHandler))) // all traffic coming into HTTP will be redirect to HTTPS automatically (LE HTTP-01 validation happens here) + // all traffic coming into HTTP will be redirect to HTTPS automatically (LE HTTP-01 validation happens here) + var err = runHTTP(setting.HTTPAddr+":"+setting.PortToRedirect, certManager.HTTPHandler(http.HandlerFunc(runLetsEncryptFallbackHandler))) if err != nil { log.Fatal("Failed to start the Let's Encrypt handler on port %s: %v", setting.PortToRedirect, err) } }() - server := &http.Server{ - Addr: listenAddr, - Handler: m, - TLSConfig: certManager.TLSConfig(), - } - return server.ListenAndServeTLS("", "") + return runHTTPSWithTLSConfig(listenAddr, certManager.TLSConfig(), context2.ClearHandler(m)) } func runLetsEncryptFallbackHandler(w http.ResponseWriter, r *http.Request) { @@ -101,12 +97,21 @@ func runLetsEncryptFallbackHandler(w http.ResponseWriter, r *http.Request) { } func runWeb(ctx *cli.Context) error { + if os.Getppid() > 1 && len(os.Getenv("LISTEN_FDS")) > 0 { + log.Info("Restarting Gitea on PID: %d from parent PID: %d", os.Getpid(), os.Getppid()) + } else { + log.Info("Starting Gitea on PID: %d", os.Getpid()) + } + + // Set pid file setting if ctx.IsSet("pid") { setting.CustomPID = ctx.String("pid") } + // Perform global initialization routers.GlobalInit() + // Set up Macaron m := routes.NewMacaron() routes.RegisterRoutes(m) @@ -164,6 +169,7 @@ func runWeb(ctx *cli.Context) error { var err error switch setting.Protocol { case setting.HTTP: + NoHTTPRedirector() err = runHTTP(listenAddr, context2.ClearHandler(m)) case setting.HTTPS: if setting.EnableLetsEncrypt { @@ -172,9 +178,15 @@ func runWeb(ctx *cli.Context) error { } if setting.RedirectOtherPort { go runHTTPRedirector() + } else { + NoHTTPRedirector() } err = runHTTPS(listenAddr, setting.CertFile, setting.KeyFile, context2.ClearHandler(m)) case setting.FCGI: + NoHTTPRedirector() + // FCGI listeners are provided as stdin - this is orthogonal to the LISTEN_FDS approach + // in graceful and systemD + NoMainListener() var listener net.Listener listener, err = net.Listen("tcp", listenAddr) if err != nil { @@ -187,6 +199,10 @@ func runWeb(ctx *cli.Context) error { }() err = fcgi.Serve(listener, context2.ClearHandler(m)) case setting.UnixSocket: + // This could potentially be inherited using LISTEN_FDS but currently + // these cannot be inherited + NoHTTPRedirector() + NoMainListener() if err := os.Remove(listenAddr); err != nil && !os.IsNotExist(err) { log.Fatal("Failed to remove unix socket directory %s: %v", listenAddr, err) } @@ -207,8 +223,9 @@ func runWeb(ctx *cli.Context) error { } if err != nil { - log.Fatal("Failed to start server: %v", err) + log.Critical("Failed to start server: %v", err) } - + log.Info("HTTP Listener: %s Closed", listenAddr) + log.Close() return nil } diff --git a/cmd/web_graceful.go b/cmd/web_graceful.go index 53f407ce9e..07b5a964c5 100644 --- a/cmd/web_graceful.go +++ b/cmd/web_graceful.go @@ -10,36 +10,28 @@ import ( "crypto/tls" "net/http" - "code.gitea.io/gitea/modules/log" - - "github.com/facebookgo/grace/gracehttp" + "code.gitea.io/gitea/modules/graceful" ) func runHTTP(listenAddr string, m http.Handler) error { - return gracehttp.Serve(&http.Server{ - Addr: listenAddr, - Handler: m, - }) + return graceful.HTTPListenAndServe("tcp", listenAddr, m) } func runHTTPS(listenAddr, certFile, keyFile string, m http.Handler) error { - config := &tls.Config{ - MinVersion: tls.VersionTLS10, - } - if config.NextProtos == nil { - config.NextProtos = []string{"http/1.1"} - } - - config.Certificates = make([]tls.Certificate, 1) - var err error - config.Certificates[0], err = tls.LoadX509KeyPair(certFile, keyFile) - if err != nil { - log.Fatal("Failed to load https cert file %s: %v", listenAddr, err) - } - - return gracehttp.Serve(&http.Server{ - Addr: listenAddr, - Handler: m, - TLSConfig: config, - }) + return graceful.HTTPListenAndServeTLS("tcp", listenAddr, certFile, keyFile, m) +} + +func runHTTPSWithTLSConfig(listenAddr string, tlsConfig *tls.Config, m http.Handler) error { + return graceful.HTTPListenAndServeTLSConfig("tcp", listenAddr, tlsConfig, m) +} + +// NoHTTPRedirector tells our cleanup routine that we will not be using a fallback http redirector +func NoHTTPRedirector() { + graceful.InformCleanup() +} + +// NoMainListener tells our cleanup routine that we will not be using a possibly provided listener +// for our main HTTP/HTTPS service +func NoMainListener() { + graceful.InformCleanup() } diff --git a/cmd/web_windows.go b/cmd/web_windows.go index 0fc6cbea0d..cdd2cc513b 100644 --- a/cmd/web_windows.go +++ b/cmd/web_windows.go @@ -7,6 +7,7 @@ package cmd import ( + "crypto/tls" "net/http" ) @@ -17,3 +18,20 @@ func runHTTP(listenAddr string, m http.Handler) error { func runHTTPS(listenAddr, certFile, keyFile string, m http.Handler) error { return http.ListenAndServeTLS(listenAddr, certFile, keyFile, m) } + +func runHTTPSWithTLSConfig(listenAddr string, tlsConfig *tls.Config, m http.Handler) error { + server := &http.Server{ + Addr: listenAddr, + Handler: m, + TLSConfig: tlsConfig, + } + return server.ListenAndServeTLS("", "") +} + +// NoHTTPRedirector is a no-op on Windows +func NoHTTPRedirector() { +} + +// NoMainListener is a no-op on Windows +func NoMainListener() { +} diff --git a/custom/conf/app.ini.sample b/custom/conf/app.ini.sample index 5ee2162ba3..08a2c8e32f 100644 --- a/custom/conf/app.ini.sample +++ b/custom/conf/app.ini.sample @@ -244,6 +244,12 @@ LFS_CONTENT_PATH = data/lfs LFS_JWT_SECRET = ; LFS authentication validity period (in time.Duration), pushes taking longer than this may fail. LFS_HTTP_AUTH_EXPIRY = 20m +; Allow graceful restarts using SIGHUP to fork +ALLOW_GRACEFUL_RESTARTS = true +; After a restart the parent will finish ongoing requests before +; shutting down. Force shutdown if this process takes longer than this delay. +; set to a negative value to disable +GRACEFUL_HAMMER_TIME = 60s ; Static resources, includes resources on custom/, public/ and all uploaded avatars web browser cache time, default is 6h STATIC_CACHE_TIME = 6h @@ -299,6 +305,9 @@ ISSUE_INDEXER_QUEUE_DIR = indexers/issues.queue ISSUE_INDEXER_QUEUE_CONN_STR = "addrs=127.0.0.1:6379 db=0" ; Batch queue number, default is 20 ISSUE_INDEXER_QUEUE_BATCH_NUMBER = 20 +; Timeout the indexer if it takes longer than this to start. +; Set to zero to disable timeout. +STARTUP_TIMEOUT=30s ; repo indexer by default disabled, since it uses a lot of disk space REPO_INDEXER_ENABLED = false diff --git a/docs/content/doc/advanced/config-cheat-sheet.en-us.md b/docs/content/doc/advanced/config-cheat-sheet.en-us.md index 29a971da12..6fb92f2721 100644 --- a/docs/content/doc/advanced/config-cheat-sheet.en-us.md +++ b/docs/content/doc/advanced/config-cheat-sheet.en-us.md @@ -157,6 +157,8 @@ Values containing `#` or `;` must be quoted using `` ` `` or `"""`. - `LETSENCRYPT_ACCEPTTOS`: **false**: This is an explicit check that you accept the terms of service for Let's Encrypt. - `LETSENCRYPT_DIRECTORY`: **https**: Directory that Letsencrypt will use to cache information such as certs and private keys. - `LETSENCRYPT_EMAIL`: **email@example.com**: Email used by Letsencrypt to notify about problems with issued certificates. (No default) +- `ALLOW_GRACEFUL_RESTARTS`: **true**: Perform a graceful restart on SIGHUP +- `GRACEFUL_HAMMER_TIME`: **60s**: After a restart the parent process will stop accepting new connections and will allow requests to finish before stopping. Shutdown will be forced if it takes longer than this time. ## Database (`database`) @@ -189,6 +191,7 @@ Values containing `#` or `;` must be quoted using `` ` `` or `"""`. - `REPO_INDEXER_EXCLUDE`: **empty**: A comma separated list of glob patterns (see https://github.com/gobwas/glob) to **exclude** from the index. Files that match this list will not be indexed, even if they match in `REPO_INDEXER_INCLUDE`. - `UPDATE_BUFFER_LEN`: **20**: Buffer length of index request. - `MAX_FILE_SIZE`: **1048576**: Maximum size in bytes of files to be indexed. +- `STARTUP_TIMEOUT`: **30s**: If the indexer takes longer than this timeout to start - fail. (This timeout will be added to the hammer time above for child processes - as bleve will not start until the previous parent is shutdown.) Set to zero to never timeout. ## Admin (`admin`) - `DEFAULT_EMAIL_NOTIFICATIONS`: **enabled**: Default configuration for email notifications for users (user configurable). Options: enabled, onmention, disabled @@ -32,13 +32,8 @@ require ( github.com/emirpasic/gods v1.12.0 github.com/etcd-io/bbolt v1.3.2 // indirect github.com/ethantkoenig/rupture v0.0.0-20180203182544-0a76f03a811a - github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect github.com/facebookgo/ensure v0.0.0-20160127193407-b4ab57deab51 // indirect - github.com/facebookgo/freeport v0.0.0-20150612182905-d4adf43b75b9 // indirect - github.com/facebookgo/grace v0.0.0-20160926231715-5729e484473f - github.com/facebookgo/httpdown v0.0.0-20160323221027-a3b1354551a2 // indirect github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect - github.com/facebookgo/stats v0.0.0-20151006221625-1b76add642e4 // indirect github.com/facebookgo/subset v0.0.0-20150612182917-8dac2c3c4870 // indirect github.com/gliderlabs/ssh v0.2.2 github.com/glycerine/go-unsnap-stream v0.0.0-20180323001048-9f0cb55181dd // indirect @@ -142,20 +142,10 @@ github.com/etcd-io/bbolt v1.3.2 h1:RLRQ0TKLX7DlBRXAJHvbmXL17Q3KNnTBtZ9B6Qo+/Y0= github.com/etcd-io/bbolt v1.3.2/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw= github.com/ethantkoenig/rupture v0.0.0-20180203182544-0a76f03a811a h1:M1bRpaZAn4GSsqu3hdK2R8H0AH9O6vqCTCbm2oAFGfE= github.com/ethantkoenig/rupture v0.0.0-20180203182544-0a76f03a811a/go.mod h1:MkKY/CB98aVE4VxO63X5vTQKUgcn+3XP15LMASe3lYs= -github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a h1:yDWHCSQ40h88yih2JAcL6Ls/kVkSE8GFACTGVnMPruw= -github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a/go.mod h1:7Ga40egUymuWXxAe151lTNnCv97MddSOVsjpPPkityA= github.com/facebookgo/ensure v0.0.0-20160127193407-b4ab57deab51 h1:0JZ+dUmQeA8IIVUMzysrX4/AKuQwWhV2dYQuPZdvdSQ= github.com/facebookgo/ensure v0.0.0-20160127193407-b4ab57deab51/go.mod h1:Yg+htXGokKKdzcwhuNDwVvN+uBxDGXJ7G/VN1d8fa64= -github.com/facebookgo/freeport v0.0.0-20150612182905-d4adf43b75b9 h1:wWke/RUCl7VRjQhwPlR/v0glZXNYzBHdNUzf/Am2Nmg= -github.com/facebookgo/freeport v0.0.0-20150612182905-d4adf43b75b9/go.mod h1:uPmAp6Sws4L7+Q/OokbWDAK1ibXYhB3PXFP1kol5hPg= -github.com/facebookgo/grace v0.0.0-20160926231715-5729e484473f h1:0mlfEUWnUDVZnqWEVHGerL5bKYDKMEmT/Qk/W/3nGuo= -github.com/facebookgo/grace v0.0.0-20160926231715-5729e484473f/go.mod h1:KigFdumBXUPSwzLDbeuzyt0elrL7+CP7TKuhrhT4bcU= -github.com/facebookgo/httpdown v0.0.0-20160323221027-a3b1354551a2 h1:3Zvf9wRhl1cOhckN1oRGWPOkIhOketmEcrQ4TeFAoR4= -github.com/facebookgo/httpdown v0.0.0-20160323221027-a3b1354551a2/go.mod h1:TUV/fX3XrTtBQb5+ttSUJzcFgLNpILONFTKmBuk5RSw= github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 h1:JWuenKqqX8nojtoVVWjGfOF9635RETekkoH6Cc9SX0A= github.com/facebookgo/stack v0.0.0-20160209184415-751773369052/go.mod h1:UbMTZqLaRiH3MsBH8va0n7s1pQYcu3uTb8G4tygF4Zg= -github.com/facebookgo/stats v0.0.0-20151006221625-1b76add642e4 h1:0YtRCqIZs2+Tz49QuH6cJVw/IFqzo39gEqZ0iYLxD2M= -github.com/facebookgo/stats v0.0.0-20151006221625-1b76add642e4/go.mod h1:vsJz7uE339KUCpBXx3JAJzSRH7Uk4iGGyJzR529qDIA= github.com/facebookgo/subset v0.0.0-20150612182917-8dac2c3c4870 h1:E2s37DuLxFhQDg5gKsWoLBOB0n+ZW8s599zru8FJ2/Y= github.com/facebookgo/subset v0.0.0-20150612182917-8dac2c3c4870/go.mod h1:5tD+neXqOorC30/tWg0LCSkrqj/AR6gu8yY8/fpw1q0= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568 h1:BHsljHzVlRcyQhjrss6TZTdY2VfCqZPbv5k3iBFa2ZQ= diff --git a/models/repo_indexer.go b/models/repo_indexer.go index b842a1c87f..9cc002a8ab 100644 --- a/models/repo_indexer.go +++ b/models/repo_indexer.go @@ -8,10 +8,12 @@ import ( "fmt" "strconv" "strings" + "time" "code.gitea.io/gitea/modules/base" "code.gitea.io/gitea/modules/charset" "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/indexer" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" @@ -70,9 +72,30 @@ func InitRepoIndexer() { if !setting.Indexer.RepoIndexerEnabled { return } + waitChannel := make(chan time.Duration) repoIndexerOperationQueue = make(chan repoIndexerOperation, setting.Indexer.UpdateQueueLength) - indexer.InitRepoIndexer(populateRepoIndexerAsynchronously) - go processRepoIndexerOperationQueue() + go func() { + start := time.Now() + log.Info("Initializing Repository Indexer") + indexer.InitRepoIndexer(populateRepoIndexerAsynchronously) + go processRepoIndexerOperationQueue() + waitChannel <- time.Since(start) + }() + if setting.Indexer.StartupTimeout > 0 { + go func() { + timeout := setting.Indexer.StartupTimeout + if graceful.IsChild && setting.GracefulHammerTime > 0 { + timeout += setting.GracefulHammerTime + } + select { + case duration := <-waitChannel: + log.Info("Repository Indexer Initialization took %v", duration) + case <-time.After(timeout): + log.Fatal("Repository Indexer Initialization Timed-Out after: %v", timeout) + } + }() + + } } // populateRepoIndexerAsynchronously asynchronously populates the repo indexer diff --git a/modules/graceful/cleanup.go b/modules/graceful/cleanup.go new file mode 100644 index 0000000000..1de087a999 --- /dev/null +++ b/modules/graceful/cleanup.go @@ -0,0 +1,38 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package graceful + +import "sync" + +var cleanupWaitGroup sync.WaitGroup + +func init() { + cleanupWaitGroup = sync.WaitGroup{} + + // There are three places that could inherit sockets: + // + // * HTTP or HTTPS main listener + // * HTTP redirection fallback + // * SSH + // + // If you add an additional place you must increment this number + // and add a function to call InformCleanup if it's not going to be used + cleanupWaitGroup.Add(3) + + // Wait till we're done getting all of the listeners and then close + // the unused ones + go func() { + cleanupWaitGroup.Wait() + // Ignore the error here there's not much we can do with it + // They're logged in the CloseProvidedListeners function + _ = CloseProvidedListeners() + }() +} + +// InformCleanup tells the cleanup wait group that we have either taken a listener +// or will not be taking a listener +func InformCleanup() { + cleanupWaitGroup.Done() +} diff --git a/modules/graceful/net.go b/modules/graceful/net.go new file mode 100644 index 0000000000..f2612e21be --- /dev/null +++ b/modules/graceful/net.go @@ -0,0 +1,209 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +// This code is heavily inspired by the archived gofacebook/gracenet/net.go handler + +package graceful + +import ( + "fmt" + "net" + "os" + "strconv" + "strings" + "sync" + + "code.gitea.io/gitea/modules/log" +) + +const ( + listenFDs = "LISTEN_FDS" + startFD = 3 +) + +// In order to keep the working directory the same as when we started we record +// it at startup. +var originalWD, _ = os.Getwd() + +var ( + once = sync.Once{} + mutex = sync.Mutex{} + + providedListeners = []net.Listener{} + activeListeners = []net.Listener{} +) + +func getProvidedFDs() (savedErr error) { + // Only inherit the provided FDS once but we will save the error so that repeated calls to this function will return the same error + once.Do(func() { + mutex.Lock() + defer mutex.Unlock() + + numFDs := os.Getenv(listenFDs) + if numFDs == "" { + return + } + n, err := strconv.Atoi(numFDs) + if err != nil { + savedErr = fmt.Errorf("%s is not a number: %s. Err: %v", listenFDs, numFDs, err) + return + } + + for i := startFD; i < n+startFD; i++ { + file := os.NewFile(uintptr(i), fmt.Sprintf("listener_FD%d", i)) + + l, err := net.FileListener(file) + if err == nil { + // Close the inherited file if it's a listener + if err = file.Close(); err != nil { + savedErr = fmt.Errorf("error closing provided socket fd %d: %s", i, err) + return + } + providedListeners = append(providedListeners, l) + continue + } + + // If needed we can handle packetconns here. + savedErr = fmt.Errorf("Error getting provided socket fd %d: %v", i, err) + return + } + }) + return savedErr +} + +// CloseProvidedListeners closes all unused provided listeners. +func CloseProvidedListeners() error { + mutex.Lock() + defer mutex.Unlock() + var returnableError error + for _, l := range providedListeners { + err := l.Close() + if err != nil { + log.Error("Error in closing unused provided listener: %v", err) + if returnableError != nil { + returnableError = fmt.Errorf("%v & %v", returnableError, err) + } else { + returnableError = err + } + } + } + providedListeners = []net.Listener{} + + return returnableError +} + +// GetListener obtains a listener for the local network address. The network must be +// a stream-oriented network: "tcp", "tcp4", "tcp6", "unix" or "unixpacket". It +// returns an provided net.Listener for the matching network and address, or +// creates a new one using net.Listen. +func GetListener(network, address string) (net.Listener, error) { + // Add a deferral to say that we've tried to grab a listener + defer InformCleanup() + switch network { + case "tcp", "tcp4", "tcp6": + tcpAddr, err := net.ResolveTCPAddr(network, address) + if err != nil { + return nil, err + } + return GetListenerTCP(network, tcpAddr) + case "unix", "unixpacket": + unixAddr, err := net.ResolveUnixAddr(network, address) + if err != nil { + return nil, err + } + return GetListenerUnix(network, unixAddr) + default: + return nil, net.UnknownNetworkError(network) + } +} + +// GetListenerTCP announces on the local network address. The network must be: +// "tcp", "tcp4" or "tcp6". It returns a provided net.Listener for the +// matching network and address, or creates a new one using net.ListenTCP. +func GetListenerTCP(network string, address *net.TCPAddr) (*net.TCPListener, error) { + if err := getProvidedFDs(); err != nil { + return nil, err + } + + mutex.Lock() + defer mutex.Unlock() + + // look for a provided listener + for i, l := range providedListeners { + if isSameAddr(l.Addr(), address) { + providedListeners = append(providedListeners[:i], providedListeners[i+1:]...) + + activeListeners = append(activeListeners, l) + return l.(*net.TCPListener), nil + } + } + + // no provided listener for this address -> make a fresh listener + l, err := net.ListenTCP(network, address) + if err != nil { + return nil, err + } + activeListeners = append(activeListeners, l) + return l, nil +} + +// GetListenerUnix announces on the local network address. The network must be: +// "unix" or "unixpacket". It returns a provided net.Listener for the +// matching network and address, or creates a new one using net.ListenUnix. +func GetListenerUnix(network string, address *net.UnixAddr) (*net.UnixListener, error) { + if err := getProvidedFDs(); err != nil { + return nil, err + } + + mutex.Lock() + defer mutex.Unlock() + + // look for a provided listener + for i, l := range providedListeners { + if isSameAddr(l.Addr(), address) { + providedListeners = append(providedListeners[:i], providedListeners[i+1:]...) + activeListeners = append(activeListeners, l) + return l.(*net.UnixListener), nil + } + } + + // make a fresh listener + l, err := net.ListenUnix(network, address) + if err != nil { + return nil, err + } + activeListeners = append(activeListeners, l) + return l, nil +} + +func isSameAddr(a1, a2 net.Addr) bool { + // If the addresses are not on the same network fail. + if a1.Network() != a2.Network() { + return false + } + + // If the two addresses have the same string representation they're equal + a1s := a1.String() + a2s := a2.String() + if a1s == a2s { + return true + } + + // This allows for ipv6 vs ipv4 local addresses to compare as equal. This + // scenario is common when listening on localhost. + const ipv6prefix = "[::]" + a1s = strings.TrimPrefix(a1s, ipv6prefix) + a2s = strings.TrimPrefix(a2s, ipv6prefix) + const ipv4prefix = "0.0.0.0" + a1s = strings.TrimPrefix(a1s, ipv4prefix) + a2s = strings.TrimPrefix(a2s, ipv4prefix) + return a1s == a2s +} + +func getActiveListeners() []net.Listener { + mutex.Lock() + defer mutex.Unlock() + listeners := make([]net.Listener, len(activeListeners)) + copy(listeners, activeListeners) + return listeners +} diff --git a/modules/graceful/restart.go b/modules/graceful/restart.go new file mode 100644 index 0000000000..33b3c4d417 --- /dev/null +++ b/modules/graceful/restart.go @@ -0,0 +1,67 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +// This code is heavily inspired by the archived gofacebook/gracenet/net.go handler + +package graceful + +import ( + "fmt" + "os" + "os/exec" + "strings" +) + +// RestartProcess starts a new process passing it the active listeners. It +// doesn't fork, but starts a new process using the same environment and +// arguments as when it was originally started. This allows for a newly +// deployed binary to be started. It returns the pid of the newly started +// process when successful. +func RestartProcess() (int, error) { + listeners := getActiveListeners() + + // Extract the fds from the listeners. + files := make([]*os.File, len(listeners)) + for i, l := range listeners { + var err error + // Now, all our listeners actually have File() functions so instead of + // individually casting we just use a hacky interface + files[i], err = l.(filer).File() + if err != nil { + return 0, err + } + // Remember to close these at the end. + defer files[i].Close() + } + + // Use the original binary location. This works with symlinks such that if + // the file it points to has been changed we will use the updated symlink. + argv0, err := exec.LookPath(os.Args[0]) + if err != nil { + return 0, err + } + + // Pass on the environment and replace the old count key with the new one. + var env []string + for _, v := range os.Environ() { + if !strings.HasPrefix(v, listenFDs+"=") { + env = append(env, v) + } + } + env = append(env, fmt.Sprintf("%s=%d", listenFDs, len(listeners))) + + allFiles := append([]*os.File{os.Stdin, os.Stdout, os.Stderr}, files...) + process, err := os.StartProcess(argv0, os.Args, &os.ProcAttr{ + Dir: originalWD, + Env: env, + Files: allFiles, + }) + if err != nil { + return 0, err + } + return process.Pid, nil +} + +type filer interface { + File() (*os.File, error) +} diff --git a/modules/graceful/server.go b/modules/graceful/server.go new file mode 100644 index 0000000000..efe8b264b3 --- /dev/null +++ b/modules/graceful/server.go @@ -0,0 +1,267 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +// This code is highly inspired by endless go + +package graceful + +import ( + "crypto/tls" + "net" + "os" + "strings" + "sync" + "syscall" + "time" + + "code.gitea.io/gitea/modules/log" +) + +type state uint8 + +const ( + stateInit state = iota + stateRunning + stateShuttingDown + stateTerminate +) + +var ( + // RWMutex for when adding servers or shutting down + runningServerReg sync.RWMutex + // ensure we only fork once + runningServersForked bool + + // DefaultReadTimeOut default read timeout + DefaultReadTimeOut time.Duration + // DefaultWriteTimeOut default write timeout + DefaultWriteTimeOut time.Duration + // DefaultMaxHeaderBytes default max header bytes + DefaultMaxHeaderBytes int + + // IsChild reports if we are a fork iff LISTEN_FDS is set and our parent PID is not 1 + IsChild = len(os.Getenv(listenFDs)) > 0 && os.Getppid() > 1 +) + +func init() { + runningServerReg = sync.RWMutex{} + + DefaultMaxHeaderBytes = 0 // use http.DefaultMaxHeaderBytes - which currently is 1 << 20 (1MB) +} + +// ServeFunction represents a listen.Accept loop +type ServeFunction = func(net.Listener) error + +// Server represents our graceful server +type Server struct { + network string + address string + listener net.Listener + PreSignalHooks map[os.Signal][]func() + PostSignalHooks map[os.Signal][]func() + wg sync.WaitGroup + sigChan chan os.Signal + state state + lock *sync.RWMutex + BeforeBegin func(network, address string) + OnShutdown func() +} + +// NewServer creates a server on network at provided address +func NewServer(network, address string) *Server { + runningServerReg.Lock() + defer runningServerReg.Unlock() + + if IsChild { + log.Info("Restarting new server: %s:%s on PID: %d", network, address, os.Getpid()) + } else { + log.Info("Starting new server: %s:%s on PID: %d", network, address, os.Getpid()) + } + srv := &Server{ + wg: sync.WaitGroup{}, + sigChan: make(chan os.Signal), + PreSignalHooks: map[os.Signal][]func(){}, + PostSignalHooks: map[os.Signal][]func(){}, + state: stateInit, + lock: &sync.RWMutex{}, + network: network, + address: address, + } + + srv.BeforeBegin = func(network, addr string) { + log.Debug("Starting server on %s:%s (PID: %d)", network, addr, syscall.Getpid()) + } + + return srv +} + +// ListenAndServe listens on the provided network address and then calls Serve +// to handle requests on incoming connections. +func (srv *Server) ListenAndServe(serve ServeFunction) error { + go srv.handleSignals() + + l, err := GetListener(srv.network, srv.address) + if err != nil { + log.Error("Unable to GetListener: %v", err) + return err + } + + srv.listener = newWrappedListener(l, srv) + + if IsChild { + _ = syscall.Kill(syscall.Getppid(), syscall.SIGTERM) + } + + srv.BeforeBegin(srv.network, srv.address) + + return srv.Serve(serve) +} + +// ListenAndServeTLS listens on the provided network address and then calls +// Serve to handle requests on incoming TLS connections. +// +// Filenames containing a certificate and matching private key for the server must +// be provided. If the certificate is signed by a certificate authority, the +// certFile should be the concatenation of the server's certificate followed by the +// CA's certificate. +func (srv *Server) ListenAndServeTLS(certFile, keyFile string, serve ServeFunction) error { + config := &tls.Config{} + if config.NextProtos == nil { + config.NextProtos = []string{"http/1.1"} + } + + config.Certificates = make([]tls.Certificate, 1) + var err error + config.Certificates[0], err = tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { + log.Error("Failed to load https cert file %s for %s:%s: %v", certFile, srv.network, srv.address, err) + return err + } + return srv.ListenAndServeTLSConfig(config, serve) +} + +// ListenAndServeTLSConfig listens on the provided network address and then calls +// Serve to handle requests on incoming TLS connections. +func (srv *Server) ListenAndServeTLSConfig(tlsConfig *tls.Config, serve ServeFunction) error { + go srv.handleSignals() + + l, err := GetListener(srv.network, srv.address) + if err != nil { + log.Error("Unable to get Listener: %v", err) + return err + } + + wl := newWrappedListener(l, srv) + srv.listener = tls.NewListener(wl, tlsConfig) + + if IsChild { + _ = syscall.Kill(syscall.Getppid(), syscall.SIGTERM) + } + srv.BeforeBegin(srv.network, srv.address) + + return srv.Serve(serve) +} + +// Serve accepts incoming HTTP connections on the wrapped listener l, creating a new +// service goroutine for each. The service goroutines read requests and then call +// handler to reply to them. Handler is typically nil, in which case the +// DefaultServeMux is used. +// +// In addition to the standard Serve behaviour each connection is added to a +// sync.Waitgroup so that all outstanding connections can be served before shutting +// down the server. +func (srv *Server) Serve(serve ServeFunction) error { + defer log.Debug("Serve() returning... (PID: %d)", syscall.Getpid()) + srv.setState(stateRunning) + err := serve(srv.listener) + log.Debug("Waiting for connections to finish... (PID: %d)", syscall.Getpid()) + srv.wg.Wait() + srv.setState(stateTerminate) + // use of closed means that the listeners are closed - i.e. we should be shutting down - return nil + if err != nil && strings.Contains(err.Error(), "use of closed") { + return nil + } + return err +} + +func (srv *Server) getState() state { + srv.lock.RLock() + defer srv.lock.RUnlock() + + return srv.state +} + +func (srv *Server) setState(st state) { + srv.lock.Lock() + defer srv.lock.Unlock() + + srv.state = st +} + +type wrappedListener struct { + net.Listener + stopped bool + server *Server +} + +func newWrappedListener(l net.Listener, srv *Server) *wrappedListener { + return &wrappedListener{ + Listener: l, + server: srv, + } +} + +func (wl *wrappedListener) Accept() (net.Conn, error) { + var c net.Conn + // Set keepalive on TCPListeners connections. + if tcl, ok := wl.Listener.(*net.TCPListener); ok { + tc, err := tcl.AcceptTCP() + if err != nil { + return nil, err + } + _ = tc.SetKeepAlive(true) // see http.tcpKeepAliveListener + _ = tc.SetKeepAlivePeriod(3 * time.Minute) // see http.tcpKeepAliveListener + c = tc + } else { + var err error + c, err = wl.Listener.Accept() + if err != nil { + return nil, err + } + } + + c = wrappedConn{ + Conn: c, + server: wl.server, + } + + wl.server.wg.Add(1) + return c, nil +} + +func (wl *wrappedListener) Close() error { + if wl.stopped { + return syscall.EINVAL + } + + wl.stopped = true + return wl.Listener.Close() +} + +func (wl *wrappedListener) File() (*os.File, error) { + // returns a dup(2) - FD_CLOEXEC flag *not* set so the listening socket can be passed to child processes + return wl.Listener.(filer).File() +} + +type wrappedConn struct { + net.Conn + server *Server +} + +func (w wrappedConn) Close() error { + err := w.Conn.Close() + if err == nil { + w.server.wg.Done() + } + return err +} diff --git a/modules/graceful/server_hooks.go b/modules/graceful/server_hooks.go new file mode 100644 index 0000000000..a80d955556 --- /dev/null +++ b/modules/graceful/server_hooks.go @@ -0,0 +1,119 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package graceful + +import ( + "errors" + "fmt" + "os" + "runtime" + "time" + + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" +) + +// shutdown closes the listener so that no new connections are accepted +// and starts a goroutine that will hammer (stop all running requests) the server +// after setting.GracefulHammerTime. +func (srv *Server) shutdown() { + // only shutdown if we're running. + if srv.getState() != stateRunning { + return + } + + srv.setState(stateShuttingDown) + if setting.GracefulHammerTime >= 0 { + go srv.hammerTime(setting.GracefulHammerTime) + } + + if srv.OnShutdown != nil { + srv.OnShutdown() + } + err := srv.listener.Close() + if err != nil { + log.Error("PID: %d Listener.Close() error: %v", os.Getpid(), err) + } else { + log.Info("PID: %d Listener (%s) closed.", os.Getpid(), srv.listener.Addr()) + } +} + +// hammerTime forces the server to shutdown in a given timeout - whether it +// finished outstanding requests or not. if Read/WriteTimeout are not set or the +// max header size is very big a connection could hang... +// +// srv.Serve() will not return until all connections are served. this will +// unblock the srv.wg.Wait() in Serve() thus causing ListenAndServe* functions to +// return. +func (srv *Server) hammerTime(d time.Duration) { + defer func() { + // We call srv.wg.Done() until it panics. + // This happens if we call Done() when the WaitGroup counter is already at 0 + // So if it panics -> we're done, Serve() will return and the + // parent will goroutine will exit. + if r := recover(); r != nil { + log.Error("WaitGroup at 0: Error: %v", r) + } + }() + if srv.getState() != stateShuttingDown { + return + } + time.Sleep(d) + log.Warn("Forcefully shutting down parent") + for { + if srv.getState() == stateTerminate { + break + } + srv.wg.Done() + + // Give other goroutines a chance to finish before we forcibly stop them. + runtime.Gosched() + } +} + +func (srv *Server) fork() error { + runningServerReg.Lock() + defer runningServerReg.Unlock() + + // only one server instance should fork! + if runningServersForked { + return errors.New("another process already forked. Ignoring this one") + } + + runningServersForked = true + + // We need to move the file logs to append pids + setting.RestartLogsWithPIDSuffix() + + _, err := RestartProcess() + + return err +} + +// RegisterPreSignalHook registers a function to be run before the signal handler for +// a given signal. These are not mutex locked and should therefore be only called before Serve. +func (srv *Server) RegisterPreSignalHook(sig os.Signal, f func()) (err error) { + for _, s := range hookableSignals { + if s == sig { + srv.PreSignalHooks[sig] = append(srv.PreSignalHooks[sig], f) + return + } + } + err = fmt.Errorf("Signal %v is not supported", sig) + return +} + +// RegisterPostSignalHook registers a function to be run after the signal handler for +// a given signal. These are not mutex locked and should therefore be only called before Serve. +func (srv *Server) RegisterPostSignalHook(sig os.Signal, f func()) (err error) { + for _, s := range hookableSignals { + if s == sig { + srv.PostSignalHooks[sig] = append(srv.PostSignalHooks[sig], f) + return + } + } + err = fmt.Errorf("Signal %v is not supported", sig) + return +} diff --git a/modules/graceful/server_http.go b/modules/graceful/server_http.go new file mode 100644 index 0000000000..1052637d5e --- /dev/null +++ b/modules/graceful/server_http.go @@ -0,0 +1,45 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package graceful + +import ( + "crypto/tls" + "net/http" +) + +func newHTTPServer(network, address string, handler http.Handler) (*Server, ServeFunction) { + server := NewServer(network, address) + httpServer := http.Server{ + ReadTimeout: DefaultReadTimeOut, + WriteTimeout: DefaultWriteTimeOut, + MaxHeaderBytes: DefaultMaxHeaderBytes, + Handler: handler, + } + server.OnShutdown = func() { + httpServer.SetKeepAlivesEnabled(false) + } + return server, httpServer.Serve +} + +// HTTPListenAndServe listens on the provided network address and then calls Serve +// to handle requests on incoming connections. +func HTTPListenAndServe(network, address string, handler http.Handler) error { + server, lHandler := newHTTPServer(network, address, handler) + return server.ListenAndServe(lHandler) +} + +// HTTPListenAndServeTLS listens on the provided network address and then calls Serve +// to handle requests on incoming connections. +func HTTPListenAndServeTLS(network, address, certFile, keyFile string, handler http.Handler) error { + server, lHandler := newHTTPServer(network, address, handler) + return server.ListenAndServeTLS(certFile, keyFile, lHandler) +} + +// HTTPListenAndServeTLSConfig listens on the provided network address and then calls Serve +// to handle requests on incoming connections. +func HTTPListenAndServeTLSConfig(network, address string, tlsConfig *tls.Config, handler http.Handler) error { + server, lHandler := newHTTPServer(network, address, handler) + return server.ListenAndServeTLSConfig(tlsConfig, lHandler) +} diff --git a/modules/graceful/server_signals.go b/modules/graceful/server_signals.go new file mode 100644 index 0000000000..ea76b5509c --- /dev/null +++ b/modules/graceful/server_signals.go @@ -0,0 +1,93 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package graceful + +import ( + "os" + "os/signal" + "syscall" + "time" + + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" +) + +var hookableSignals []os.Signal + +func init() { + hookableSignals = []os.Signal{ + syscall.SIGHUP, + syscall.SIGUSR1, + syscall.SIGUSR2, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGTSTP, + } +} + +// handleSignals listens for os Signals and calls any hooked in function that the +// user had registered with the signal. +func (srv *Server) handleSignals() { + var sig os.Signal + + signal.Notify( + srv.sigChan, + hookableSignals..., + ) + + pid := syscall.Getpid() + for { + sig = <-srv.sigChan + srv.preSignalHooks(sig) + switch sig { + case syscall.SIGHUP: + if setting.GracefulRestartable { + log.Info("PID: %d. Received SIGHUP. Forking...", pid) + err := srv.fork() + if err != nil { + log.Error("Error whilst forking from PID: %d : %v", pid, err) + } + } else { + log.Info("PID: %d. Received SIGHUP. Not set restartable. Shutting down...", pid) + + srv.shutdown() + } + case syscall.SIGUSR1: + log.Info("PID %d. Received SIGUSR1.", pid) + case syscall.SIGUSR2: + log.Warn("PID %d. Received SIGUSR2. Hammering...", pid) + srv.hammerTime(0 * time.Second) + case syscall.SIGINT: + log.Warn("PID %d. Received SIGINT. Shutting down...", pid) + srv.shutdown() + case syscall.SIGTERM: + log.Warn("PID %d. Received SIGTERM. Shutting down...", pid) + srv.shutdown() + case syscall.SIGTSTP: + log.Info("PID %d. Received SIGTSTP.") + default: + log.Info("PID %d. Received %v.", sig) + } + srv.postSignalHooks(sig) + } +} + +func (srv *Server) preSignalHooks(sig os.Signal) { + if _, notSet := srv.PreSignalHooks[sig]; !notSet { + return + } + for _, f := range srv.PreSignalHooks[sig] { + f() + } +} + +func (srv *Server) postSignalHooks(sig os.Signal) { + if _, notSet := srv.PostSignalHooks[sig]; !notSet { + return + } + for _, f := range srv.PostSignalHooks[sig] { + f() + } +} diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index df8bfd6305..4f410daf4c 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -5,9 +5,11 @@ package issues import ( - "fmt" + "sync" + "time" "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/util" @@ -45,78 +47,143 @@ type Indexer interface { Search(kw string, repoID int64, limit, start int) (*SearchResult, error) } +type indexerHolder struct { + indexer Indexer + mutex sync.RWMutex + cond *sync.Cond +} + +func newIndexerHolder() *indexerHolder { + h := &indexerHolder{} + h.cond = sync.NewCond(h.mutex.RLocker()) + return h +} + +func (h *indexerHolder) set(indexer Indexer) { + h.mutex.Lock() + defer h.mutex.Unlock() + h.indexer = indexer + h.cond.Broadcast() +} + +func (h *indexerHolder) get() Indexer { + h.mutex.RLock() + defer h.mutex.RUnlock() + if h.indexer == nil { + h.cond.Wait() + } + return h.indexer +} + var ( + issueIndexerChannel = make(chan *IndexerData, setting.Indexer.UpdateQueueLength) // issueIndexerQueue queue of issue ids to be updated issueIndexerQueue Queue - issueIndexer Indexer + holder = newIndexerHolder() ) // InitIssueIndexer initialize issue indexer, syncReindex is true then reindex until // all issue index done. -func InitIssueIndexer(syncReindex bool) error { - var populate bool - var dummyQueue bool - switch setting.Indexer.IssueType { - case "bleve": - issueIndexer = NewBleveIndexer(setting.Indexer.IssuePath) - exist, err := issueIndexer.Init() - if err != nil { - return err +func InitIssueIndexer(syncReindex bool) { + waitChannel := make(chan time.Duration) + go func() { + start := time.Now() + log.Info("Initializing Issue Indexer") + var populate bool + var dummyQueue bool + switch setting.Indexer.IssueType { + case "bleve": + issueIndexer := NewBleveIndexer(setting.Indexer.IssuePath) + exist, err := issueIndexer.Init() + if err != nil { + log.Fatal("Unable to initialize Bleve Issue Indexer: %v", err) + } + populate = !exist + holder.set(issueIndexer) + case "db": + issueIndexer := &DBIndexer{} + holder.set(issueIndexer) + dummyQueue = true + default: + log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType) } - populate = !exist - case "db": - issueIndexer = &DBIndexer{} - dummyQueue = true - default: - return fmt.Errorf("unknow issue indexer type: %s", setting.Indexer.IssueType) - } - if dummyQueue { - issueIndexerQueue = &DummyQueue{} - return nil - } + if dummyQueue { + issueIndexerQueue = &DummyQueue{} + } else { + var err error + switch setting.Indexer.IssueQueueType { + case setting.LevelQueueType: + issueIndexerQueue, err = NewLevelQueue( + holder.get(), + setting.Indexer.IssueQueueDir, + setting.Indexer.IssueQueueBatchNumber) + if err != nil { + log.Fatal( + "Unable create level queue for issue queue dir: %s batch number: %d : %v", + setting.Indexer.IssueQueueDir, + setting.Indexer.IssueQueueBatchNumber, + err) + } + case setting.ChannelQueueType: + issueIndexerQueue = NewChannelQueue(holder.get(), setting.Indexer.IssueQueueBatchNumber) + case setting.RedisQueueType: + addrs, pass, idx, err := parseConnStr(setting.Indexer.IssueQueueConnStr) + if err != nil { + log.Fatal("Unable to parse connection string for RedisQueueType: %s : %v", + setting.Indexer.IssueQueueConnStr, + err) + } + issueIndexerQueue, err = NewRedisQueue(addrs, pass, idx, holder.get(), setting.Indexer.IssueQueueBatchNumber) + if err != nil { + log.Fatal("Unable to create RedisQueue: %s : %v", + setting.Indexer.IssueQueueConnStr, + err) + } + default: + log.Fatal("Unsupported indexer queue type: %v", + setting.Indexer.IssueQueueType) + } - var err error - switch setting.Indexer.IssueQueueType { - case setting.LevelQueueType: - issueIndexerQueue, err = NewLevelQueue( - issueIndexer, - setting.Indexer.IssueQueueDir, - setting.Indexer.IssueQueueBatchNumber) - if err != nil { - return err + go func() { + err = issueIndexerQueue.Run() + if err != nil { + log.Error("issueIndexerQueue.Run: %v", err) + } + }() } - case setting.ChannelQueueType: - issueIndexerQueue = NewChannelQueue(issueIndexer, setting.Indexer.IssueQueueBatchNumber) - case setting.RedisQueueType: - addrs, pass, idx, err := parseConnStr(setting.Indexer.IssueQueueConnStr) - if err != nil { - return err - } - issueIndexerQueue, err = NewRedisQueue(addrs, pass, idx, issueIndexer, setting.Indexer.IssueQueueBatchNumber) - if err != nil { - return err - } - default: - return fmt.Errorf("Unsupported indexer queue type: %v", setting.Indexer.IssueQueueType) - } - go func() { - err = issueIndexerQueue.Run() - if err != nil { - log.Error("issueIndexerQueue.Run: %v", err) - } - }() + go func() { + for data := range issueIndexerChannel { + _ = issueIndexerQueue.Push(data) + } + }() - if populate { - if syncReindex { - populateIssueIndexer() - } else { - go populateIssueIndexer() + if populate { + if syncReindex { + populateIssueIndexer() + } else { + go populateIssueIndexer() + } } + waitChannel <- time.Since(start) + }() + if syncReindex { + <-waitChannel + } else if setting.Indexer.StartupTimeout > 0 { + go func() { + timeout := setting.Indexer.StartupTimeout + if graceful.IsChild && setting.GracefulHammerTime > 0 { + timeout += setting.GracefulHammerTime + } + select { + case duration := <-waitChannel: + log.Info("Issue Indexer Initialization took %v", duration) + case <-time.After(timeout): + log.Fatal("Issue Indexer Initialization timed-out after: %v", timeout) + } + }() } - - return nil } // populateIssueIndexer populate the issue indexer with issue data @@ -166,13 +233,13 @@ func UpdateIssueIndexer(issue *models.Issue) { comments = append(comments, comment.Content) } } - _ = issueIndexerQueue.Push(&IndexerData{ + issueIndexerChannel <- &IndexerData{ ID: issue.ID, RepoID: issue.RepoID, Title: issue.Title, Content: issue.Content, Comments: comments, - }) + } } // DeleteRepoIssueIndexer deletes repo's all issues indexes @@ -188,16 +255,16 @@ func DeleteRepoIssueIndexer(repo *models.Repository) { return } - _ = issueIndexerQueue.Push(&IndexerData{ + issueIndexerChannel <- &IndexerData{ IDs: ids, IsDelete: true, - }) + } } // SearchIssuesByKeyword search issue ids by keywords and repo id func SearchIssuesByKeyword(repoID int64, keyword string) ([]int64, error) { var issueIDs []int64 - res, err := issueIndexer.Search(keyword, repoID, 1000, 0) + res, err := holder.get().Search(keyword, repoID, 1000, 0) if err != nil { return nil, err } diff --git a/modules/indexer/issues/indexer_test.go b/modules/indexer/issues/indexer_test.go index 59a7beed47..212c2edfbe 100644 --- a/modules/indexer/issues/indexer_test.go +++ b/modules/indexer/issues/indexer_test.go @@ -5,7 +5,6 @@ package issues import ( - "fmt" "os" "path/filepath" "testing" @@ -17,11 +16,6 @@ import ( "github.com/stretchr/testify/assert" ) -func fatalTestError(fmtStr string, args ...interface{}) { - fmt.Fprintf(os.Stderr, fmtStr, args...) - os.Exit(1) -} - func TestMain(m *testing.M) { models.MainTest(m, filepath.Join("..", "..", "..")) } @@ -32,9 +26,7 @@ func TestBleveSearchIssues(t *testing.T) { os.RemoveAll(setting.Indexer.IssueQueueDir) os.RemoveAll(setting.Indexer.IssuePath) setting.Indexer.IssueType = "bleve" - if err := InitIssueIndexer(true); err != nil { - fatalTestError("Error InitIssueIndexer: %v\n", err) - } + InitIssueIndexer(true) time.Sleep(5 * time.Second) @@ -59,9 +51,7 @@ func TestDBSearchIssues(t *testing.T) { assert.NoError(t, models.PrepareTestDatabase()) setting.Indexer.IssueType = "db" - if err := InitIssueIndexer(true); err != nil { - fatalTestError("Error InitIssueIndexer: %v\n", err) - } + InitIssueIndexer(true) ids, err := SearchIssuesByKeyword(1, "issue2") assert.NoError(t, err) diff --git a/modules/indexer/repo.go b/modules/indexer/repo.go index 91ed173aa7..841f29acd7 100644 --- a/modules/indexer/repo.go +++ b/modules/indexer/repo.go @@ -6,6 +6,7 @@ package indexer import ( "strings" + "sync" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" @@ -25,8 +26,36 @@ const ( repoIndexerLatestVersion = 4 ) +type bleveIndexerHolder struct { + index bleve.Index + mutex sync.RWMutex + cond *sync.Cond +} + +func newBleveIndexerHolder() *bleveIndexerHolder { + b := &bleveIndexerHolder{} + b.cond = sync.NewCond(b.mutex.RLocker()) + return b +} + +func (r *bleveIndexerHolder) set(index bleve.Index) { + r.mutex.Lock() + defer r.mutex.Unlock() + r.index = index + r.cond.Broadcast() +} + +func (r *bleveIndexerHolder) get() bleve.Index { + r.mutex.RLock() + defer r.mutex.RUnlock() + if r.index == nil { + r.cond.Wait() + } + return r.index +} + // repoIndexer (thread-safe) index for repository contents -var repoIndexer bleve.Index +var indexerHolder = newBleveIndexerHolder() // RepoIndexerOp type of operation to perform on repo indexer type RepoIndexerOp int @@ -73,12 +102,12 @@ func (update RepoIndexerUpdate) AddToFlushingBatch(batch rupture.FlushingBatch) // InitRepoIndexer initialize repo indexer func InitRepoIndexer(populateIndexer func() error) { - var err error - repoIndexer, err = openIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion) + indexer, err := openIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion) if err != nil { log.Fatal("InitRepoIndexer: %v", err) } - if repoIndexer != nil { + if indexer != nil { + indexerHolder.set(indexer) return } @@ -92,7 +121,6 @@ func InitRepoIndexer(populateIndexer func() error) { // createRepoIndexer create a repo indexer if one does not already exist func createRepoIndexer(path string, latestVersion int) error { - var err error docMapping := bleve.NewDocumentMapping() numericFieldMapping := bleve.NewNumericFieldMapping() numericFieldMapping.IncludeInAll = false @@ -103,9 +131,9 @@ func createRepoIndexer(path string, latestVersion int) error { docMapping.AddFieldMappingsAt("Content", textFieldMapping) mapping := bleve.NewIndexMapping() - if err = addUnicodeNormalizeTokenFilter(mapping); err != nil { + if err := addUnicodeNormalizeTokenFilter(mapping); err != nil { return err - } else if err = mapping.AddCustomAnalyzer(repoIndexerAnalyzer, map[string]interface{}{ + } else if err := mapping.AddCustomAnalyzer(repoIndexerAnalyzer, map[string]interface{}{ "type": custom.Name, "char_filters": []string{}, "tokenizer": unicode.Name, @@ -117,10 +145,12 @@ func createRepoIndexer(path string, latestVersion int) error { mapping.AddDocumentMapping(repoIndexerDocType, docMapping) mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping()) - repoIndexer, err = bleve.New(path, mapping) + indexer, err := bleve.New(path, mapping) if err != nil { return err } + indexerHolder.set(indexer) + return rupture.WriteIndexMetadata(path, &rupture.IndexMetadata{ Version: latestVersion, }) @@ -140,14 +170,14 @@ func filenameOfIndexerID(indexerID string) string { // RepoIndexerBatch batch to add updates to func RepoIndexerBatch() rupture.FlushingBatch { - return rupture.NewFlushingBatch(repoIndexer, maxBatchSize) + return rupture.NewFlushingBatch(indexerHolder.get(), maxBatchSize) } // DeleteRepoFromIndexer delete all of a repo's files from indexer func DeleteRepoFromIndexer(repoID int64) error { query := numericEqualityQuery(repoID, "RepoID") searchRequest := bleve.NewSearchRequestOptions(query, 2147483647, 0, false) - result, err := repoIndexer.Search(searchRequest) + result, err := indexerHolder.get().Search(searchRequest) if err != nil { return err } @@ -196,7 +226,7 @@ func SearchRepoByKeyword(repoIDs []int64, keyword string, page, pageSize int) (i searchRequest.Fields = []string{"Content", "RepoID"} searchRequest.IncludeLocations = true - result, err := repoIndexer.Search(searchRequest) + result, err := indexerHolder.get().Search(searchRequest) if err != nil { return 0, nil, err } diff --git a/modules/setting/indexer.go b/modules/setting/indexer.go index 30c670d407..fbaef3fcf2 100644 --- a/modules/setting/indexer.go +++ b/modules/setting/indexer.go @@ -8,6 +8,7 @@ import ( "path" "path/filepath" "strings" + "time" "code.gitea.io/gitea/modules/log" @@ -34,6 +35,7 @@ var ( IssueQueueDir string IssueQueueConnStr string IssueQueueBatchNumber int + StartupTimeout time.Duration IncludePatterns []glob.Glob ExcludePatterns []glob.Glob }{ @@ -67,6 +69,7 @@ func newIndexerService() { Indexer.IssueQueueDir = sec.Key("ISSUE_INDEXER_QUEUE_DIR").MustString(path.Join(AppDataPath, "indexers/issues.queue")) Indexer.IssueQueueConnStr = sec.Key("ISSUE_INDEXER_QUEUE_CONN_STR").MustString(path.Join(AppDataPath, "")) Indexer.IssueQueueBatchNumber = sec.Key("ISSUE_INDEXER_QUEUE_BATCH_NUMBER").MustInt(20) + Indexer.StartupTimeout = sec.Key("STARTUP_TIMEOUT").MustDuration(30 * time.Second) } // IndexerGlobFromString parses a comma separated list of patterns and returns a glob.Glob slice suited for repo indexing diff --git a/modules/setting/log.go b/modules/setting/log.go index 5e2d2d769d..cb8f142084 100644 --- a/modules/setting/log.go +++ b/modules/setting/log.go @@ -6,6 +6,7 @@ package setting import ( "encoding/json" + "fmt" golog "log" "os" "path" @@ -17,6 +18,8 @@ import ( ini "gopkg.in/ini.v1" ) +var filenameSuffix = "" + type defaultLogOptions struct { levelName string // LogLevel flags string @@ -112,7 +115,7 @@ func generateLogConfig(sec *ini.Section, name string, defaults defaultLogOptions panic(err.Error()) } - logConfig["filename"] = logPath + logConfig["filename"] = logPath + filenameSuffix logConfig["rotate"] = sec.Key("LOG_ROTATE").MustBool(true) logConfig["maxsize"] = 1 << uint(sec.Key("MAX_SIZE_SHIFT").MustInt(28)) logConfig["daily"] = sec.Key("DAILY_ROTATE").MustBool(true) @@ -277,6 +280,12 @@ func newLogService() { golog.SetOutput(log.NewLoggerAsWriter("INFO", log.GetLogger(log.DEFAULT))) } +// RestartLogsWithPIDSuffix restarts the logs with a PID suffix on files +func RestartLogsWithPIDSuffix() { + filenameSuffix = fmt.Sprintf(".%d", os.Getpid()) + NewLogServices(false) +} + // NewLogServices creates all the log services func NewLogServices(disableConsole bool) { newLogService() diff --git a/modules/setting/setting.go b/modules/setting/setting.go index 629a89766f..144882976f 100644 --- a/modules/setting/setting.go +++ b/modules/setting/setting.go @@ -97,6 +97,8 @@ var ( LetsEncryptTOS bool LetsEncryptDirectory string LetsEncryptEmail string + GracefulRestartable bool + GracefulHammerTime time.Duration SSH = struct { Disabled bool `ini:"DISABLE_SSH"` @@ -563,6 +565,8 @@ func NewContext() { Domain = sec.Key("DOMAIN").MustString("localhost") HTTPAddr = sec.Key("HTTP_ADDR").MustString("0.0.0.0") HTTPPort = sec.Key("HTTP_PORT").MustString("3000") + GracefulRestartable = sec.Key("ALLOW_GRACEFUL_RESTARTS").MustBool(true) + GracefulHammerTime = sec.Key("GRACEFUL_HAMMER_TIME").MustDuration(60 * time.Second) defaultAppURL := string(Protocol) + "://" + Domain if (Protocol == HTTP && HTTPPort != "80") || (Protocol == HTTPS && HTTPPort != "443") { diff --git a/modules/ssh/ssh.go b/modules/ssh/ssh.go index 7ff0c32326..e7a694683a 100644 --- a/modules/ssh/ssh.go +++ b/modules/ssh/ssh.go @@ -183,12 +183,7 @@ func Listen(host string, port int, ciphers []string, keyExchanges []string, macs log.Error("Failed to set Host Key. %s", err) } - go func() { - err := srv.ListenAndServe() - if err != nil { - log.Error("Failed to serve with builtin SSH server. %s", err) - } - }() + go listen(&srv) } diff --git a/modules/ssh/ssh_graceful.go b/modules/ssh/ssh_graceful.go new file mode 100644 index 0000000000..d66c7d6540 --- /dev/null +++ b/modules/ssh/ssh_graceful.go @@ -0,0 +1,30 @@ +// +build !windows + +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package ssh + +import ( + "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/log" + + "github.com/gliderlabs/ssh" +) + +func listen(server *ssh.Server) { + gracefulServer := graceful.NewServer("tcp", server.Addr) + + err := gracefulServer.ListenAndServe(server.Serve) + if err != nil { + log.Critical("Failed to start SSH server: %v", err) + } + log.Info("SSH Listener: %s Closed", server.Addr) + +} + +// Unused informs our cleanup routine that we will not be using a ssh port +func Unused() { + graceful.InformCleanup() +} diff --git a/modules/ssh/ssh_windows.go b/modules/ssh/ssh_windows.go new file mode 100644 index 0000000000..55032e17cd --- /dev/null +++ b/modules/ssh/ssh_windows.go @@ -0,0 +1,24 @@ +// +build windows + +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package ssh + +import ( + "code.gitea.io/gitea/modules/log" + "github.com/gliderlabs/ssh" +) + +func listen(server *ssh.Server) { + err := server.ListenAndServe() + if err != nil { + log.Critical("Failed to serve with builtin SSH server. %s", err) + } +} + +// Unused does nothing on windows +func Unused() { + // Do nothing +} diff --git a/routers/init.go b/routers/init.go index c37bbeb6b0..e4e880dbbb 100644 --- a/routers/init.go +++ b/routers/init.go @@ -96,9 +96,7 @@ func GlobalInit() { // Booting long running goroutines. cron.NewContext() - if err := issue_indexer.InitIssueIndexer(false); err != nil { - log.Fatal("Failed to initialize issue indexer: %v", err) - } + issue_indexer.InitIssueIndexer(false) models.InitRepoIndexer() mirror_service.InitSyncMirrors() models.InitDeliverHooks() @@ -112,8 +110,15 @@ func GlobalInit() { } checkRunMode() - if setting.InstallLock && setting.SSH.StartBuiltinServer { - ssh.Listen(setting.SSH.ListenHost, setting.SSH.ListenPort, setting.SSH.ServerCiphers, setting.SSH.ServerKeyExchanges, setting.SSH.ServerMACs) - log.Info("SSH server started on %s:%d. Cipher list (%v), key exchange algorithms (%v), MACs (%v)", setting.SSH.ListenHost, setting.SSH.ListenPort, setting.SSH.ServerCiphers, setting.SSH.ServerKeyExchanges, setting.SSH.ServerMACs) + // Now because Install will re-run GlobalInit once it has set InstallLock + // we can't tell if the ssh port will remain unused until that's done. + // However, see FIXME comment in install.go + if setting.InstallLock { + if setting.SSH.StartBuiltinServer { + ssh.Listen(setting.SSH.ListenHost, setting.SSH.ListenPort, setting.SSH.ServerCiphers, setting.SSH.ServerKeyExchanges, setting.SSH.ServerMACs) + log.Info("SSH server started on %s:%d. Cipher list (%v), key exchange algorithms (%v), MACs (%v)", setting.SSH.ListenHost, setting.SSH.ListenPort, setting.SSH.ServerCiphers, setting.SSH.ServerKeyExchanges, setting.SSH.ServerMACs) + } else { + ssh.Unused() + } } } diff --git a/routers/install.go b/routers/install.go index 16888adc82..8f3d0d5ae6 100644 --- a/routers/install.go +++ b/routers/install.go @@ -386,6 +386,12 @@ func InstallPost(ctx *context.Context, form auth.InstallForm) { } log.Info("First-time run install finished!") + // FIXME: This isn't really enough to completely take account of new configuration + // We should really be restarting: + // - On windows this is probably just a simple restart + // - On linux we can't just use graceful.RestartProcess() everything that was passed in on LISTEN_FDS + // (active or not) needs to be passed out and everything new passed out too. + // This means we need to prevent the cleanup goroutine from running prior to the second GlobalInit ctx.Flash.Success(ctx.Tr("install.install_success")) ctx.Redirect(form.AppURL + "user/login") } diff --git a/vendor/github.com/facebookgo/clock/LICENSE b/vendor/github.com/facebookgo/clock/LICENSE deleted file mode 100644 index ce212cb1ce..0000000000 --- a/vendor/github.com/facebookgo/clock/LICENSE +++ /dev/null @@ -1,21 +0,0 @@ -The MIT License (MIT) - -Copyright (c) 2014 Ben Johnson - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. diff --git a/vendor/github.com/facebookgo/clock/README.md b/vendor/github.com/facebookgo/clock/README.md deleted file mode 100644 index 5d4f4fe72e..0000000000 --- a/vendor/github.com/facebookgo/clock/README.md +++ /dev/null @@ -1,104 +0,0 @@ -clock [![Build Status](https://drone.io/github.com/benbjohnson/clock/status.png)](https://drone.io/github.com/benbjohnson/clock/latest) [![Coverage Status](https://coveralls.io/repos/benbjohnson/clock/badge.png?branch=master)](https://coveralls.io/r/benbjohnson/clock?branch=master) [![GoDoc](https://godoc.org/github.com/benbjohnson/clock?status.png)](https://godoc.org/github.com/benbjohnson/clock) ![Project status](http://img.shields.io/status/experimental.png?color=red) -===== - -Clock is a small library for mocking time in Go. It provides an interface -around the standard library's [`time`][time] package so that the application -can use the realtime clock while tests can use the mock clock. - -[time]: http://golang.org/pkg/time/ - - -## Usage - -### Realtime Clock - -Your application can maintain a `Clock` variable that will allow realtime and -mock clocks to be interchangable. For example, if you had an `Application` type: - -```go -import "github.com/benbjohnson/clock" - -type Application struct { - Clock clock.Clock -} -``` - -You could initialize it to use the realtime clock like this: - -```go -var app Application -app.Clock = clock.New() -... -``` - -Then all timers and time-related functionality should be performed from the -`Clock` variable. - - -### Mocking time - -In your tests, you will want to use a `Mock` clock: - -```go -import ( - "testing" - - "github.com/benbjohnson/clock" -) - -func TestApplication_DoSomething(t *testing.T) { - mock := clock.NewMock() - app := Application{Clock: mock} - ... -} -``` - -Now that you've initialized your application to use the mock clock, you can -adjust the time programmatically. The mock clock always starts from the Unix -epoch (midnight, Jan 1, 1970 UTC). - - -### Controlling time - -The mock clock provides the same functions that the standard library's `time` -package provides. For example, to find the current time, you use the `Now()` -function: - -```go -mock := clock.NewMock() - -// Find the current time. -mock.Now().UTC() // 1970-01-01 00:00:00 +0000 UTC - -// Move the clock forward. -mock.Add(2 * time.Hour) - -// Check the time again. It's 2 hours later! -mock.Now().UTC() // 1970-01-01 02:00:00 +0000 UTC -``` - -Timers and Tickers are also controlled by this same mock clock. They will only -execute when the clock is moved forward: - -``` -mock := clock.NewMock() -count := 0 - -// Kick off a timer to increment every 1 mock second. -go func() { - ticker := clock.Ticker(1 * time.Second) - for { - <-ticker.C - count++ - } -}() -runtime.Gosched() - -// Move the clock forward 10 second. -mock.Add(10 * time.Second) - -// This prints 10. -fmt.Println(count) -``` - - diff --git a/vendor/github.com/facebookgo/clock/clock.go b/vendor/github.com/facebookgo/clock/clock.go deleted file mode 100644 index bca1a7ba8b..0000000000 --- a/vendor/github.com/facebookgo/clock/clock.go +++ /dev/null @@ -1,363 +0,0 @@ -package clock - -import ( - "runtime" - "sort" - "sync" - "time" -) - -// Clock represents an interface to the functions in the standard library time -// package. Two implementations are available in the clock package. The first -// is a real-time clock which simply wraps the time package's functions. The -// second is a mock clock which will only make forward progress when -// programmatically adjusted. -type Clock interface { - After(d time.Duration) <-chan time.Time - AfterFunc(d time.Duration, f func()) *Timer - Now() time.Time - Sleep(d time.Duration) - Tick(d time.Duration) <-chan time.Time - Ticker(d time.Duration) *Ticker - Timer(d time.Duration) *Timer -} - -// New returns an instance of a real-time clock. -func New() Clock { - return &clock{} -} - -// clock implements a real-time clock by simply wrapping the time package functions. -type clock struct{} - -func (c *clock) After(d time.Duration) <-chan time.Time { return time.After(d) } - -func (c *clock) AfterFunc(d time.Duration, f func()) *Timer { - return &Timer{timer: time.AfterFunc(d, f)} -} - -func (c *clock) Now() time.Time { return time.Now() } - -func (c *clock) Sleep(d time.Duration) { time.Sleep(d) } - -func (c *clock) Tick(d time.Duration) <-chan time.Time { return time.Tick(d) } - -func (c *clock) Ticker(d time.Duration) *Ticker { - t := time.NewTicker(d) - return &Ticker{C: t.C, ticker: t} -} - -func (c *clock) Timer(d time.Duration) *Timer { - t := time.NewTimer(d) - return &Timer{C: t.C, timer: t} -} - -// Mock represents a mock clock that only moves forward programmically. -// It can be preferable to a real-time clock when testing time-based functionality. -type Mock struct { - mu sync.Mutex - now time.Time // current time - timers clockTimers // tickers & timers - - calls Calls - waiting []waiting - callsMutex sync.Mutex -} - -// NewMock returns an instance of a mock clock. -// The current time of the mock clock on initialization is the Unix epoch. -func NewMock() *Mock { - return &Mock{now: time.Unix(0, 0)} -} - -// Add moves the current time of the mock clock forward by the duration. -// This should only be called from a single goroutine at a time. -func (m *Mock) Add(d time.Duration) { - // Calculate the final current time. - t := m.now.Add(d) - - // Continue to execute timers until there are no more before the new time. - for { - if !m.runNextTimer(t) { - break - } - } - - // Ensure that we end with the new time. - m.mu.Lock() - m.now = t - m.mu.Unlock() - - // Give a small buffer to make sure the other goroutines get handled. - gosched() -} - -// runNextTimer executes the next timer in chronological order and moves the -// current time to the timer's next tick time. The next time is not executed if -// it's next time if after the max time. Returns true if a timer is executed. -func (m *Mock) runNextTimer(max time.Time) bool { - m.mu.Lock() - - // Sort timers by time. - sort.Sort(m.timers) - - // If we have no more timers then exit. - if len(m.timers) == 0 { - m.mu.Unlock() - return false - } - - // Retrieve next timer. Exit if next tick is after new time. - t := m.timers[0] - if t.Next().After(max) { - m.mu.Unlock() - return false - } - - // Move "now" forward and unlock clock. - m.now = t.Next() - m.mu.Unlock() - - // Execute timer. - t.Tick(m.now) - return true -} - -// After waits for the duration to elapse and then sends the current time on the returned channel. -func (m *Mock) After(d time.Duration) <-chan time.Time { - defer m.inc(&m.calls.After) - return m.Timer(d).C -} - -// AfterFunc waits for the duration to elapse and then executes a function. -// A Timer is returned that can be stopped. -func (m *Mock) AfterFunc(d time.Duration, f func()) *Timer { - defer m.inc(&m.calls.AfterFunc) - t := m.Timer(d) - t.C = nil - t.fn = f - return t -} - -// Now returns the current wall time on the mock clock. -func (m *Mock) Now() time.Time { - defer m.inc(&m.calls.Now) - m.mu.Lock() - defer m.mu.Unlock() - return m.now -} - -// Sleep pauses the goroutine for the given duration on the mock clock. -// The clock must be moved forward in a separate goroutine. -func (m *Mock) Sleep(d time.Duration) { - defer m.inc(&m.calls.Sleep) - <-m.After(d) -} - -// Tick is a convenience function for Ticker(). -// It will return a ticker channel that cannot be stopped. -func (m *Mock) Tick(d time.Duration) <-chan time.Time { - defer m.inc(&m.calls.Tick) - return m.Ticker(d).C -} - -// Ticker creates a new instance of Ticker. -func (m *Mock) Ticker(d time.Duration) *Ticker { - defer m.inc(&m.calls.Ticker) - m.mu.Lock() - defer m.mu.Unlock() - ch := make(chan time.Time) - t := &Ticker{ - C: ch, - c: ch, - mock: m, - d: d, - next: m.now.Add(d), - } - m.timers = append(m.timers, (*internalTicker)(t)) - return t -} - -// Timer creates a new instance of Timer. -func (m *Mock) Timer(d time.Duration) *Timer { - defer m.inc(&m.calls.Timer) - m.mu.Lock() - defer m.mu.Unlock() - ch := make(chan time.Time) - t := &Timer{ - C: ch, - c: ch, - mock: m, - next: m.now.Add(d), - } - m.timers = append(m.timers, (*internalTimer)(t)) - return t -} - -func (m *Mock) removeClockTimer(t clockTimer) { - m.mu.Lock() - defer m.mu.Unlock() - for i, timer := range m.timers { - if timer == t { - copy(m.timers[i:], m.timers[i+1:]) - m.timers[len(m.timers)-1] = nil - m.timers = m.timers[:len(m.timers)-1] - break - } - } - sort.Sort(m.timers) -} - -func (m *Mock) inc(addr *uint32) { - m.callsMutex.Lock() - defer m.callsMutex.Unlock() - *addr++ - var newWaiting []waiting - for _, w := range m.waiting { - if m.calls.atLeast(w.expected) { - close(w.done) - continue - } - newWaiting = append(newWaiting, w) - } - m.waiting = newWaiting -} - -// Wait waits for at least the relevant calls before returning. The expected -// Calls are always over the lifetime of the Mock. Values in the Calls struct -// are used as the minimum number of calls, this allows you to wait for only -// the calls you care about. -func (m *Mock) Wait(s Calls) { - m.callsMutex.Lock() - if m.calls.atLeast(s) { - m.callsMutex.Unlock() - return - } - done := make(chan struct{}) - m.waiting = append(m.waiting, waiting{expected: s, done: done}) - m.callsMutex.Unlock() - <-done -} - -// clockTimer represents an object with an associated start time. -type clockTimer interface { - Next() time.Time - Tick(time.Time) -} - -// clockTimers represents a list of sortable timers. -type clockTimers []clockTimer - -func (a clockTimers) Len() int { return len(a) } -func (a clockTimers) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a clockTimers) Less(i, j int) bool { return a[i].Next().Before(a[j].Next()) } - -// Timer represents a single event. -// The current time will be sent on C, unless the timer was created by AfterFunc. -type Timer struct { - C <-chan time.Time - c chan time.Time - timer *time.Timer // realtime impl, if set - next time.Time // next tick time - mock *Mock // mock clock, if set - fn func() // AfterFunc function, if set -} - -// Stop turns off the ticker. -func (t *Timer) Stop() { - if t.timer != nil { - t.timer.Stop() - } else { - t.mock.removeClockTimer((*internalTimer)(t)) - } -} - -type internalTimer Timer - -func (t *internalTimer) Next() time.Time { return t.next } -func (t *internalTimer) Tick(now time.Time) { - if t.fn != nil { - t.fn() - } else { - t.c <- now - } - t.mock.removeClockTimer((*internalTimer)(t)) - gosched() -} - -// Ticker holds a channel that receives "ticks" at regular intervals. -type Ticker struct { - C <-chan time.Time - c chan time.Time - ticker *time.Ticker // realtime impl, if set - next time.Time // next tick time - mock *Mock // mock clock, if set - d time.Duration // time between ticks -} - -// Stop turns off the ticker. -func (t *Ticker) Stop() { - if t.ticker != nil { - t.ticker.Stop() - } else { - t.mock.removeClockTimer((*internalTicker)(t)) - } -} - -type internalTicker Ticker - -func (t *internalTicker) Next() time.Time { return t.next } -func (t *internalTicker) Tick(now time.Time) { - select { - case t.c <- now: - case <-time.After(1 * time.Millisecond): - } - t.next = now.Add(t.d) - gosched() -} - -// Sleep momentarily so that other goroutines can process. -func gosched() { runtime.Gosched() } - -// Calls keeps track of the count of calls for each of the methods on the Clock -// interface. -type Calls struct { - After uint32 - AfterFunc uint32 - Now uint32 - Sleep uint32 - Tick uint32 - Ticker uint32 - Timer uint32 -} - -// atLeast returns true if at least the number of calls in o have been made. -func (c Calls) atLeast(o Calls) bool { - if c.After < o.After { - return false - } - if c.AfterFunc < o.AfterFunc { - return false - } - if c.Now < o.Now { - return false - } - if c.Sleep < o.Sleep { - return false - } - if c.Tick < o.Tick { - return false - } - if c.Ticker < o.Ticker { - return false - } - if c.Timer < o.Timer { - return false - } - return true -} - -type waiting struct { - expected Calls - done chan struct{} -} diff --git a/vendor/github.com/facebookgo/grace/gracehttp/http.go b/vendor/github.com/facebookgo/grace/gracehttp/http.go deleted file mode 100644 index 4ba8d284c8..0000000000 --- a/vendor/github.com/facebookgo/grace/gracehttp/http.go +++ /dev/null @@ -1,186 +0,0 @@ -// Package gracehttp provides easy to use graceful restart -// functionality for HTTP server. -package gracehttp - -import ( - "bytes" - "crypto/tls" - "flag" - "fmt" - "log" - "net" - "net/http" - "os" - "os/signal" - "sync" - "syscall" - - "github.com/facebookgo/grace/gracenet" - "github.com/facebookgo/httpdown" -) - -var ( - verbose = flag.Bool("gracehttp.log", true, "Enable logging.") - didInherit = os.Getenv("LISTEN_FDS") != "" - ppid = os.Getppid() -) - -// An app contains one or more servers and associated configuration. -type app struct { - servers []*http.Server - http *httpdown.HTTP - net *gracenet.Net - listeners []net.Listener - sds []httpdown.Server - errors chan error -} - -func newApp(servers []*http.Server) *app { - return &app{ - servers: servers, - http: &httpdown.HTTP{}, - net: &gracenet.Net{}, - listeners: make([]net.Listener, 0, len(servers)), - sds: make([]httpdown.Server, 0, len(servers)), - - // 2x num servers for possible Close or Stop errors + 1 for possible - // StartProcess error. - errors: make(chan error, 1+(len(servers)*2)), - } -} - -func (a *app) listen() error { - for _, s := range a.servers { - // TODO: default addresses - l, err := a.net.Listen("tcp", s.Addr) - if err != nil { - return err - } - if s.TLSConfig != nil { - l = tls.NewListener(l, s.TLSConfig) - } - a.listeners = append(a.listeners, l) - } - return nil -} - -func (a *app) serve() { - for i, s := range a.servers { - a.sds = append(a.sds, a.http.Serve(s, a.listeners[i])) - } -} - -func (a *app) wait() { - var wg sync.WaitGroup - wg.Add(len(a.sds) * 2) // Wait & Stop - go a.signalHandler(&wg) - for _, s := range a.sds { - go func(s httpdown.Server) { - defer wg.Done() - if err := s.Wait(); err != nil { - a.errors <- err - } - }(s) - } - wg.Wait() -} - -func (a *app) term(wg *sync.WaitGroup) { - for _, s := range a.sds { - go func(s httpdown.Server) { - defer wg.Done() - if err := s.Stop(); err != nil { - a.errors <- err - } - }(s) - } -} - -func (a *app) signalHandler(wg *sync.WaitGroup) { - ch := make(chan os.Signal, 10) - signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM, syscall.SIGUSR2) - for { - sig := <-ch - switch sig { - case syscall.SIGINT, syscall.SIGTERM: - // this ensures a subsequent INT/TERM will trigger standard go behaviour of - // terminating. - signal.Stop(ch) - a.term(wg) - return - case syscall.SIGUSR2: - // we only return here if there's an error, otherwise the new process - // will send us a TERM when it's ready to trigger the actual shutdown. - if _, err := a.net.StartProcess(); err != nil { - a.errors <- err - } - } - } -} - -// Serve will serve the given http.Servers and will monitor for signals -// allowing for graceful termination (SIGTERM) or restart (SIGUSR2). -func Serve(servers ...*http.Server) error { - a := newApp(servers) - - // Acquire Listeners - if err := a.listen(); err != nil { - return err - } - - // Some useful logging. - if *verbose { - if didInherit { - if ppid == 1 { - log.Printf("Listening on init activated %s", pprintAddr(a.listeners)) - } else { - const msg = "Graceful handoff of %s with new pid %d and old pid %d" - log.Printf(msg, pprintAddr(a.listeners), os.Getpid(), ppid) - } - } else { - const msg = "Serving %s with pid %d" - log.Printf(msg, pprintAddr(a.listeners), os.Getpid()) - } - } - - // Start serving. - a.serve() - - // Close the parent if we inherited and it wasn't init that started us. - if didInherit && ppid != 1 { - if err := syscall.Kill(ppid, syscall.SIGTERM); err != nil { - return fmt.Errorf("failed to close parent: %s", err) - } - } - - waitdone := make(chan struct{}) - go func() { - defer close(waitdone) - a.wait() - }() - - select { - case err := <-a.errors: - if err == nil { - panic("unexpected nil error") - } - return err - case <-waitdone: - if *verbose { - log.Printf("Exiting pid %d.", os.Getpid()) - } - return nil - } -} - -// Used for pretty printing addresses. -func pprintAddr(listeners []net.Listener) []byte { - var out bytes.Buffer - for i, l := range listeners { - if i != 0 { - fmt.Fprint(&out, ", ") - } - fmt.Fprint(&out, l.Addr()) - } - return out.Bytes() -} diff --git a/vendor/github.com/facebookgo/grace/gracenet/net.go b/vendor/github.com/facebookgo/grace/gracenet/net.go deleted file mode 100644 index a980954a9d..0000000000 --- a/vendor/github.com/facebookgo/grace/gracenet/net.go +++ /dev/null @@ -1,252 +0,0 @@ -// Package gracenet provides a family of Listen functions that either open a -// fresh connection or provide an inherited connection from when the process -// was started. The behave like their counterparts in the net package, but -// transparently provide support for graceful restarts without dropping -// connections. This is provided in a systemd socket activation compatible form -// to allow using socket activation. -// -// BUG: Doesn't handle closing of listeners. -package gracenet - -import ( - "fmt" - "net" - "os" - "os/exec" - "strconv" - "strings" - "sync" -) - -const ( - // Used to indicate a graceful restart in the new process. - envCountKey = "LISTEN_FDS" - envCountKeyPrefix = envCountKey + "=" -) - -// In order to keep the working directory the same as when we started we record -// it at startup. -var originalWD, _ = os.Getwd() - -// Net provides the family of Listen functions and maintains the associated -// state. Typically you will have only once instance of Net per application. -type Net struct { - inherited []net.Listener - active []net.Listener - mutex sync.Mutex - inheritOnce sync.Once - - // used in tests to override the default behavior of starting from fd 3. - fdStart int -} - -func (n *Net) inherit() error { - var retErr error - n.inheritOnce.Do(func() { - n.mutex.Lock() - defer n.mutex.Unlock() - countStr := os.Getenv(envCountKey) - if countStr == "" { - return - } - count, err := strconv.Atoi(countStr) - if err != nil { - retErr = fmt.Errorf("found invalid count value: %s=%s", envCountKey, countStr) - return - } - - // In tests this may be overridden. - fdStart := n.fdStart - if fdStart == 0 { - // In normal operations if we are inheriting, the listeners will begin at - // fd 3. - fdStart = 3 - } - - for i := fdStart; i < fdStart+count; i++ { - file := os.NewFile(uintptr(i), "listener") - l, err := net.FileListener(file) - if err != nil { - file.Close() - retErr = fmt.Errorf("error inheriting socket fd %d: %s", i, err) - return - } - if err := file.Close(); err != nil { - retErr = fmt.Errorf("error closing inherited socket fd %d: %s", i, err) - return - } - n.inherited = append(n.inherited, l) - } - }) - return retErr -} - -// Listen announces on the local network address laddr. The network net must be -// a stream-oriented network: "tcp", "tcp4", "tcp6", "unix" or "unixpacket". It -// returns an inherited net.Listener for the matching network and address, or -// creates a new one using net.Listen. -func (n *Net) Listen(nett, laddr string) (net.Listener, error) { - switch nett { - default: - return nil, net.UnknownNetworkError(nett) - case "tcp", "tcp4", "tcp6": - addr, err := net.ResolveTCPAddr(nett, laddr) - if err != nil { - return nil, err - } - return n.ListenTCP(nett, addr) - case "unix", "unixpacket", "invalid_unix_net_for_test": - addr, err := net.ResolveUnixAddr(nett, laddr) - if err != nil { - return nil, err - } - return n.ListenUnix(nett, addr) - } -} - -// ListenTCP announces on the local network address laddr. The network net must -// be: "tcp", "tcp4" or "tcp6". It returns an inherited net.Listener for the -// matching network and address, or creates a new one using net.ListenTCP. -func (n *Net) ListenTCP(nett string, laddr *net.TCPAddr) (*net.TCPListener, error) { - if err := n.inherit(); err != nil { - return nil, err - } - - n.mutex.Lock() - defer n.mutex.Unlock() - - // look for an inherited listener - for i, l := range n.inherited { - if l == nil { // we nil used inherited listeners - continue - } - if isSameAddr(l.Addr(), laddr) { - n.inherited[i] = nil - n.active = append(n.active, l) - return l.(*net.TCPListener), nil - } - } - - // make a fresh listener - l, err := net.ListenTCP(nett, laddr) - if err != nil { - return nil, err - } - n.active = append(n.active, l) - return l, nil -} - -// ListenUnix announces on the local network address laddr. The network net -// must be a: "unix" or "unixpacket". It returns an inherited net.Listener for -// the matching network and address, or creates a new one using net.ListenUnix. -func (n *Net) ListenUnix(nett string, laddr *net.UnixAddr) (*net.UnixListener, error) { - if err := n.inherit(); err != nil { - return nil, err - } - - n.mutex.Lock() - defer n.mutex.Unlock() - - // look for an inherited listener - for i, l := range n.inherited { - if l == nil { // we nil used inherited listeners - continue - } - if isSameAddr(l.Addr(), laddr) { - n.inherited[i] = nil - n.active = append(n.active, l) - return l.(*net.UnixListener), nil - } - } - - // make a fresh listener - l, err := net.ListenUnix(nett, laddr) - if err != nil { - return nil, err - } - n.active = append(n.active, l) - return l, nil -} - -// activeListeners returns a snapshot copy of the active listeners. -func (n *Net) activeListeners() ([]net.Listener, error) { - n.mutex.Lock() - defer n.mutex.Unlock() - ls := make([]net.Listener, len(n.active)) - copy(ls, n.active) - return ls, nil -} - -func isSameAddr(a1, a2 net.Addr) bool { - if a1.Network() != a2.Network() { - return false - } - a1s := a1.String() - a2s := a2.String() - if a1s == a2s { - return true - } - - // This allows for ipv6 vs ipv4 local addresses to compare as equal. This - // scenario is common when listening on localhost. - const ipv6prefix = "[::]" - a1s = strings.TrimPrefix(a1s, ipv6prefix) - a2s = strings.TrimPrefix(a2s, ipv6prefix) - const ipv4prefix = "0.0.0.0" - a1s = strings.TrimPrefix(a1s, ipv4prefix) - a2s = strings.TrimPrefix(a2s, ipv4prefix) - return a1s == a2s -} - -// StartProcess starts a new process passing it the active listeners. It -// doesn't fork, but starts a new process using the same environment and -// arguments as when it was originally started. This allows for a newly -// deployed binary to be started. It returns the pid of the newly started -// process when successful. -func (n *Net) StartProcess() (int, error) { - listeners, err := n.activeListeners() - if err != nil { - return 0, err - } - - // Extract the fds from the listeners. - files := make([]*os.File, len(listeners)) - for i, l := range listeners { - files[i], err = l.(filer).File() - if err != nil { - return 0, err - } - defer files[i].Close() - } - - // Use the original binary location. This works with symlinks such that if - // the file it points to has been changed we will use the updated symlink. - argv0, err := exec.LookPath(os.Args[0]) - if err != nil { - return 0, err - } - - // Pass on the environment and replace the old count key with the new one. - var env []string - for _, v := range os.Environ() { - if !strings.HasPrefix(v, envCountKeyPrefix) { - env = append(env, v) - } - } - env = append(env, fmt.Sprintf("%s%d", envCountKeyPrefix, len(listeners))) - - allFiles := append([]*os.File{os.Stdin, os.Stdout, os.Stderr}, files...) - process, err := os.StartProcess(argv0, os.Args, &os.ProcAttr{ - Dir: originalWD, - Env: env, - Files: allFiles, - }) - if err != nil { - return 0, err - } - return process.Pid, nil -} - -type filer interface { - File() (*os.File, error) -} diff --git a/vendor/github.com/facebookgo/httpdown/.travis.yml b/vendor/github.com/facebookgo/httpdown/.travis.yml deleted file mode 100644 index ea316cfe50..0000000000 --- a/vendor/github.com/facebookgo/httpdown/.travis.yml +++ /dev/null @@ -1,23 +0,0 @@ -language: go - -go: - - 1.6 - -before_install: - - go get -v golang.org/x/tools/cmd/vet - - go get -v golang.org/x/tools/cmd/cover - - go get -v github.com/golang/lint/golint - -install: - - go install -race -v std - - go get -race -t -v ./... - - go install -race -v ./... - -script: - - go vet ./... - - $HOME/gopath/bin/golint . - - go test -cpu=2 -race -v ./... - - go test -cpu=2 -covermode=atomic -coverprofile=coverage.txt ./ - -after_success: - - bash <(curl -s https://codecov.io/bash) diff --git a/vendor/github.com/facebookgo/httpdown/httpdown.go b/vendor/github.com/facebookgo/httpdown/httpdown.go deleted file mode 100644 index 34c5dea9ff..0000000000 --- a/vendor/github.com/facebookgo/httpdown/httpdown.go +++ /dev/null @@ -1,376 +0,0 @@ -// Package httpdown provides http.ConnState enabled graceful termination of -// http.Server. -package httpdown - -import ( - "crypto/tls" - "fmt" - "net" - "net/http" - "os" - "os/signal" - "sync" - "syscall" - "time" - - "github.com/facebookgo/clock" - "github.com/facebookgo/stats" -) - -const ( - defaultStopTimeout = time.Minute - defaultKillTimeout = time.Minute -) - -// A Server allows encapsulates the process of accepting new connections and -// serving them, and gracefully shutting down the listener without dropping -// active connections. -type Server interface { - // Wait waits for the serving loop to finish. This will happen when Stop is - // called, at which point it returns no error, or if there is an error in the - // serving loop. You must call Wait after calling Serve or ListenAndServe. - Wait() error - - // Stop stops the listener. It will block until all connections have been - // closed. - Stop() error -} - -// HTTP defines the configuration for serving a http.Server. Multiple calls to -// Serve or ListenAndServe can be made on the same HTTP instance. The default -// timeouts of 1 minute each result in a maximum of 2 minutes before a Stop() -// returns. -type HTTP struct { - // StopTimeout is the duration before we begin force closing connections. - // Defaults to 1 minute. - StopTimeout time.Duration - - // KillTimeout is the duration before which we completely give up and abort - // even though we still have connected clients. This is useful when a large - // number of client connections exist and closing them can take a long time. - // Note, this is in addition to the StopTimeout. Defaults to 1 minute. - KillTimeout time.Duration - - // Stats is optional. If provided, it will be used to record various metrics. - Stats stats.Client - - // Clock allows for testing timing related functionality. Do not specify this - // in production code. - Clock clock.Clock -} - -// Serve provides the low-level API which is useful if you're creating your own -// net.Listener. -func (h HTTP) Serve(s *http.Server, l net.Listener) Server { - stopTimeout := h.StopTimeout - if stopTimeout == 0 { - stopTimeout = defaultStopTimeout - } - killTimeout := h.KillTimeout - if killTimeout == 0 { - killTimeout = defaultKillTimeout - } - klock := h.Clock - if klock == nil { - klock = clock.New() - } - - ss := &server{ - stopTimeout: stopTimeout, - killTimeout: killTimeout, - stats: h.Stats, - clock: klock, - oldConnState: s.ConnState, - listener: l, - server: s, - serveDone: make(chan struct{}), - serveErr: make(chan error, 1), - new: make(chan net.Conn), - active: make(chan net.Conn), - idle: make(chan net.Conn), - closed: make(chan net.Conn), - stop: make(chan chan struct{}), - kill: make(chan chan struct{}), - } - s.ConnState = ss.connState - go ss.manage() - go ss.serve() - return ss -} - -// ListenAndServe returns a Server for the given http.Server. It is equivalent -// to ListenAndServe from the standard library, but returns immediately. -// Requests will be accepted in a background goroutine. If the http.Server has -// a non-nil TLSConfig, a TLS enabled listener will be setup. -func (h HTTP) ListenAndServe(s *http.Server) (Server, error) { - addr := s.Addr - if addr == "" { - if s.TLSConfig == nil { - addr = ":http" - } else { - addr = ":https" - } - } - l, err := net.Listen("tcp", addr) - if err != nil { - stats.BumpSum(h.Stats, "listen.error", 1) - return nil, err - } - if s.TLSConfig != nil { - l = tls.NewListener(l, s.TLSConfig) - } - return h.Serve(s, l), nil -} - -// server manages the serving process and allows for gracefully stopping it. -type server struct { - stopTimeout time.Duration - killTimeout time.Duration - stats stats.Client - clock clock.Clock - - oldConnState func(net.Conn, http.ConnState) - server *http.Server - serveDone chan struct{} - serveErr chan error - listener net.Listener - - new chan net.Conn - active chan net.Conn - idle chan net.Conn - closed chan net.Conn - stop chan chan struct{} - kill chan chan struct{} - - stopOnce sync.Once - stopErr error -} - -func (s *server) connState(c net.Conn, cs http.ConnState) { - if s.oldConnState != nil { - s.oldConnState(c, cs) - } - - switch cs { - case http.StateNew: - s.new <- c - case http.StateActive: - s.active <- c - case http.StateIdle: - s.idle <- c - case http.StateHijacked, http.StateClosed: - s.closed <- c - } -} - -func (s *server) manage() { - defer func() { - close(s.new) - close(s.active) - close(s.idle) - close(s.closed) - close(s.stop) - close(s.kill) - }() - - var stopDone chan struct{} - - conns := map[net.Conn]http.ConnState{} - var countNew, countActive, countIdle float64 - - // decConn decrements the count associated with the current state of the - // given connection. - decConn := func(c net.Conn) { - switch conns[c] { - default: - panic(fmt.Errorf("unknown existing connection: %s", c)) - case http.StateNew: - countNew-- - case http.StateActive: - countActive-- - case http.StateIdle: - countIdle-- - } - } - - // setup a ticker to report various values every minute. if we don't have a - // Stats implementation provided, we Stop it so it never ticks. - statsTicker := s.clock.Ticker(time.Minute) - if s.stats == nil { - statsTicker.Stop() - } - - for { - select { - case <-statsTicker.C: - // we'll only get here when s.stats is not nil - s.stats.BumpAvg("http-state.new", countNew) - s.stats.BumpAvg("http-state.active", countActive) - s.stats.BumpAvg("http-state.idle", countIdle) - s.stats.BumpAvg("http-state.total", countNew+countActive+countIdle) - case c := <-s.new: - conns[c] = http.StateNew - countNew++ - case c := <-s.active: - decConn(c) - countActive++ - - conns[c] = http.StateActive - case c := <-s.idle: - decConn(c) - countIdle++ - - conns[c] = http.StateIdle - - // if we're already stopping, close it - if stopDone != nil { - c.Close() - } - case c := <-s.closed: - stats.BumpSum(s.stats, "conn.closed", 1) - decConn(c) - delete(conns, c) - - // if we're waiting to stop and are all empty, we just closed the last - // connection and we're done. - if stopDone != nil && len(conns) == 0 { - close(stopDone) - return - } - case stopDone = <-s.stop: - // if we're already all empty, we're already done - if len(conns) == 0 { - close(stopDone) - return - } - - // close current idle connections right away - for c, cs := range conns { - if cs == http.StateIdle { - c.Close() - } - } - - // continue the loop and wait for all the ConnState updates which will - // eventually close(stopDone) and return from this goroutine. - - case killDone := <-s.kill: - // force close all connections - stats.BumpSum(s.stats, "kill.conn.count", float64(len(conns))) - for c := range conns { - c.Close() - } - - // don't block the kill. - close(killDone) - - // continue the loop and we wait for all the ConnState updates and will - // return from this goroutine when we're all done. otherwise we'll try to - // send those ConnState updates on closed channels. - - } - } -} - -func (s *server) serve() { - stats.BumpSum(s.stats, "serve", 1) - s.serveErr <- s.server.Serve(s.listener) - close(s.serveDone) - close(s.serveErr) -} - -func (s *server) Wait() error { - if err := <-s.serveErr; !isUseOfClosedError(err) { - return err - } - return nil -} - -func (s *server) Stop() error { - s.stopOnce.Do(func() { - defer stats.BumpTime(s.stats, "stop.time").End() - stats.BumpSum(s.stats, "stop", 1) - - // first disable keep-alive for new connections - s.server.SetKeepAlivesEnabled(false) - - // then close the listener so new connections can't connect come thru - closeErr := s.listener.Close() - <-s.serveDone - - // then trigger the background goroutine to stop and wait for it - stopDone := make(chan struct{}) - s.stop <- stopDone - - // wait for stop - select { - case <-stopDone: - case <-s.clock.After(s.stopTimeout): - defer stats.BumpTime(s.stats, "kill.time").End() - stats.BumpSum(s.stats, "kill", 1) - - // stop timed out, wait for kill - killDone := make(chan struct{}) - s.kill <- killDone - select { - case <-killDone: - case <-s.clock.After(s.killTimeout): - // kill timed out, give up - stats.BumpSum(s.stats, "kill.timeout", 1) - } - } - - if closeErr != nil && !isUseOfClosedError(closeErr) { - stats.BumpSum(s.stats, "listener.close.error", 1) - s.stopErr = closeErr - } - }) - return s.stopErr -} - -func isUseOfClosedError(err error) bool { - if err == nil { - return false - } - if opErr, ok := err.(*net.OpError); ok { - err = opErr.Err - } - return err.Error() == "use of closed network connection" -} - -// ListenAndServe is a convenience function to serve and wait for a SIGTERM -// or SIGINT before shutting down. -func ListenAndServe(s *http.Server, hd *HTTP) error { - if hd == nil { - hd = &HTTP{} - } - hs, err := hd.ListenAndServe(s) - if err != nil { - return err - } - - waiterr := make(chan error, 1) - go func() { - defer close(waiterr) - waiterr <- hs.Wait() - }() - - signals := make(chan os.Signal, 10) - signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT) - - select { - case err := <-waiterr: - if err != nil { - return err - } - case <-signals: - signal.Stop(signals) - if err := hs.Stop(); err != nil { - return err - } - if err := <-waiterr; err != nil { - return err - } - } - return nil -} diff --git a/vendor/github.com/facebookgo/httpdown/license b/vendor/github.com/facebookgo/httpdown/license deleted file mode 100644 index d849082ffb..0000000000 --- a/vendor/github.com/facebookgo/httpdown/license +++ /dev/null @@ -1,30 +0,0 @@ -BSD License - -For httpdown software - -Copyright (c) 2015, Facebook, Inc. All rights reserved. - -Redistribution and use in source and binary forms, with or without modification, -are permitted provided that the following conditions are met: - - * Redistributions of source code must retain the above copyright notice, this - list of conditions and the following disclaimer. - - * Redistributions in binary form must reproduce the above copyright notice, - this list of conditions and the following disclaimer in the documentation - and/or other materials provided with the distribution. - - * Neither the name Facebook nor the names of its contributors may be used to - endorse or promote products derived from this software without specific - prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND -ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED -WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR -ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES -(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; -LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON -ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/github.com/facebookgo/httpdown/patents b/vendor/github.com/facebookgo/httpdown/patents deleted file mode 100644 index f7133456a2..0000000000 --- a/vendor/github.com/facebookgo/httpdown/patents +++ /dev/null @@ -1,33 +0,0 @@ -Additional Grant of Patent Rights Version 2 - -"Software" means the httpdown software distributed by Facebook, Inc. - -Facebook, Inc. ("Facebook") hereby grants to each recipient of the Software -("you") a perpetual, worldwide, royalty-free, non-exclusive, irrevocable -(subject to the termination provision below) license under any Necessary -Claims, to make, have made, use, sell, offer to sell, import, and otherwise -transfer the Software. For avoidance of doubt, no license is granted under -Facebook’s rights in any patent claims that are infringed by (i) modifications -to the Software made by you or any third party or (ii) the Software in -combination with any software or other technology. - -The license granted hereunder will terminate, automatically and without notice, -if you (or any of your subsidiaries, corporate affiliates or agents) initiate -directly or indirectly, or take a direct financial interest in, any Patent -Assertion: (i) against Facebook or any of its subsidiaries or corporate -affiliates, (ii) against any party if such Patent Assertion arises in whole or -in part from any software, technology, product or service of Facebook or any of -its subsidiaries or corporate affiliates, or (iii) against any party relating -to the Software. Notwithstanding the foregoing, if Facebook or any of its -subsidiaries or corporate affiliates files a lawsuit alleging patent -infringement against you in the first instance, and you respond by filing a -patent infringement counterclaim in that lawsuit against that party that is -unrelated to the Software, the license granted hereunder will not terminate -under section (i) of this paragraph due to such counterclaim. - -A "Necessary Claim" is a claim of a patent owned by Facebook that is -necessarily infringed by the Software standing alone. - -A "Patent Assertion" is any lawsuit or other action alleging direct, indirect, -or contributory infringement or inducement to infringe any patent, including a -cross-claim or counterclaim. diff --git a/vendor/github.com/facebookgo/httpdown/readme.md b/vendor/github.com/facebookgo/httpdown/readme.md deleted file mode 100644 index d5fa245dbc..0000000000 --- a/vendor/github.com/facebookgo/httpdown/readme.md +++ /dev/null @@ -1,41 +0,0 @@ -httpdown [![Build Status](https://secure.travis-ci.org/facebookgo/httpdown.png)](https://travis-ci.org/facebookgo/httpdown) -======== - -Documentation: https://godoc.org/github.com/facebookgo/httpdown - -Package httpdown provides a library that makes it easy to build a HTTP server -that can be shutdown gracefully (that is, without dropping any connections). - -If you want graceful restart and not just graceful shutdown, look at the -[grace](https://github.com/facebookgo/grace) package which uses this package -underneath but also provides graceful restart. - -Usage ------ - -Demo HTTP Server with graceful termination: -https://github.com/facebookgo/httpdown/blob/master/httpdown_example/main.go - -1. Install the demo application - - go get github.com/facebookgo/httpdown/httpdown_example - -1. Start it in the first terminal - - httpdown_example - - This will output something like: - - 2014/11/18 21:57:50 serving on http://127.0.0.1:8080/ with pid 17 - -1. In a second terminal start a slow HTTP request - - curl 'http://localhost:8080/?duration=20s' - -1. In a third terminal trigger a graceful shutdown (using the pid from your output): - - kill -TERM 17 - -This will demonstrate that the slow request was served before the server was -shutdown. You could also have used `Ctrl-C` instead of `kill` as the example -application triggers graceful shutdown on TERM or INT signals. diff --git a/vendor/github.com/facebookgo/stats/.travis.yml b/vendor/github.com/facebookgo/stats/.travis.yml deleted file mode 100644 index 9c9f036ae3..0000000000 --- a/vendor/github.com/facebookgo/stats/.travis.yml +++ /dev/null @@ -1,20 +0,0 @@ -language: go - -go: - - 1.5 - -before_install: - - go get -v golang.org/x/tools/cmd/vet - - go get -v golang.org/x/tools/cmd/cover - - go get -v github.com/golang/lint/golint - -install: - - go install -race -v std - - go get -race -t -v ./... - - go install -race -v ./... - -script: - - go vet ./... - - $HOME/gopath/bin/golint . - - go test -cpu=2 -race -v ./... - - go test -cpu=2 -covermode=atomic ./... diff --git a/vendor/github.com/facebookgo/stats/aggregation.go b/vendor/github.com/facebookgo/stats/aggregation.go deleted file mode 100644 index 8f57fb7f1b..0000000000 --- a/vendor/github.com/facebookgo/stats/aggregation.go +++ /dev/null @@ -1,35 +0,0 @@ -package stats - -import "sort" - -// Average returns the average value -func Average(values []float64) float64 { - if len(values) == 0 { - return 0 - } - - var val float64 - for _, point := range values { - val += point - } - return val / float64(len(values)) -} - -// Sum returns the sum of all the given values -func Sum(values []float64) float64 { - var val float64 - for _, point := range values { - val += point - } - return val -} - -// Percentiles returns a map containing the asked for percentiles -func Percentiles(values []float64, percentiles map[string]float64) map[string]float64 { - sort.Float64s(values) - results := map[string]float64{} - for label, p := range percentiles { - results[label] = values[int(float64(len(values))*p)] - } - return results -} diff --git a/vendor/github.com/facebookgo/stats/counter.go b/vendor/github.com/facebookgo/stats/counter.go deleted file mode 100644 index 59a0ed1e7d..0000000000 --- a/vendor/github.com/facebookgo/stats/counter.go +++ /dev/null @@ -1,112 +0,0 @@ -package stats - -import "fmt" - -// Type is the type of aggregation of apply -type Type int - -const ( - AggregateAvg Type = iota - AggregateSum - AggregateHistogram -) - -var ( - // HistogramPercentiles is used to determine which percentiles to return for - // SimpleCounter.Aggregate - HistogramPercentiles = map[string]float64{ - "p50": 0.5, - "p95": 0.95, - "p99": 0.99, - } - - // MinSamplesForPercentiles is used by SimpleCounter.Aggregate to determine - // what the minimum number of samples is required for percentile analysis - MinSamplesForPercentiles = 10 -) - -// Aggregates can be used to merge counters together. This is not goroutine safe -type Aggregates map[string]Counter - -// Add adds the counter for aggregation. This is not goroutine safe -func (a Aggregates) Add(c Counter) error { - key := c.FullKey() - if counter, ok := a[key]; ok { - if counter.GetType() != c.GetType() { - return fmt.Errorf("stats: mismatched aggregation type for: %s", key) - } - counter.AddValues(c.GetValues()...) - } else { - a[key] = c - } - return nil -} - -// Counter is the interface used by Aggregates to merge counters together -type Counter interface { - // FullKey is used to uniquely identify the counter - FullKey() string - - // AddValues adds values for aggregation - AddValues(...float64) - - // GetValues returns the values for aggregation - GetValues() []float64 - - // GetType returns the type of aggregation to apply - GetType() Type -} - -// SimpleCounter is a basic implementation of the Counter interface -type SimpleCounter struct { - Key string - Values []float64 - Type Type -} - -// FullKey is part of the Counter interace -func (s *SimpleCounter) FullKey() string { - return s.Key -} - -// GetValues is part of the Counter interface -func (s *SimpleCounter) GetValues() []float64 { - return s.Values -} - -// AddValues is part of the Counter interface -func (s *SimpleCounter) AddValues(vs ...float64) { - s.Values = append(s.Values, vs...) -} - -// GetType is part of the Counter interface -func (s *SimpleCounter) GetType() Type { - return s.Type -} - -// Aggregate aggregates the provided values appropriately, returning a map -// from key to value. If AggregateHistogram is specified, the map will contain -// the relevant percentiles as specified by HistogramPercentiles -func (s *SimpleCounter) Aggregate() map[string]float64 { - switch s.Type { - case AggregateAvg: - return map[string]float64{ - s.Key: Average(s.Values), - } - case AggregateSum: - return map[string]float64{ - s.Key: Sum(s.Values), - } - case AggregateHistogram: - histogram := map[string]float64{ - s.Key: Average(s.Values), - } - if len(s.Values) > MinSamplesForPercentiles { - for k, v := range Percentiles(s.Values, HistogramPercentiles) { - histogram[fmt.Sprintf("%s.%s", s.Key, k)] = v - } - } - return histogram - } - panic("stats: unsupported aggregation type") -} diff --git a/vendor/github.com/facebookgo/stats/license b/vendor/github.com/facebookgo/stats/license deleted file mode 100644 index feae870756..0000000000 --- a/vendor/github.com/facebookgo/stats/license +++ /dev/null @@ -1,30 +0,0 @@ -BSD License - -For stats software - -Copyright (c) 2015, Facebook, Inc. All rights reserved. - -Redistribution and use in source and binary forms, with or without modification, -are permitted provided that the following conditions are met: - - * Redistributions of source code must retain the above copyright notice, this - list of conditions and the following disclaimer. - - * Redistributions in binary form must reproduce the above copyright notice, - this list of conditions and the following disclaimer in the documentation - and/or other materials provided with the distribution. - - * Neither the name Facebook nor the names of its contributors may be used to - endorse or promote products derived from this software without specific - prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND -ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED -WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR -ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES -(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; -LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON -ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/github.com/facebookgo/stats/patents b/vendor/github.com/facebookgo/stats/patents deleted file mode 100644 index 5d76172129..0000000000 --- a/vendor/github.com/facebookgo/stats/patents +++ /dev/null @@ -1,33 +0,0 @@ -Additional Grant of Patent Rights Version 2 - -"Software" means the stats software distributed by Facebook, Inc. - -Facebook, Inc. ("Facebook") hereby grants to each recipient of the Software -("you") a perpetual, worldwide, royalty-free, non-exclusive, irrevocable -(subject to the termination provision below) license under any Necessary -Claims, to make, have made, use, sell, offer to sell, import, and otherwise -transfer the Software. For avoidance of doubt, no license is granted under -Facebook’s rights in any patent claims that are infringed by (i) modifications -to the Software made by you or any third party or (ii) the Software in -combination with any software or other technology. - -The license granted hereunder will terminate, automatically and without notice, -if you (or any of your subsidiaries, corporate affiliates or agents) initiate -directly or indirectly, or take a direct financial interest in, any Patent -Assertion: (i) against Facebook or any of its subsidiaries or corporate -affiliates, (ii) against any party if such Patent Assertion arises in whole or -in part from any software, technology, product or service of Facebook or any of -its subsidiaries or corporate affiliates, or (iii) against any party relating -to the Software. Notwithstanding the foregoing, if Facebook or any of its -subsidiaries or corporate affiliates files a lawsuit alleging patent -infringement against you in the first instance, and you respond by filing a -patent infringement counterclaim in that lawsuit against that party that is -unrelated to the Software, the license granted hereunder will not terminate -under section (i) of this paragraph due to such counterclaim. - -A "Necessary Claim" is a claim of a patent owned by Facebook that is -necessarily infringed by the Software standing alone. - -A "Patent Assertion" is any lawsuit or other action alleging direct, indirect, -or contributory infringement or inducement to infringe any patent, including a -cross-claim or counterclaim. diff --git a/vendor/github.com/facebookgo/stats/readme.md b/vendor/github.com/facebookgo/stats/readme.md deleted file mode 100644 index f268ed307b..0000000000 --- a/vendor/github.com/facebookgo/stats/readme.md +++ /dev/null @@ -1,4 +0,0 @@ -stats [![Build Status](https://secure.travis-ci.org/facebookgo/stats.png)](https://travis-ci.org/facebookgo/stats) -===== - -Documentation: https://godoc.org/github.com/facebookgo/stats diff --git a/vendor/github.com/facebookgo/stats/stats.go b/vendor/github.com/facebookgo/stats/stats.go deleted file mode 100644 index b833506aa2..0000000000 --- a/vendor/github.com/facebookgo/stats/stats.go +++ /dev/null @@ -1,166 +0,0 @@ -// Package stats defines a lightweight interface for collecting statistics. It -// doesn't provide an implementation, just the shared interface. -package stats - -// Client provides methods to collection statistics. -type Client interface { - // BumpAvg bumps the average for the given key. - BumpAvg(key string, val float64) - - // BumpSum bumps the sum for the given key. - BumpSum(key string, val float64) - - // BumpHistogram bumps the histogram for the given key. - BumpHistogram(key string, val float64) - - // BumpTime is a special version of BumpHistogram which is specialized for - // timers. Calling it starts the timer, and it returns a value on which End() - // can be called to indicate finishing the timer. A convenient way of - // recording the duration of a function is calling it like such at the top of - // the function: - // - // defer s.BumpTime("my.function").End() - BumpTime(key string) interface { - End() - } -} - -// PrefixClient adds multiple keys for the same value, with each prefix -// added to the key and calls the underlying client. -func PrefixClient(prefixes []string, client Client) Client { - return &prefixClient{ - Prefixes: prefixes, - Client: client, - } -} - -type prefixClient struct { - Prefixes []string - Client Client -} - -func (p *prefixClient) BumpAvg(key string, val float64) { - for _, prefix := range p.Prefixes { - p.Client.BumpAvg(prefix+key, val) - } -} - -func (p *prefixClient) BumpSum(key string, val float64) { - for _, prefix := range p.Prefixes { - p.Client.BumpSum(prefix+key, val) - } -} - -func (p *prefixClient) BumpHistogram(key string, val float64) { - for _, prefix := range p.Prefixes { - p.Client.BumpHistogram(prefix+key, val) - } -} - -func (p *prefixClient) BumpTime(key string) interface { - End() -} { - var m multiEnder - for _, prefix := range p.Prefixes { - m = append(m, p.Client.BumpTime(prefix+key)) - } - return m -} - -// multiEnder combines many enders together. -type multiEnder []interface { - End() -} - -func (m multiEnder) End() { - for _, e := range m { - e.End() - } -} - -// HookClient is useful for testing. It provides optional hooks for each -// expected method in the interface, which if provided will be called. If a -// hook is not provided, it will be ignored. -type HookClient struct { - BumpAvgHook func(key string, val float64) - BumpSumHook func(key string, val float64) - BumpHistogramHook func(key string, val float64) - BumpTimeHook func(key string) interface { - End() - } -} - -// BumpAvg will call BumpAvgHook if defined. -func (c *HookClient) BumpAvg(key string, val float64) { - if c.BumpAvgHook != nil { - c.BumpAvgHook(key, val) - } -} - -// BumpSum will call BumpSumHook if defined. -func (c *HookClient) BumpSum(key string, val float64) { - if c.BumpSumHook != nil { - c.BumpSumHook(key, val) - } -} - -// BumpHistogram will call BumpHistogramHook if defined. -func (c *HookClient) BumpHistogram(key string, val float64) { - if c.BumpHistogramHook != nil { - c.BumpHistogramHook(key, val) - } -} - -// BumpTime will call BumpTimeHook if defined. -func (c *HookClient) BumpTime(key string) interface { - End() -} { - if c.BumpTimeHook != nil { - return c.BumpTimeHook(key) - } - return NoOpEnd -} - -type noOpEnd struct{} - -func (n noOpEnd) End() {} - -// NoOpEnd provides a dummy value for use in tests as valid return value for -// BumpTime(). -var NoOpEnd = noOpEnd{} - -// BumpAvg calls BumpAvg on the Client if it isn't nil. This is useful when a -// component has an optional stats.Client. -func BumpAvg(c Client, key string, val float64) { - if c != nil { - c.BumpAvg(key, val) - } -} - -// BumpSum calls BumpSum on the Client if it isn't nil. This is useful when a -// component has an optional stats.Client. -func BumpSum(c Client, key string, val float64) { - if c != nil { - c.BumpSum(key, val) - } -} - -// BumpHistogram calls BumpHistogram on the Client if it isn't nil. This is -// useful when a component has an optional stats.Client. -func BumpHistogram(c Client, key string, val float64) { - if c != nil { - c.BumpHistogram(key, val) - } -} - -// BumpTime calls BumpTime on the Client if it isn't nil. If the Client is nil -// it still returns a valid return value which will be a no-op. This is useful -// when a component has an optional stats.Client. -func BumpTime(c Client, key string) interface { - End() -} { - if c != nil { - return c.BumpTime(key) - } - return NoOpEnd -} diff --git a/vendor/github.com/facebookgo/stats/stopper.go b/vendor/github.com/facebookgo/stats/stopper.go deleted file mode 100644 index 38e8eab80a..0000000000 --- a/vendor/github.com/facebookgo/stats/stopper.go +++ /dev/null @@ -1,17 +0,0 @@ -package stats - -import "time" - -// Stopper calls Client.BumpSum and Client.BumpHistogram when End'ed -type Stopper struct { - Key string - Start time.Time - Client Client -} - -// End the Stopper -func (s *Stopper) End() { - since := time.Since(s.Start).Seconds() * 1000.0 - s.Client.BumpSum(s.Key+".total", since) - s.Client.BumpHistogram(s.Key, since) -} diff --git a/vendor/modules.txt b/vendor/modules.txt index 30b1370933..52bb23dfda 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -132,15 +132,6 @@ github.com/emirpasic/gods/utils github.com/etcd-io/bbolt # github.com/ethantkoenig/rupture v0.0.0-20180203182544-0a76f03a811a github.com/ethantkoenig/rupture -# github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a -github.com/facebookgo/clock -# github.com/facebookgo/grace v0.0.0-20160926231715-5729e484473f -github.com/facebookgo/grace/gracehttp -github.com/facebookgo/grace/gracenet -# github.com/facebookgo/httpdown v0.0.0-20160323221027-a3b1354551a2 -github.com/facebookgo/httpdown -# github.com/facebookgo/stats v0.0.0-20151006221625-1b76add642e4 -github.com/facebookgo/stats # github.com/fsnotify/fsnotify v1.4.7 github.com/fsnotify/fsnotify # github.com/gliderlabs/ssh v0.2.2 |