You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

queue_disk_channel.go 5.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package queue
  5. import (
  6. "context"
  7. "sync"
  8. "time"
  9. "code.gitea.io/gitea/modules/log"
  10. )
  11. // PersistableChannelQueueType is the type for persistable queue
  12. const PersistableChannelQueueType Type = "persistable-channel"
  13. // PersistableChannelQueueConfiguration is the configuration for a PersistableChannelQueue
  14. type PersistableChannelQueueConfiguration struct {
  15. Name string
  16. DataDir string
  17. BatchLength int
  18. QueueLength int
  19. Timeout time.Duration
  20. MaxAttempts int
  21. Workers int
  22. MaxWorkers int
  23. BlockTimeout time.Duration
  24. BoostTimeout time.Duration
  25. BoostWorkers int
  26. }
  27. // PersistableChannelQueue wraps a channel queue and level queue together
  28. type PersistableChannelQueue struct {
  29. *ChannelQueue
  30. delayedStarter
  31. lock sync.Mutex
  32. closed chan struct{}
  33. }
  34. // NewPersistableChannelQueue creates a wrapped batched channel queue with persistable level queue backend when shutting down
  35. // This differs from a wrapped queue in that the persistent queue is only used to persist at shutdown/terminate
  36. func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
  37. configInterface, err := toConfig(PersistableChannelQueueConfiguration{}, cfg)
  38. if err != nil {
  39. return nil, err
  40. }
  41. config := configInterface.(PersistableChannelQueueConfiguration)
  42. channelQueue, err := NewChannelQueue(handle, ChannelQueueConfiguration{
  43. QueueLength: config.QueueLength,
  44. BatchLength: config.BatchLength,
  45. Workers: config.Workers,
  46. MaxWorkers: config.MaxWorkers,
  47. BlockTimeout: config.BlockTimeout,
  48. BoostTimeout: config.BoostTimeout,
  49. BoostWorkers: config.BoostWorkers,
  50. Name: config.Name + "-channel",
  51. }, exemplar)
  52. if err != nil {
  53. return nil, err
  54. }
  55. // the level backend only needs temporary workers to catch up with the previously dropped work
  56. levelCfg := LevelQueueConfiguration{
  57. DataDir: config.DataDir,
  58. QueueLength: config.QueueLength,
  59. BatchLength: config.BatchLength,
  60. Workers: 1,
  61. MaxWorkers: 6,
  62. BlockTimeout: 1 * time.Second,
  63. BoostTimeout: 5 * time.Minute,
  64. BoostWorkers: 5,
  65. Name: config.Name + "-level",
  66. }
  67. levelQueue, err := NewLevelQueue(handle, levelCfg, exemplar)
  68. if err == nil {
  69. queue := &PersistableChannelQueue{
  70. ChannelQueue: channelQueue.(*ChannelQueue),
  71. delayedStarter: delayedStarter{
  72. internal: levelQueue.(*LevelQueue),
  73. name: config.Name,
  74. },
  75. closed: make(chan struct{}),
  76. }
  77. _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar, nil)
  78. return queue, nil
  79. }
  80. if IsErrInvalidConfiguration(err) {
  81. // Retrying ain't gonna make this any better...
  82. return nil, ErrInvalidConfiguration{cfg: cfg}
  83. }
  84. queue := &PersistableChannelQueue{
  85. ChannelQueue: channelQueue.(*ChannelQueue),
  86. delayedStarter: delayedStarter{
  87. cfg: levelCfg,
  88. underlying: LevelQueueType,
  89. timeout: config.Timeout,
  90. maxAttempts: config.MaxAttempts,
  91. name: config.Name,
  92. },
  93. closed: make(chan struct{}),
  94. }
  95. _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar, nil)
  96. return queue, nil
  97. }
  98. // Name returns the name of this queue
  99. func (p *PersistableChannelQueue) Name() string {
  100. return p.delayedStarter.name
  101. }
  102. // Push will push the indexer data to queue
  103. func (p *PersistableChannelQueue) Push(data Data) error {
  104. select {
  105. case <-p.closed:
  106. return p.internal.Push(data)
  107. default:
  108. return p.ChannelQueue.Push(data)
  109. }
  110. }
  111. // Run starts to run the queue
  112. func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
  113. p.lock.Lock()
  114. if p.internal == nil {
  115. err := p.setInternal(atShutdown, p.ChannelQueue.pool.handle, p.exemplar)
  116. p.lock.Unlock()
  117. if err != nil {
  118. log.Fatal("Unable to create internal queue for %s Error: %v", p.Name(), err)
  119. return
  120. }
  121. } else {
  122. p.lock.Unlock()
  123. }
  124. atShutdown(context.Background(), p.Shutdown)
  125. atTerminate(context.Background(), p.Terminate)
  126. // Just run the level queue - we shut it down later
  127. go p.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {})
  128. go func() {
  129. _ = p.ChannelQueue.pool.AddWorkers(p.workers, 0)
  130. }()
  131. log.Trace("PersistableChannelQueue: %s Waiting til closed", p.delayedStarter.name)
  132. <-p.closed
  133. log.Trace("PersistableChannelQueue: %s Cancelling pools", p.delayedStarter.name)
  134. p.ChannelQueue.pool.cancel()
  135. p.internal.(*LevelQueue).pool.cancel()
  136. log.Trace("PersistableChannelQueue: %s Waiting til done", p.delayedStarter.name)
  137. p.ChannelQueue.pool.Wait()
  138. p.internal.(*LevelQueue).pool.Wait()
  139. // Redirect all remaining data in the chan to the internal channel
  140. go func() {
  141. log.Trace("PersistableChannelQueue: %s Redirecting remaining data", p.delayedStarter.name)
  142. for data := range p.ChannelQueue.pool.dataChan {
  143. _ = p.internal.Push(data)
  144. }
  145. log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", p.delayedStarter.name)
  146. }()
  147. log.Trace("PersistableChannelQueue: %s Done main loop", p.delayedStarter.name)
  148. }
  149. // Shutdown processing this queue
  150. func (p *PersistableChannelQueue) Shutdown() {
  151. log.Trace("PersistableChannelQueue: %s Shutdown", p.delayedStarter.name)
  152. select {
  153. case <-p.closed:
  154. default:
  155. p.lock.Lock()
  156. defer p.lock.Unlock()
  157. if p.internal != nil {
  158. p.internal.(*LevelQueue).Shutdown()
  159. }
  160. close(p.closed)
  161. }
  162. }
  163. // Terminate this queue and close the queue
  164. func (p *PersistableChannelQueue) Terminate() {
  165. log.Trace("PersistableChannelQueue: %s Terminating", p.delayedStarter.name)
  166. p.Shutdown()
  167. p.lock.Lock()
  168. defer p.lock.Unlock()
  169. if p.internal != nil {
  170. p.internal.(*LevelQueue).Terminate()
  171. }
  172. }
  173. func init() {
  174. queuesMap[PersistableChannelQueueType] = NewPersistableChannelQueue
  175. }