You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

server.go 7.0KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. // +build !windows
  2. // Copyright 2019 The Gitea Authors. All rights reserved.
  3. // Use of this source code is governed by a MIT-style
  4. // license that can be found in the LICENSE file.
  5. // This code is highly inspired by endless go
  6. package graceful
  7. import (
  8. "crypto/tls"
  9. "net"
  10. "os"
  11. "strings"
  12. "sync"
  13. "syscall"
  14. "time"
  15. "code.gitea.io/gitea/modules/log"
  16. )
  17. type state uint8
  18. const (
  19. stateInit state = iota
  20. stateRunning
  21. stateShuttingDown
  22. stateTerminate
  23. )
  24. var (
  25. // RWMutex for when adding servers or shutting down
  26. runningServerReg sync.RWMutex
  27. runningServerWG sync.WaitGroup
  28. // ensure we only fork once
  29. runningServersForked bool
  30. // DefaultReadTimeOut default read timeout
  31. DefaultReadTimeOut time.Duration
  32. // DefaultWriteTimeOut default write timeout
  33. DefaultWriteTimeOut time.Duration
  34. // DefaultMaxHeaderBytes default max header bytes
  35. DefaultMaxHeaderBytes int
  36. // IsChild reports if we are a fork iff LISTEN_FDS is set and our parent PID is not 1
  37. IsChild = len(os.Getenv(listenFDs)) > 0 && os.Getppid() > 1
  38. )
  39. func init() {
  40. runningServerReg = sync.RWMutex{}
  41. runningServerWG = sync.WaitGroup{}
  42. DefaultMaxHeaderBytes = 0 // use http.DefaultMaxHeaderBytes - which currently is 1 << 20 (1MB)
  43. }
  44. // ServeFunction represents a listen.Accept loop
  45. type ServeFunction = func(net.Listener) error
  46. // Server represents our graceful server
  47. type Server struct {
  48. network string
  49. address string
  50. listener net.Listener
  51. PreSignalHooks map[os.Signal][]func()
  52. PostSignalHooks map[os.Signal][]func()
  53. wg sync.WaitGroup
  54. sigChan chan os.Signal
  55. state state
  56. lock *sync.RWMutex
  57. BeforeBegin func(network, address string)
  58. OnShutdown func()
  59. }
  60. // WaitForServers waits for all running servers to finish
  61. func WaitForServers() {
  62. runningServerWG.Wait()
  63. }
  64. // NewServer creates a server on network at provided address
  65. func NewServer(network, address string) *Server {
  66. runningServerReg.Lock()
  67. defer runningServerReg.Unlock()
  68. if IsChild {
  69. log.Info("Restarting new server: %s:%s on PID: %d", network, address, os.Getpid())
  70. } else {
  71. log.Info("Starting new server: %s:%s on PID: %d", network, address, os.Getpid())
  72. }
  73. srv := &Server{
  74. wg: sync.WaitGroup{},
  75. sigChan: make(chan os.Signal),
  76. PreSignalHooks: map[os.Signal][]func(){},
  77. PostSignalHooks: map[os.Signal][]func(){},
  78. state: stateInit,
  79. lock: &sync.RWMutex{},
  80. network: network,
  81. address: address,
  82. }
  83. srv.BeforeBegin = func(network, addr string) {
  84. log.Debug("Starting server on %s:%s (PID: %d)", network, addr, syscall.Getpid())
  85. }
  86. return srv
  87. }
  88. // ListenAndServe listens on the provided network address and then calls Serve
  89. // to handle requests on incoming connections.
  90. func (srv *Server) ListenAndServe(serve ServeFunction) error {
  91. go srv.handleSignals()
  92. l, err := GetListener(srv.network, srv.address)
  93. if err != nil {
  94. log.Error("Unable to GetListener: %v", err)
  95. return err
  96. }
  97. srv.listener = newWrappedListener(l, srv)
  98. KillParent()
  99. srv.BeforeBegin(srv.network, srv.address)
  100. return srv.Serve(serve)
  101. }
  102. // ListenAndServeTLS listens on the provided network address and then calls
  103. // Serve to handle requests on incoming TLS connections.
  104. //
  105. // Filenames containing a certificate and matching private key for the server must
  106. // be provided. If the certificate is signed by a certificate authority, the
  107. // certFile should be the concatenation of the server's certificate followed by the
  108. // CA's certificate.
  109. func (srv *Server) ListenAndServeTLS(certFile, keyFile string, serve ServeFunction) error {
  110. config := &tls.Config{}
  111. if config.NextProtos == nil {
  112. config.NextProtos = []string{"http/1.1"}
  113. }
  114. config.Certificates = make([]tls.Certificate, 1)
  115. var err error
  116. config.Certificates[0], err = tls.LoadX509KeyPair(certFile, keyFile)
  117. if err != nil {
  118. log.Error("Failed to load https cert file %s for %s:%s: %v", certFile, srv.network, srv.address, err)
  119. return err
  120. }
  121. return srv.ListenAndServeTLSConfig(config, serve)
  122. }
  123. // ListenAndServeTLSConfig listens on the provided network address and then calls
  124. // Serve to handle requests on incoming TLS connections.
  125. func (srv *Server) ListenAndServeTLSConfig(tlsConfig *tls.Config, serve ServeFunction) error {
  126. go srv.handleSignals()
  127. l, err := GetListener(srv.network, srv.address)
  128. if err != nil {
  129. log.Error("Unable to get Listener: %v", err)
  130. return err
  131. }
  132. wl := newWrappedListener(l, srv)
  133. srv.listener = tls.NewListener(wl, tlsConfig)
  134. KillParent()
  135. srv.BeforeBegin(srv.network, srv.address)
  136. return srv.Serve(serve)
  137. }
  138. // Serve accepts incoming HTTP connections on the wrapped listener l, creating a new
  139. // service goroutine for each. The service goroutines read requests and then call
  140. // handler to reply to them. Handler is typically nil, in which case the
  141. // DefaultServeMux is used.
  142. //
  143. // In addition to the standard Serve behaviour each connection is added to a
  144. // sync.Waitgroup so that all outstanding connections can be served before shutting
  145. // down the server.
  146. func (srv *Server) Serve(serve ServeFunction) error {
  147. defer log.Debug("Serve() returning... (PID: %d)", syscall.Getpid())
  148. srv.setState(stateRunning)
  149. runningServerWG.Add(1)
  150. err := serve(srv.listener)
  151. log.Debug("Waiting for connections to finish... (PID: %d)", syscall.Getpid())
  152. srv.wg.Wait()
  153. srv.setState(stateTerminate)
  154. runningServerWG.Done()
  155. // use of closed means that the listeners are closed - i.e. we should be shutting down - return nil
  156. if err != nil && strings.Contains(err.Error(), "use of closed") {
  157. return nil
  158. }
  159. return err
  160. }
  161. func (srv *Server) getState() state {
  162. srv.lock.RLock()
  163. defer srv.lock.RUnlock()
  164. return srv.state
  165. }
  166. func (srv *Server) setState(st state) {
  167. srv.lock.Lock()
  168. defer srv.lock.Unlock()
  169. srv.state = st
  170. }
  171. type wrappedListener struct {
  172. net.Listener
  173. stopped bool
  174. server *Server
  175. }
  176. func newWrappedListener(l net.Listener, srv *Server) *wrappedListener {
  177. return &wrappedListener{
  178. Listener: l,
  179. server: srv,
  180. }
  181. }
  182. func (wl *wrappedListener) Accept() (net.Conn, error) {
  183. var c net.Conn
  184. // Set keepalive on TCPListeners connections.
  185. if tcl, ok := wl.Listener.(*net.TCPListener); ok {
  186. tc, err := tcl.AcceptTCP()
  187. if err != nil {
  188. return nil, err
  189. }
  190. _ = tc.SetKeepAlive(true) // see http.tcpKeepAliveListener
  191. _ = tc.SetKeepAlivePeriod(3 * time.Minute) // see http.tcpKeepAliveListener
  192. c = tc
  193. } else {
  194. var err error
  195. c, err = wl.Listener.Accept()
  196. if err != nil {
  197. return nil, err
  198. }
  199. }
  200. c = wrappedConn{
  201. Conn: c,
  202. server: wl.server,
  203. }
  204. wl.server.wg.Add(1)
  205. return c, nil
  206. }
  207. func (wl *wrappedListener) Close() error {
  208. if wl.stopped {
  209. return syscall.EINVAL
  210. }
  211. wl.stopped = true
  212. return wl.Listener.Close()
  213. }
  214. func (wl *wrappedListener) File() (*os.File, error) {
  215. // returns a dup(2) - FD_CLOEXEC flag *not* set so the listening socket can be passed to child processes
  216. return wl.Listener.(filer).File()
  217. }
  218. type wrappedConn struct {
  219. net.Conn
  220. server *Server
  221. }
  222. func (w wrappedConn) Close() error {
  223. err := w.Conn.Close()
  224. if err == nil {
  225. w.server.wg.Done()
  226. }
  227. return err
  228. }