You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

unique_queue_wrapped.go 4.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. // Copyright 2020 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package queue
  5. import (
  6. "fmt"
  7. "sync"
  8. "time"
  9. )
  10. // WrappedUniqueQueueType is the type for a wrapped delayed starting queue
  11. const WrappedUniqueQueueType Type = "unique-wrapped"
  12. // WrappedUniqueQueueConfiguration is the configuration for a WrappedUniqueQueue
  13. type WrappedUniqueQueueConfiguration struct {
  14. Underlying Type
  15. Timeout time.Duration
  16. MaxAttempts int
  17. Config interface{}
  18. QueueLength int
  19. Name string
  20. }
  21. // WrappedUniqueQueue wraps a delayed starting unique queue
  22. type WrappedUniqueQueue struct {
  23. *WrappedQueue
  24. table map[Data]bool
  25. tlock sync.Mutex
  26. ready bool
  27. }
  28. // NewWrappedUniqueQueue will attempt to create a unique queue of the provided type,
  29. // but if there is a problem creating this queue it will instead create
  30. // a WrappedUniqueQueue with delayed startup of the queue instead and a
  31. // channel which will be redirected to the queue
  32. //
  33. // Please note that this Queue does not guarantee that a particular
  34. // task cannot be processed twice or more at the same time. Uniqueness is
  35. // only guaranteed whilst the task is waiting in the queue.
  36. func NewWrappedUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
  37. configInterface, err := toConfig(WrappedUniqueQueueConfiguration{}, cfg)
  38. if err != nil {
  39. return nil, err
  40. }
  41. config := configInterface.(WrappedUniqueQueueConfiguration)
  42. queue, err := NewQueue(config.Underlying, handle, config.Config, exemplar)
  43. if err == nil {
  44. // Just return the queue there is no need to wrap
  45. return queue, nil
  46. }
  47. if IsErrInvalidConfiguration(err) {
  48. // Retrying ain't gonna make this any better...
  49. return nil, ErrInvalidConfiguration{cfg: cfg}
  50. }
  51. wrapped := &WrappedUniqueQueue{
  52. WrappedQueue: &WrappedQueue{
  53. channel: make(chan Data, config.QueueLength),
  54. exemplar: exemplar,
  55. delayedStarter: delayedStarter{
  56. cfg: config.Config,
  57. underlying: config.Underlying,
  58. timeout: config.Timeout,
  59. maxAttempts: config.MaxAttempts,
  60. name: config.Name,
  61. },
  62. },
  63. table: map[Data]bool{},
  64. }
  65. // wrapped.handle is passed to the delayedStarting internal queue and is run to handle
  66. // data passed to
  67. wrapped.handle = func(data ...Data) {
  68. for _, datum := range data {
  69. wrapped.tlock.Lock()
  70. if !wrapped.ready {
  71. delete(wrapped.table, data)
  72. // If our table is empty all of the requests we have buffered between the
  73. // wrapper queue starting and the internal queue starting have been handled.
  74. // We can stop buffering requests in our local table and just pass Push
  75. // direct to the internal queue
  76. if len(wrapped.table) == 0 {
  77. wrapped.ready = true
  78. }
  79. }
  80. wrapped.tlock.Unlock()
  81. handle(datum)
  82. }
  83. }
  84. _ = GetManager().Add(queue, WrappedUniqueQueueType, config, exemplar)
  85. return wrapped, nil
  86. }
  87. // Push will push the data to the internal channel checking it against the exemplar
  88. func (q *WrappedUniqueQueue) Push(data Data) error {
  89. return q.PushFunc(data, nil)
  90. }
  91. // PushFunc will push the data to the internal channel checking it against the exemplar
  92. func (q *WrappedUniqueQueue) PushFunc(data Data, fn func() error) error {
  93. if !assignableTo(data, q.exemplar) {
  94. return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
  95. }
  96. q.tlock.Lock()
  97. if q.ready {
  98. // ready means our table is empty and all of the requests we have buffered between the
  99. // wrapper queue starting and the internal queue starting have been handled.
  100. // We can stop buffering requests in our local table and just pass Push
  101. // direct to the internal queue
  102. q.tlock.Unlock()
  103. return q.internal.(UniqueQueue).PushFunc(data, fn)
  104. }
  105. locked := true
  106. defer func() {
  107. if locked {
  108. q.tlock.Unlock()
  109. }
  110. }()
  111. if _, ok := q.table[data]; ok {
  112. return ErrAlreadyInQueue
  113. }
  114. // FIXME: We probably need to implement some sort of limit here
  115. // If the downstream queue blocks this table will grow without limit
  116. q.table[data] = true
  117. if fn != nil {
  118. err := fn()
  119. if err != nil {
  120. delete(q.table, data)
  121. return err
  122. }
  123. }
  124. locked = false
  125. q.tlock.Unlock()
  126. q.channel <- data
  127. return nil
  128. }
  129. // Has checks if the data is in the queue
  130. func (q *WrappedUniqueQueue) Has(data Data) (bool, error) {
  131. q.tlock.Lock()
  132. defer q.tlock.Unlock()
  133. if q.ready {
  134. return q.internal.(UniqueQueue).Has(data)
  135. }
  136. _, has := q.table[data]
  137. return has, nil
  138. }
  139. // IsEmpty checks whether the queue is empty
  140. func (q *WrappedUniqueQueue) IsEmpty() bool {
  141. q.tlock.Lock()
  142. if len(q.table) > 0 {
  143. q.tlock.Unlock()
  144. return false
  145. }
  146. if q.ready {
  147. q.tlock.Unlock()
  148. return q.internal.IsEmpty()
  149. }
  150. q.tlock.Unlock()
  151. return false
  152. }
  153. func init() {
  154. queuesMap[WrappedUniqueQueueType] = NewWrappedUniqueQueue
  155. }