You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

workerpool.go 8.0KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package queue
  5. import (
  6. "context"
  7. "sync"
  8. "time"
  9. "code.gitea.io/gitea/modules/log"
  10. "code.gitea.io/gitea/modules/util"
  11. )
  12. // WorkerPool takes
  13. type WorkerPool struct {
  14. lock sync.Mutex
  15. baseCtx context.Context
  16. cancel context.CancelFunc
  17. cond *sync.Cond
  18. qid int64
  19. maxNumberOfWorkers int
  20. numberOfWorkers int
  21. batchLength int
  22. handle HandlerFunc
  23. dataChan chan Data
  24. blockTimeout time.Duration
  25. boostTimeout time.Duration
  26. boostWorkers int
  27. }
  28. // Push pushes the data to the internal channel
  29. func (p *WorkerPool) Push(data Data) {
  30. p.lock.Lock()
  31. if p.blockTimeout > 0 && p.boostTimeout > 0 && (p.numberOfWorkers <= p.maxNumberOfWorkers || p.maxNumberOfWorkers < 0) {
  32. p.lock.Unlock()
  33. p.pushBoost(data)
  34. } else {
  35. p.lock.Unlock()
  36. p.dataChan <- data
  37. }
  38. }
  39. func (p *WorkerPool) pushBoost(data Data) {
  40. select {
  41. case p.dataChan <- data:
  42. default:
  43. p.lock.Lock()
  44. if p.blockTimeout <= 0 {
  45. p.lock.Unlock()
  46. p.dataChan <- data
  47. return
  48. }
  49. ourTimeout := p.blockTimeout
  50. timer := time.NewTimer(p.blockTimeout)
  51. p.lock.Unlock()
  52. select {
  53. case p.dataChan <- data:
  54. util.StopTimer(timer)
  55. case <-timer.C:
  56. p.lock.Lock()
  57. if p.blockTimeout > ourTimeout || (p.numberOfWorkers > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0) {
  58. p.lock.Unlock()
  59. p.dataChan <- data
  60. return
  61. }
  62. p.blockTimeout *= 2
  63. ctx, cancel := context.WithCancel(p.baseCtx)
  64. mq := GetManager().GetManagedQueue(p.qid)
  65. boost := p.boostWorkers
  66. if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 {
  67. boost = p.maxNumberOfWorkers - p.numberOfWorkers
  68. }
  69. if mq != nil {
  70. log.Warn("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, mq.Name, ourTimeout, boost, p.boostTimeout, p.blockTimeout)
  71. start := time.Now()
  72. pid := mq.RegisterWorkers(boost, start, false, start, cancel)
  73. go func() {
  74. <-ctx.Done()
  75. mq.RemoveWorkers(pid)
  76. cancel()
  77. }()
  78. } else {
  79. log.Warn("WorkerPool: %d Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, ourTimeout, p.boostWorkers, p.boostTimeout, p.blockTimeout)
  80. }
  81. go func() {
  82. <-time.After(p.boostTimeout)
  83. cancel()
  84. p.lock.Lock()
  85. p.blockTimeout /= 2
  86. p.lock.Unlock()
  87. }()
  88. p.lock.Unlock()
  89. p.addWorkers(ctx, boost)
  90. p.dataChan <- data
  91. }
  92. }
  93. }
  94. // NumberOfWorkers returns the number of current workers in the pool
  95. func (p *WorkerPool) NumberOfWorkers() int {
  96. p.lock.Lock()
  97. defer p.lock.Unlock()
  98. return p.numberOfWorkers
  99. }
  100. // MaxNumberOfWorkers returns the maximum number of workers automatically added to the pool
  101. func (p *WorkerPool) MaxNumberOfWorkers() int {
  102. p.lock.Lock()
  103. defer p.lock.Unlock()
  104. return p.maxNumberOfWorkers
  105. }
  106. // BoostWorkers returns the number of workers for a boost
  107. func (p *WorkerPool) BoostWorkers() int {
  108. p.lock.Lock()
  109. defer p.lock.Unlock()
  110. return p.boostWorkers
  111. }
  112. // BoostTimeout returns the timeout of the next boost
  113. func (p *WorkerPool) BoostTimeout() time.Duration {
  114. p.lock.Lock()
  115. defer p.lock.Unlock()
  116. return p.boostTimeout
  117. }
  118. // BlockTimeout returns the timeout til the next boost
  119. func (p *WorkerPool) BlockTimeout() time.Duration {
  120. p.lock.Lock()
  121. defer p.lock.Unlock()
  122. return p.blockTimeout
  123. }
  124. // SetSettings sets the setable boost values
  125. func (p *WorkerPool) SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) {
  126. p.lock.Lock()
  127. defer p.lock.Unlock()
  128. p.maxNumberOfWorkers = maxNumberOfWorkers
  129. p.boostWorkers = boostWorkers
  130. p.boostTimeout = timeout
  131. }
  132. // SetMaxNumberOfWorkers sets the maximum number of workers automatically added to the pool
  133. // Changing this number will not change the number of current workers but will change the limit
  134. // for future additions
  135. func (p *WorkerPool) SetMaxNumberOfWorkers(newMax int) {
  136. p.lock.Lock()
  137. defer p.lock.Unlock()
  138. p.maxNumberOfWorkers = newMax
  139. }
  140. // AddWorkers adds workers to the pool - this allows the number of workers to go above the limit
  141. func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc {
  142. var ctx context.Context
  143. var cancel context.CancelFunc
  144. start := time.Now()
  145. end := start
  146. hasTimeout := false
  147. if timeout > 0 {
  148. ctx, cancel = context.WithTimeout(p.baseCtx, timeout)
  149. end = start.Add(timeout)
  150. hasTimeout = true
  151. } else {
  152. ctx, cancel = context.WithCancel(p.baseCtx)
  153. }
  154. mq := GetManager().GetManagedQueue(p.qid)
  155. if mq != nil {
  156. pid := mq.RegisterWorkers(number, start, hasTimeout, end, cancel)
  157. go func() {
  158. <-ctx.Done()
  159. mq.RemoveWorkers(pid)
  160. cancel()
  161. }()
  162. log.Trace("WorkerPool: %d (for %s) adding %d workers with group id: %d", p.qid, mq.Name, number, pid)
  163. } else {
  164. log.Trace("WorkerPool: %d adding %d workers (no group id)", p.qid, number)
  165. }
  166. p.addWorkers(ctx, number)
  167. return cancel
  168. }
  169. // addWorkers adds workers to the pool
  170. func (p *WorkerPool) addWorkers(ctx context.Context, number int) {
  171. for i := 0; i < number; i++ {
  172. p.lock.Lock()
  173. if p.cond == nil {
  174. p.cond = sync.NewCond(&p.lock)
  175. }
  176. p.numberOfWorkers++
  177. p.lock.Unlock()
  178. go func() {
  179. p.doWork(ctx)
  180. p.lock.Lock()
  181. p.numberOfWorkers--
  182. if p.numberOfWorkers == 0 {
  183. p.cond.Broadcast()
  184. } else if p.numberOfWorkers < 0 {
  185. // numberOfWorkers can't go negative but...
  186. log.Warn("Number of Workers < 0 for QID %d - this shouldn't happen", p.qid)
  187. p.numberOfWorkers = 0
  188. p.cond.Broadcast()
  189. }
  190. p.lock.Unlock()
  191. }()
  192. }
  193. }
  194. // Wait for WorkerPool to finish
  195. func (p *WorkerPool) Wait() {
  196. p.lock.Lock()
  197. defer p.lock.Unlock()
  198. if p.cond == nil {
  199. p.cond = sync.NewCond(&p.lock)
  200. }
  201. if p.numberOfWorkers <= 0 {
  202. return
  203. }
  204. p.cond.Wait()
  205. }
  206. // CleanUp will drain the remaining contents of the channel
  207. // This should be called after AddWorkers context is closed
  208. func (p *WorkerPool) CleanUp(ctx context.Context) {
  209. log.Trace("WorkerPool: %d CleanUp", p.qid)
  210. close(p.dataChan)
  211. for data := range p.dataChan {
  212. p.handle(data)
  213. select {
  214. case <-ctx.Done():
  215. log.Warn("WorkerPool: %d Cleanup context closed before finishing clean-up", p.qid)
  216. return
  217. default:
  218. }
  219. }
  220. log.Trace("WorkerPool: %d CleanUp Done", p.qid)
  221. }
  222. func (p *WorkerPool) doWork(ctx context.Context) {
  223. delay := time.Millisecond * 300
  224. var data = make([]Data, 0, p.batchLength)
  225. for {
  226. select {
  227. case <-ctx.Done():
  228. if len(data) > 0 {
  229. log.Trace("Handling: %d data, %v", len(data), data)
  230. p.handle(data...)
  231. }
  232. log.Trace("Worker shutting down")
  233. return
  234. case datum, ok := <-p.dataChan:
  235. if !ok {
  236. // the dataChan has been closed - we should finish up:
  237. if len(data) > 0 {
  238. log.Trace("Handling: %d data, %v", len(data), data)
  239. p.handle(data...)
  240. }
  241. log.Trace("Worker shutting down")
  242. return
  243. }
  244. data = append(data, datum)
  245. if len(data) >= p.batchLength {
  246. log.Trace("Handling: %d data, %v", len(data), data)
  247. p.handle(data...)
  248. data = make([]Data, 0, p.batchLength)
  249. }
  250. default:
  251. timer := time.NewTimer(delay)
  252. select {
  253. case <-ctx.Done():
  254. util.StopTimer(timer)
  255. if len(data) > 0 {
  256. log.Trace("Handling: %d data, %v", len(data), data)
  257. p.handle(data...)
  258. }
  259. log.Trace("Worker shutting down")
  260. return
  261. case datum, ok := <-p.dataChan:
  262. util.StopTimer(timer)
  263. if !ok {
  264. // the dataChan has been closed - we should finish up:
  265. if len(data) > 0 {
  266. log.Trace("Handling: %d data, %v", len(data), data)
  267. p.handle(data...)
  268. }
  269. log.Trace("Worker shutting down")
  270. return
  271. }
  272. data = append(data, datum)
  273. if len(data) >= p.batchLength {
  274. log.Trace("Handling: %d data, %v", len(data), data)
  275. p.handle(data...)
  276. data = make([]Data, 0, p.batchLength)
  277. }
  278. case <-timer.C:
  279. delay = time.Millisecond * 100
  280. if len(data) > 0 {
  281. log.Trace("Handling: %d data, %v", len(data), data)
  282. p.handle(data...)
  283. data = make([]Data, 0, p.batchLength)
  284. }
  285. }
  286. }
  287. }
  288. }