You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

unique_queue_redis.go 3.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package queue
  4. import (
  5. "context"
  6. "github.com/go-redis/redis/v8"
  7. )
  8. // RedisUniqueQueueType is the type for redis queue
  9. const RedisUniqueQueueType Type = "unique-redis"
  10. // RedisUniqueQueue redis queue
  11. type RedisUniqueQueue struct {
  12. *ByteFIFOUniqueQueue
  13. }
  14. // RedisUniqueQueueConfiguration is the configuration for the redis queue
  15. type RedisUniqueQueueConfiguration struct {
  16. ByteFIFOQueueConfiguration
  17. RedisUniqueByteFIFOConfiguration
  18. }
  19. // NewRedisUniqueQueue creates single redis or cluster redis queue.
  20. //
  21. // Please note that this Queue does not guarantee that a particular
  22. // task cannot be processed twice or more at the same time. Uniqueness is
  23. // only guaranteed whilst the task is waiting in the queue.
  24. func NewRedisUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
  25. configInterface, err := toConfig(RedisUniqueQueueConfiguration{}, cfg)
  26. if err != nil {
  27. return nil, err
  28. }
  29. config := configInterface.(RedisUniqueQueueConfiguration)
  30. byteFIFO, err := NewRedisUniqueByteFIFO(config.RedisUniqueByteFIFOConfiguration)
  31. if err != nil {
  32. return nil, err
  33. }
  34. if len(byteFIFO.setName) == 0 {
  35. byteFIFO.setName = byteFIFO.queueName + "_unique"
  36. }
  37. byteFIFOQueue, err := NewByteFIFOUniqueQueue(RedisUniqueQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar)
  38. if err != nil {
  39. return nil, err
  40. }
  41. queue := &RedisUniqueQueue{
  42. ByteFIFOUniqueQueue: byteFIFOQueue,
  43. }
  44. queue.qid = GetManager().Add(queue, RedisUniqueQueueType, config, exemplar)
  45. return queue, nil
  46. }
  47. var _ UniqueByteFIFO = &RedisUniqueByteFIFO{}
  48. // RedisUniqueByteFIFO represents a UniqueByteFIFO formed from a redisClient
  49. type RedisUniqueByteFIFO struct {
  50. RedisByteFIFO
  51. setName string
  52. }
  53. // RedisUniqueByteFIFOConfiguration is the configuration for the RedisUniqueByteFIFO
  54. type RedisUniqueByteFIFOConfiguration struct {
  55. RedisByteFIFOConfiguration
  56. SetName string
  57. }
  58. // NewRedisUniqueByteFIFO creates a UniqueByteFIFO formed from a redisClient
  59. func NewRedisUniqueByteFIFO(config RedisUniqueByteFIFOConfiguration) (*RedisUniqueByteFIFO, error) {
  60. internal, err := NewRedisByteFIFO(config.RedisByteFIFOConfiguration)
  61. if err != nil {
  62. return nil, err
  63. }
  64. fifo := &RedisUniqueByteFIFO{
  65. RedisByteFIFO: *internal,
  66. setName: config.SetName,
  67. }
  68. return fifo, nil
  69. }
  70. // PushFunc pushes data to the end of the fifo and calls the callback if it is added
  71. func (fifo *RedisUniqueByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error {
  72. added, err := fifo.client.SAdd(ctx, fifo.setName, data).Result()
  73. if err != nil {
  74. return err
  75. }
  76. if added == 0 {
  77. return ErrAlreadyInQueue
  78. }
  79. if fn != nil {
  80. if err := fn(); err != nil {
  81. return err
  82. }
  83. }
  84. return fifo.client.RPush(ctx, fifo.queueName, data).Err()
  85. }
  86. // PushBack pushes data to the top of the fifo
  87. func (fifo *RedisUniqueByteFIFO) PushBack(ctx context.Context, data []byte) error {
  88. added, err := fifo.client.SAdd(ctx, fifo.setName, data).Result()
  89. if err != nil {
  90. return err
  91. }
  92. if added == 0 {
  93. return ErrAlreadyInQueue
  94. }
  95. return fifo.client.LPush(ctx, fifo.queueName, data).Err()
  96. }
  97. // Pop pops data from the start of the fifo
  98. func (fifo *RedisUniqueByteFIFO) Pop(ctx context.Context) ([]byte, error) {
  99. data, err := fifo.client.LPop(ctx, fifo.queueName).Bytes()
  100. if err != nil && err != redis.Nil {
  101. return data, err
  102. }
  103. if len(data) == 0 {
  104. return data, nil
  105. }
  106. err = fifo.client.SRem(ctx, fifo.setName, data).Err()
  107. return data, err
  108. }
  109. // Has returns whether the fifo contains this data
  110. func (fifo *RedisUniqueByteFIFO) Has(ctx context.Context, data []byte) (bool, error) {
  111. return fifo.client.SIsMember(ctx, fifo.setName, data).Result()
  112. }
  113. func init() {
  114. queuesMap[RedisUniqueQueueType] = NewRedisUniqueQueue
  115. }