You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

unique_queue_disk.go 2.9KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package queue
  5. import (
  6. "gitea.com/lunny/levelqueue"
  7. )
  8. // LevelUniqueQueueType is the type for level queue
  9. const LevelUniqueQueueType Type = "unique-level"
  10. // LevelUniqueQueueConfiguration is the configuration for a LevelUniqueQueue
  11. type LevelUniqueQueueConfiguration struct {
  12. ByteFIFOQueueConfiguration
  13. DataDir string
  14. }
  15. // LevelUniqueQueue implements a disk library queue
  16. type LevelUniqueQueue struct {
  17. *ByteFIFOUniqueQueue
  18. }
  19. // NewLevelUniqueQueue creates a ledis local queue
  20. //
  21. // Please note that this Queue does not guarantee that a particular
  22. // task cannot be processed twice or more at the same time. Uniqueness is
  23. // only guaranteed whilst the task is waiting in the queue.
  24. func NewLevelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
  25. configInterface, err := toConfig(LevelUniqueQueueConfiguration{}, cfg)
  26. if err != nil {
  27. return nil, err
  28. }
  29. config := configInterface.(LevelUniqueQueueConfiguration)
  30. byteFIFO, err := NewLevelUniqueQueueByteFIFO(config.DataDir)
  31. if err != nil {
  32. return nil, err
  33. }
  34. byteFIFOQueue, err := NewByteFIFOUniqueQueue(LevelUniqueQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar)
  35. if err != nil {
  36. return nil, err
  37. }
  38. queue := &LevelUniqueQueue{
  39. ByteFIFOUniqueQueue: byteFIFOQueue,
  40. }
  41. queue.qid = GetManager().Add(queue, LevelUniqueQueueType, config, exemplar)
  42. return queue, nil
  43. }
  44. var _ (UniqueByteFIFO) = &LevelUniqueQueueByteFIFO{}
  45. // LevelUniqueQueueByteFIFO represents a ByteFIFO formed from a LevelUniqueQueue
  46. type LevelUniqueQueueByteFIFO struct {
  47. internal *levelqueue.UniqueQueue
  48. }
  49. // NewLevelUniqueQueueByteFIFO creates a new ByteFIFO formed from a LevelUniqueQueue
  50. func NewLevelUniqueQueueByteFIFO(dataDir string) (*LevelUniqueQueueByteFIFO, error) {
  51. internal, err := levelqueue.OpenUnique(dataDir)
  52. if err != nil {
  53. return nil, err
  54. }
  55. return &LevelUniqueQueueByteFIFO{
  56. internal: internal,
  57. }, nil
  58. }
  59. // PushFunc pushes data to the end of the fifo and calls the callback if it is added
  60. func (fifo *LevelUniqueQueueByteFIFO) PushFunc(data []byte, fn func() error) error {
  61. return fifo.internal.LPushFunc(data, fn)
  62. }
  63. // Pop pops data from the start of the fifo
  64. func (fifo *LevelUniqueQueueByteFIFO) Pop() ([]byte, error) {
  65. data, err := fifo.internal.RPop()
  66. if err != nil && err != levelqueue.ErrNotFound {
  67. return nil, err
  68. }
  69. return data, nil
  70. }
  71. // Len returns the length of the fifo
  72. func (fifo *LevelUniqueQueueByteFIFO) Len() int64 {
  73. return fifo.internal.Len()
  74. }
  75. // Has returns whether the fifo contains this data
  76. func (fifo *LevelUniqueQueueByteFIFO) Has(data []byte) (bool, error) {
  77. return fifo.internal.Has(data)
  78. }
  79. // Close this fifo
  80. func (fifo *LevelUniqueQueueByteFIFO) Close() error {
  81. return fifo.internal.Close()
  82. }
  83. func init() {
  84. queuesMap[LevelUniqueQueueType] = NewLevelUniqueQueue
  85. }