You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

base_levelqueue_common.go 2.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. // Copyright 2023 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package queue
  4. import (
  5. "context"
  6. "fmt"
  7. "path/filepath"
  8. "strings"
  9. "sync"
  10. "time"
  11. "code.gitea.io/gitea/modules/nosql"
  12. "gitea.com/lunny/levelqueue"
  13. "github.com/syndtr/goleveldb/leveldb"
  14. )
  15. // baseLevelQueuePushPoper is the common interface for levelqueue.Queue and levelqueue.UniqueQueue
  16. type baseLevelQueuePushPoper interface {
  17. RPush(data []byte) error
  18. LPop() ([]byte, error)
  19. Len() int64
  20. }
  21. type baseLevelQueueCommonImpl struct {
  22. length int
  23. internalFunc func() baseLevelQueuePushPoper
  24. mu *sync.Mutex
  25. }
  26. func (q *baseLevelQueueCommonImpl) PushItem(ctx context.Context, data []byte) error {
  27. return backoffErr(ctx, backoffBegin, backoffUpper, time.After(pushBlockTime), func() (retry bool, err error) {
  28. if q.mu != nil {
  29. q.mu.Lock()
  30. defer q.mu.Unlock()
  31. }
  32. cnt := int(q.internalFunc().Len())
  33. if cnt >= q.length {
  34. return true, nil
  35. }
  36. retry, err = false, q.internalFunc().RPush(data)
  37. if err == levelqueue.ErrAlreadyInQueue {
  38. err = ErrAlreadyInQueue
  39. }
  40. return retry, err
  41. })
  42. }
  43. func (q *baseLevelQueueCommonImpl) PopItem(ctx context.Context) ([]byte, error) {
  44. return backoffRetErr(ctx, backoffBegin, backoffUpper, infiniteTimerC, func() (retry bool, data []byte, err error) {
  45. if q.mu != nil {
  46. q.mu.Lock()
  47. defer q.mu.Unlock()
  48. }
  49. data, err = q.internalFunc().LPop()
  50. if err == levelqueue.ErrNotFound {
  51. return true, nil, nil
  52. }
  53. if err != nil {
  54. return false, nil, err
  55. }
  56. return false, data, nil
  57. })
  58. }
  59. func baseLevelQueueCommon(cfg *BaseConfig, mu *sync.Mutex, internalFunc func() baseLevelQueuePushPoper) *baseLevelQueueCommonImpl {
  60. return &baseLevelQueueCommonImpl{length: cfg.Length, mu: mu, internalFunc: internalFunc}
  61. }
  62. func prepareLevelDB(cfg *BaseConfig) (conn string, db *leveldb.DB, err error) {
  63. if cfg.ConnStr == "" { // use data dir as conn str
  64. if !filepath.IsAbs(cfg.DataFullDir) {
  65. return "", nil, fmt.Errorf("invalid leveldb data dir (not absolute): %q", cfg.DataFullDir)
  66. }
  67. conn = cfg.DataFullDir
  68. } else {
  69. if !strings.HasPrefix(cfg.ConnStr, "leveldb://") {
  70. return "", nil, fmt.Errorf("invalid leveldb connection string: %q", cfg.ConnStr)
  71. }
  72. conn = cfg.ConnStr
  73. }
  74. for i := 0; i < 10; i++ {
  75. if db, err = nosql.GetManager().GetLevelDB(conn); err == nil {
  76. break
  77. }
  78. time.Sleep(1 * time.Second)
  79. }
  80. return conn, db, err
  81. }