You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

queue_redis.go 2.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package task
  5. import (
  6. "encoding/json"
  7. "errors"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "code.gitea.io/gitea/models"
  12. "code.gitea.io/gitea/modules/log"
  13. "github.com/go-redis/redis"
  14. )
  15. var (
  16. _ Queue = &RedisQueue{}
  17. )
  18. type redisClient interface {
  19. RPush(key string, args ...interface{}) *redis.IntCmd
  20. LPop(key string) *redis.StringCmd
  21. Ping() *redis.StatusCmd
  22. }
  23. // RedisQueue redis queue
  24. type RedisQueue struct {
  25. client redisClient
  26. queueName string
  27. closeChan chan bool
  28. }
  29. func parseConnStr(connStr string) (addrs, password string, dbIdx int, err error) {
  30. fields := strings.Fields(connStr)
  31. for _, f := range fields {
  32. items := strings.SplitN(f, "=", 2)
  33. if len(items) < 2 {
  34. continue
  35. }
  36. switch strings.ToLower(items[0]) {
  37. case "addrs":
  38. addrs = items[1]
  39. case "password":
  40. password = items[1]
  41. case "db":
  42. dbIdx, err = strconv.Atoi(items[1])
  43. if err != nil {
  44. return
  45. }
  46. }
  47. }
  48. return
  49. }
  50. // NewRedisQueue creates single redis or cluster redis queue
  51. func NewRedisQueue(addrs string, password string, dbIdx int) (*RedisQueue, error) {
  52. dbs := strings.Split(addrs, ",")
  53. var queue = RedisQueue{
  54. queueName: "task_queue",
  55. closeChan: make(chan bool),
  56. }
  57. if len(dbs) == 0 {
  58. return nil, errors.New("no redis host found")
  59. } else if len(dbs) == 1 {
  60. queue.client = redis.NewClient(&redis.Options{
  61. Addr: strings.TrimSpace(dbs[0]), // use default Addr
  62. Password: password, // no password set
  63. DB: dbIdx, // use default DB
  64. })
  65. } else {
  66. // cluster will ignore db
  67. queue.client = redis.NewClusterClient(&redis.ClusterOptions{
  68. Addrs: dbs,
  69. Password: password,
  70. })
  71. }
  72. if err := queue.client.Ping().Err(); err != nil {
  73. return nil, err
  74. }
  75. return &queue, nil
  76. }
  77. // Run starts to run the queue
  78. func (r *RedisQueue) Run() error {
  79. for {
  80. select {
  81. case <-r.closeChan:
  82. return nil
  83. case <-time.After(time.Millisecond * 100):
  84. }
  85. bs, err := r.client.LPop(r.queueName).Bytes()
  86. if err != nil {
  87. if err != redis.Nil {
  88. log.Error("LPop failed: %v", err)
  89. }
  90. time.Sleep(time.Millisecond * 100)
  91. continue
  92. }
  93. var task models.Task
  94. err = json.Unmarshal(bs, &task)
  95. if err != nil {
  96. log.Error("Unmarshal task failed: %s", err.Error())
  97. } else {
  98. err = Run(&task)
  99. if err != nil {
  100. log.Error("Run task failed: %s", err.Error())
  101. }
  102. }
  103. }
  104. }
  105. // Push implements Queue
  106. func (r *RedisQueue) Push(task *models.Task) error {
  107. bs, err := json.Marshal(task)
  108. if err != nil {
  109. return err
  110. }
  111. return r.client.RPush(r.queueName, bs).Err()
  112. }
  113. // Stop stop the queue
  114. func (r *RedisQueue) Stop() {
  115. r.closeChan <- true
  116. }