You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

unique_queue_disk.go 3.5KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package queue
  5. import (
  6. "context"
  7. "code.gitea.io/gitea/modules/nosql"
  8. "gitea.com/lunny/levelqueue"
  9. )
  10. // LevelUniqueQueueType is the type for level queue
  11. const LevelUniqueQueueType Type = "unique-level"
  12. // LevelUniqueQueueConfiguration is the configuration for a LevelUniqueQueue
  13. type LevelUniqueQueueConfiguration struct {
  14. ByteFIFOQueueConfiguration
  15. DataDir string
  16. ConnectionString string
  17. QueueName string
  18. }
  19. // LevelUniqueQueue implements a disk library queue
  20. type LevelUniqueQueue struct {
  21. *ByteFIFOUniqueQueue
  22. }
  23. // NewLevelUniqueQueue creates a ledis local queue
  24. //
  25. // Please note that this Queue does not guarantee that a particular
  26. // task cannot be processed twice or more at the same time. Uniqueness is
  27. // only guaranteed whilst the task is waiting in the queue.
  28. func NewLevelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
  29. configInterface, err := toConfig(LevelUniqueQueueConfiguration{}, cfg)
  30. if err != nil {
  31. return nil, err
  32. }
  33. config := configInterface.(LevelUniqueQueueConfiguration)
  34. if len(config.ConnectionString) == 0 {
  35. config.ConnectionString = config.DataDir
  36. }
  37. config.WaitOnEmpty = true
  38. byteFIFO, err := NewLevelUniqueQueueByteFIFO(config.ConnectionString, config.QueueName)
  39. if err != nil {
  40. return nil, err
  41. }
  42. byteFIFOQueue, err := NewByteFIFOUniqueQueue(LevelUniqueQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar)
  43. if err != nil {
  44. return nil, err
  45. }
  46. queue := &LevelUniqueQueue{
  47. ByteFIFOUniqueQueue: byteFIFOQueue,
  48. }
  49. queue.qid = GetManager().Add(queue, LevelUniqueQueueType, config, exemplar)
  50. return queue, nil
  51. }
  52. var _ UniqueByteFIFO = &LevelUniqueQueueByteFIFO{}
  53. // LevelUniqueQueueByteFIFO represents a ByteFIFO formed from a LevelUniqueQueue
  54. type LevelUniqueQueueByteFIFO struct {
  55. internal *levelqueue.UniqueQueue
  56. connection string
  57. }
  58. // NewLevelUniqueQueueByteFIFO creates a new ByteFIFO formed from a LevelUniqueQueue
  59. func NewLevelUniqueQueueByteFIFO(connection, prefix string) (*LevelUniqueQueueByteFIFO, error) {
  60. db, err := nosql.GetManager().GetLevelDB(connection)
  61. if err != nil {
  62. return nil, err
  63. }
  64. internal, err := levelqueue.NewUniqueQueue(db, []byte(prefix), []byte(prefix+"-unique"), false)
  65. if err != nil {
  66. return nil, err
  67. }
  68. return &LevelUniqueQueueByteFIFO{
  69. connection: connection,
  70. internal: internal,
  71. }, nil
  72. }
  73. // PushFunc pushes data to the end of the fifo and calls the callback if it is added
  74. func (fifo *LevelUniqueQueueByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error {
  75. return fifo.internal.LPushFunc(data, fn)
  76. }
  77. // Pop pops data from the start of the fifo
  78. func (fifo *LevelUniqueQueueByteFIFO) Pop(ctx context.Context) ([]byte, error) {
  79. data, err := fifo.internal.RPop()
  80. if err != nil && err != levelqueue.ErrNotFound {
  81. return nil, err
  82. }
  83. return data, nil
  84. }
  85. // Len returns the length of the fifo
  86. func (fifo *LevelUniqueQueueByteFIFO) Len(ctx context.Context) int64 {
  87. return fifo.internal.Len()
  88. }
  89. // Has returns whether the fifo contains this data
  90. func (fifo *LevelUniqueQueueByteFIFO) Has(ctx context.Context, data []byte) (bool, error) {
  91. return fifo.internal.Has(data)
  92. }
  93. // Close this fifo
  94. func (fifo *LevelUniqueQueueByteFIFO) Close() error {
  95. err := fifo.internal.Close()
  96. _ = nosql.GetManager().CloseLevelDB(fifo.connection)
  97. return err
  98. }
  99. func init() {
  100. queuesMap[LevelUniqueQueueType] = NewLevelUniqueQueue
  101. }