選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

workerqueue.go 7.0KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. // Copyright 2023 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package queue
  4. import (
  5. "context"
  6. "fmt"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "code.gitea.io/gitea/modules/json"
  11. "code.gitea.io/gitea/modules/log"
  12. "code.gitea.io/gitea/modules/process"
  13. "code.gitea.io/gitea/modules/setting"
  14. )
  15. // WorkerPoolQueue is a queue that uses a pool of workers to process items
  16. // It can use different underlying (base) queue types
  17. type WorkerPoolQueue[T any] struct {
  18. ctxRun context.Context
  19. ctxRunCancel context.CancelFunc
  20. shutdownDone chan struct{}
  21. shutdownTimeout atomic.Int64 // in case some buggy handlers (workers) would hang forever, "shutdown" should finish in predictable time
  22. origHandler HandlerFuncT[T]
  23. safeHandler HandlerFuncT[T]
  24. baseQueueType string
  25. baseConfig *BaseConfig
  26. baseQueue baseQueue
  27. batchChan chan []T
  28. flushChan chan flushType
  29. batchLength int
  30. workerNum int
  31. workerMaxNum int
  32. workerActiveNum int
  33. workerNumMu sync.Mutex
  34. }
  35. type flushType chan struct{}
  36. var _ ManagedWorkerPoolQueue = (*WorkerPoolQueue[any])(nil)
  37. func (q *WorkerPoolQueue[T]) GetName() string {
  38. return q.baseConfig.ManagedName
  39. }
  40. func (q *WorkerPoolQueue[T]) GetType() string {
  41. return q.baseQueueType
  42. }
  43. func (q *WorkerPoolQueue[T]) GetItemTypeName() string {
  44. var t T
  45. return fmt.Sprintf("%T", t)
  46. }
  47. func (q *WorkerPoolQueue[T]) GetWorkerNumber() int {
  48. q.workerNumMu.Lock()
  49. defer q.workerNumMu.Unlock()
  50. return q.workerNum
  51. }
  52. func (q *WorkerPoolQueue[T]) GetWorkerActiveNumber() int {
  53. q.workerNumMu.Lock()
  54. defer q.workerNumMu.Unlock()
  55. return q.workerActiveNum
  56. }
  57. func (q *WorkerPoolQueue[T]) GetWorkerMaxNumber() int {
  58. q.workerNumMu.Lock()
  59. defer q.workerNumMu.Unlock()
  60. return q.workerMaxNum
  61. }
  62. func (q *WorkerPoolQueue[T]) SetWorkerMaxNumber(num int) {
  63. q.workerNumMu.Lock()
  64. defer q.workerNumMu.Unlock()
  65. q.workerMaxNum = num
  66. }
  67. func (q *WorkerPoolQueue[T]) GetQueueItemNumber() int {
  68. cnt, err := q.baseQueue.Len(q.ctxRun)
  69. if err != nil {
  70. log.Error("Failed to get number of items in queue %q: %v", q.GetName(), err)
  71. }
  72. return cnt
  73. }
  74. func (q *WorkerPoolQueue[T]) FlushWithContext(ctx context.Context, timeout time.Duration) (err error) {
  75. if q.isBaseQueueDummy() {
  76. return nil
  77. }
  78. log.Debug("Try to flush queue %q with timeout %v", q.GetName(), timeout)
  79. defer log.Debug("Finish flushing queue %q, err: %v", q.GetName(), err)
  80. var after <-chan time.Time
  81. after = infiniteTimerC
  82. if timeout > 0 {
  83. after = time.After(timeout)
  84. }
  85. c := make(flushType)
  86. // send flush request
  87. // if it blocks, it means that there is a flush in progress or the queue hasn't been started yet
  88. select {
  89. case q.flushChan <- c:
  90. case <-ctx.Done():
  91. return ctx.Err()
  92. case <-q.ctxRun.Done():
  93. return q.ctxRun.Err()
  94. case <-after:
  95. return context.DeadlineExceeded
  96. }
  97. // wait for flush to finish
  98. select {
  99. case <-c:
  100. return nil
  101. case <-ctx.Done():
  102. return ctx.Err()
  103. case <-q.ctxRun.Done():
  104. return q.ctxRun.Err()
  105. case <-after:
  106. return context.DeadlineExceeded
  107. }
  108. }
  109. // RemoveAllItems removes all items in the baes queue
  110. func (q *WorkerPoolQueue[T]) RemoveAllItems(ctx context.Context) error {
  111. return q.baseQueue.RemoveAll(ctx)
  112. }
  113. func (q *WorkerPoolQueue[T]) marshal(data T) []byte {
  114. bs, err := json.Marshal(data)
  115. if err != nil {
  116. log.Error("Failed to marshal item for queue %q: %v", q.GetName(), err)
  117. return nil
  118. }
  119. return bs
  120. }
  121. func (q *WorkerPoolQueue[T]) unmarshal(data []byte) (t T, ok bool) {
  122. if err := json.Unmarshal(data, &t); err != nil {
  123. log.Error("Failed to unmarshal item from queue %q: %v", q.GetName(), err)
  124. return t, false
  125. }
  126. return t, true
  127. }
  128. func (q *WorkerPoolQueue[T]) isBaseQueueDummy() bool {
  129. _, isDummy := q.baseQueue.(*baseDummy)
  130. return isDummy
  131. }
  132. // Push adds an item to the queue, it may block for a while and then returns an error if the queue is full
  133. func (q *WorkerPoolQueue[T]) Push(data T) error {
  134. if q.isBaseQueueDummy() && q.safeHandler != nil {
  135. // FIXME: the "immediate" queue is only for testing, but it really causes problems because its behavior is different from a real queue.
  136. // Even if tests pass, it doesn't mean that there is no bug in code.
  137. if data, ok := q.unmarshal(q.marshal(data)); ok {
  138. q.safeHandler(data)
  139. }
  140. }
  141. return q.baseQueue.PushItem(q.ctxRun, q.marshal(data))
  142. }
  143. // Has only works for unique queues. Keep in mind that this check may not be reliable (due to lacking of proper transaction support)
  144. // There could be a small chance that duplicate items appear in the queue
  145. func (q *WorkerPoolQueue[T]) Has(data T) (bool, error) {
  146. return q.baseQueue.HasItem(q.ctxRun, q.marshal(data))
  147. }
  148. func (q *WorkerPoolQueue[T]) Run() {
  149. q.doRun()
  150. }
  151. func (q *WorkerPoolQueue[T]) Cancel() {
  152. q.ctxRunCancel()
  153. }
  154. // ShutdownWait shuts down the queue, waits for all workers to finish their jobs, and pushes the unhandled items back to the base queue
  155. // It waits for all workers (handlers) to finish their jobs, in case some buggy handlers would hang forever, a reasonable timeout is needed
  156. func (q *WorkerPoolQueue[T]) ShutdownWait(timeout time.Duration) {
  157. q.shutdownTimeout.Store(int64(timeout))
  158. q.ctxRunCancel()
  159. <-q.shutdownDone
  160. }
  161. func getNewQueueFn(t string) (string, func(cfg *BaseConfig, unique bool) (baseQueue, error)) {
  162. switch t {
  163. case "dummy", "immediate":
  164. return t, newBaseDummy
  165. case "channel":
  166. return t, newBaseChannelGeneric
  167. case "redis":
  168. return t, newBaseRedisGeneric
  169. default: // level(leveldb,levelqueue,persistable-channel)
  170. return "level", newBaseLevelQueueGeneric
  171. }
  172. }
  173. func newWorkerPoolQueueForTest[T any](name string, queueSetting setting.QueueSettings, handler HandlerFuncT[T], unique bool) (*WorkerPoolQueue[T], error) {
  174. return NewWorkerPoolQueueWithContext(context.Background(), name, queueSetting, handler, unique)
  175. }
  176. func NewWorkerPoolQueueWithContext[T any](ctx context.Context, name string, queueSetting setting.QueueSettings, handler HandlerFuncT[T], unique bool) (*WorkerPoolQueue[T], error) {
  177. if handler == nil {
  178. log.Debug("Use dummy queue for %q because handler is nil and caller doesn't want to process the queue items", name)
  179. queueSetting.Type = "dummy"
  180. }
  181. var w WorkerPoolQueue[T]
  182. var err error
  183. queueType, newQueueFn := getNewQueueFn(queueSetting.Type)
  184. w.baseQueueType = queueType
  185. w.baseConfig = toBaseConfig(name, queueSetting)
  186. w.baseQueue, err = newQueueFn(w.baseConfig, unique)
  187. if err != nil {
  188. return nil, err
  189. }
  190. log.Trace("Created queue %q of type %q", name, queueType)
  191. w.ctxRun, _, w.ctxRunCancel = process.GetManager().AddTypedContext(ctx, "Queue: "+w.GetName(), process.SystemProcessType, false)
  192. w.batchChan = make(chan []T)
  193. w.flushChan = make(chan flushType)
  194. w.shutdownDone = make(chan struct{})
  195. w.shutdownTimeout.Store(int64(shutdownDefaultTimeout))
  196. w.workerMaxNum = queueSetting.MaxWorkers
  197. w.batchLength = queueSetting.BatchLength
  198. w.origHandler = handler
  199. w.safeHandler = func(t ...T) (unhandled []T) {
  200. defer func() {
  201. err := recover()
  202. if err != nil {
  203. log.Error("Recovered from panic in queue %q handler: %v\n%s", name, err, log.Stack(2))
  204. }
  205. }()
  206. if w.origHandler != nil {
  207. return w.origHandler(t...)
  208. }
  209. return nil
  210. }
  211. return &w, nil
  212. }