Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package queue
  5. import (
  6. "context"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "code.gitea.io/gitea/modules/log"
  11. "code.gitea.io/gitea/modules/util"
  12. )
  13. // WorkerPool represent a dynamically growable worker pool for a
  14. // provided handler function. They have an internal channel which
  15. // they use to detect if there is a block and will grow and shrink in
  16. // response to demand as per configuration.
  17. type WorkerPool struct {
  18. lock sync.Mutex
  19. baseCtx context.Context
  20. cancel context.CancelFunc
  21. cond *sync.Cond
  22. qid int64
  23. maxNumberOfWorkers int
  24. numberOfWorkers int
  25. batchLength int
  26. handle HandlerFunc
  27. dataChan chan Data
  28. blockTimeout time.Duration
  29. boostTimeout time.Duration
  30. boostWorkers int
  31. numInQueue int64
  32. }
  33. // WorkerPoolConfiguration is the basic configuration for a WorkerPool
  34. type WorkerPoolConfiguration struct {
  35. QueueLength int
  36. BatchLength int
  37. BlockTimeout time.Duration
  38. BoostTimeout time.Duration
  39. BoostWorkers int
  40. MaxWorkers int
  41. }
  42. // NewWorkerPool creates a new worker pool
  43. func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPool {
  44. ctx, cancel := context.WithCancel(context.Background())
  45. dataChan := make(chan Data, config.QueueLength)
  46. pool := &WorkerPool{
  47. baseCtx: ctx,
  48. cancel: cancel,
  49. batchLength: config.BatchLength,
  50. dataChan: dataChan,
  51. handle: handle,
  52. blockTimeout: config.BlockTimeout,
  53. boostTimeout: config.BoostTimeout,
  54. boostWorkers: config.BoostWorkers,
  55. maxNumberOfWorkers: config.MaxWorkers,
  56. }
  57. return pool
  58. }
  59. // Push pushes the data to the internal channel
  60. func (p *WorkerPool) Push(data Data) {
  61. atomic.AddInt64(&p.numInQueue, 1)
  62. p.lock.Lock()
  63. if p.blockTimeout > 0 && p.boostTimeout > 0 && (p.numberOfWorkers <= p.maxNumberOfWorkers || p.maxNumberOfWorkers < 0) {
  64. p.lock.Unlock()
  65. p.pushBoost(data)
  66. } else {
  67. p.lock.Unlock()
  68. p.dataChan <- data
  69. }
  70. }
  71. func (p *WorkerPool) pushBoost(data Data) {
  72. select {
  73. case p.dataChan <- data:
  74. default:
  75. p.lock.Lock()
  76. if p.blockTimeout <= 0 {
  77. p.lock.Unlock()
  78. p.dataChan <- data
  79. return
  80. }
  81. ourTimeout := p.blockTimeout
  82. timer := time.NewTimer(p.blockTimeout)
  83. p.lock.Unlock()
  84. select {
  85. case p.dataChan <- data:
  86. util.StopTimer(timer)
  87. case <-timer.C:
  88. p.lock.Lock()
  89. if p.blockTimeout > ourTimeout || (p.numberOfWorkers > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0) {
  90. p.lock.Unlock()
  91. p.dataChan <- data
  92. return
  93. }
  94. p.blockTimeout *= 2
  95. ctx, cancel := context.WithCancel(p.baseCtx)
  96. mq := GetManager().GetManagedQueue(p.qid)
  97. boost := p.boostWorkers
  98. if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 {
  99. boost = p.maxNumberOfWorkers - p.numberOfWorkers
  100. }
  101. if mq != nil {
  102. log.Warn("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, mq.Name, ourTimeout, boost, p.boostTimeout, p.blockTimeout)
  103. start := time.Now()
  104. pid := mq.RegisterWorkers(boost, start, false, start, cancel, false)
  105. go func() {
  106. <-ctx.Done()
  107. mq.RemoveWorkers(pid)
  108. cancel()
  109. }()
  110. } else {
  111. log.Warn("WorkerPool: %d Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, ourTimeout, p.boostWorkers, p.boostTimeout, p.blockTimeout)
  112. }
  113. go func() {
  114. <-time.After(p.boostTimeout)
  115. cancel()
  116. p.lock.Lock()
  117. p.blockTimeout /= 2
  118. p.lock.Unlock()
  119. }()
  120. p.lock.Unlock()
  121. p.addWorkers(ctx, boost)
  122. p.dataChan <- data
  123. }
  124. }
  125. }
  126. // NumberOfWorkers returns the number of current workers in the pool
  127. func (p *WorkerPool) NumberOfWorkers() int {
  128. p.lock.Lock()
  129. defer p.lock.Unlock()
  130. return p.numberOfWorkers
  131. }
  132. // MaxNumberOfWorkers returns the maximum number of workers automatically added to the pool
  133. func (p *WorkerPool) MaxNumberOfWorkers() int {
  134. p.lock.Lock()
  135. defer p.lock.Unlock()
  136. return p.maxNumberOfWorkers
  137. }
  138. // BoostWorkers returns the number of workers for a boost
  139. func (p *WorkerPool) BoostWorkers() int {
  140. p.lock.Lock()
  141. defer p.lock.Unlock()
  142. return p.boostWorkers
  143. }
  144. // BoostTimeout returns the timeout of the next boost
  145. func (p *WorkerPool) BoostTimeout() time.Duration {
  146. p.lock.Lock()
  147. defer p.lock.Unlock()
  148. return p.boostTimeout
  149. }
  150. // BlockTimeout returns the timeout til the next boost
  151. func (p *WorkerPool) BlockTimeout() time.Duration {
  152. p.lock.Lock()
  153. defer p.lock.Unlock()
  154. return p.blockTimeout
  155. }
  156. // SetPoolSettings sets the setable boost values
  157. func (p *WorkerPool) SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) {
  158. p.lock.Lock()
  159. defer p.lock.Unlock()
  160. p.maxNumberOfWorkers = maxNumberOfWorkers
  161. p.boostWorkers = boostWorkers
  162. p.boostTimeout = timeout
  163. }
  164. // SetMaxNumberOfWorkers sets the maximum number of workers automatically added to the pool
  165. // Changing this number will not change the number of current workers but will change the limit
  166. // for future additions
  167. func (p *WorkerPool) SetMaxNumberOfWorkers(newMax int) {
  168. p.lock.Lock()
  169. defer p.lock.Unlock()
  170. p.maxNumberOfWorkers = newMax
  171. }
  172. func (p *WorkerPool) commonRegisterWorkers(number int, timeout time.Duration, isFlusher bool) (context.Context, context.CancelFunc) {
  173. var ctx context.Context
  174. var cancel context.CancelFunc
  175. start := time.Now()
  176. end := start
  177. hasTimeout := false
  178. if timeout > 0 {
  179. ctx, cancel = context.WithTimeout(p.baseCtx, timeout)
  180. end = start.Add(timeout)
  181. hasTimeout = true
  182. } else {
  183. ctx, cancel = context.WithCancel(p.baseCtx)
  184. }
  185. mq := GetManager().GetManagedQueue(p.qid)
  186. if mq != nil {
  187. pid := mq.RegisterWorkers(number, start, hasTimeout, end, cancel, isFlusher)
  188. go func() {
  189. <-ctx.Done()
  190. mq.RemoveWorkers(pid)
  191. cancel()
  192. }()
  193. log.Trace("WorkerPool: %d (for %s) adding %d workers with group id: %d", p.qid, mq.Name, number, pid)
  194. } else {
  195. log.Trace("WorkerPool: %d adding %d workers (no group id)", p.qid, number)
  196. }
  197. return ctx, cancel
  198. }
  199. // AddWorkers adds workers to the pool - this allows the number of workers to go above the limit
  200. func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc {
  201. ctx, cancel := p.commonRegisterWorkers(number, timeout, false)
  202. p.addWorkers(ctx, number)
  203. return cancel
  204. }
  205. // addWorkers adds workers to the pool
  206. func (p *WorkerPool) addWorkers(ctx context.Context, number int) {
  207. for i := 0; i < number; i++ {
  208. p.lock.Lock()
  209. if p.cond == nil {
  210. p.cond = sync.NewCond(&p.lock)
  211. }
  212. p.numberOfWorkers++
  213. p.lock.Unlock()
  214. go func() {
  215. p.doWork(ctx)
  216. p.lock.Lock()
  217. p.numberOfWorkers--
  218. if p.numberOfWorkers == 0 {
  219. p.cond.Broadcast()
  220. } else if p.numberOfWorkers < 0 {
  221. // numberOfWorkers can't go negative but...
  222. log.Warn("Number of Workers < 0 for QID %d - this shouldn't happen", p.qid)
  223. p.numberOfWorkers = 0
  224. p.cond.Broadcast()
  225. }
  226. p.lock.Unlock()
  227. }()
  228. }
  229. }
  230. // Wait for WorkerPool to finish
  231. func (p *WorkerPool) Wait() {
  232. p.lock.Lock()
  233. defer p.lock.Unlock()
  234. if p.cond == nil {
  235. p.cond = sync.NewCond(&p.lock)
  236. }
  237. if p.numberOfWorkers <= 0 {
  238. return
  239. }
  240. p.cond.Wait()
  241. }
  242. // CleanUp will drain the remaining contents of the channel
  243. // This should be called after AddWorkers context is closed
  244. func (p *WorkerPool) CleanUp(ctx context.Context) {
  245. log.Trace("WorkerPool: %d CleanUp", p.qid)
  246. close(p.dataChan)
  247. for data := range p.dataChan {
  248. p.handle(data)
  249. atomic.AddInt64(&p.numInQueue, -1)
  250. select {
  251. case <-ctx.Done():
  252. log.Warn("WorkerPool: %d Cleanup context closed before finishing clean-up", p.qid)
  253. return
  254. default:
  255. }
  256. }
  257. log.Trace("WorkerPool: %d CleanUp Done", p.qid)
  258. }
  259. // Flush flushes the channel with a timeout - the Flush worker will be registered as a flush worker with the manager
  260. func (p *WorkerPool) Flush(timeout time.Duration) error {
  261. ctx, cancel := p.commonRegisterWorkers(1, timeout, true)
  262. defer cancel()
  263. return p.FlushWithContext(ctx)
  264. }
  265. // IsEmpty returns if true if the worker queue is empty
  266. func (p *WorkerPool) IsEmpty() bool {
  267. return atomic.LoadInt64(&p.numInQueue) == 0
  268. }
  269. // FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
  270. // NB: The worker will not be registered with the manager.
  271. func (p *WorkerPool) FlushWithContext(ctx context.Context) error {
  272. log.Trace("WorkerPool: %d Flush", p.qid)
  273. for {
  274. select {
  275. case data := <-p.dataChan:
  276. p.handle(data)
  277. atomic.AddInt64(&p.numInQueue, -1)
  278. case <-p.baseCtx.Done():
  279. return p.baseCtx.Err()
  280. case <-ctx.Done():
  281. return ctx.Err()
  282. default:
  283. return nil
  284. }
  285. }
  286. }
  287. func (p *WorkerPool) doWork(ctx context.Context) {
  288. delay := time.Millisecond * 300
  289. var data = make([]Data, 0, p.batchLength)
  290. for {
  291. select {
  292. case <-ctx.Done():
  293. if len(data) > 0 {
  294. log.Trace("Handling: %d data, %v", len(data), data)
  295. p.handle(data...)
  296. atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
  297. }
  298. log.Trace("Worker shutting down")
  299. return
  300. case datum, ok := <-p.dataChan:
  301. if !ok {
  302. // the dataChan has been closed - we should finish up:
  303. if len(data) > 0 {
  304. log.Trace("Handling: %d data, %v", len(data), data)
  305. p.handle(data...)
  306. atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
  307. }
  308. log.Trace("Worker shutting down")
  309. return
  310. }
  311. data = append(data, datum)
  312. if len(data) >= p.batchLength {
  313. log.Trace("Handling: %d data, %v", len(data), data)
  314. p.handle(data...)
  315. atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
  316. data = make([]Data, 0, p.batchLength)
  317. }
  318. default:
  319. timer := time.NewTimer(delay)
  320. select {
  321. case <-ctx.Done():
  322. util.StopTimer(timer)
  323. if len(data) > 0 {
  324. log.Trace("Handling: %d data, %v", len(data), data)
  325. p.handle(data...)
  326. atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
  327. }
  328. log.Trace("Worker shutting down")
  329. return
  330. case datum, ok := <-p.dataChan:
  331. util.StopTimer(timer)
  332. if !ok {
  333. // the dataChan has been closed - we should finish up:
  334. if len(data) > 0 {
  335. log.Trace("Handling: %d data, %v", len(data), data)
  336. p.handle(data...)
  337. atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
  338. }
  339. log.Trace("Worker shutting down")
  340. return
  341. }
  342. data = append(data, datum)
  343. if len(data) >= p.batchLength {
  344. log.Trace("Handling: %d data, %v", len(data), data)
  345. p.handle(data...)
  346. atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
  347. data = make([]Data, 0, p.batchLength)
  348. }
  349. case <-timer.C:
  350. delay = time.Millisecond * 100
  351. if len(data) > 0 {
  352. log.Trace("Handling: %d data, %v", len(data), data)
  353. p.handle(data...)
  354. atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
  355. data = make([]Data, 0, p.batchLength)
  356. }
  357. }
  358. }
  359. }
  360. }