You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

queue_redis.go 3.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package queue
  5. import (
  6. "errors"
  7. "strings"
  8. "code.gitea.io/gitea/modules/log"
  9. "github.com/go-redis/redis"
  10. )
  11. // RedisQueueType is the type for redis queue
  12. const RedisQueueType Type = "redis"
  13. // RedisQueueConfiguration is the configuration for the redis queue
  14. type RedisQueueConfiguration struct {
  15. ByteFIFOQueueConfiguration
  16. RedisByteFIFOConfiguration
  17. }
  18. // RedisQueue redis queue
  19. type RedisQueue struct {
  20. *ByteFIFOQueue
  21. }
  22. // NewRedisQueue creates single redis or cluster redis queue
  23. func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
  24. configInterface, err := toConfig(RedisQueueConfiguration{}, cfg)
  25. if err != nil {
  26. return nil, err
  27. }
  28. config := configInterface.(RedisQueueConfiguration)
  29. byteFIFO, err := NewRedisByteFIFO(config.RedisByteFIFOConfiguration)
  30. if err != nil {
  31. return nil, err
  32. }
  33. byteFIFOQueue, err := NewByteFIFOQueue(RedisQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar)
  34. if err != nil {
  35. return nil, err
  36. }
  37. queue := &RedisQueue{
  38. ByteFIFOQueue: byteFIFOQueue,
  39. }
  40. queue.qid = GetManager().Add(queue, RedisQueueType, config, exemplar)
  41. return queue, nil
  42. }
  43. type redisClient interface {
  44. RPush(key string, args ...interface{}) *redis.IntCmd
  45. LPop(key string) *redis.StringCmd
  46. LLen(key string) *redis.IntCmd
  47. SAdd(key string, members ...interface{}) *redis.IntCmd
  48. SRem(key string, members ...interface{}) *redis.IntCmd
  49. SIsMember(key string, member interface{}) *redis.BoolCmd
  50. Ping() *redis.StatusCmd
  51. Close() error
  52. }
  53. var _ (ByteFIFO) = &RedisByteFIFO{}
  54. // RedisByteFIFO represents a ByteFIFO formed from a redisClient
  55. type RedisByteFIFO struct {
  56. client redisClient
  57. queueName string
  58. }
  59. // RedisByteFIFOConfiguration is the configuration for the RedisByteFIFO
  60. type RedisByteFIFOConfiguration struct {
  61. Network string
  62. Addresses string
  63. Password string
  64. DBIndex int
  65. QueueName string
  66. }
  67. // NewRedisByteFIFO creates a ByteFIFO formed from a redisClient
  68. func NewRedisByteFIFO(config RedisByteFIFOConfiguration) (*RedisByteFIFO, error) {
  69. fifo := &RedisByteFIFO{
  70. queueName: config.QueueName,
  71. }
  72. dbs := strings.Split(config.Addresses, ",")
  73. if len(dbs) == 0 {
  74. return nil, errors.New("no redis host specified")
  75. } else if len(dbs) == 1 {
  76. fifo.client = redis.NewClient(&redis.Options{
  77. Network: config.Network,
  78. Addr: strings.TrimSpace(dbs[0]), // use default Addr
  79. Password: config.Password, // no password set
  80. DB: config.DBIndex, // use default DB
  81. })
  82. } else {
  83. fifo.client = redis.NewClusterClient(&redis.ClusterOptions{
  84. Addrs: dbs,
  85. })
  86. }
  87. if err := fifo.client.Ping().Err(); err != nil {
  88. return nil, err
  89. }
  90. return fifo, nil
  91. }
  92. // PushFunc pushes data to the end of the fifo and calls the callback if it is added
  93. func (fifo *RedisByteFIFO) PushFunc(data []byte, fn func() error) error {
  94. if fn != nil {
  95. if err := fn(); err != nil {
  96. return err
  97. }
  98. }
  99. return fifo.client.RPush(fifo.queueName, data).Err()
  100. }
  101. // Pop pops data from the start of the fifo
  102. func (fifo *RedisByteFIFO) Pop() ([]byte, error) {
  103. data, err := fifo.client.LPop(fifo.queueName).Bytes()
  104. if err == nil || err == redis.Nil {
  105. return data, nil
  106. }
  107. return data, err
  108. }
  109. // Close this fifo
  110. func (fifo *RedisByteFIFO) Close() error {
  111. return fifo.client.Close()
  112. }
  113. // Len returns the length of the fifo
  114. func (fifo *RedisByteFIFO) Len() int64 {
  115. val, err := fifo.client.LLen(fifo.queueName).Result()
  116. if err != nil {
  117. log.Error("Error whilst getting length of redis queue %s: Error: %v", fifo.queueName, err)
  118. return -1
  119. }
  120. return val
  121. }
  122. func init() {
  123. queuesMap[RedisQueueType] = NewRedisQueue
  124. }