You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

base_redis.go 3.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. // Copyright 2023 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package queue
  4. import (
  5. "context"
  6. "sync"
  7. "time"
  8. "code.gitea.io/gitea/modules/graceful"
  9. "code.gitea.io/gitea/modules/log"
  10. "code.gitea.io/gitea/modules/nosql"
  11. "github.com/redis/go-redis/v9"
  12. )
  13. type baseRedis struct {
  14. client redis.UniversalClient
  15. isUnique bool
  16. cfg *BaseConfig
  17. mu sync.Mutex // the old implementation is not thread-safe, the queue operation and set operation should be protected together
  18. }
  19. var _ baseQueue = (*baseRedis)(nil)
  20. func newBaseRedisGeneric(cfg *BaseConfig, unique bool) (baseQueue, error) {
  21. client := nosql.GetManager().GetRedisClient(cfg.ConnStr)
  22. var err error
  23. for i := 0; i < 10; i++ {
  24. err = client.Ping(graceful.GetManager().ShutdownContext()).Err()
  25. if err == nil {
  26. break
  27. }
  28. log.Warn("Redis is not ready, waiting for 1 second to retry: %v", err)
  29. time.Sleep(time.Second)
  30. }
  31. if err != nil {
  32. return nil, err
  33. }
  34. return &baseRedis{cfg: cfg, client: client, isUnique: unique}, nil
  35. }
  36. func newBaseRedisSimple(cfg *BaseConfig) (baseQueue, error) {
  37. return newBaseRedisGeneric(cfg, false)
  38. }
  39. func newBaseRedisUnique(cfg *BaseConfig) (baseQueue, error) {
  40. return newBaseRedisGeneric(cfg, true)
  41. }
  42. func (q *baseRedis) PushItem(ctx context.Context, data []byte) error {
  43. return backoffErr(ctx, backoffBegin, backoffUpper, time.After(pushBlockTime), func() (retry bool, err error) {
  44. q.mu.Lock()
  45. defer q.mu.Unlock()
  46. cnt, err := q.client.LLen(ctx, q.cfg.QueueFullName).Result()
  47. if err != nil {
  48. return false, err
  49. }
  50. if int(cnt) >= q.cfg.Length {
  51. return true, nil
  52. }
  53. if q.isUnique {
  54. added, err := q.client.SAdd(ctx, q.cfg.SetFullName, data).Result()
  55. if err != nil {
  56. return false, err
  57. }
  58. if added == 0 {
  59. return false, ErrAlreadyInQueue
  60. }
  61. }
  62. return false, q.client.RPush(ctx, q.cfg.QueueFullName, data).Err()
  63. })
  64. }
  65. func (q *baseRedis) PopItem(ctx context.Context) ([]byte, error) {
  66. return backoffRetErr(ctx, backoffBegin, backoffUpper, infiniteTimerC, func() (retry bool, data []byte, err error) {
  67. q.mu.Lock()
  68. defer q.mu.Unlock()
  69. data, err = q.client.LPop(ctx, q.cfg.QueueFullName).Bytes()
  70. if err == redis.Nil {
  71. return true, nil, nil
  72. }
  73. if err != nil {
  74. return true, nil, nil
  75. }
  76. if q.isUnique {
  77. // the data has been popped, even if there is any error we can't do anything
  78. _ = q.client.SRem(ctx, q.cfg.SetFullName, data).Err()
  79. }
  80. return false, data, err
  81. })
  82. }
  83. func (q *baseRedis) HasItem(ctx context.Context, data []byte) (bool, error) {
  84. q.mu.Lock()
  85. defer q.mu.Unlock()
  86. if !q.isUnique {
  87. return false, nil
  88. }
  89. return q.client.SIsMember(ctx, q.cfg.SetFullName, data).Result()
  90. }
  91. func (q *baseRedis) Len(ctx context.Context) (int, error) {
  92. q.mu.Lock()
  93. defer q.mu.Unlock()
  94. cnt, err := q.client.LLen(ctx, q.cfg.QueueFullName).Result()
  95. return int(cnt), err
  96. }
  97. func (q *baseRedis) Close() error {
  98. q.mu.Lock()
  99. defer q.mu.Unlock()
  100. return q.client.Close()
  101. }
  102. func (q *baseRedis) RemoveAll(ctx context.Context) error {
  103. q.mu.Lock()
  104. defer q.mu.Unlock()
  105. c1 := q.client.Del(ctx, q.cfg.QueueFullName)
  106. // the "set" must be cleared after the "list" because there is no transaction.
  107. // it's better to have duplicate items than losing items.
  108. c2 := q.client.Del(ctx, q.cfg.SetFullName)
  109. if c1.Err() != nil {
  110. return c1.Err()
  111. }
  112. if c2.Err() != nil {
  113. return c2.Err()
  114. }
  115. return nil // actually, checking errors doesn't make sense here because the state could be out-of-sync
  116. }