You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

manager.go 9.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package queue
  5. import (
  6. "context"
  7. "encoding/json"
  8. "fmt"
  9. "reflect"
  10. "sort"
  11. "sync"
  12. "time"
  13. "code.gitea.io/gitea/modules/log"
  14. )
  15. var manager *Manager
  16. // Manager is a queue manager
  17. type Manager struct {
  18. mutex sync.Mutex
  19. counter int64
  20. Queues map[int64]*ManagedQueue
  21. }
  22. // ManagedQueue represents a working queue with a Pool of workers.
  23. //
  24. // Although a ManagedQueue should really represent a Queue this does not
  25. // necessarily have to be the case. This could be used to describe any queue.WorkerPool.
  26. type ManagedQueue struct {
  27. mutex sync.Mutex
  28. QID int64
  29. Type Type
  30. Name string
  31. Configuration interface{}
  32. ExemplarType string
  33. Managed interface{}
  34. counter int64
  35. PoolWorkers map[int64]*PoolWorkers
  36. }
  37. // Flushable represents a pool or queue that is flushable
  38. type Flushable interface {
  39. // Flush will add a flush worker to the pool - the worker should be autoregistered with the manager
  40. Flush(time.Duration) error
  41. // FlushWithContext is very similar to Flush
  42. // NB: The worker will not be registered with the manager.
  43. FlushWithContext(ctx context.Context) error
  44. // IsEmpty will return if the managed pool is empty and has no work
  45. IsEmpty() bool
  46. }
  47. // ManagedPool is a simple interface to get certain details from a worker pool
  48. type ManagedPool interface {
  49. // AddWorkers adds a number of worker as group to the pool with the provided timeout. A CancelFunc is provided to cancel the group
  50. AddWorkers(number int, timeout time.Duration) context.CancelFunc
  51. // NumberOfWorkers returns the total number of workers in the pool
  52. NumberOfWorkers() int
  53. // MaxNumberOfWorkers returns the maximum number of workers the pool can dynamically grow to
  54. MaxNumberOfWorkers() int
  55. // SetMaxNumberOfWorkers sets the maximum number of workers the pool can dynamically grow to
  56. SetMaxNumberOfWorkers(int)
  57. // BoostTimeout returns the current timeout for worker groups created during a boost
  58. BoostTimeout() time.Duration
  59. // BlockTimeout returns the timeout the internal channel can block for before a boost would occur
  60. BlockTimeout() time.Duration
  61. // BoostWorkers sets the number of workers to be created during a boost
  62. BoostWorkers() int
  63. // SetPoolSettings sets the user updatable settings for the pool
  64. SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
  65. }
  66. // ManagedQueueList implements the sort.Interface
  67. type ManagedQueueList []*ManagedQueue
  68. // PoolWorkers represents a group of workers working on a queue
  69. type PoolWorkers struct {
  70. PID int64
  71. Workers int
  72. Start time.Time
  73. Timeout time.Time
  74. HasTimeout bool
  75. Cancel context.CancelFunc
  76. IsFlusher bool
  77. }
  78. // PoolWorkersList implements the sort.Interface for PoolWorkers
  79. type PoolWorkersList []*PoolWorkers
  80. func init() {
  81. _ = GetManager()
  82. }
  83. // GetManager returns a Manager and initializes one as singleton if there's none yet
  84. func GetManager() *Manager {
  85. if manager == nil {
  86. manager = &Manager{
  87. Queues: make(map[int64]*ManagedQueue),
  88. }
  89. }
  90. return manager
  91. }
  92. // Add adds a queue to this manager
  93. func (m *Manager) Add(managed interface{},
  94. t Type,
  95. configuration,
  96. exemplar interface{}) int64 {
  97. cfg, _ := json.Marshal(configuration)
  98. mq := &ManagedQueue{
  99. Type: t,
  100. Configuration: string(cfg),
  101. ExemplarType: reflect.TypeOf(exemplar).String(),
  102. PoolWorkers: make(map[int64]*PoolWorkers),
  103. Managed: managed,
  104. }
  105. m.mutex.Lock()
  106. m.counter++
  107. mq.QID = m.counter
  108. mq.Name = fmt.Sprintf("queue-%d", mq.QID)
  109. if named, ok := managed.(Named); ok {
  110. name := named.Name()
  111. if len(name) > 0 {
  112. mq.Name = name
  113. }
  114. }
  115. m.Queues[mq.QID] = mq
  116. m.mutex.Unlock()
  117. log.Trace("Queue Manager registered: %s (QID: %d)", mq.Name, mq.QID)
  118. return mq.QID
  119. }
  120. // Remove a queue from the Manager
  121. func (m *Manager) Remove(qid int64) {
  122. m.mutex.Lock()
  123. delete(m.Queues, qid)
  124. m.mutex.Unlock()
  125. log.Trace("Queue Manager removed: QID: %d", qid)
  126. }
  127. // GetManagedQueue by qid
  128. func (m *Manager) GetManagedQueue(qid int64) *ManagedQueue {
  129. m.mutex.Lock()
  130. defer m.mutex.Unlock()
  131. return m.Queues[qid]
  132. }
  133. // FlushAll flushes all the flushable queues attached to this manager
  134. func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error {
  135. var ctx context.Context
  136. var cancel context.CancelFunc
  137. start := time.Now()
  138. end := start
  139. hasTimeout := false
  140. if timeout > 0 {
  141. ctx, cancel = context.WithTimeout(baseCtx, timeout)
  142. end = start.Add(timeout)
  143. hasTimeout = true
  144. } else {
  145. ctx, cancel = context.WithCancel(baseCtx)
  146. }
  147. defer cancel()
  148. for {
  149. select {
  150. case <-ctx.Done():
  151. return ctx.Err()
  152. default:
  153. }
  154. mqs := m.ManagedQueues()
  155. wg := sync.WaitGroup{}
  156. wg.Add(len(mqs))
  157. allEmpty := true
  158. for _, mq := range mqs {
  159. if mq.IsEmpty() {
  160. wg.Done()
  161. continue
  162. }
  163. allEmpty = false
  164. if flushable, ok := mq.Managed.(Flushable); ok {
  165. go func(q *ManagedQueue) {
  166. localCtx, localCancel := context.WithCancel(ctx)
  167. pid := q.RegisterWorkers(1, start, hasTimeout, end, localCancel, true)
  168. err := flushable.FlushWithContext(localCtx)
  169. if err != nil && err != ctx.Err() {
  170. cancel()
  171. }
  172. q.CancelWorkers(pid)
  173. localCancel()
  174. wg.Done()
  175. }(mq)
  176. } else {
  177. wg.Done()
  178. }
  179. }
  180. if allEmpty {
  181. break
  182. }
  183. wg.Wait()
  184. }
  185. return nil
  186. }
  187. // ManagedQueues returns the managed queues
  188. func (m *Manager) ManagedQueues() []*ManagedQueue {
  189. m.mutex.Lock()
  190. mqs := make([]*ManagedQueue, 0, len(m.Queues))
  191. for _, mq := range m.Queues {
  192. mqs = append(mqs, mq)
  193. }
  194. m.mutex.Unlock()
  195. sort.Sort(ManagedQueueList(mqs))
  196. return mqs
  197. }
  198. // Workers returns the poolworkers
  199. func (q *ManagedQueue) Workers() []*PoolWorkers {
  200. q.mutex.Lock()
  201. workers := make([]*PoolWorkers, 0, len(q.PoolWorkers))
  202. for _, worker := range q.PoolWorkers {
  203. workers = append(workers, worker)
  204. }
  205. q.mutex.Unlock()
  206. sort.Sort(PoolWorkersList(workers))
  207. return workers
  208. }
  209. // RegisterWorkers registers workers to this queue
  210. func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, cancel context.CancelFunc, isFlusher bool) int64 {
  211. q.mutex.Lock()
  212. defer q.mutex.Unlock()
  213. q.counter++
  214. q.PoolWorkers[q.counter] = &PoolWorkers{
  215. PID: q.counter,
  216. Workers: number,
  217. Start: start,
  218. Timeout: timeout,
  219. HasTimeout: hasTimeout,
  220. Cancel: cancel,
  221. IsFlusher: isFlusher,
  222. }
  223. return q.counter
  224. }
  225. // CancelWorkers cancels pooled workers with pid
  226. func (q *ManagedQueue) CancelWorkers(pid int64) {
  227. q.mutex.Lock()
  228. pw, ok := q.PoolWorkers[pid]
  229. q.mutex.Unlock()
  230. if !ok {
  231. return
  232. }
  233. pw.Cancel()
  234. }
  235. // RemoveWorkers deletes pooled workers with pid
  236. func (q *ManagedQueue) RemoveWorkers(pid int64) {
  237. q.mutex.Lock()
  238. pw, ok := q.PoolWorkers[pid]
  239. delete(q.PoolWorkers, pid)
  240. q.mutex.Unlock()
  241. if ok && pw.Cancel != nil {
  242. pw.Cancel()
  243. }
  244. }
  245. // AddWorkers adds workers to the queue if it has registered an add worker function
  246. func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.CancelFunc {
  247. if pool, ok := q.Managed.(ManagedPool); ok {
  248. // the cancel will be added to the pool workers description above
  249. return pool.AddWorkers(number, timeout)
  250. }
  251. return nil
  252. }
  253. // Flush flushes the queue with a timeout
  254. func (q *ManagedQueue) Flush(timeout time.Duration) error {
  255. if flushable, ok := q.Managed.(Flushable); ok {
  256. // the cancel will be added to the pool workers description above
  257. return flushable.Flush(timeout)
  258. }
  259. return nil
  260. }
  261. // IsEmpty returns if the queue is empty
  262. func (q *ManagedQueue) IsEmpty() bool {
  263. if flushable, ok := q.Managed.(Flushable); ok {
  264. return flushable.IsEmpty()
  265. }
  266. return true
  267. }
  268. // NumberOfWorkers returns the number of workers in the queue
  269. func (q *ManagedQueue) NumberOfWorkers() int {
  270. if pool, ok := q.Managed.(ManagedPool); ok {
  271. return pool.NumberOfWorkers()
  272. }
  273. return -1
  274. }
  275. // MaxNumberOfWorkers returns the maximum number of workers for the pool
  276. func (q *ManagedQueue) MaxNumberOfWorkers() int {
  277. if pool, ok := q.Managed.(ManagedPool); ok {
  278. return pool.MaxNumberOfWorkers()
  279. }
  280. return 0
  281. }
  282. // BoostWorkers returns the number of workers for a boost
  283. func (q *ManagedQueue) BoostWorkers() int {
  284. if pool, ok := q.Managed.(ManagedPool); ok {
  285. return pool.BoostWorkers()
  286. }
  287. return -1
  288. }
  289. // BoostTimeout returns the timeout of the next boost
  290. func (q *ManagedQueue) BoostTimeout() time.Duration {
  291. if pool, ok := q.Managed.(ManagedPool); ok {
  292. return pool.BoostTimeout()
  293. }
  294. return 0
  295. }
  296. // BlockTimeout returns the timeout til the next boost
  297. func (q *ManagedQueue) BlockTimeout() time.Duration {
  298. if pool, ok := q.Managed.(ManagedPool); ok {
  299. return pool.BlockTimeout()
  300. }
  301. return 0
  302. }
  303. // SetPoolSettings sets the setable boost values
  304. func (q *ManagedQueue) SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) {
  305. if pool, ok := q.Managed.(ManagedPool); ok {
  306. pool.SetPoolSettings(maxNumberOfWorkers, boostWorkers, timeout)
  307. }
  308. }
  309. func (l ManagedQueueList) Len() int {
  310. return len(l)
  311. }
  312. func (l ManagedQueueList) Less(i, j int) bool {
  313. return l[i].Name < l[j].Name
  314. }
  315. func (l ManagedQueueList) Swap(i, j int) {
  316. l[i], l[j] = l[j], l[i]
  317. }
  318. func (l PoolWorkersList) Len() int {
  319. return len(l)
  320. }
  321. func (l PoolWorkersList) Less(i, j int) bool {
  322. return l[i].Start.Before(l[j].Start)
  323. }
  324. func (l PoolWorkersList) Swap(i, j int) {
  325. l[i], l[j] = l[j], l[i]
  326. }