You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

server.go 8.0KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. // This code is highly inspired by endless go
  4. package graceful
  5. import (
  6. "crypto/tls"
  7. "net"
  8. "os"
  9. "strings"
  10. "sync"
  11. "sync/atomic"
  12. "syscall"
  13. "time"
  14. "code.gitea.io/gitea/modules/log"
  15. "code.gitea.io/gitea/modules/proxyprotocol"
  16. "code.gitea.io/gitea/modules/setting"
  17. )
  18. // GetListener returns a net listener
  19. // This determines the implementation of net.Listener which the server will use,
  20. // so that downstreams could provide their own Listener, such as with a hidden service or a p2p network
  21. var GetListener = DefaultGetListener
  22. // ServeFunction represents a listen.Accept loop
  23. type ServeFunction = func(net.Listener) error
  24. // Server represents our graceful server
  25. type Server struct {
  26. network string
  27. address string
  28. listener net.Listener
  29. wg sync.WaitGroup
  30. state state
  31. lock *sync.RWMutex
  32. BeforeBegin func(network, address string)
  33. OnShutdown func()
  34. PerWriteTimeout time.Duration
  35. PerWritePerKbTimeout time.Duration
  36. }
  37. // NewServer creates a server on network at provided address
  38. func NewServer(network, address, name string) *Server {
  39. if GetManager().IsChild() {
  40. log.Info("Restarting new %s server: %s:%s on PID: %d", name, network, address, os.Getpid())
  41. } else {
  42. log.Info("Starting new %s server: %s:%s on PID: %d", name, network, address, os.Getpid())
  43. }
  44. srv := &Server{
  45. wg: sync.WaitGroup{},
  46. state: stateInit,
  47. lock: &sync.RWMutex{},
  48. network: network,
  49. address: address,
  50. PerWriteTimeout: setting.PerWriteTimeout,
  51. PerWritePerKbTimeout: setting.PerWritePerKbTimeout,
  52. }
  53. srv.BeforeBegin = func(network, addr string) {
  54. log.Debug("Starting server on %s:%s (PID: %d)", network, addr, syscall.Getpid())
  55. }
  56. return srv
  57. }
  58. // ListenAndServe listens on the provided network address and then calls Serve
  59. // to handle requests on incoming connections.
  60. func (srv *Server) ListenAndServe(serve ServeFunction, useProxyProtocol bool) error {
  61. go srv.awaitShutdown()
  62. listener, err := GetListener(srv.network, srv.address)
  63. if err != nil {
  64. log.Error("Unable to GetListener: %v", err)
  65. return err
  66. }
  67. // we need to wrap the listener to take account of our lifecycle
  68. listener = newWrappedListener(listener, srv)
  69. // Now we need to take account of ProxyProtocol settings...
  70. if useProxyProtocol {
  71. listener = &proxyprotocol.Listener{
  72. Listener: listener,
  73. ProxyHeaderTimeout: setting.ProxyProtocolHeaderTimeout,
  74. AcceptUnknown: setting.ProxyProtocolAcceptUnknown,
  75. }
  76. }
  77. srv.listener = listener
  78. srv.BeforeBegin(srv.network, srv.address)
  79. return srv.Serve(serve)
  80. }
  81. // ListenAndServeTLSConfig listens on the provided network address and then calls
  82. // Serve to handle requests on incoming TLS connections.
  83. func (srv *Server) ListenAndServeTLSConfig(tlsConfig *tls.Config, serve ServeFunction, useProxyProtocol, proxyProtocolTLSBridging bool) error {
  84. go srv.awaitShutdown()
  85. if tlsConfig.MinVersion == 0 {
  86. tlsConfig.MinVersion = tls.VersionTLS12
  87. }
  88. listener, err := GetListener(srv.network, srv.address)
  89. if err != nil {
  90. log.Error("Unable to get Listener: %v", err)
  91. return err
  92. }
  93. // we need to wrap the listener to take account of our lifecycle
  94. listener = newWrappedListener(listener, srv)
  95. // Now we need to take account of ProxyProtocol settings... If we're not bridging then we expect that the proxy will forward the connection to us
  96. if useProxyProtocol && !proxyProtocolTLSBridging {
  97. listener = &proxyprotocol.Listener{
  98. Listener: listener,
  99. ProxyHeaderTimeout: setting.ProxyProtocolHeaderTimeout,
  100. AcceptUnknown: setting.ProxyProtocolAcceptUnknown,
  101. }
  102. }
  103. // Now handle the tls protocol
  104. listener = tls.NewListener(listener, tlsConfig)
  105. // Now if we're bridging then we need the proxy to tell us who we're bridging for...
  106. if useProxyProtocol && proxyProtocolTLSBridging {
  107. listener = &proxyprotocol.Listener{
  108. Listener: listener,
  109. ProxyHeaderTimeout: setting.ProxyProtocolHeaderTimeout,
  110. AcceptUnknown: setting.ProxyProtocolAcceptUnknown,
  111. }
  112. }
  113. srv.listener = listener
  114. srv.BeforeBegin(srv.network, srv.address)
  115. return srv.Serve(serve)
  116. }
  117. // Serve accepts incoming HTTP connections on the wrapped listener l, creating a new
  118. // service goroutine for each. The service goroutines read requests and then call
  119. // handler to reply to them. Handler is typically nil, in which case the
  120. // DefaultServeMux is used.
  121. //
  122. // In addition to the standard Serve behaviour each connection is added to a
  123. // sync.Waitgroup so that all outstanding connections can be served before shutting
  124. // down the server.
  125. func (srv *Server) Serve(serve ServeFunction) error {
  126. defer log.Debug("Serve() returning... (PID: %d)", syscall.Getpid())
  127. srv.setState(stateRunning)
  128. GetManager().RegisterServer()
  129. err := serve(srv.listener)
  130. log.Debug("Waiting for connections to finish... (PID: %d)", syscall.Getpid())
  131. srv.wg.Wait()
  132. srv.setState(stateTerminate)
  133. GetManager().ServerDone()
  134. // use of closed means that the listeners are closed - i.e. we should be shutting down - return nil
  135. if err == nil || strings.Contains(err.Error(), "use of closed") || strings.Contains(err.Error(), "http: Server closed") {
  136. return nil
  137. }
  138. return err
  139. }
  140. func (srv *Server) getState() state {
  141. srv.lock.RLock()
  142. defer srv.lock.RUnlock()
  143. return srv.state
  144. }
  145. func (srv *Server) setState(st state) {
  146. srv.lock.Lock()
  147. defer srv.lock.Unlock()
  148. srv.state = st
  149. }
  150. type filer interface {
  151. File() (*os.File, error)
  152. }
  153. type wrappedListener struct {
  154. net.Listener
  155. stopped bool
  156. server *Server
  157. }
  158. func newWrappedListener(l net.Listener, srv *Server) *wrappedListener {
  159. return &wrappedListener{
  160. Listener: l,
  161. server: srv,
  162. }
  163. }
  164. func (wl *wrappedListener) Accept() (net.Conn, error) {
  165. var c net.Conn
  166. // Set keepalive on TCPListeners connections.
  167. if tcl, ok := wl.Listener.(*net.TCPListener); ok {
  168. tc, err := tcl.AcceptTCP()
  169. if err != nil {
  170. return nil, err
  171. }
  172. _ = tc.SetKeepAlive(true) // see http.tcpKeepAliveListener
  173. _ = tc.SetKeepAlivePeriod(3 * time.Minute) // see http.tcpKeepAliveListener
  174. c = tc
  175. } else {
  176. var err error
  177. c, err = wl.Listener.Accept()
  178. if err != nil {
  179. return nil, err
  180. }
  181. }
  182. closed := int32(0)
  183. c = &wrappedConn{
  184. Conn: c,
  185. server: wl.server,
  186. closed: &closed,
  187. perWriteTimeout: wl.server.PerWriteTimeout,
  188. perWritePerKbTimeout: wl.server.PerWritePerKbTimeout,
  189. }
  190. wl.server.wg.Add(1)
  191. return c, nil
  192. }
  193. func (wl *wrappedListener) Close() error {
  194. if wl.stopped {
  195. return syscall.EINVAL
  196. }
  197. wl.stopped = true
  198. return wl.Listener.Close()
  199. }
  200. func (wl *wrappedListener) File() (*os.File, error) {
  201. // returns a dup(2) - FD_CLOEXEC flag *not* set so the listening socket can be passed to child processes
  202. return wl.Listener.(filer).File()
  203. }
  204. type wrappedConn struct {
  205. net.Conn
  206. server *Server
  207. closed *int32
  208. deadline time.Time
  209. perWriteTimeout time.Duration
  210. perWritePerKbTimeout time.Duration
  211. }
  212. func (w *wrappedConn) Write(p []byte) (n int, err error) {
  213. if w.perWriteTimeout > 0 {
  214. minTimeout := time.Duration(len(p)/1024) * w.perWritePerKbTimeout
  215. minDeadline := time.Now().Add(minTimeout).Add(w.perWriteTimeout)
  216. w.deadline = w.deadline.Add(minTimeout)
  217. if minDeadline.After(w.deadline) {
  218. w.deadline = minDeadline
  219. }
  220. _ = w.Conn.SetWriteDeadline(w.deadline)
  221. }
  222. return w.Conn.Write(p)
  223. }
  224. func (w *wrappedConn) Close() error {
  225. if atomic.CompareAndSwapInt32(w.closed, 0, 1) {
  226. defer func() {
  227. if err := recover(); err != nil {
  228. select {
  229. case <-GetManager().IsHammer():
  230. // Likely deadlocked request released at hammertime
  231. log.Warn("Panic during connection close! %v. Likely there has been a deadlocked request which has been released by forced shutdown.", err)
  232. default:
  233. log.Error("Panic during connection close! %v", err)
  234. }
  235. }
  236. }()
  237. w.server.wg.Done()
  238. }
  239. return w.Conn.Close()
  240. }