You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

queue_disk.go 2.1KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package issues
  5. import (
  6. "encoding/json"
  7. "time"
  8. "code.gitea.io/gitea/modules/log"
  9. "github.com/lunny/levelqueue"
  10. )
  11. var (
  12. _ Queue = &LevelQueue{}
  13. )
  14. // LevelQueue implements a disk library queue
  15. type LevelQueue struct {
  16. indexer Indexer
  17. queue *levelqueue.Queue
  18. batchNumber int
  19. }
  20. // NewLevelQueue creates a ledis local queue
  21. func NewLevelQueue(indexer Indexer, dataDir string, batchNumber int) (*LevelQueue, error) {
  22. queue, err := levelqueue.Open(dataDir)
  23. if err != nil {
  24. return nil, err
  25. }
  26. return &LevelQueue{
  27. indexer: indexer,
  28. queue: queue,
  29. batchNumber: batchNumber,
  30. }, nil
  31. }
  32. // Run starts to run the queue
  33. func (l *LevelQueue) Run() error {
  34. var i int
  35. var datas = make([]*IndexerData, 0, l.batchNumber)
  36. for {
  37. i++
  38. if len(datas) > l.batchNumber || (len(datas) > 0 && i > 3) {
  39. _ = l.indexer.Index(datas)
  40. datas = make([]*IndexerData, 0, l.batchNumber)
  41. i = 0
  42. continue
  43. }
  44. bs, err := l.queue.RPop()
  45. if err != nil {
  46. if err != levelqueue.ErrNotFound {
  47. log.Error("RPop: %v", err)
  48. }
  49. time.Sleep(time.Millisecond * 100)
  50. continue
  51. }
  52. if len(bs) == 0 {
  53. time.Sleep(time.Millisecond * 100)
  54. continue
  55. }
  56. var data IndexerData
  57. err = json.Unmarshal(bs, &data)
  58. if err != nil {
  59. log.Error("Unmarshal: %v", err)
  60. time.Sleep(time.Millisecond * 100)
  61. continue
  62. }
  63. log.Trace("LevelQueue: task found: %#v", data)
  64. if data.IsDelete {
  65. if data.ID > 0 {
  66. if err = l.indexer.Delete(data.ID); err != nil {
  67. log.Error("indexer.Delete: %v", err)
  68. }
  69. } else if len(data.IDs) > 0 {
  70. if err = l.indexer.Delete(data.IDs...); err != nil {
  71. log.Error("indexer.Delete: %v", err)
  72. }
  73. }
  74. time.Sleep(time.Millisecond * 10)
  75. continue
  76. }
  77. datas = append(datas, &data)
  78. time.Sleep(time.Millisecond * 10)
  79. }
  80. }
  81. // Push will push the indexer data to queue
  82. func (l *LevelQueue) Push(data *IndexerData) error {
  83. bs, err := json.Marshal(data)
  84. if err != nil {
  85. return err
  86. }
  87. return l.queue.LPush(bs)
  88. }