You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

base_levelqueue_unique.go 2.5KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. // Copyright 2023 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package queue
  4. import (
  5. "context"
  6. "sync"
  7. "sync/atomic"
  8. "code.gitea.io/gitea/modules/nosql"
  9. "code.gitea.io/gitea/modules/queue/lqinternal"
  10. "gitea.com/lunny/levelqueue"
  11. "github.com/syndtr/goleveldb/leveldb"
  12. )
  13. type baseLevelQueueUnique struct {
  14. internal atomic.Pointer[levelqueue.UniqueQueue]
  15. conn string
  16. cfg *BaseConfig
  17. db *leveldb.DB
  18. mu sync.Mutex // the levelqueue.UniqueQueue is not thread-safe, there is no mutex protecting the underlying queue&set together
  19. }
  20. var _ baseQueue = (*baseLevelQueueUnique)(nil)
  21. func newBaseLevelQueueUnique(cfg *BaseConfig) (baseQueue, error) {
  22. conn, db, err := prepareLevelDB(cfg)
  23. if err != nil {
  24. return nil, err
  25. }
  26. q := &baseLevelQueueUnique{conn: conn, cfg: cfg, db: db}
  27. lq, err := levelqueue.NewUniqueQueue(db, []byte(cfg.QueueFullName), []byte(cfg.SetFullName), false)
  28. if err != nil {
  29. return nil, err
  30. }
  31. q.internal.Store(lq)
  32. return q, nil
  33. }
  34. func (q *baseLevelQueueUnique) PushItem(ctx context.Context, data []byte) error {
  35. c := baseLevelQueueCommon(q.cfg, &q.mu, func() baseLevelQueuePushPoper { return q.internal.Load() })
  36. return c.PushItem(ctx, data)
  37. }
  38. func (q *baseLevelQueueUnique) PopItem(ctx context.Context) ([]byte, error) {
  39. c := baseLevelQueueCommon(q.cfg, &q.mu, func() baseLevelQueuePushPoper { return q.internal.Load() })
  40. return c.PopItem(ctx)
  41. }
  42. func (q *baseLevelQueueUnique) HasItem(ctx context.Context, data []byte) (bool, error) {
  43. q.mu.Lock()
  44. defer q.mu.Unlock()
  45. return q.internal.Load().Has(data)
  46. }
  47. func (q *baseLevelQueueUnique) Len(ctx context.Context) (int, error) {
  48. q.mu.Lock()
  49. defer q.mu.Unlock()
  50. return int(q.internal.Load().Len()), nil
  51. }
  52. func (q *baseLevelQueueUnique) Close() error {
  53. q.mu.Lock()
  54. defer q.mu.Unlock()
  55. err := q.internal.Load().Close()
  56. q.db = nil // the db is not managed by us, it's managed by the nosql manager
  57. _ = nosql.GetManager().CloseLevelDB(q.conn)
  58. return err
  59. }
  60. func (q *baseLevelQueueUnique) RemoveAll(ctx context.Context) error {
  61. q.mu.Lock()
  62. defer q.mu.Unlock()
  63. lqinternal.RemoveLevelQueueKeys(q.db, []byte(q.cfg.QueueFullName))
  64. lqinternal.RemoveLevelQueueKeys(q.db, []byte(q.cfg.SetFullName))
  65. lq, err := levelqueue.NewUniqueQueue(q.db, []byte(q.cfg.QueueFullName), []byte(q.cfg.SetFullName), false)
  66. if err != nil {
  67. return err
  68. }
  69. old := q.internal.Load()
  70. q.internal.Store(lq)
  71. _ = old.Close() // Not ideal for concurrency. Luckily, the levelqueue only sets its db=nil because it doesn't manage the db, so far so good
  72. return nil
  73. }