You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

unique_queue_redis.go 3.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package queue
  5. import "github.com/go-redis/redis"
  6. // RedisUniqueQueueType is the type for redis queue
  7. const RedisUniqueQueueType Type = "unique-redis"
  8. // RedisUniqueQueue redis queue
  9. type RedisUniqueQueue struct {
  10. *ByteFIFOUniqueQueue
  11. }
  12. // RedisUniqueQueueConfiguration is the configuration for the redis queue
  13. type RedisUniqueQueueConfiguration struct {
  14. ByteFIFOQueueConfiguration
  15. RedisUniqueByteFIFOConfiguration
  16. }
  17. // NewRedisUniqueQueue creates single redis or cluster redis queue.
  18. //
  19. // Please note that this Queue does not guarantee that a particular
  20. // task cannot be processed twice or more at the same time. Uniqueness is
  21. // only guaranteed whilst the task is waiting in the queue.
  22. func NewRedisUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
  23. configInterface, err := toConfig(RedisUniqueQueueConfiguration{}, cfg)
  24. if err != nil {
  25. return nil, err
  26. }
  27. config := configInterface.(RedisUniqueQueueConfiguration)
  28. byteFIFO, err := NewRedisUniqueByteFIFO(config.RedisUniqueByteFIFOConfiguration)
  29. if err != nil {
  30. return nil, err
  31. }
  32. if len(byteFIFO.setName) == 0 {
  33. byteFIFO.setName = byteFIFO.queueName + "_unique"
  34. }
  35. byteFIFOQueue, err := NewByteFIFOUniqueQueue(RedisUniqueQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar)
  36. if err != nil {
  37. return nil, err
  38. }
  39. queue := &RedisUniqueQueue{
  40. ByteFIFOUniqueQueue: byteFIFOQueue,
  41. }
  42. queue.qid = GetManager().Add(queue, RedisUniqueQueueType, config, exemplar)
  43. return queue, nil
  44. }
  45. var _ (UniqueByteFIFO) = &RedisUniqueByteFIFO{}
  46. // RedisUniqueByteFIFO represents a UniqueByteFIFO formed from a redisClient
  47. type RedisUniqueByteFIFO struct {
  48. RedisByteFIFO
  49. setName string
  50. }
  51. // RedisUniqueByteFIFOConfiguration is the configuration for the RedisUniqueByteFIFO
  52. type RedisUniqueByteFIFOConfiguration struct {
  53. RedisByteFIFOConfiguration
  54. SetName string
  55. }
  56. // NewRedisUniqueByteFIFO creates a UniqueByteFIFO formed from a redisClient
  57. func NewRedisUniqueByteFIFO(config RedisUniqueByteFIFOConfiguration) (*RedisUniqueByteFIFO, error) {
  58. internal, err := NewRedisByteFIFO(config.RedisByteFIFOConfiguration)
  59. if err != nil {
  60. return nil, err
  61. }
  62. fifo := &RedisUniqueByteFIFO{
  63. RedisByteFIFO: *internal,
  64. setName: config.SetName,
  65. }
  66. return fifo, nil
  67. }
  68. // PushFunc pushes data to the end of the fifo and calls the callback if it is added
  69. func (fifo *RedisUniqueByteFIFO) PushFunc(data []byte, fn func() error) error {
  70. added, err := fifo.client.SAdd(fifo.setName, data).Result()
  71. if err != nil {
  72. return err
  73. }
  74. if added == 0 {
  75. return ErrAlreadyInQueue
  76. }
  77. if fn != nil {
  78. if err := fn(); err != nil {
  79. return err
  80. }
  81. }
  82. return fifo.client.RPush(fifo.queueName, data).Err()
  83. }
  84. // Pop pops data from the start of the fifo
  85. func (fifo *RedisUniqueByteFIFO) Pop() ([]byte, error) {
  86. data, err := fifo.client.LPop(fifo.queueName).Bytes()
  87. if err != nil && err != redis.Nil {
  88. return data, err
  89. }
  90. if len(data) == 0 {
  91. return data, nil
  92. }
  93. err = fifo.client.SRem(fifo.setName, data).Err()
  94. return data, err
  95. }
  96. // Has returns whether the fifo contains this data
  97. func (fifo *RedisUniqueByteFIFO) Has(data []byte) (bool, error) {
  98. return fifo.client.SIsMember(fifo.setName, data).Result()
  99. }
  100. func init() {
  101. queuesMap[RedisUniqueQueueType] = NewRedisUniqueQueue
  102. }