選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

workergroup.go 11KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. // Copyright 2023 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package queue
  4. import (
  5. "context"
  6. "runtime/pprof"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "code.gitea.io/gitea/modules/log"
  11. )
  12. var (
  13. infiniteTimerC = make(chan time.Time)
  14. batchDebounceDuration = 100 * time.Millisecond
  15. workerIdleDuration = 1 * time.Second
  16. shutdownDefaultTimeout = 2 * time.Second
  17. unhandledItemRequeueDuration atomic.Int64 // to avoid data race during test
  18. )
  19. func init() {
  20. unhandledItemRequeueDuration.Store(int64(5 * time.Second))
  21. }
  22. // workerGroup is a group of workers to work with a WorkerPoolQueue
  23. type workerGroup[T any] struct {
  24. q *WorkerPoolQueue[T]
  25. wg sync.WaitGroup
  26. ctxWorker context.Context
  27. ctxWorkerCancel context.CancelFunc
  28. batchBuffer []T
  29. popItemChan chan []byte
  30. popItemErr chan error
  31. }
  32. func (wg *workerGroup[T]) doPrepareWorkerContext() {
  33. wg.ctxWorker, wg.ctxWorkerCancel = context.WithCancel(wg.q.ctxRun)
  34. }
  35. // doDispatchBatchToWorker dispatches a batch of items to worker's channel.
  36. // If the channel is full, it tries to start a new worker if possible.
  37. func (q *WorkerPoolQueue[T]) doDispatchBatchToWorker(wg *workerGroup[T], flushChan chan flushType) {
  38. batch := wg.batchBuffer
  39. wg.batchBuffer = nil
  40. if len(batch) == 0 {
  41. return
  42. }
  43. full := false
  44. select {
  45. case q.batchChan <- batch:
  46. default:
  47. full = true
  48. }
  49. // TODO: the logic could be improved in the future, to avoid a data-race between "doStartNewWorker" and "workerNum"
  50. // The root problem is that if we skip "doStartNewWorker" here, the "workerNum" might be decreased by other workers later
  51. // So ideally, it should check whether there are enough workers by some approaches, and start new workers if necessary.
  52. // This data-race is not serious, as long as a new worker will be started soon to make sure there are enough workers,
  53. // so no need to hugely refactor at the moment.
  54. q.workerNumMu.Lock()
  55. noWorker := q.workerNum == 0
  56. if full || noWorker {
  57. if q.workerNum < q.workerMaxNum || noWorker && q.workerMaxNum <= 0 {
  58. q.workerNum++
  59. q.doStartNewWorker(wg)
  60. }
  61. }
  62. q.workerNumMu.Unlock()
  63. if full {
  64. select {
  65. case q.batchChan <- batch:
  66. case flush := <-flushChan:
  67. q.doWorkerHandle(batch)
  68. q.doFlush(wg, flush)
  69. case <-q.ctxRun.Done():
  70. wg.batchBuffer = batch // return the batch to buffer, the "doRun" function will handle it
  71. }
  72. }
  73. }
  74. // doWorkerHandle calls the safeHandler to handle a batch of items, and it increases/decreases the active worker number.
  75. // If the context has been canceled, it should not be caller because the "Push" still needs the context, in such case, call q.safeHandler directly
  76. func (q *WorkerPoolQueue[T]) doWorkerHandle(batch []T) {
  77. q.workerNumMu.Lock()
  78. q.workerActiveNum++
  79. q.workerNumMu.Unlock()
  80. defer func() {
  81. q.workerNumMu.Lock()
  82. q.workerActiveNum--
  83. q.workerNumMu.Unlock()
  84. }()
  85. unhandled := q.safeHandler(batch...)
  86. // if none of the items were handled, it should back-off for a few seconds
  87. // in this case the handler (eg: document indexer) may have encountered some errors/failures
  88. if len(unhandled) == len(batch) && unhandledItemRequeueDuration.Load() != 0 {
  89. log.Error("Queue %q failed to handle batch of %d items, backoff for a few seconds", q.GetName(), len(batch))
  90. select {
  91. case <-q.ctxRun.Done():
  92. case <-time.After(time.Duration(unhandledItemRequeueDuration.Load())):
  93. }
  94. }
  95. for _, item := range unhandled {
  96. if err := q.Push(item); err != nil {
  97. if !q.basePushForShutdown(item) {
  98. log.Error("Failed to requeue item for queue %q when calling handler: %v", q.GetName(), err)
  99. }
  100. }
  101. }
  102. }
  103. // basePushForShutdown tries to requeue items into the base queue when the WorkerPoolQueue is shutting down.
  104. // If the queue is shutting down, it returns true and try to push the items
  105. // Otherwise it does nothing and returns false
  106. func (q *WorkerPoolQueue[T]) basePushForShutdown(items ...T) bool {
  107. shutdownTimeout := time.Duration(q.shutdownTimeout.Load())
  108. if shutdownTimeout == 0 {
  109. return false
  110. }
  111. ctxShutdown, ctxShutdownCancel := context.WithTimeout(context.Background(), shutdownTimeout)
  112. defer ctxShutdownCancel()
  113. for _, item := range items {
  114. // if there is still any error, the queue can do nothing instead of losing the items
  115. if err := q.baseQueue.PushItem(ctxShutdown, q.marshal(item)); err != nil {
  116. log.Error("Failed to requeue item for queue %q when shutting down: %v", q.GetName(), err)
  117. }
  118. }
  119. return true
  120. }
  121. func resetIdleTicker(t *time.Ticker, dur time.Duration) {
  122. t.Reset(dur)
  123. select {
  124. case <-t.C:
  125. default:
  126. }
  127. }
  128. // doStartNewWorker starts a new worker for the queue, the worker reads from worker's channel and handles the items.
  129. func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) {
  130. wp.wg.Add(1)
  131. go func() {
  132. defer wp.wg.Done()
  133. log.Debug("Queue %q starts new worker", q.GetName())
  134. defer log.Debug("Queue %q stops idle worker", q.GetName())
  135. t := time.NewTicker(workerIdleDuration)
  136. defer t.Stop()
  137. keepWorking := true
  138. stopWorking := func() {
  139. q.workerNumMu.Lock()
  140. keepWorking = false
  141. q.workerNum--
  142. q.workerNumMu.Unlock()
  143. }
  144. for keepWorking {
  145. select {
  146. case <-wp.ctxWorker.Done():
  147. stopWorking()
  148. case batch, ok := <-q.batchChan:
  149. if !ok {
  150. stopWorking()
  151. continue
  152. }
  153. q.doWorkerHandle(batch)
  154. // reset the idle ticker, and drain the tick after reset in case a tick is already triggered
  155. resetIdleTicker(t, workerIdleDuration) // key code for TestWorkerPoolQueueWorkerIdleReset
  156. case <-t.C:
  157. q.workerNumMu.Lock()
  158. keepWorking = q.workerNum <= 1 // keep the last worker running
  159. if !keepWorking {
  160. q.workerNum--
  161. }
  162. q.workerNumMu.Unlock()
  163. }
  164. }
  165. }()
  166. }
  167. // doFlush flushes the queue: it tries to read all items from the queue and handles them.
  168. // It is for testing purpose only. It's not designed to work for a cluster.
  169. func (q *WorkerPoolQueue[T]) doFlush(wg *workerGroup[T], flush flushType) {
  170. log.Debug("Queue %q starts flushing", q.GetName())
  171. defer log.Debug("Queue %q finishes flushing", q.GetName())
  172. // stop all workers, and prepare a new worker context to start new workers
  173. wg.ctxWorkerCancel()
  174. wg.wg.Wait()
  175. defer func() {
  176. close(flush)
  177. wg.doPrepareWorkerContext()
  178. }()
  179. // drain the batch channel first
  180. loop:
  181. for {
  182. select {
  183. case batch := <-q.batchChan:
  184. q.doWorkerHandle(batch)
  185. default:
  186. break loop
  187. }
  188. }
  189. // drain the popItem channel
  190. emptyCounter := 0
  191. for {
  192. select {
  193. case data, dataOk := <-wg.popItemChan:
  194. if !dataOk {
  195. return
  196. }
  197. emptyCounter = 0
  198. if v, jsonOk := q.unmarshal(data); !jsonOk {
  199. continue
  200. } else {
  201. q.doWorkerHandle([]T{v})
  202. }
  203. case err := <-wg.popItemErr:
  204. if !q.isCtxRunCanceled() {
  205. log.Error("Failed to pop item from queue %q (doFlush): %v", q.GetName(), err)
  206. }
  207. return
  208. case <-q.ctxRun.Done():
  209. log.Debug("Queue %q is shutting down", q.GetName())
  210. return
  211. case <-time.After(20 * time.Millisecond):
  212. // There is no reliable way to make sure all queue items are consumed by the Flush, there always might be some items stored in some buffers/temp variables.
  213. // If we run Gitea in a cluster, we can even not guarantee all items are consumed in a deterministic instance.
  214. // Luckily, the "Flush" trick is only used in tests, so far so good.
  215. if cnt, _ := q.baseQueue.Len(q.ctxRun); cnt == 0 && len(wg.popItemChan) == 0 {
  216. emptyCounter++
  217. }
  218. if emptyCounter >= 2 {
  219. return
  220. }
  221. }
  222. }
  223. }
  224. func (q *WorkerPoolQueue[T]) isCtxRunCanceled() bool {
  225. select {
  226. case <-q.ctxRun.Done():
  227. return true
  228. default:
  229. return false
  230. }
  231. }
  232. var skipFlushChan = make(chan flushType) // an empty flush chan, used to skip reading other flush requests
  233. // doRun is the main loop of the queue. All related "doXxx" functions are executed in its context.
  234. func (q *WorkerPoolQueue[T]) doRun() {
  235. pprof.SetGoroutineLabels(q.ctxRun)
  236. log.Debug("Queue %q starts running", q.GetName())
  237. defer log.Debug("Queue %q stops running", q.GetName())
  238. wg := &workerGroup[T]{q: q}
  239. wg.doPrepareWorkerContext()
  240. wg.popItemChan, wg.popItemErr = popItemByChan(q.ctxRun, q.baseQueue.PopItem)
  241. defer func() {
  242. q.ctxRunCancel()
  243. // drain all data on the fly
  244. // since the queue is shutting down, the items can't be dispatched to workers because the context is canceled
  245. // it can't call doWorkerHandle either, because there is no chance to push unhandled items back to the queue
  246. var unhandled []T
  247. close(q.batchChan)
  248. for batch := range q.batchChan {
  249. unhandled = append(unhandled, batch...)
  250. }
  251. unhandled = append(unhandled, wg.batchBuffer...)
  252. for data := range wg.popItemChan {
  253. if v, ok := q.unmarshal(data); ok {
  254. unhandled = append(unhandled, v)
  255. }
  256. }
  257. shutdownTimeout := time.Duration(q.shutdownTimeout.Load())
  258. if shutdownTimeout != 0 {
  259. // if there is a shutdown context, try to push the items back to the base queue
  260. q.basePushForShutdown(unhandled...)
  261. workerDone := make(chan struct{})
  262. // the only way to wait for the workers, because the handlers do not have context to wait for
  263. go func() { wg.wg.Wait(); close(workerDone) }()
  264. select {
  265. case <-workerDone:
  266. case <-time.After(shutdownTimeout):
  267. log.Error("Queue %q is shutting down, but workers are still running after timeout", q.GetName())
  268. }
  269. } else {
  270. // if there is no shutdown context, just call the handler to try to handle the items. if the handler fails again, the items are lost
  271. q.safeHandler(unhandled...)
  272. }
  273. close(q.shutdownDone)
  274. }()
  275. var batchDispatchC <-chan time.Time = infiniteTimerC
  276. for {
  277. select {
  278. case data, dataOk := <-wg.popItemChan:
  279. if !dataOk {
  280. return
  281. }
  282. if v, jsonOk := q.unmarshal(data); !jsonOk {
  283. testRecorder.Record("pop:corrupted:%s", data) // in rare cases the levelqueue(leveldb) might be corrupted
  284. continue
  285. } else {
  286. wg.batchBuffer = append(wg.batchBuffer, v)
  287. }
  288. if len(wg.batchBuffer) >= q.batchLength {
  289. q.doDispatchBatchToWorker(wg, q.flushChan)
  290. } else if batchDispatchC == infiniteTimerC {
  291. batchDispatchC = time.After(batchDebounceDuration)
  292. } // else: batchDispatchC is already a debounce timer, it will be triggered soon
  293. case <-batchDispatchC:
  294. batchDispatchC = infiniteTimerC
  295. q.doDispatchBatchToWorker(wg, q.flushChan)
  296. case flush := <-q.flushChan:
  297. // before flushing, it needs to try to dispatch the batch to worker first, in case there is no worker running
  298. // after the flushing, there is at least one worker running, so "doFlush" could wait for workers to finish
  299. // since we are already in a "flush" operation, so the dispatching function shouldn't read the flush chan.
  300. q.doDispatchBatchToWorker(wg, skipFlushChan)
  301. q.doFlush(wg, flush)
  302. case err := <-wg.popItemErr:
  303. if !q.isCtxRunCanceled() {
  304. log.Error("Failed to pop item from queue %q (doRun): %v", q.GetName(), err)
  305. }
  306. return
  307. case <-q.ctxRun.Done():
  308. log.Debug("Queue %q is shutting down", q.GetName())
  309. return
  310. }
  311. }
  312. }