You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

queue_disk_channel.go 5.7KB

Graceful Queues: Issue Indexing and Tasks (#9363) * Queue: Add generic graceful queues with settings * Queue & Setting: Add worker pool implementation * Queue: Add worker settings * Queue: Make resizing worker pools * Queue: Add name variable to queues * Queue: Add monitoring * Queue: Improve logging * Issues: Gracefulise the issues indexer Remove the old now unused specific queues * Task: Move to generic queue and gracefulise * Issues: Standardise the issues indexer queue settings * Fix test * Queue: Allow Redis to connect to unix * Prevent deadlock during early shutdown of issue indexer * Add MaxWorker settings to queues * Merge branch 'master' into graceful-queues * Update modules/indexer/issues/indexer.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/indexer/issues/indexer.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/queue/queue_channel.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/queue/queue_disk.go * Update modules/queue/queue_disk_channel.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Rename queue.Description to queue.ManagedQueue as per @guillep2k * Cancel pool workers when removed * Remove dependency on queue from setting * Update modules/queue/queue_redis.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * As per @guillep2k add mutex locks on shutdown/terminate * move unlocking out of setInternal * Add warning if number of workers < 0 * Small changes as per @guillep2k * No redis host specified not found * Clean up documentation for queues * Update docs/content/doc/advanced/config-cheat-sheet.en-us.md * Update modules/indexer/issues/indexer_test.go * Ensure that persistable channel queue is added to manager * Rename QUEUE_NAME REDIS_QUEUE_NAME * Revert "Rename QUEUE_NAME REDIS_QUEUE_NAME" This reverts commit 1f83b4fc9b9dabda186257b38c265fe7012f90df. Co-authored-by: guillep2k <18600385+guillep2k@users.noreply.github.com> Co-authored-by: Lauris BH <lauris@nix.lv> Co-authored-by: techknowlogick <matti@mdranta.net> Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
4 years ago
Graceful Queues: Issue Indexing and Tasks (#9363) * Queue: Add generic graceful queues with settings * Queue & Setting: Add worker pool implementation * Queue: Add worker settings * Queue: Make resizing worker pools * Queue: Add name variable to queues * Queue: Add monitoring * Queue: Improve logging * Issues: Gracefulise the issues indexer Remove the old now unused specific queues * Task: Move to generic queue and gracefulise * Issues: Standardise the issues indexer queue settings * Fix test * Queue: Allow Redis to connect to unix * Prevent deadlock during early shutdown of issue indexer * Add MaxWorker settings to queues * Merge branch 'master' into graceful-queues * Update modules/indexer/issues/indexer.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/indexer/issues/indexer.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/queue/queue_channel.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/queue/queue_disk.go * Update modules/queue/queue_disk_channel.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Rename queue.Description to queue.ManagedQueue as per @guillep2k * Cancel pool workers when removed * Remove dependency on queue from setting * Update modules/queue/queue_redis.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * As per @guillep2k add mutex locks on shutdown/terminate * move unlocking out of setInternal * Add warning if number of workers < 0 * Small changes as per @guillep2k * No redis host specified not found * Clean up documentation for queues * Update docs/content/doc/advanced/config-cheat-sheet.en-us.md * Update modules/indexer/issues/indexer_test.go * Ensure that persistable channel queue is added to manager * Rename QUEUE_NAME REDIS_QUEUE_NAME * Revert "Rename QUEUE_NAME REDIS_QUEUE_NAME" This reverts commit 1f83b4fc9b9dabda186257b38c265fe7012f90df. Co-authored-by: guillep2k <18600385+guillep2k@users.noreply.github.com> Co-authored-by: Lauris BH <lauris@nix.lv> Co-authored-by: techknowlogick <matti@mdranta.net> Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
4 years ago
Graceful Queues: Issue Indexing and Tasks (#9363) * Queue: Add generic graceful queues with settings * Queue & Setting: Add worker pool implementation * Queue: Add worker settings * Queue: Make resizing worker pools * Queue: Add name variable to queues * Queue: Add monitoring * Queue: Improve logging * Issues: Gracefulise the issues indexer Remove the old now unused specific queues * Task: Move to generic queue and gracefulise * Issues: Standardise the issues indexer queue settings * Fix test * Queue: Allow Redis to connect to unix * Prevent deadlock during early shutdown of issue indexer * Add MaxWorker settings to queues * Merge branch 'master' into graceful-queues * Update modules/indexer/issues/indexer.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/indexer/issues/indexer.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/queue/queue_channel.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/queue/queue_disk.go * Update modules/queue/queue_disk_channel.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Rename queue.Description to queue.ManagedQueue as per @guillep2k * Cancel pool workers when removed * Remove dependency on queue from setting * Update modules/queue/queue_redis.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * As per @guillep2k add mutex locks on shutdown/terminate * move unlocking out of setInternal * Add warning if number of workers < 0 * Small changes as per @guillep2k * No redis host specified not found * Clean up documentation for queues * Update docs/content/doc/advanced/config-cheat-sheet.en-us.md * Update modules/indexer/issues/indexer_test.go * Ensure that persistable channel queue is added to manager * Rename QUEUE_NAME REDIS_QUEUE_NAME * Revert "Rename QUEUE_NAME REDIS_QUEUE_NAME" This reverts commit 1f83b4fc9b9dabda186257b38c265fe7012f90df. Co-authored-by: guillep2k <18600385+guillep2k@users.noreply.github.com> Co-authored-by: Lauris BH <lauris@nix.lv> Co-authored-by: techknowlogick <matti@mdranta.net> Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
4 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package queue
  5. import (
  6. "context"
  7. "sync"
  8. "time"
  9. "code.gitea.io/gitea/modules/log"
  10. )
  11. // PersistableChannelQueueType is the type for persistable queue
  12. const PersistableChannelQueueType Type = "persistable-channel"
  13. // PersistableChannelQueueConfiguration is the configuration for a PersistableChannelQueue
  14. type PersistableChannelQueueConfiguration struct {
  15. Name string
  16. DataDir string
  17. BatchLength int
  18. QueueLength int
  19. Timeout time.Duration
  20. MaxAttempts int
  21. Workers int
  22. MaxWorkers int
  23. BlockTimeout time.Duration
  24. BoostTimeout time.Duration
  25. BoostWorkers int
  26. }
  27. // PersistableChannelQueue wraps a channel queue and level queue together
  28. type PersistableChannelQueue struct {
  29. *ChannelQueue
  30. delayedStarter
  31. lock sync.Mutex
  32. closed chan struct{}
  33. }
  34. // NewPersistableChannelQueue creates a wrapped batched channel queue with persistable level queue backend when shutting down
  35. // This differs from a wrapped queue in that the persistent queue is only used to persist at shutdown/terminate
  36. func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
  37. configInterface, err := toConfig(PersistableChannelQueueConfiguration{}, cfg)
  38. if err != nil {
  39. return nil, err
  40. }
  41. config := configInterface.(PersistableChannelQueueConfiguration)
  42. channelQueue, err := NewChannelQueue(handle, ChannelQueueConfiguration{
  43. QueueLength: config.QueueLength,
  44. BatchLength: config.BatchLength,
  45. Workers: config.Workers,
  46. MaxWorkers: config.MaxWorkers,
  47. BlockTimeout: config.BlockTimeout,
  48. BoostTimeout: config.BoostTimeout,
  49. BoostWorkers: config.BoostWorkers,
  50. Name: config.Name + "-channel",
  51. }, exemplar)
  52. if err != nil {
  53. return nil, err
  54. }
  55. // the level backend only needs temporary workers to catch up with the previously dropped work
  56. levelCfg := LevelQueueConfiguration{
  57. DataDir: config.DataDir,
  58. QueueLength: config.QueueLength,
  59. BatchLength: config.BatchLength,
  60. Workers: 1,
  61. MaxWorkers: 6,
  62. BlockTimeout: 1 * time.Second,
  63. BoostTimeout: 5 * time.Minute,
  64. BoostWorkers: 5,
  65. Name: config.Name + "-level",
  66. }
  67. levelQueue, err := NewLevelQueue(handle, levelCfg, exemplar)
  68. if err == nil {
  69. queue := &PersistableChannelQueue{
  70. ChannelQueue: channelQueue.(*ChannelQueue),
  71. delayedStarter: delayedStarter{
  72. internal: levelQueue.(*LevelQueue),
  73. name: config.Name,
  74. },
  75. closed: make(chan struct{}),
  76. }
  77. _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar, nil)
  78. return queue, nil
  79. }
  80. if IsErrInvalidConfiguration(err) {
  81. // Retrying ain't gonna make this any better...
  82. return nil, ErrInvalidConfiguration{cfg: cfg}
  83. }
  84. queue := &PersistableChannelQueue{
  85. ChannelQueue: channelQueue.(*ChannelQueue),
  86. delayedStarter: delayedStarter{
  87. cfg: levelCfg,
  88. underlying: LevelQueueType,
  89. timeout: config.Timeout,
  90. maxAttempts: config.MaxAttempts,
  91. name: config.Name,
  92. },
  93. closed: make(chan struct{}),
  94. }
  95. _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar, nil)
  96. return queue, nil
  97. }
  98. // Name returns the name of this queue
  99. func (p *PersistableChannelQueue) Name() string {
  100. return p.delayedStarter.name
  101. }
  102. // Push will push the indexer data to queue
  103. func (p *PersistableChannelQueue) Push(data Data) error {
  104. select {
  105. case <-p.closed:
  106. return p.internal.Push(data)
  107. default:
  108. return p.ChannelQueue.Push(data)
  109. }
  110. }
  111. // Run starts to run the queue
  112. func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
  113. p.lock.Lock()
  114. if p.internal == nil {
  115. err := p.setInternal(atShutdown, p.ChannelQueue.pool.handle, p.exemplar)
  116. p.lock.Unlock()
  117. if err != nil {
  118. log.Fatal("Unable to create internal queue for %s Error: %v", p.Name(), err)
  119. return
  120. }
  121. } else {
  122. p.lock.Unlock()
  123. }
  124. atShutdown(context.Background(), p.Shutdown)
  125. atTerminate(context.Background(), p.Terminate)
  126. // Just run the level queue - we shut it down later
  127. go p.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {})
  128. go func() {
  129. _ = p.ChannelQueue.pool.AddWorkers(p.workers, 0)
  130. }()
  131. log.Trace("PersistableChannelQueue: %s Waiting til closed", p.delayedStarter.name)
  132. <-p.closed
  133. log.Trace("PersistableChannelQueue: %s Cancelling pools", p.delayedStarter.name)
  134. p.ChannelQueue.pool.cancel()
  135. p.internal.(*LevelQueue).pool.cancel()
  136. log.Trace("PersistableChannelQueue: %s Waiting til done", p.delayedStarter.name)
  137. p.ChannelQueue.pool.Wait()
  138. p.internal.(*LevelQueue).pool.Wait()
  139. // Redirect all remaining data in the chan to the internal channel
  140. go func() {
  141. log.Trace("PersistableChannelQueue: %s Redirecting remaining data", p.delayedStarter.name)
  142. for data := range p.ChannelQueue.pool.dataChan {
  143. _ = p.internal.Push(data)
  144. }
  145. log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", p.delayedStarter.name)
  146. }()
  147. log.Trace("PersistableChannelQueue: %s Done main loop", p.delayedStarter.name)
  148. }
  149. // Shutdown processing this queue
  150. func (p *PersistableChannelQueue) Shutdown() {
  151. log.Trace("PersistableChannelQueue: %s Shutdown", p.delayedStarter.name)
  152. select {
  153. case <-p.closed:
  154. default:
  155. p.lock.Lock()
  156. defer p.lock.Unlock()
  157. if p.internal != nil {
  158. p.internal.(*LevelQueue).Shutdown()
  159. }
  160. close(p.closed)
  161. }
  162. }
  163. // Terminate this queue and close the queue
  164. func (p *PersistableChannelQueue) Terminate() {
  165. log.Trace("PersistableChannelQueue: %s Terminating", p.delayedStarter.name)
  166. p.Shutdown()
  167. p.lock.Lock()
  168. defer p.lock.Unlock()
  169. if p.internal != nil {
  170. p.internal.(*LevelQueue).Terminate()
  171. }
  172. }
  173. func init() {
  174. queuesMap[PersistableChannelQueueType] = NewPersistableChannelQueue
  175. }