You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

manager.go 10KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package queue
  5. import (
  6. "context"
  7. "fmt"
  8. "reflect"
  9. "sort"
  10. "strings"
  11. "sync"
  12. "time"
  13. "code.gitea.io/gitea/modules/json"
  14. "code.gitea.io/gitea/modules/log"
  15. )
  16. var manager *Manager
  17. // Manager is a queue manager
  18. type Manager struct {
  19. mutex sync.Mutex
  20. counter int64
  21. Queues map[int64]*ManagedQueue
  22. }
  23. // ManagedQueue represents a working queue with a Pool of workers.
  24. //
  25. // Although a ManagedQueue should really represent a Queue this does not
  26. // necessarily have to be the case. This could be used to describe any queue.WorkerPool.
  27. type ManagedQueue struct {
  28. mutex sync.Mutex
  29. QID int64
  30. Type Type
  31. Name string
  32. Configuration interface{}
  33. ExemplarType string
  34. Managed interface{}
  35. counter int64
  36. PoolWorkers map[int64]*PoolWorkers
  37. }
  38. // Flushable represents a pool or queue that is flushable
  39. type Flushable interface {
  40. // Flush will add a flush worker to the pool - the worker should be autoregistered with the manager
  41. Flush(time.Duration) error
  42. // FlushWithContext is very similar to Flush
  43. // NB: The worker will not be registered with the manager.
  44. FlushWithContext(ctx context.Context) error
  45. // IsEmpty will return if the managed pool is empty and has no work
  46. IsEmpty() bool
  47. }
  48. // ManagedPool is a simple interface to get certain details from a worker pool
  49. type ManagedPool interface {
  50. // AddWorkers adds a number of worker as group to the pool with the provided timeout. A CancelFunc is provided to cancel the group
  51. AddWorkers(number int, timeout time.Duration) context.CancelFunc
  52. // NumberOfWorkers returns the total number of workers in the pool
  53. NumberOfWorkers() int
  54. // MaxNumberOfWorkers returns the maximum number of workers the pool can dynamically grow to
  55. MaxNumberOfWorkers() int
  56. // SetMaxNumberOfWorkers sets the maximum number of workers the pool can dynamically grow to
  57. SetMaxNumberOfWorkers(int)
  58. // BoostTimeout returns the current timeout for worker groups created during a boost
  59. BoostTimeout() time.Duration
  60. // BlockTimeout returns the timeout the internal channel can block for before a boost would occur
  61. BlockTimeout() time.Duration
  62. // BoostWorkers sets the number of workers to be created during a boost
  63. BoostWorkers() int
  64. // SetPoolSettings sets the user updatable settings for the pool
  65. SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
  66. }
  67. // ManagedQueueList implements the sort.Interface
  68. type ManagedQueueList []*ManagedQueue
  69. // PoolWorkers represents a group of workers working on a queue
  70. type PoolWorkers struct {
  71. PID int64
  72. Workers int
  73. Start time.Time
  74. Timeout time.Time
  75. HasTimeout bool
  76. Cancel context.CancelFunc
  77. IsFlusher bool
  78. }
  79. // PoolWorkersList implements the sort.Interface for PoolWorkers
  80. type PoolWorkersList []*PoolWorkers
  81. func init() {
  82. _ = GetManager()
  83. }
  84. // GetManager returns a Manager and initializes one as singleton if there's none yet
  85. func GetManager() *Manager {
  86. if manager == nil {
  87. manager = &Manager{
  88. Queues: make(map[int64]*ManagedQueue),
  89. }
  90. }
  91. return manager
  92. }
  93. // Add adds a queue to this manager
  94. func (m *Manager) Add(managed interface{},
  95. t Type,
  96. configuration,
  97. exemplar interface{}) int64 {
  98. cfg, _ := json.Marshal(configuration)
  99. mq := &ManagedQueue{
  100. Type: t,
  101. Configuration: string(cfg),
  102. ExemplarType: reflect.TypeOf(exemplar).String(),
  103. PoolWorkers: make(map[int64]*PoolWorkers),
  104. Managed: managed,
  105. }
  106. m.mutex.Lock()
  107. m.counter++
  108. mq.QID = m.counter
  109. mq.Name = fmt.Sprintf("queue-%d", mq.QID)
  110. if named, ok := managed.(Named); ok {
  111. name := named.Name()
  112. if len(name) > 0 {
  113. mq.Name = name
  114. }
  115. }
  116. m.Queues[mq.QID] = mq
  117. m.mutex.Unlock()
  118. log.Trace("Queue Manager registered: %s (QID: %d)", mq.Name, mq.QID)
  119. return mq.QID
  120. }
  121. // Remove a queue from the Manager
  122. func (m *Manager) Remove(qid int64) {
  123. m.mutex.Lock()
  124. delete(m.Queues, qid)
  125. m.mutex.Unlock()
  126. log.Trace("Queue Manager removed: QID: %d", qid)
  127. }
  128. // GetManagedQueue by qid
  129. func (m *Manager) GetManagedQueue(qid int64) *ManagedQueue {
  130. m.mutex.Lock()
  131. defer m.mutex.Unlock()
  132. return m.Queues[qid]
  133. }
  134. // FlushAll flushes all the flushable queues attached to this manager
  135. func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error {
  136. var ctx context.Context
  137. var cancel context.CancelFunc
  138. start := time.Now()
  139. end := start
  140. hasTimeout := false
  141. if timeout > 0 {
  142. ctx, cancel = context.WithTimeout(baseCtx, timeout)
  143. end = start.Add(timeout)
  144. hasTimeout = true
  145. } else {
  146. ctx, cancel = context.WithCancel(baseCtx)
  147. }
  148. defer cancel()
  149. for {
  150. select {
  151. case <-ctx.Done():
  152. mqs := m.ManagedQueues()
  153. nonEmptyQueues := []string{}
  154. for _, mq := range mqs {
  155. if !mq.IsEmpty() {
  156. nonEmptyQueues = append(nonEmptyQueues, mq.Name)
  157. }
  158. }
  159. if len(nonEmptyQueues) > 0 {
  160. return fmt.Errorf("flush timeout with non-empty queues: %s", strings.Join(nonEmptyQueues, ", "))
  161. }
  162. return nil
  163. default:
  164. }
  165. mqs := m.ManagedQueues()
  166. log.Debug("Found %d Managed Queues", len(mqs))
  167. wg := sync.WaitGroup{}
  168. wg.Add(len(mqs))
  169. allEmpty := true
  170. for _, mq := range mqs {
  171. if mq.IsEmpty() {
  172. wg.Done()
  173. continue
  174. }
  175. allEmpty = false
  176. if flushable, ok := mq.Managed.(Flushable); ok {
  177. log.Debug("Flushing (flushable) queue: %s", mq.Name)
  178. go func(q *ManagedQueue) {
  179. localCtx, localCtxCancel := context.WithCancel(ctx)
  180. pid := q.RegisterWorkers(1, start, hasTimeout, end, localCtxCancel, true)
  181. err := flushable.FlushWithContext(localCtx)
  182. if err != nil && err != ctx.Err() {
  183. cancel()
  184. }
  185. q.CancelWorkers(pid)
  186. localCtxCancel()
  187. wg.Done()
  188. }(mq)
  189. } else {
  190. log.Debug("Queue: %s is non-empty but is not flushable", mq.Name)
  191. wg.Done()
  192. }
  193. }
  194. if allEmpty {
  195. log.Debug("All queues are empty")
  196. break
  197. }
  198. // Ensure there are always at least 100ms between loops but not more if we've actually been doing some flushign
  199. // but don't delay cancellation here.
  200. select {
  201. case <-ctx.Done():
  202. case <-time.After(100 * time.Millisecond):
  203. }
  204. wg.Wait()
  205. }
  206. return nil
  207. }
  208. // ManagedQueues returns the managed queues
  209. func (m *Manager) ManagedQueues() []*ManagedQueue {
  210. m.mutex.Lock()
  211. mqs := make([]*ManagedQueue, 0, len(m.Queues))
  212. for _, mq := range m.Queues {
  213. mqs = append(mqs, mq)
  214. }
  215. m.mutex.Unlock()
  216. sort.Sort(ManagedQueueList(mqs))
  217. return mqs
  218. }
  219. // Workers returns the poolworkers
  220. func (q *ManagedQueue) Workers() []*PoolWorkers {
  221. q.mutex.Lock()
  222. workers := make([]*PoolWorkers, 0, len(q.PoolWorkers))
  223. for _, worker := range q.PoolWorkers {
  224. workers = append(workers, worker)
  225. }
  226. q.mutex.Unlock()
  227. sort.Sort(PoolWorkersList(workers))
  228. return workers
  229. }
  230. // RegisterWorkers registers workers to this queue
  231. func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, cancel context.CancelFunc, isFlusher bool) int64 {
  232. q.mutex.Lock()
  233. defer q.mutex.Unlock()
  234. q.counter++
  235. q.PoolWorkers[q.counter] = &PoolWorkers{
  236. PID: q.counter,
  237. Workers: number,
  238. Start: start,
  239. Timeout: timeout,
  240. HasTimeout: hasTimeout,
  241. Cancel: cancel,
  242. IsFlusher: isFlusher,
  243. }
  244. return q.counter
  245. }
  246. // CancelWorkers cancels pooled workers with pid
  247. func (q *ManagedQueue) CancelWorkers(pid int64) {
  248. q.mutex.Lock()
  249. pw, ok := q.PoolWorkers[pid]
  250. q.mutex.Unlock()
  251. if !ok {
  252. return
  253. }
  254. pw.Cancel()
  255. }
  256. // RemoveWorkers deletes pooled workers with pid
  257. func (q *ManagedQueue) RemoveWorkers(pid int64) {
  258. q.mutex.Lock()
  259. pw, ok := q.PoolWorkers[pid]
  260. delete(q.PoolWorkers, pid)
  261. q.mutex.Unlock()
  262. if ok && pw.Cancel != nil {
  263. pw.Cancel()
  264. }
  265. }
  266. // AddWorkers adds workers to the queue if it has registered an add worker function
  267. func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.CancelFunc {
  268. if pool, ok := q.Managed.(ManagedPool); ok {
  269. // the cancel will be added to the pool workers description above
  270. return pool.AddWorkers(number, timeout)
  271. }
  272. return nil
  273. }
  274. // Flush flushes the queue with a timeout
  275. func (q *ManagedQueue) Flush(timeout time.Duration) error {
  276. if flushable, ok := q.Managed.(Flushable); ok {
  277. // the cancel will be added to the pool workers description above
  278. return flushable.Flush(timeout)
  279. }
  280. return nil
  281. }
  282. // IsEmpty returns if the queue is empty
  283. func (q *ManagedQueue) IsEmpty() bool {
  284. if flushable, ok := q.Managed.(Flushable); ok {
  285. return flushable.IsEmpty()
  286. }
  287. return true
  288. }
  289. // NumberOfWorkers returns the number of workers in the queue
  290. func (q *ManagedQueue) NumberOfWorkers() int {
  291. if pool, ok := q.Managed.(ManagedPool); ok {
  292. return pool.NumberOfWorkers()
  293. }
  294. return -1
  295. }
  296. // MaxNumberOfWorkers returns the maximum number of workers for the pool
  297. func (q *ManagedQueue) MaxNumberOfWorkers() int {
  298. if pool, ok := q.Managed.(ManagedPool); ok {
  299. return pool.MaxNumberOfWorkers()
  300. }
  301. return 0
  302. }
  303. // BoostWorkers returns the number of workers for a boost
  304. func (q *ManagedQueue) BoostWorkers() int {
  305. if pool, ok := q.Managed.(ManagedPool); ok {
  306. return pool.BoostWorkers()
  307. }
  308. return -1
  309. }
  310. // BoostTimeout returns the timeout of the next boost
  311. func (q *ManagedQueue) BoostTimeout() time.Duration {
  312. if pool, ok := q.Managed.(ManagedPool); ok {
  313. return pool.BoostTimeout()
  314. }
  315. return 0
  316. }
  317. // BlockTimeout returns the timeout til the next boost
  318. func (q *ManagedQueue) BlockTimeout() time.Duration {
  319. if pool, ok := q.Managed.(ManagedPool); ok {
  320. return pool.BlockTimeout()
  321. }
  322. return 0
  323. }
  324. // SetPoolSettings sets the setable boost values
  325. func (q *ManagedQueue) SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) {
  326. if pool, ok := q.Managed.(ManagedPool); ok {
  327. pool.SetPoolSettings(maxNumberOfWorkers, boostWorkers, timeout)
  328. }
  329. }
  330. func (l ManagedQueueList) Len() int {
  331. return len(l)
  332. }
  333. func (l ManagedQueueList) Less(i, j int) bool {
  334. return l[i].Name < l[j].Name
  335. }
  336. func (l ManagedQueueList) Swap(i, j int) {
  337. l[i], l[j] = l[j], l[i]
  338. }
  339. func (l PoolWorkersList) Len() int {
  340. return len(l)
  341. }
  342. func (l PoolWorkersList) Less(i, j int) bool {
  343. return l[i].Start.Before(l[j].Start)
  344. }
  345. func (l PoolWorkersList) Swap(i, j int) {
  346. l[i], l[j] = l[j], l[i]
  347. }