You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

queue_disk.go 2.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package queue
  5. import (
  6. "gitea.com/lunny/levelqueue"
  7. )
  8. // LevelQueueType is the type for level queue
  9. const LevelQueueType Type = "level"
  10. // LevelQueueConfiguration is the configuration for a LevelQueue
  11. type LevelQueueConfiguration struct {
  12. ByteFIFOQueueConfiguration
  13. DataDir string
  14. }
  15. // LevelQueue implements a disk library queue
  16. type LevelQueue struct {
  17. *ByteFIFOQueue
  18. }
  19. // NewLevelQueue creates a ledis local queue
  20. func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
  21. configInterface, err := toConfig(LevelQueueConfiguration{}, cfg)
  22. if err != nil {
  23. return nil, err
  24. }
  25. config := configInterface.(LevelQueueConfiguration)
  26. byteFIFO, err := NewLevelQueueByteFIFO(config.DataDir)
  27. if err != nil {
  28. return nil, err
  29. }
  30. byteFIFOQueue, err := NewByteFIFOQueue(LevelQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar)
  31. if err != nil {
  32. return nil, err
  33. }
  34. queue := &LevelQueue{
  35. ByteFIFOQueue: byteFIFOQueue,
  36. }
  37. queue.qid = GetManager().Add(queue, LevelQueueType, config, exemplar)
  38. return queue, nil
  39. }
  40. var _ (ByteFIFO) = &LevelQueueByteFIFO{}
  41. // LevelQueueByteFIFO represents a ByteFIFO formed from a LevelQueue
  42. type LevelQueueByteFIFO struct {
  43. internal *levelqueue.Queue
  44. }
  45. // NewLevelQueueByteFIFO creates a ByteFIFO formed from a LevelQueue
  46. func NewLevelQueueByteFIFO(dataDir string) (*LevelQueueByteFIFO, error) {
  47. internal, err := levelqueue.Open(dataDir)
  48. if err != nil {
  49. return nil, err
  50. }
  51. return &LevelQueueByteFIFO{
  52. internal: internal,
  53. }, nil
  54. }
  55. // PushFunc will push data into the fifo
  56. func (fifo *LevelQueueByteFIFO) PushFunc(data []byte, fn func() error) error {
  57. if fn != nil {
  58. if err := fn(); err != nil {
  59. return err
  60. }
  61. }
  62. return fifo.internal.LPush(data)
  63. }
  64. // Pop pops data from the start of the fifo
  65. func (fifo *LevelQueueByteFIFO) Pop() ([]byte, error) {
  66. data, err := fifo.internal.RPop()
  67. if err != nil && err != levelqueue.ErrNotFound {
  68. return nil, err
  69. }
  70. return data, nil
  71. }
  72. // Close this fifo
  73. func (fifo *LevelQueueByteFIFO) Close() error {
  74. return fifo.internal.Close()
  75. }
  76. // Len returns the length of the fifo
  77. func (fifo *LevelQueueByteFIFO) Len() int64 {
  78. return fifo.internal.Len()
  79. }
  80. func init() {
  81. queuesMap[LevelQueueType] = NewLevelQueue
  82. }