You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

manager.go 12KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package queue
  4. import (
  5. "context"
  6. "fmt"
  7. "reflect"
  8. "sort"
  9. "strings"
  10. "sync"
  11. "time"
  12. "code.gitea.io/gitea/modules/json"
  13. "code.gitea.io/gitea/modules/log"
  14. )
  15. var manager *Manager
  16. // Manager is a queue manager
  17. type Manager struct {
  18. mutex sync.Mutex
  19. counter int64
  20. Queues map[int64]*ManagedQueue
  21. }
  22. // ManagedQueue represents a working queue with a Pool of workers.
  23. //
  24. // Although a ManagedQueue should really represent a Queue this does not
  25. // necessarily have to be the case. This could be used to describe any queue.WorkerPool.
  26. type ManagedQueue struct {
  27. mutex sync.Mutex
  28. QID int64
  29. Type Type
  30. Name string
  31. Configuration interface{}
  32. ExemplarType string
  33. Managed interface{}
  34. counter int64
  35. PoolWorkers map[int64]*PoolWorkers
  36. }
  37. // Flushable represents a pool or queue that is flushable
  38. type Flushable interface {
  39. // Flush will add a flush worker to the pool - the worker should be autoregistered with the manager
  40. Flush(time.Duration) error
  41. // FlushWithContext is very similar to Flush
  42. // NB: The worker will not be registered with the manager.
  43. FlushWithContext(ctx context.Context) error
  44. // IsEmpty will return if the managed pool is empty and has no work
  45. IsEmpty() bool
  46. }
  47. // Pausable represents a pool or queue that is Pausable
  48. type Pausable interface {
  49. // IsPaused will return if the pool or queue is paused
  50. IsPaused() bool
  51. // Pause will pause the pool or queue
  52. Pause()
  53. // Resume will resume the pool or queue
  54. Resume()
  55. // IsPausedIsResumed will return a bool indicating if the pool or queue is paused and a channel that will be closed when it is resumed
  56. IsPausedIsResumed() (paused, resumed <-chan struct{})
  57. }
  58. // ManagedPool is a simple interface to get certain details from a worker pool
  59. type ManagedPool interface {
  60. // AddWorkers adds a number of worker as group to the pool with the provided timeout. A CancelFunc is provided to cancel the group
  61. AddWorkers(number int, timeout time.Duration) context.CancelFunc
  62. // NumberOfWorkers returns the total number of workers in the pool
  63. NumberOfWorkers() int
  64. // MaxNumberOfWorkers returns the maximum number of workers the pool can dynamically grow to
  65. MaxNumberOfWorkers() int
  66. // SetMaxNumberOfWorkers sets the maximum number of workers the pool can dynamically grow to
  67. SetMaxNumberOfWorkers(int)
  68. // BoostTimeout returns the current timeout for worker groups created during a boost
  69. BoostTimeout() time.Duration
  70. // BlockTimeout returns the timeout the internal channel can block for before a boost would occur
  71. BlockTimeout() time.Duration
  72. // BoostWorkers sets the number of workers to be created during a boost
  73. BoostWorkers() int
  74. // SetPoolSettings sets the user updatable settings for the pool
  75. SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
  76. // NumberInQueue returns the total number of items in the pool
  77. NumberInQueue() int64
  78. // Done returns a channel that will be closed when the Pool's baseCtx is closed
  79. Done() <-chan struct{}
  80. }
  81. // ManagedQueueList implements the sort.Interface
  82. type ManagedQueueList []*ManagedQueue
  83. // PoolWorkers represents a group of workers working on a queue
  84. type PoolWorkers struct {
  85. PID int64
  86. Workers int
  87. Start time.Time
  88. Timeout time.Time
  89. HasTimeout bool
  90. Cancel context.CancelFunc
  91. IsFlusher bool
  92. }
  93. // PoolWorkersList implements the sort.Interface for PoolWorkers
  94. type PoolWorkersList []*PoolWorkers
  95. func init() {
  96. _ = GetManager()
  97. }
  98. // GetManager returns a Manager and initializes one as singleton if there's none yet
  99. func GetManager() *Manager {
  100. if manager == nil {
  101. manager = &Manager{
  102. Queues: make(map[int64]*ManagedQueue),
  103. }
  104. }
  105. return manager
  106. }
  107. // Add adds a queue to this manager
  108. func (m *Manager) Add(managed interface{},
  109. t Type,
  110. configuration,
  111. exemplar interface{},
  112. ) int64 {
  113. cfg, _ := json.Marshal(configuration)
  114. mq := &ManagedQueue{
  115. Type: t,
  116. Configuration: string(cfg),
  117. ExemplarType: reflect.TypeOf(exemplar).String(),
  118. PoolWorkers: make(map[int64]*PoolWorkers),
  119. Managed: managed,
  120. }
  121. m.mutex.Lock()
  122. m.counter++
  123. mq.QID = m.counter
  124. mq.Name = fmt.Sprintf("queue-%d", mq.QID)
  125. if named, ok := managed.(Named); ok {
  126. name := named.Name()
  127. if len(name) > 0 {
  128. mq.Name = name
  129. }
  130. }
  131. m.Queues[mq.QID] = mq
  132. m.mutex.Unlock()
  133. log.Trace("Queue Manager registered: %s (QID: %d)", mq.Name, mq.QID)
  134. return mq.QID
  135. }
  136. // Remove a queue from the Manager
  137. func (m *Manager) Remove(qid int64) {
  138. m.mutex.Lock()
  139. delete(m.Queues, qid)
  140. m.mutex.Unlock()
  141. log.Trace("Queue Manager removed: QID: %d", qid)
  142. }
  143. // GetManagedQueue by qid
  144. func (m *Manager) GetManagedQueue(qid int64) *ManagedQueue {
  145. m.mutex.Lock()
  146. defer m.mutex.Unlock()
  147. return m.Queues[qid]
  148. }
  149. // FlushAll flushes all the flushable queues attached to this manager
  150. func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error {
  151. var ctx context.Context
  152. var cancel context.CancelFunc
  153. start := time.Now()
  154. end := start
  155. hasTimeout := false
  156. if timeout > 0 {
  157. ctx, cancel = context.WithTimeout(baseCtx, timeout)
  158. end = start.Add(timeout)
  159. hasTimeout = true
  160. } else {
  161. ctx, cancel = context.WithCancel(baseCtx)
  162. }
  163. defer cancel()
  164. for {
  165. select {
  166. case <-ctx.Done():
  167. mqs := m.ManagedQueues()
  168. nonEmptyQueues := []string{}
  169. for _, mq := range mqs {
  170. if !mq.IsEmpty() {
  171. nonEmptyQueues = append(nonEmptyQueues, mq.Name)
  172. }
  173. }
  174. if len(nonEmptyQueues) > 0 {
  175. return fmt.Errorf("flush timeout with non-empty queues: %s", strings.Join(nonEmptyQueues, ", "))
  176. }
  177. return nil
  178. default:
  179. }
  180. mqs := m.ManagedQueues()
  181. log.Debug("Found %d Managed Queues", len(mqs))
  182. wg := sync.WaitGroup{}
  183. wg.Add(len(mqs))
  184. allEmpty := true
  185. for _, mq := range mqs {
  186. if mq.IsEmpty() {
  187. wg.Done()
  188. continue
  189. }
  190. if pausable, ok := mq.Managed.(Pausable); ok {
  191. // no point flushing paused queues
  192. if pausable.IsPaused() {
  193. wg.Done()
  194. continue
  195. }
  196. }
  197. if pool, ok := mq.Managed.(ManagedPool); ok {
  198. // No point into flushing pools when their base's ctx is already done.
  199. select {
  200. case <-pool.Done():
  201. wg.Done()
  202. continue
  203. default:
  204. }
  205. }
  206. allEmpty = false
  207. if flushable, ok := mq.Managed.(Flushable); ok {
  208. log.Debug("Flushing (flushable) queue: %s", mq.Name)
  209. go func(q *ManagedQueue) {
  210. localCtx, localCtxCancel := context.WithCancel(ctx)
  211. pid := q.RegisterWorkers(1, start, hasTimeout, end, localCtxCancel, true)
  212. err := flushable.FlushWithContext(localCtx)
  213. if err != nil && err != ctx.Err() {
  214. cancel()
  215. }
  216. q.CancelWorkers(pid)
  217. localCtxCancel()
  218. wg.Done()
  219. }(mq)
  220. } else {
  221. log.Debug("Queue: %s is non-empty but is not flushable", mq.Name)
  222. wg.Done()
  223. }
  224. }
  225. if allEmpty {
  226. log.Debug("All queues are empty")
  227. break
  228. }
  229. // Ensure there are always at least 100ms between loops but not more if we've actually been doing some flushing
  230. // but don't delay cancellation here.
  231. select {
  232. case <-ctx.Done():
  233. case <-time.After(100 * time.Millisecond):
  234. }
  235. wg.Wait()
  236. }
  237. return nil
  238. }
  239. // ManagedQueues returns the managed queues
  240. func (m *Manager) ManagedQueues() []*ManagedQueue {
  241. m.mutex.Lock()
  242. mqs := make([]*ManagedQueue, 0, len(m.Queues))
  243. for _, mq := range m.Queues {
  244. mqs = append(mqs, mq)
  245. }
  246. m.mutex.Unlock()
  247. sort.Sort(ManagedQueueList(mqs))
  248. return mqs
  249. }
  250. // Workers returns the poolworkers
  251. func (q *ManagedQueue) Workers() []*PoolWorkers {
  252. q.mutex.Lock()
  253. workers := make([]*PoolWorkers, 0, len(q.PoolWorkers))
  254. for _, worker := range q.PoolWorkers {
  255. workers = append(workers, worker)
  256. }
  257. q.mutex.Unlock()
  258. sort.Sort(PoolWorkersList(workers))
  259. return workers
  260. }
  261. // RegisterWorkers registers workers to this queue
  262. func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, cancel context.CancelFunc, isFlusher bool) int64 {
  263. q.mutex.Lock()
  264. defer q.mutex.Unlock()
  265. q.counter++
  266. q.PoolWorkers[q.counter] = &PoolWorkers{
  267. PID: q.counter,
  268. Workers: number,
  269. Start: start,
  270. Timeout: timeout,
  271. HasTimeout: hasTimeout,
  272. Cancel: cancel,
  273. IsFlusher: isFlusher,
  274. }
  275. return q.counter
  276. }
  277. // CancelWorkers cancels pooled workers with pid
  278. func (q *ManagedQueue) CancelWorkers(pid int64) {
  279. q.mutex.Lock()
  280. pw, ok := q.PoolWorkers[pid]
  281. q.mutex.Unlock()
  282. if !ok {
  283. return
  284. }
  285. pw.Cancel()
  286. }
  287. // RemoveWorkers deletes pooled workers with pid
  288. func (q *ManagedQueue) RemoveWorkers(pid int64) {
  289. q.mutex.Lock()
  290. pw, ok := q.PoolWorkers[pid]
  291. delete(q.PoolWorkers, pid)
  292. q.mutex.Unlock()
  293. if ok && pw.Cancel != nil {
  294. pw.Cancel()
  295. }
  296. }
  297. // AddWorkers adds workers to the queue if it has registered an add worker function
  298. func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.CancelFunc {
  299. if pool, ok := q.Managed.(ManagedPool); ok {
  300. // the cancel will be added to the pool workers description above
  301. return pool.AddWorkers(number, timeout)
  302. }
  303. return nil
  304. }
  305. // Flushable returns true if the queue is flushable
  306. func (q *ManagedQueue) Flushable() bool {
  307. _, ok := q.Managed.(Flushable)
  308. return ok
  309. }
  310. // Flush flushes the queue with a timeout
  311. func (q *ManagedQueue) Flush(timeout time.Duration) error {
  312. if flushable, ok := q.Managed.(Flushable); ok {
  313. // the cancel will be added to the pool workers description above
  314. return flushable.Flush(timeout)
  315. }
  316. return nil
  317. }
  318. // IsEmpty returns if the queue is empty
  319. func (q *ManagedQueue) IsEmpty() bool {
  320. if flushable, ok := q.Managed.(Flushable); ok {
  321. return flushable.IsEmpty()
  322. }
  323. return true
  324. }
  325. // Pausable returns whether the queue is Pausable
  326. func (q *ManagedQueue) Pausable() bool {
  327. _, ok := q.Managed.(Pausable)
  328. return ok
  329. }
  330. // Pause pauses the queue
  331. func (q *ManagedQueue) Pause() {
  332. if pausable, ok := q.Managed.(Pausable); ok {
  333. pausable.Pause()
  334. }
  335. }
  336. // IsPaused reveals if the queue is paused
  337. func (q *ManagedQueue) IsPaused() bool {
  338. if pausable, ok := q.Managed.(Pausable); ok {
  339. return pausable.IsPaused()
  340. }
  341. return false
  342. }
  343. // Resume resumes the queue
  344. func (q *ManagedQueue) Resume() {
  345. if pausable, ok := q.Managed.(Pausable); ok {
  346. pausable.Resume()
  347. }
  348. }
  349. // NumberOfWorkers returns the number of workers in the queue
  350. func (q *ManagedQueue) NumberOfWorkers() int {
  351. if pool, ok := q.Managed.(ManagedPool); ok {
  352. return pool.NumberOfWorkers()
  353. }
  354. return -1
  355. }
  356. // MaxNumberOfWorkers returns the maximum number of workers for the pool
  357. func (q *ManagedQueue) MaxNumberOfWorkers() int {
  358. if pool, ok := q.Managed.(ManagedPool); ok {
  359. return pool.MaxNumberOfWorkers()
  360. }
  361. return 0
  362. }
  363. // BoostWorkers returns the number of workers for a boost
  364. func (q *ManagedQueue) BoostWorkers() int {
  365. if pool, ok := q.Managed.(ManagedPool); ok {
  366. return pool.BoostWorkers()
  367. }
  368. return -1
  369. }
  370. // BoostTimeout returns the timeout of the next boost
  371. func (q *ManagedQueue) BoostTimeout() time.Duration {
  372. if pool, ok := q.Managed.(ManagedPool); ok {
  373. return pool.BoostTimeout()
  374. }
  375. return 0
  376. }
  377. // BlockTimeout returns the timeout til the next boost
  378. func (q *ManagedQueue) BlockTimeout() time.Duration {
  379. if pool, ok := q.Managed.(ManagedPool); ok {
  380. return pool.BlockTimeout()
  381. }
  382. return 0
  383. }
  384. // SetPoolSettings sets the setable boost values
  385. func (q *ManagedQueue) SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) {
  386. if pool, ok := q.Managed.(ManagedPool); ok {
  387. pool.SetPoolSettings(maxNumberOfWorkers, boostWorkers, timeout)
  388. }
  389. }
  390. // NumberInQueue returns the number of items in the queue
  391. func (q *ManagedQueue) NumberInQueue() int64 {
  392. if pool, ok := q.Managed.(ManagedPool); ok {
  393. return pool.NumberInQueue()
  394. }
  395. return -1
  396. }
  397. func (l ManagedQueueList) Len() int {
  398. return len(l)
  399. }
  400. func (l ManagedQueueList) Less(i, j int) bool {
  401. return l[i].Name < l[j].Name
  402. }
  403. func (l ManagedQueueList) Swap(i, j int) {
  404. l[i], l[j] = l[j], l[i]
  405. }
  406. func (l PoolWorkersList) Len() int {
  407. return len(l)
  408. }
  409. func (l PoolWorkersList) Less(i, j int) bool {
  410. return l[i].Start.Before(l[j].Start)
  411. }
  412. func (l PoolWorkersList) Swap(i, j int) {
  413. l[i], l[j] = l[j], l[i]
  414. }