You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

server.go 7.0KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. // This code is highly inspired by endless go
  5. package graceful
  6. import (
  7. "crypto/tls"
  8. "net"
  9. "os"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "syscall"
  14. "time"
  15. "code.gitea.io/gitea/modules/log"
  16. "code.gitea.io/gitea/modules/setting"
  17. )
  18. var (
  19. // DefaultReadTimeOut default read timeout
  20. DefaultReadTimeOut time.Duration
  21. // DefaultWriteTimeOut default write timeout
  22. DefaultWriteTimeOut time.Duration
  23. // DefaultMaxHeaderBytes default max header bytes
  24. DefaultMaxHeaderBytes int
  25. // PerWriteWriteTimeout timeout for writes
  26. PerWriteWriteTimeout = 30 * time.Second
  27. // PerWriteWriteTimeoutKbTime is a timeout taking account of how much there is to be written
  28. PerWriteWriteTimeoutKbTime = 10 * time.Second
  29. )
  30. func init() {
  31. DefaultMaxHeaderBytes = 0 // use http.DefaultMaxHeaderBytes - which currently is 1 << 20 (1MB)
  32. }
  33. // ServeFunction represents a listen.Accept loop
  34. type ServeFunction = func(net.Listener) error
  35. // Server represents our graceful server
  36. type Server struct {
  37. network string
  38. address string
  39. listener net.Listener
  40. wg sync.WaitGroup
  41. state state
  42. lock *sync.RWMutex
  43. BeforeBegin func(network, address string)
  44. OnShutdown func()
  45. PerWriteTimeout time.Duration
  46. PerWritePerKbTimeout time.Duration
  47. }
  48. // NewServer creates a server on network at provided address
  49. func NewServer(network, address, name string) *Server {
  50. if GetManager().IsChild() {
  51. log.Info("Restarting new %s server: %s:%s on PID: %d", name, network, address, os.Getpid())
  52. } else {
  53. log.Info("Starting new %s server: %s:%s on PID: %d", name, network, address, os.Getpid())
  54. }
  55. srv := &Server{
  56. wg: sync.WaitGroup{},
  57. state: stateInit,
  58. lock: &sync.RWMutex{},
  59. network: network,
  60. address: address,
  61. PerWriteTimeout: setting.PerWriteTimeout,
  62. PerWritePerKbTimeout: setting.PerWritePerKbTimeout,
  63. }
  64. srv.BeforeBegin = func(network, addr string) {
  65. log.Debug("Starting server on %s:%s (PID: %d)", network, addr, syscall.Getpid())
  66. }
  67. return srv
  68. }
  69. // ListenAndServe listens on the provided network address and then calls Serve
  70. // to handle requests on incoming connections.
  71. func (srv *Server) ListenAndServe(serve ServeFunction) error {
  72. go srv.awaitShutdown()
  73. l, err := GetListener(srv.network, srv.address)
  74. if err != nil {
  75. log.Error("Unable to GetListener: %v", err)
  76. return err
  77. }
  78. srv.listener = newWrappedListener(l, srv)
  79. srv.BeforeBegin(srv.network, srv.address)
  80. return srv.Serve(serve)
  81. }
  82. // ListenAndServeTLSConfig listens on the provided network address and then calls
  83. // Serve to handle requests on incoming TLS connections.
  84. func (srv *Server) ListenAndServeTLSConfig(tlsConfig *tls.Config, serve ServeFunction) error {
  85. go srv.awaitShutdown()
  86. if tlsConfig.MinVersion == 0 {
  87. tlsConfig.MinVersion = tls.VersionTLS12
  88. }
  89. l, err := GetListener(srv.network, srv.address)
  90. if err != nil {
  91. log.Error("Unable to get Listener: %v", err)
  92. return err
  93. }
  94. wl := newWrappedListener(l, srv)
  95. srv.listener = tls.NewListener(wl, tlsConfig)
  96. srv.BeforeBegin(srv.network, srv.address)
  97. return srv.Serve(serve)
  98. }
  99. // Serve accepts incoming HTTP connections on the wrapped listener l, creating a new
  100. // service goroutine for each. The service goroutines read requests and then call
  101. // handler to reply to them. Handler is typically nil, in which case the
  102. // DefaultServeMux is used.
  103. //
  104. // In addition to the standard Serve behaviour each connection is added to a
  105. // sync.Waitgroup so that all outstanding connections can be served before shutting
  106. // down the server.
  107. func (srv *Server) Serve(serve ServeFunction) error {
  108. defer log.Debug("Serve() returning... (PID: %d)", syscall.Getpid())
  109. srv.setState(stateRunning)
  110. GetManager().RegisterServer()
  111. err := serve(srv.listener)
  112. log.Debug("Waiting for connections to finish... (PID: %d)", syscall.Getpid())
  113. srv.wg.Wait()
  114. srv.setState(stateTerminate)
  115. GetManager().ServerDone()
  116. // use of closed means that the listeners are closed - i.e. we should be shutting down - return nil
  117. if err == nil || strings.Contains(err.Error(), "use of closed") || strings.Contains(err.Error(), "http: Server closed") {
  118. return nil
  119. }
  120. return err
  121. }
  122. func (srv *Server) getState() state {
  123. srv.lock.RLock()
  124. defer srv.lock.RUnlock()
  125. return srv.state
  126. }
  127. func (srv *Server) setState(st state) {
  128. srv.lock.Lock()
  129. defer srv.lock.Unlock()
  130. srv.state = st
  131. }
  132. type filer interface {
  133. File() (*os.File, error)
  134. }
  135. type wrappedListener struct {
  136. net.Listener
  137. stopped bool
  138. server *Server
  139. }
  140. func newWrappedListener(l net.Listener, srv *Server) *wrappedListener {
  141. return &wrappedListener{
  142. Listener: l,
  143. server: srv,
  144. }
  145. }
  146. func (wl *wrappedListener) Accept() (net.Conn, error) {
  147. var c net.Conn
  148. // Set keepalive on TCPListeners connections.
  149. if tcl, ok := wl.Listener.(*net.TCPListener); ok {
  150. tc, err := tcl.AcceptTCP()
  151. if err != nil {
  152. return nil, err
  153. }
  154. _ = tc.SetKeepAlive(true) // see http.tcpKeepAliveListener
  155. _ = tc.SetKeepAlivePeriod(3 * time.Minute) // see http.tcpKeepAliveListener
  156. c = tc
  157. } else {
  158. var err error
  159. c, err = wl.Listener.Accept()
  160. if err != nil {
  161. return nil, err
  162. }
  163. }
  164. closed := int32(0)
  165. c = &wrappedConn{
  166. Conn: c,
  167. server: wl.server,
  168. closed: &closed,
  169. perWriteTimeout: wl.server.PerWriteTimeout,
  170. perWritePerKbTimeout: wl.server.PerWritePerKbTimeout,
  171. }
  172. wl.server.wg.Add(1)
  173. return c, nil
  174. }
  175. func (wl *wrappedListener) Close() error {
  176. if wl.stopped {
  177. return syscall.EINVAL
  178. }
  179. wl.stopped = true
  180. return wl.Listener.Close()
  181. }
  182. func (wl *wrappedListener) File() (*os.File, error) {
  183. // returns a dup(2) - FD_CLOEXEC flag *not* set so the listening socket can be passed to child processes
  184. return wl.Listener.(filer).File()
  185. }
  186. type wrappedConn struct {
  187. net.Conn
  188. server *Server
  189. closed *int32
  190. deadline time.Time
  191. perWriteTimeout time.Duration
  192. perWritePerKbTimeout time.Duration
  193. }
  194. func (w *wrappedConn) Write(p []byte) (n int, err error) {
  195. if w.perWriteTimeout > 0 {
  196. minTimeout := time.Duration(len(p)/1024) * w.perWritePerKbTimeout
  197. minDeadline := time.Now().Add(minTimeout).Add(w.perWriteTimeout)
  198. w.deadline = w.deadline.Add(minTimeout)
  199. if minDeadline.After(w.deadline) {
  200. w.deadline = minDeadline
  201. }
  202. _ = w.Conn.SetWriteDeadline(w.deadline)
  203. }
  204. return w.Conn.Write(p)
  205. }
  206. func (w *wrappedConn) Close() error {
  207. if atomic.CompareAndSwapInt32(w.closed, 0, 1) {
  208. defer func() {
  209. if err := recover(); err != nil {
  210. select {
  211. case <-GetManager().IsHammer():
  212. // Likely deadlocked request released at hammertime
  213. log.Warn("Panic during connection close! %v. Likely there has been a deadlocked request which has been released by forced shutdown.", err)
  214. default:
  215. log.Error("Panic during connection close! %v", err)
  216. }
  217. }
  218. }()
  219. w.server.wg.Done()
  220. }
  221. return w.Conn.Close()
  222. }