You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

unique_queue_channel.go 3.5KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. // Copyright 2020 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package queue
  5. import (
  6. "context"
  7. "fmt"
  8. "sync"
  9. "code.gitea.io/gitea/modules/log"
  10. )
  11. // ChannelUniqueQueueType is the type for channel queue
  12. const ChannelUniqueQueueType Type = "unique-channel"
  13. // ChannelUniqueQueueConfiguration is the configuration for a ChannelUniqueQueue
  14. type ChannelUniqueQueueConfiguration ChannelQueueConfiguration
  15. // ChannelUniqueQueue implements UniqueQueue
  16. //
  17. // It is basically a thin wrapper around a WorkerPool but keeps a store of
  18. // what has been pushed within a table.
  19. //
  20. // Please note that this Queue does not guarantee that a particular
  21. // task cannot be processed twice or more at the same time. Uniqueness is
  22. // only guaranteed whilst the task is waiting in the queue.
  23. type ChannelUniqueQueue struct {
  24. *WorkerPool
  25. lock sync.Mutex
  26. table map[Data]bool
  27. exemplar interface{}
  28. workers int
  29. name string
  30. }
  31. // NewChannelUniqueQueue create a memory channel queue
  32. func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
  33. configInterface, err := toConfig(ChannelUniqueQueueConfiguration{}, cfg)
  34. if err != nil {
  35. return nil, err
  36. }
  37. config := configInterface.(ChannelUniqueQueueConfiguration)
  38. if config.BatchLength == 0 {
  39. config.BatchLength = 1
  40. }
  41. queue := &ChannelUniqueQueue{
  42. table: map[Data]bool{},
  43. exemplar: exemplar,
  44. workers: config.Workers,
  45. name: config.Name,
  46. }
  47. queue.WorkerPool = NewWorkerPool(func(data ...Data) {
  48. for _, datum := range data {
  49. queue.lock.Lock()
  50. delete(queue.table, datum)
  51. queue.lock.Unlock()
  52. handle(datum)
  53. }
  54. }, config.WorkerPoolConfiguration)
  55. queue.qid = GetManager().Add(queue, ChannelUniqueQueueType, config, exemplar)
  56. return queue, nil
  57. }
  58. // Run starts to run the queue
  59. func (q *ChannelUniqueQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
  60. atShutdown(context.Background(), func() {
  61. log.Warn("ChannelUniqueQueue: %s is not shutdownable!", q.name)
  62. })
  63. atTerminate(context.Background(), func() {
  64. log.Warn("ChannelUniqueQueue: %s is not terminatable!", q.name)
  65. })
  66. log.Debug("ChannelUniqueQueue: %s Starting", q.name)
  67. go func() {
  68. _ = q.AddWorkers(q.workers, 0)
  69. }()
  70. }
  71. // Push will push data into the queue if the data is not already in the queue
  72. func (q *ChannelUniqueQueue) Push(data Data) error {
  73. return q.PushFunc(data, nil)
  74. }
  75. // PushFunc will push data into the queue
  76. func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
  77. if !assignableTo(data, q.exemplar) {
  78. return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
  79. }
  80. q.lock.Lock()
  81. locked := true
  82. defer func() {
  83. if locked {
  84. q.lock.Unlock()
  85. }
  86. }()
  87. if _, ok := q.table[data]; ok {
  88. return ErrAlreadyInQueue
  89. }
  90. // FIXME: We probably need to implement some sort of limit here
  91. // If the downstream queue blocks this table will grow without limit
  92. q.table[data] = true
  93. if fn != nil {
  94. err := fn()
  95. if err != nil {
  96. delete(q.table, data)
  97. return err
  98. }
  99. }
  100. locked = false
  101. q.lock.Unlock()
  102. q.WorkerPool.Push(data)
  103. return nil
  104. }
  105. // Has checks if the data is in the queue
  106. func (q *ChannelUniqueQueue) Has(data Data) (bool, error) {
  107. q.lock.Lock()
  108. defer q.lock.Unlock()
  109. _, has := q.table[data]
  110. return has, nil
  111. }
  112. // Name returns the name of this queue
  113. func (q *ChannelUniqueQueue) Name() string {
  114. return q.name
  115. }
  116. func init() {
  117. queuesMap[ChannelUniqueQueueType] = NewChannelUniqueQueue
  118. }