You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

queue_redis.go 3.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package issues
  5. import (
  6. "encoding/json"
  7. "errors"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "code.gitea.io/gitea/modules/log"
  12. "github.com/go-redis/redis"
  13. )
  14. var (
  15. _ Queue = &RedisQueue{}
  16. )
  17. type redisClient interface {
  18. RPush(key string, args ...interface{}) *redis.IntCmd
  19. LPop(key string) *redis.StringCmd
  20. Ping() *redis.StatusCmd
  21. }
  22. // RedisQueue redis queue
  23. type RedisQueue struct {
  24. client redisClient
  25. queueName string
  26. indexer Indexer
  27. batchNumber int
  28. }
  29. func parseConnStr(connStr string) (addrs, password string, dbIdx int, err error) {
  30. fields := strings.Fields(connStr)
  31. for _, f := range fields {
  32. items := strings.SplitN(f, "=", 2)
  33. if len(items) < 2 {
  34. continue
  35. }
  36. switch strings.ToLower(items[0]) {
  37. case "addrs":
  38. addrs = items[1]
  39. case "password":
  40. password = items[1]
  41. case "db":
  42. dbIdx, err = strconv.Atoi(items[1])
  43. if err != nil {
  44. return
  45. }
  46. }
  47. }
  48. return
  49. }
  50. // NewRedisQueue creates single redis or cluster redis queue
  51. func NewRedisQueue(addrs string, password string, dbIdx int, indexer Indexer, batchNumber int) (*RedisQueue, error) {
  52. dbs := strings.Split(addrs, ",")
  53. var queue = RedisQueue{
  54. queueName: "issue_indexer_queue",
  55. indexer: indexer,
  56. batchNumber: batchNumber,
  57. }
  58. if len(dbs) == 0 {
  59. return nil, errors.New("no redis host found")
  60. } else if len(dbs) == 1 {
  61. queue.client = redis.NewClient(&redis.Options{
  62. Addr: strings.TrimSpace(dbs[0]), // use default Addr
  63. Password: password, // no password set
  64. DB: dbIdx, // use default DB
  65. })
  66. } else {
  67. queue.client = redis.NewClusterClient(&redis.ClusterOptions{
  68. Addrs: dbs,
  69. })
  70. }
  71. if err := queue.client.Ping().Err(); err != nil {
  72. return nil, err
  73. }
  74. return &queue, nil
  75. }
  76. // Run runs the redis queue
  77. func (r *RedisQueue) Run() error {
  78. var i int
  79. var datas = make([]*IndexerData, 0, r.batchNumber)
  80. for {
  81. bs, err := r.client.LPop(r.queueName).Bytes()
  82. if err != nil && err != redis.Nil {
  83. log.Error("LPop faile: %v", err)
  84. time.Sleep(time.Millisecond * 100)
  85. continue
  86. }
  87. i++
  88. if len(datas) > r.batchNumber || (len(datas) > 0 && i > 3) {
  89. _ = r.indexer.Index(datas)
  90. datas = make([]*IndexerData, 0, r.batchNumber)
  91. i = 0
  92. }
  93. if len(bs) == 0 {
  94. time.Sleep(time.Millisecond * 100)
  95. continue
  96. }
  97. var data IndexerData
  98. err = json.Unmarshal(bs, &data)
  99. if err != nil {
  100. log.Error("Unmarshal: %v", err)
  101. time.Sleep(time.Millisecond * 100)
  102. continue
  103. }
  104. log.Trace("RedisQueue: task found: %#v", data)
  105. if data.IsDelete {
  106. if data.ID > 0 {
  107. if err = r.indexer.Delete(data.ID); err != nil {
  108. log.Error("indexer.Delete: %v", err)
  109. }
  110. } else if len(data.IDs) > 0 {
  111. if err = r.indexer.Delete(data.IDs...); err != nil {
  112. log.Error("indexer.Delete: %v", err)
  113. }
  114. }
  115. time.Sleep(time.Millisecond * 100)
  116. continue
  117. }
  118. datas = append(datas, &data)
  119. time.Sleep(time.Millisecond * 100)
  120. }
  121. }
  122. // Push implements Queue
  123. func (r *RedisQueue) Push(data *IndexerData) error {
  124. bs, err := json.Marshal(data)
  125. if err != nil {
  126. return err
  127. }
  128. return r.client.RPush(r.queueName, bs).Err()
  129. }