You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

unique_queue_redis.go 3.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package queue
  5. import (
  6. "fmt"
  7. "code.gitea.io/gitea/modules/graceful"
  8. "github.com/go-redis/redis/v8"
  9. )
  10. // RedisUniqueQueueType is the type for redis queue
  11. const RedisUniqueQueueType Type = "unique-redis"
  12. // RedisUniqueQueue redis queue
  13. type RedisUniqueQueue struct {
  14. *ByteFIFOUniqueQueue
  15. }
  16. // RedisUniqueQueueConfiguration is the configuration for the redis queue
  17. type RedisUniqueQueueConfiguration struct {
  18. ByteFIFOQueueConfiguration
  19. RedisUniqueByteFIFOConfiguration
  20. }
  21. // NewRedisUniqueQueue creates single redis or cluster redis queue.
  22. //
  23. // Please note that this Queue does not guarantee that a particular
  24. // task cannot be processed twice or more at the same time. Uniqueness is
  25. // only guaranteed whilst the task is waiting in the queue.
  26. func NewRedisUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
  27. configInterface, err := toConfig(RedisUniqueQueueConfiguration{}, cfg)
  28. if err != nil {
  29. return nil, err
  30. }
  31. config := configInterface.(RedisUniqueQueueConfiguration)
  32. byteFIFO, err := NewRedisUniqueByteFIFO(config.RedisUniqueByteFIFOConfiguration)
  33. if err != nil {
  34. return nil, err
  35. }
  36. if len(byteFIFO.setName) == 0 {
  37. byteFIFO.setName = byteFIFO.queueName + "_unique"
  38. }
  39. byteFIFOQueue, err := NewByteFIFOUniqueQueue(RedisUniqueQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar)
  40. if err != nil {
  41. return nil, err
  42. }
  43. byteFIFO.ctx = graceful.NewChannelContext(byteFIFOQueue.IsTerminated(), fmt.Errorf("queue has been terminated"))
  44. queue := &RedisUniqueQueue{
  45. ByteFIFOUniqueQueue: byteFIFOQueue,
  46. }
  47. queue.qid = GetManager().Add(queue, RedisUniqueQueueType, config, exemplar)
  48. return queue, nil
  49. }
  50. var _ UniqueByteFIFO = &RedisUniqueByteFIFO{}
  51. // RedisUniqueByteFIFO represents a UniqueByteFIFO formed from a redisClient
  52. type RedisUniqueByteFIFO struct {
  53. RedisByteFIFO
  54. setName string
  55. }
  56. // RedisUniqueByteFIFOConfiguration is the configuration for the RedisUniqueByteFIFO
  57. type RedisUniqueByteFIFOConfiguration struct {
  58. RedisByteFIFOConfiguration
  59. SetName string
  60. }
  61. // NewRedisUniqueByteFIFO creates a UniqueByteFIFO formed from a redisClient
  62. func NewRedisUniqueByteFIFO(config RedisUniqueByteFIFOConfiguration) (*RedisUniqueByteFIFO, error) {
  63. internal, err := NewRedisByteFIFO(config.RedisByteFIFOConfiguration)
  64. if err != nil {
  65. return nil, err
  66. }
  67. fifo := &RedisUniqueByteFIFO{
  68. RedisByteFIFO: *internal,
  69. setName: config.SetName,
  70. }
  71. return fifo, nil
  72. }
  73. // PushFunc pushes data to the end of the fifo and calls the callback if it is added
  74. func (fifo *RedisUniqueByteFIFO) PushFunc(data []byte, fn func() error) error {
  75. added, err := fifo.client.SAdd(fifo.ctx, fifo.setName, data).Result()
  76. if err != nil {
  77. return err
  78. }
  79. if added == 0 {
  80. return ErrAlreadyInQueue
  81. }
  82. if fn != nil {
  83. if err := fn(); err != nil {
  84. return err
  85. }
  86. }
  87. return fifo.client.RPush(fifo.ctx, fifo.queueName, data).Err()
  88. }
  89. // Pop pops data from the start of the fifo
  90. func (fifo *RedisUniqueByteFIFO) Pop() ([]byte, error) {
  91. data, err := fifo.client.LPop(fifo.ctx, fifo.queueName).Bytes()
  92. if err != nil && err != redis.Nil {
  93. return data, err
  94. }
  95. if len(data) == 0 {
  96. return data, nil
  97. }
  98. err = fifo.client.SRem(fifo.ctx, fifo.setName, data).Err()
  99. return data, err
  100. }
  101. // Has returns whether the fifo contains this data
  102. func (fifo *RedisUniqueByteFIFO) Has(data []byte) (bool, error) {
  103. return fifo.client.SIsMember(fifo.ctx, fifo.setName, data).Result()
  104. }
  105. func init() {
  106. queuesMap[RedisUniqueQueueType] = NewRedisUniqueQueue
  107. }