You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

queue_disk.go 4.9KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package queue
  5. import (
  6. "context"
  7. "encoding/json"
  8. "fmt"
  9. "reflect"
  10. "sync"
  11. "time"
  12. "code.gitea.io/gitea/modules/log"
  13. "gitea.com/lunny/levelqueue"
  14. )
  15. // LevelQueueType is the type for level queue
  16. const LevelQueueType Type = "level"
  17. // LevelQueueConfiguration is the configuration for a LevelQueue
  18. type LevelQueueConfiguration struct {
  19. DataDir string
  20. QueueLength int
  21. BatchLength int
  22. Workers int
  23. MaxWorkers int
  24. BlockTimeout time.Duration
  25. BoostTimeout time.Duration
  26. BoostWorkers int
  27. Name string
  28. }
  29. // LevelQueue implements a disk library queue
  30. type LevelQueue struct {
  31. pool *WorkerPool
  32. queue *levelqueue.Queue
  33. closed chan struct{}
  34. terminated chan struct{}
  35. lock sync.Mutex
  36. exemplar interface{}
  37. workers int
  38. name string
  39. }
  40. // NewLevelQueue creates a ledis local queue
  41. func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
  42. configInterface, err := toConfig(LevelQueueConfiguration{}, cfg)
  43. if err != nil {
  44. return nil, err
  45. }
  46. config := configInterface.(LevelQueueConfiguration)
  47. internal, err := levelqueue.Open(config.DataDir)
  48. if err != nil {
  49. return nil, err
  50. }
  51. dataChan := make(chan Data, config.QueueLength)
  52. ctx, cancel := context.WithCancel(context.Background())
  53. queue := &LevelQueue{
  54. pool: &WorkerPool{
  55. baseCtx: ctx,
  56. cancel: cancel,
  57. batchLength: config.BatchLength,
  58. handle: handle,
  59. dataChan: dataChan,
  60. blockTimeout: config.BlockTimeout,
  61. boostTimeout: config.BoostTimeout,
  62. boostWorkers: config.BoostWorkers,
  63. maxNumberOfWorkers: config.MaxWorkers,
  64. },
  65. queue: internal,
  66. exemplar: exemplar,
  67. closed: make(chan struct{}),
  68. terminated: make(chan struct{}),
  69. workers: config.Workers,
  70. name: config.Name,
  71. }
  72. queue.pool.qid = GetManager().Add(queue, LevelQueueType, config, exemplar, queue.pool)
  73. return queue, nil
  74. }
  75. // Run starts to run the queue
  76. func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
  77. atShutdown(context.Background(), l.Shutdown)
  78. atTerminate(context.Background(), l.Terminate)
  79. go func() {
  80. _ = l.pool.AddWorkers(l.workers, 0)
  81. }()
  82. go l.readToChan()
  83. log.Trace("LevelQueue: %s Waiting til closed", l.name)
  84. <-l.closed
  85. log.Trace("LevelQueue: %s Waiting til done", l.name)
  86. l.pool.Wait()
  87. log.Trace("LevelQueue: %s Waiting til cleaned", l.name)
  88. ctx, cancel := context.WithCancel(context.Background())
  89. atTerminate(ctx, cancel)
  90. l.pool.CleanUp(ctx)
  91. cancel()
  92. log.Trace("LevelQueue: %s Cleaned", l.name)
  93. }
  94. func (l *LevelQueue) readToChan() {
  95. for {
  96. select {
  97. case <-l.closed:
  98. // tell the pool to shutdown.
  99. l.pool.cancel()
  100. return
  101. default:
  102. bs, err := l.queue.RPop()
  103. if err != nil {
  104. if err != levelqueue.ErrNotFound {
  105. log.Error("LevelQueue: %s Error on RPop: %v", l.name, err)
  106. }
  107. time.Sleep(time.Millisecond * 100)
  108. continue
  109. }
  110. if len(bs) == 0 {
  111. time.Sleep(time.Millisecond * 100)
  112. continue
  113. }
  114. var data Data
  115. if l.exemplar != nil {
  116. t := reflect.TypeOf(l.exemplar)
  117. n := reflect.New(t)
  118. ne := n.Elem()
  119. err = json.Unmarshal(bs, ne.Addr().Interface())
  120. data = ne.Interface().(Data)
  121. } else {
  122. err = json.Unmarshal(bs, &data)
  123. }
  124. if err != nil {
  125. log.Error("LevelQueue: %s Failed to unmarshal with error: %v", l.name, err)
  126. time.Sleep(time.Millisecond * 100)
  127. continue
  128. }
  129. log.Trace("LevelQueue %s: Task found: %#v", l.name, data)
  130. l.pool.Push(data)
  131. }
  132. }
  133. }
  134. // Push will push the indexer data to queue
  135. func (l *LevelQueue) Push(data Data) error {
  136. if l.exemplar != nil {
  137. // Assert data is of same type as r.exemplar
  138. value := reflect.ValueOf(data)
  139. t := value.Type()
  140. exemplarType := reflect.ValueOf(l.exemplar).Type()
  141. if !t.AssignableTo(exemplarType) || data == nil {
  142. return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, l.exemplar, l.name)
  143. }
  144. }
  145. bs, err := json.Marshal(data)
  146. if err != nil {
  147. return err
  148. }
  149. return l.queue.LPush(bs)
  150. }
  151. // Shutdown this queue and stop processing
  152. func (l *LevelQueue) Shutdown() {
  153. l.lock.Lock()
  154. defer l.lock.Unlock()
  155. log.Trace("LevelQueue: %s Shutdown", l.name)
  156. select {
  157. case <-l.closed:
  158. default:
  159. close(l.closed)
  160. }
  161. }
  162. // Terminate this queue and close the queue
  163. func (l *LevelQueue) Terminate() {
  164. log.Trace("LevelQueue: %s Terminating", l.name)
  165. l.Shutdown()
  166. l.lock.Lock()
  167. select {
  168. case <-l.terminated:
  169. l.lock.Unlock()
  170. default:
  171. close(l.terminated)
  172. l.lock.Unlock()
  173. if err := l.queue.Close(); err != nil && err.Error() != "leveldb: closed" {
  174. log.Error("Error whilst closing internal queue in %s: %v", l.name, err)
  175. }
  176. }
  177. }
  178. // Name returns the name of this queue
  179. func (l *LevelQueue) Name() string {
  180. return l.name
  181. }
  182. func init() {
  183. queuesMap[LevelQueueType] = NewLevelQueue
  184. }