You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

queue_wrapped.go 6.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package queue
  5. import (
  6. "context"
  7. "fmt"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "code.gitea.io/gitea/modules/log"
  12. )
  13. // WrappedQueueType is the type for a wrapped delayed starting queue
  14. const WrappedQueueType Type = "wrapped"
  15. // WrappedQueueConfiguration is the configuration for a WrappedQueue
  16. type WrappedQueueConfiguration struct {
  17. Underlying Type
  18. Timeout time.Duration
  19. MaxAttempts int
  20. Config interface{}
  21. QueueLength int
  22. Name string
  23. }
  24. type delayedStarter struct {
  25. internal Queue
  26. underlying Type
  27. cfg interface{}
  28. timeout time.Duration
  29. maxAttempts int
  30. name string
  31. }
  32. // setInternal must be called with the lock locked.
  33. func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), handle HandlerFunc, exemplar interface{}) error {
  34. var ctx context.Context
  35. var cancel context.CancelFunc
  36. if q.timeout > 0 {
  37. ctx, cancel = context.WithTimeout(context.Background(), q.timeout)
  38. } else {
  39. ctx, cancel = context.WithCancel(context.Background())
  40. }
  41. defer cancel()
  42. // Ensure we also stop at shutdown
  43. atShutdown(ctx, func() {
  44. cancel()
  45. })
  46. i := 1
  47. for q.internal == nil {
  48. select {
  49. case <-ctx.Done():
  50. return fmt.Errorf("Timedout creating queue %v with cfg %s in %s", q.underlying, q.cfg, q.name)
  51. default:
  52. queue, err := NewQueue(q.underlying, handle, q.cfg, exemplar)
  53. if err == nil {
  54. q.internal = queue
  55. break
  56. }
  57. if err.Error() != "resource temporarily unavailable" {
  58. log.Warn("[Attempt: %d] Failed to create queue: %v for %s cfg: %s error: %v", i, q.underlying, q.name, q.cfg, err)
  59. }
  60. i++
  61. if q.maxAttempts > 0 && i > q.maxAttempts {
  62. return fmt.Errorf("Unable to create queue %v for %s with cfg %s by max attempts: error: %v", q.underlying, q.name, q.cfg, err)
  63. }
  64. sleepTime := 100 * time.Millisecond
  65. if q.timeout > 0 && q.maxAttempts > 0 {
  66. sleepTime = (q.timeout - 200*time.Millisecond) / time.Duration(q.maxAttempts)
  67. }
  68. t := time.NewTimer(sleepTime)
  69. select {
  70. case <-ctx.Done():
  71. t.Stop()
  72. case <-t.C:
  73. }
  74. }
  75. }
  76. return nil
  77. }
  78. // WrappedQueue wraps a delayed starting queue
  79. type WrappedQueue struct {
  80. delayedStarter
  81. lock sync.Mutex
  82. handle HandlerFunc
  83. exemplar interface{}
  84. channel chan Data
  85. numInQueue int64
  86. }
  87. // NewWrappedQueue will attempt to create a queue of the provided type,
  88. // but if there is a problem creating this queue it will instead create
  89. // a WrappedQueue with delayed startup of the queue instead and a
  90. // channel which will be redirected to the queue
  91. func NewWrappedQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
  92. configInterface, err := toConfig(WrappedQueueConfiguration{}, cfg)
  93. if err != nil {
  94. return nil, err
  95. }
  96. config := configInterface.(WrappedQueueConfiguration)
  97. queue, err := NewQueue(config.Underlying, handle, config.Config, exemplar)
  98. if err == nil {
  99. // Just return the queue there is no need to wrap
  100. return queue, nil
  101. }
  102. if IsErrInvalidConfiguration(err) {
  103. // Retrying ain't gonna make this any better...
  104. return nil, ErrInvalidConfiguration{cfg: cfg}
  105. }
  106. queue = &WrappedQueue{
  107. handle: handle,
  108. channel: make(chan Data, config.QueueLength),
  109. exemplar: exemplar,
  110. delayedStarter: delayedStarter{
  111. cfg: config.Config,
  112. underlying: config.Underlying,
  113. timeout: config.Timeout,
  114. maxAttempts: config.MaxAttempts,
  115. name: config.Name,
  116. },
  117. }
  118. _ = GetManager().Add(queue, WrappedQueueType, config, exemplar)
  119. return queue, nil
  120. }
  121. // Name returns the name of the queue
  122. func (q *WrappedQueue) Name() string {
  123. return q.name + "-wrapper"
  124. }
  125. // Push will push the data to the internal channel checking it against the exemplar
  126. func (q *WrappedQueue) Push(data Data) error {
  127. if !assignableTo(data, q.exemplar) {
  128. return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
  129. }
  130. atomic.AddInt64(&q.numInQueue, 1)
  131. q.channel <- data
  132. return nil
  133. }
  134. func (q *WrappedQueue) flushInternalWithContext(ctx context.Context) error {
  135. q.lock.Lock()
  136. if q.internal == nil {
  137. q.lock.Unlock()
  138. return fmt.Errorf("not ready to flush wrapped queue %s yet", q.Name())
  139. }
  140. q.lock.Unlock()
  141. select {
  142. case <-ctx.Done():
  143. return ctx.Err()
  144. default:
  145. }
  146. return q.internal.FlushWithContext(ctx)
  147. }
  148. // Flush flushes the queue and blocks till the queue is empty
  149. func (q *WrappedQueue) Flush(timeout time.Duration) error {
  150. var ctx context.Context
  151. var cancel context.CancelFunc
  152. if timeout > 0 {
  153. ctx, cancel = context.WithTimeout(context.Background(), timeout)
  154. } else {
  155. ctx, cancel = context.WithCancel(context.Background())
  156. }
  157. defer cancel()
  158. return q.FlushWithContext(ctx)
  159. }
  160. // FlushWithContext implements the final part of Flushable
  161. func (q *WrappedQueue) FlushWithContext(ctx context.Context) error {
  162. log.Trace("WrappedQueue: %s FlushWithContext", q.Name())
  163. errChan := make(chan error, 1)
  164. go func() {
  165. errChan <- q.flushInternalWithContext(ctx)
  166. close(errChan)
  167. }()
  168. select {
  169. case err := <-errChan:
  170. return err
  171. case <-ctx.Done():
  172. go func() {
  173. <-errChan
  174. }()
  175. return ctx.Err()
  176. }
  177. }
  178. // IsEmpty checks whether the queue is empty
  179. func (q *WrappedQueue) IsEmpty() bool {
  180. if atomic.LoadInt64(&q.numInQueue) != 0 {
  181. return false
  182. }
  183. q.lock.Lock()
  184. defer q.lock.Unlock()
  185. if q.internal == nil {
  186. return false
  187. }
  188. return q.internal.IsEmpty()
  189. }
  190. // Run starts to run the queue and attempts to create the internal queue
  191. func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
  192. log.Debug("WrappedQueue: %s Starting", q.name)
  193. q.lock.Lock()
  194. if q.internal == nil {
  195. err := q.setInternal(atShutdown, q.handle, q.exemplar)
  196. q.lock.Unlock()
  197. if err != nil {
  198. log.Fatal("Unable to set the internal queue for %s Error: %v", q.Name(), err)
  199. return
  200. }
  201. go func() {
  202. for data := range q.channel {
  203. _ = q.internal.Push(data)
  204. atomic.AddInt64(&q.numInQueue, -1)
  205. }
  206. }()
  207. } else {
  208. q.lock.Unlock()
  209. }
  210. q.internal.Run(atShutdown, atTerminate)
  211. log.Trace("WrappedQueue: %s Done", q.name)
  212. }
  213. // Shutdown this queue and stop processing
  214. func (q *WrappedQueue) Shutdown() {
  215. log.Trace("WrappedQueue: %s Shutting down", q.name)
  216. q.lock.Lock()
  217. defer q.lock.Unlock()
  218. if q.internal == nil {
  219. return
  220. }
  221. if shutdownable, ok := q.internal.(Shutdownable); ok {
  222. shutdownable.Shutdown()
  223. }
  224. log.Debug("WrappedQueue: %s Shutdown", q.name)
  225. }
  226. // Terminate this queue and close the queue
  227. func (q *WrappedQueue) Terminate() {
  228. log.Trace("WrappedQueue: %s Terminating", q.name)
  229. q.lock.Lock()
  230. defer q.lock.Unlock()
  231. if q.internal == nil {
  232. return
  233. }
  234. if shutdownable, ok := q.internal.(Shutdownable); ok {
  235. shutdownable.Terminate()
  236. }
  237. log.Debug("WrappedQueue: %s Terminated", q.name)
  238. }
  239. func init() {
  240. queuesMap[WrappedQueueType] = NewWrappedQueue
  241. }