You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

unique_queue_disk_channel.go 7.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. // Copyright 2020 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package queue
  5. import (
  6. "context"
  7. "sync"
  8. "time"
  9. "code.gitea.io/gitea/modules/log"
  10. )
  11. // PersistableChannelUniqueQueueType is the type for persistable queue
  12. const PersistableChannelUniqueQueueType Type = "unique-persistable-channel"
  13. // PersistableChannelUniqueQueueConfiguration is the configuration for a PersistableChannelUniqueQueue
  14. type PersistableChannelUniqueQueueConfiguration struct {
  15. Name string
  16. DataDir string
  17. BatchLength int
  18. QueueLength int
  19. Timeout time.Duration
  20. MaxAttempts int
  21. Workers int
  22. MaxWorkers int
  23. BlockTimeout time.Duration
  24. BoostTimeout time.Duration
  25. BoostWorkers int
  26. }
  27. // PersistableChannelUniqueQueue wraps a channel queue and level queue together
  28. //
  29. // Please note that this Queue does not guarantee that a particular
  30. // task cannot be processed twice or more at the same time. Uniqueness is
  31. // only guaranteed whilst the task is waiting in the queue.
  32. type PersistableChannelUniqueQueue struct {
  33. *ChannelUniqueQueue
  34. delayedStarter
  35. lock sync.Mutex
  36. closed chan struct{}
  37. }
  38. // NewPersistableChannelUniqueQueue creates a wrapped batched channel queue with persistable level queue backend when shutting down
  39. // This differs from a wrapped queue in that the persistent queue is only used to persist at shutdown/terminate
  40. func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
  41. configInterface, err := toConfig(PersistableChannelUniqueQueueConfiguration{}, cfg)
  42. if err != nil {
  43. return nil, err
  44. }
  45. config := configInterface.(PersistableChannelUniqueQueueConfiguration)
  46. channelUniqueQueue, err := NewChannelUniqueQueue(handle, ChannelUniqueQueueConfiguration{
  47. WorkerPoolConfiguration: WorkerPoolConfiguration{
  48. QueueLength: config.QueueLength,
  49. BatchLength: config.BatchLength,
  50. BlockTimeout: config.BlockTimeout,
  51. BoostTimeout: config.BoostTimeout,
  52. BoostWorkers: config.BoostWorkers,
  53. MaxWorkers: config.MaxWorkers,
  54. },
  55. Workers: config.Workers,
  56. Name: config.Name + "-channel",
  57. }, exemplar)
  58. if err != nil {
  59. return nil, err
  60. }
  61. // the level backend only needs temporary workers to catch up with the previously dropped work
  62. levelCfg := LevelUniqueQueueConfiguration{
  63. ByteFIFOQueueConfiguration: ByteFIFOQueueConfiguration{
  64. WorkerPoolConfiguration: WorkerPoolConfiguration{
  65. QueueLength: config.QueueLength,
  66. BatchLength: config.BatchLength,
  67. BlockTimeout: 0,
  68. BoostTimeout: 0,
  69. BoostWorkers: 0,
  70. MaxWorkers: 1,
  71. },
  72. Workers: 1,
  73. Name: config.Name + "-level",
  74. },
  75. DataDir: config.DataDir,
  76. }
  77. queue := &PersistableChannelUniqueQueue{
  78. ChannelUniqueQueue: channelUniqueQueue.(*ChannelUniqueQueue),
  79. closed: make(chan struct{}),
  80. }
  81. levelQueue, err := NewLevelUniqueQueue(func(data ...Data) {
  82. for _, datum := range data {
  83. err := queue.Push(datum)
  84. if err != nil && err != ErrAlreadyInQueue {
  85. log.Error("Unable push to channelled queue: %v", err)
  86. }
  87. }
  88. }, levelCfg, exemplar)
  89. if err == nil {
  90. queue.delayedStarter = delayedStarter{
  91. internal: levelQueue.(*LevelUniqueQueue),
  92. name: config.Name,
  93. }
  94. _ = GetManager().Add(queue, PersistableChannelUniqueQueueType, config, exemplar)
  95. return queue, nil
  96. }
  97. if IsErrInvalidConfiguration(err) {
  98. // Retrying ain't gonna make this any better...
  99. return nil, ErrInvalidConfiguration{cfg: cfg}
  100. }
  101. queue.delayedStarter = delayedStarter{
  102. cfg: levelCfg,
  103. underlying: LevelUniqueQueueType,
  104. timeout: config.Timeout,
  105. maxAttempts: config.MaxAttempts,
  106. name: config.Name,
  107. }
  108. _ = GetManager().Add(queue, PersistableChannelUniqueQueueType, config, exemplar)
  109. return queue, nil
  110. }
  111. // Name returns the name of this queue
  112. func (q *PersistableChannelUniqueQueue) Name() string {
  113. return q.delayedStarter.name
  114. }
  115. // Push will push the indexer data to queue
  116. func (q *PersistableChannelUniqueQueue) Push(data Data) error {
  117. return q.PushFunc(data, nil)
  118. }
  119. // PushFunc will push the indexer data to queue
  120. func (q *PersistableChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
  121. select {
  122. case <-q.closed:
  123. return q.internal.(UniqueQueue).PushFunc(data, fn)
  124. default:
  125. return q.ChannelUniqueQueue.PushFunc(data, fn)
  126. }
  127. }
  128. // Has will test if the queue has the data
  129. func (q *PersistableChannelUniqueQueue) Has(data Data) (bool, error) {
  130. // This is more difficult...
  131. has, err := q.ChannelUniqueQueue.Has(data)
  132. if err != nil || has {
  133. return has, err
  134. }
  135. return q.internal.(UniqueQueue).Has(data)
  136. }
  137. // Run starts to run the queue
  138. func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
  139. log.Debug("PersistableChannelUniqueQueue: %s Starting", q.delayedStarter.name)
  140. q.lock.Lock()
  141. if q.internal == nil {
  142. err := q.setInternal(atShutdown, func(data ...Data) {
  143. for _, datum := range data {
  144. err := q.Push(datum)
  145. if err != nil && err != ErrAlreadyInQueue {
  146. log.Error("Unable push to channelled queue: %v", err)
  147. }
  148. }
  149. }, q.exemplar)
  150. q.lock.Unlock()
  151. if err != nil {
  152. log.Fatal("Unable to create internal queue for %s Error: %v", q.Name(), err)
  153. return
  154. }
  155. } else {
  156. q.lock.Unlock()
  157. }
  158. atShutdown(context.Background(), q.Shutdown)
  159. atTerminate(context.Background(), q.Terminate)
  160. // Just run the level queue - we shut it down later
  161. go q.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {})
  162. go func() {
  163. _ = q.ChannelUniqueQueue.AddWorkers(q.workers, 0)
  164. }()
  165. log.Trace("PersistableChannelUniqueQueue: %s Waiting til closed", q.delayedStarter.name)
  166. <-q.closed
  167. log.Trace("PersistableChannelUniqueQueue: %s Cancelling pools", q.delayedStarter.name)
  168. q.internal.(*LevelUniqueQueue).cancel()
  169. q.ChannelUniqueQueue.cancel()
  170. log.Trace("PersistableChannelUniqueQueue: %s Waiting til done", q.delayedStarter.name)
  171. q.ChannelUniqueQueue.Wait()
  172. q.internal.(*LevelUniqueQueue).Wait()
  173. // Redirect all remaining data in the chan to the internal channel
  174. go func() {
  175. log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name)
  176. for data := range q.ChannelUniqueQueue.dataChan {
  177. _ = q.internal.Push(data)
  178. }
  179. log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
  180. }()
  181. log.Trace("PersistableChannelUniqueQueue: %s Done main loop", q.delayedStarter.name)
  182. }
  183. // Flush flushes the queue
  184. func (q *PersistableChannelUniqueQueue) Flush(timeout time.Duration) error {
  185. return q.ChannelUniqueQueue.Flush(timeout)
  186. }
  187. // Shutdown processing this queue
  188. func (q *PersistableChannelUniqueQueue) Shutdown() {
  189. log.Trace("PersistableChannelUniqueQueue: %s Shutting down", q.delayedStarter.name)
  190. q.lock.Lock()
  191. defer q.lock.Unlock()
  192. select {
  193. case <-q.closed:
  194. default:
  195. if q.internal != nil {
  196. q.internal.(*LevelUniqueQueue).Shutdown()
  197. }
  198. close(q.closed)
  199. }
  200. log.Debug("PersistableChannelUniqueQueue: %s Shutdown", q.delayedStarter.name)
  201. }
  202. // Terminate this queue and close the queue
  203. func (q *PersistableChannelUniqueQueue) Terminate() {
  204. log.Trace("PersistableChannelUniqueQueue: %s Terminating", q.delayedStarter.name)
  205. q.Shutdown()
  206. q.lock.Lock()
  207. defer q.lock.Unlock()
  208. if q.internal != nil {
  209. q.internal.(*LevelUniqueQueue).Terminate()
  210. }
  211. log.Debug("PersistableChannelUniqueQueue: %s Terminated", q.delayedStarter.name)
  212. }
  213. func init() {
  214. queuesMap[PersistableChannelUniqueQueueType] = NewPersistableChannelUniqueQueue
  215. }