You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

queue_disk_channel.go 7.5KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package queue
  5. import (
  6. "context"
  7. "fmt"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "code.gitea.io/gitea/modules/log"
  12. )
  13. // PersistableChannelQueueType is the type for persistable queue
  14. const PersistableChannelQueueType Type = "persistable-channel"
  15. // PersistableChannelQueueConfiguration is the configuration for a PersistableChannelQueue
  16. type PersistableChannelQueueConfiguration struct {
  17. Name string
  18. DataDir string
  19. BatchLength int
  20. QueueLength int
  21. Timeout time.Duration
  22. MaxAttempts int
  23. Workers int
  24. MaxWorkers int
  25. BlockTimeout time.Duration
  26. BoostTimeout time.Duration
  27. BoostWorkers int
  28. }
  29. // PersistableChannelQueue wraps a channel queue and level queue together
  30. // The disk level queue will be used to store data at shutdown and terminate - and will be restored
  31. // on start up.
  32. type PersistableChannelQueue struct {
  33. channelQueue *ChannelQueue
  34. delayedStarter
  35. lock sync.Mutex
  36. closed chan struct{}
  37. }
  38. // NewPersistableChannelQueue creates a wrapped batched channel queue with persistable level queue backend when shutting down
  39. // This differs from a wrapped queue in that the persistent queue is only used to persist at shutdown/terminate
  40. func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
  41. configInterface, err := toConfig(PersistableChannelQueueConfiguration{}, cfg)
  42. if err != nil {
  43. return nil, err
  44. }
  45. config := configInterface.(PersistableChannelQueueConfiguration)
  46. channelQueue, err := NewChannelQueue(handle, ChannelQueueConfiguration{
  47. WorkerPoolConfiguration: WorkerPoolConfiguration{
  48. QueueLength: config.QueueLength,
  49. BatchLength: config.BatchLength,
  50. BlockTimeout: config.BlockTimeout,
  51. BoostTimeout: config.BoostTimeout,
  52. BoostWorkers: config.BoostWorkers,
  53. MaxWorkers: config.MaxWorkers,
  54. },
  55. Workers: config.Workers,
  56. Name: config.Name + "-channel",
  57. }, exemplar)
  58. if err != nil {
  59. return nil, err
  60. }
  61. // the level backend only needs temporary workers to catch up with the previously dropped work
  62. levelCfg := LevelQueueConfiguration{
  63. ByteFIFOQueueConfiguration: ByteFIFOQueueConfiguration{
  64. WorkerPoolConfiguration: WorkerPoolConfiguration{
  65. QueueLength: config.QueueLength,
  66. BatchLength: config.BatchLength,
  67. BlockTimeout: 1 * time.Second,
  68. BoostTimeout: 5 * time.Minute,
  69. BoostWorkers: 5,
  70. MaxWorkers: 6,
  71. },
  72. Workers: 1,
  73. Name: config.Name + "-level",
  74. },
  75. DataDir: config.DataDir,
  76. }
  77. levelQueue, err := NewLevelQueue(handle, levelCfg, exemplar)
  78. if err == nil {
  79. queue := &PersistableChannelQueue{
  80. channelQueue: channelQueue.(*ChannelQueue),
  81. delayedStarter: delayedStarter{
  82. internal: levelQueue.(*LevelQueue),
  83. name: config.Name,
  84. },
  85. closed: make(chan struct{}),
  86. }
  87. _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar)
  88. return queue, nil
  89. }
  90. if IsErrInvalidConfiguration(err) {
  91. // Retrying ain't gonna make this any better...
  92. return nil, ErrInvalidConfiguration{cfg: cfg}
  93. }
  94. queue := &PersistableChannelQueue{
  95. channelQueue: channelQueue.(*ChannelQueue),
  96. delayedStarter: delayedStarter{
  97. cfg: levelCfg,
  98. underlying: LevelQueueType,
  99. timeout: config.Timeout,
  100. maxAttempts: config.MaxAttempts,
  101. name: config.Name,
  102. },
  103. closed: make(chan struct{}),
  104. }
  105. _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar)
  106. return queue, nil
  107. }
  108. // Name returns the name of this queue
  109. func (q *PersistableChannelQueue) Name() string {
  110. return q.delayedStarter.name
  111. }
  112. // Push will push the indexer data to queue
  113. func (q *PersistableChannelQueue) Push(data Data) error {
  114. select {
  115. case <-q.closed:
  116. return q.internal.Push(data)
  117. default:
  118. return q.channelQueue.Push(data)
  119. }
  120. }
  121. // Run starts to run the queue
  122. func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
  123. log.Debug("PersistableChannelQueue: %s Starting", q.delayedStarter.name)
  124. q.lock.Lock()
  125. if q.internal == nil {
  126. err := q.setInternal(atShutdown, q.channelQueue.handle, q.channelQueue.exemplar)
  127. q.lock.Unlock()
  128. if err != nil {
  129. log.Fatal("Unable to create internal queue for %s Error: %v", q.Name(), err)
  130. return
  131. }
  132. } else {
  133. q.lock.Unlock()
  134. }
  135. atShutdown(context.Background(), q.Shutdown)
  136. atTerminate(context.Background(), q.Terminate)
  137. // Just run the level queue - we shut it down later
  138. go q.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {})
  139. go func() {
  140. _ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0)
  141. }()
  142. log.Trace("PersistableChannelQueue: %s Waiting til closed", q.delayedStarter.name)
  143. <-q.closed
  144. log.Trace("PersistableChannelQueue: %s Cancelling pools", q.delayedStarter.name)
  145. q.channelQueue.cancel()
  146. q.internal.(*LevelQueue).cancel()
  147. log.Trace("PersistableChannelQueue: %s Waiting til done", q.delayedStarter.name)
  148. q.channelQueue.Wait()
  149. q.internal.(*LevelQueue).Wait()
  150. // Redirect all remaining data in the chan to the internal channel
  151. go func() {
  152. log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
  153. for data := range q.channelQueue.dataChan {
  154. _ = q.internal.Push(data)
  155. atomic.AddInt64(&q.channelQueue.numInQueue, -1)
  156. }
  157. log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
  158. }()
  159. log.Trace("PersistableChannelQueue: %s Done main loop", q.delayedStarter.name)
  160. }
  161. // Flush flushes the queue and blocks till the queue is empty
  162. func (q *PersistableChannelQueue) Flush(timeout time.Duration) error {
  163. var ctx context.Context
  164. var cancel context.CancelFunc
  165. if timeout > 0 {
  166. ctx, cancel = context.WithTimeout(context.Background(), timeout)
  167. } else {
  168. ctx, cancel = context.WithCancel(context.Background())
  169. }
  170. defer cancel()
  171. return q.FlushWithContext(ctx)
  172. }
  173. // FlushWithContext flushes the queue and blocks till the queue is empty
  174. func (q *PersistableChannelQueue) FlushWithContext(ctx context.Context) error {
  175. errChan := make(chan error, 1)
  176. go func() {
  177. errChan <- q.channelQueue.FlushWithContext(ctx)
  178. }()
  179. go func() {
  180. q.lock.Lock()
  181. if q.internal == nil {
  182. q.lock.Unlock()
  183. errChan <- fmt.Errorf("not ready to flush internal queue %s yet", q.Name())
  184. return
  185. }
  186. q.lock.Unlock()
  187. errChan <- q.internal.FlushWithContext(ctx)
  188. }()
  189. err1 := <-errChan
  190. err2 := <-errChan
  191. if err1 != nil {
  192. return err1
  193. }
  194. return err2
  195. }
  196. // IsEmpty checks if a queue is empty
  197. func (q *PersistableChannelQueue) IsEmpty() bool {
  198. if !q.channelQueue.IsEmpty() {
  199. return false
  200. }
  201. q.lock.Lock()
  202. defer q.lock.Unlock()
  203. if q.internal == nil {
  204. return false
  205. }
  206. return q.internal.IsEmpty()
  207. }
  208. // Shutdown processing this queue
  209. func (q *PersistableChannelQueue) Shutdown() {
  210. log.Trace("PersistableChannelQueue: %s Shutting down", q.delayedStarter.name)
  211. q.lock.Lock()
  212. defer q.lock.Unlock()
  213. select {
  214. case <-q.closed:
  215. default:
  216. if q.internal != nil {
  217. q.internal.(*LevelQueue).Shutdown()
  218. }
  219. close(q.closed)
  220. log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name)
  221. }
  222. }
  223. // Terminate this queue and close the queue
  224. func (q *PersistableChannelQueue) Terminate() {
  225. log.Trace("PersistableChannelQueue: %s Terminating", q.delayedStarter.name)
  226. q.Shutdown()
  227. q.lock.Lock()
  228. defer q.lock.Unlock()
  229. if q.internal != nil {
  230. q.internal.(*LevelQueue).Terminate()
  231. }
  232. log.Debug("PersistableChannelQueue: %s Terminated", q.delayedStarter.name)
  233. }
  234. func init() {
  235. queuesMap[PersistableChannelQueueType] = NewPersistableChannelQueue
  236. }