You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

queue.go 3.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package code
  5. import (
  6. "os"
  7. "code.gitea.io/gitea/models"
  8. "code.gitea.io/gitea/modules/graceful"
  9. "code.gitea.io/gitea/modules/log"
  10. "code.gitea.io/gitea/modules/setting"
  11. )
  12. type repoIndexerOperation struct {
  13. repoID int64
  14. deleted bool
  15. watchers []chan<- error
  16. }
  17. var repoIndexerOperationQueue chan repoIndexerOperation
  18. func initQueue(queueLength int) {
  19. repoIndexerOperationQueue = make(chan repoIndexerOperation, queueLength)
  20. }
  21. func processRepoIndexerOperationQueue(indexer Indexer) {
  22. for {
  23. select {
  24. case op := <-repoIndexerOperationQueue:
  25. var err error
  26. if op.deleted {
  27. if err = indexer.Delete(op.repoID); err != nil {
  28. log.Error("indexer.Delete: %v", err)
  29. }
  30. } else {
  31. if err = indexer.Index(op.repoID); err != nil {
  32. log.Error("indexer.Index: %v", err)
  33. }
  34. }
  35. for _, watcher := range op.watchers {
  36. watcher <- err
  37. }
  38. case <-graceful.GetManager().IsShutdown():
  39. log.Info("PID: %d Repository indexer queue processing stopped", os.Getpid())
  40. return
  41. }
  42. }
  43. }
  44. // DeleteRepoFromIndexer remove all of a repository's entries from the indexer
  45. func DeleteRepoFromIndexer(repo *models.Repository, watchers ...chan<- error) {
  46. addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: true, watchers: watchers})
  47. }
  48. // UpdateRepoIndexer update a repository's entries in the indexer
  49. func UpdateRepoIndexer(repo *models.Repository, watchers ...chan<- error) {
  50. addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: false, watchers: watchers})
  51. }
  52. func addOperationToQueue(op repoIndexerOperation) {
  53. if !setting.Indexer.RepoIndexerEnabled {
  54. return
  55. }
  56. select {
  57. case repoIndexerOperationQueue <- op:
  58. break
  59. default:
  60. go func() {
  61. repoIndexerOperationQueue <- op
  62. }()
  63. }
  64. }
  65. // populateRepoIndexer populate the repo indexer with pre-existing data. This
  66. // should only be run when the indexer is created for the first time.
  67. func populateRepoIndexer() {
  68. log.Info("Populating the repo indexer with existing repositories")
  69. isShutdown := graceful.GetManager().IsShutdown()
  70. exist, err := models.IsTableNotEmpty("repository")
  71. if err != nil {
  72. log.Fatal("System error: %v", err)
  73. } else if !exist {
  74. return
  75. }
  76. // if there is any existing repo indexer metadata in the DB, delete it
  77. // since we are starting afresh. Also, xorm requires deletes to have a
  78. // condition, and we want to delete everything, thus 1=1.
  79. if err := models.DeleteAllRecords("repo_indexer_status"); err != nil {
  80. log.Fatal("System error: %v", err)
  81. }
  82. var maxRepoID int64
  83. if maxRepoID, err = models.GetMaxID("repository"); err != nil {
  84. log.Fatal("System error: %v", err)
  85. }
  86. // start with the maximum existing repo ID and work backwards, so that we
  87. // don't include repos that are created after gitea starts; such repos will
  88. // already be added to the indexer, and we don't need to add them again.
  89. for maxRepoID > 0 {
  90. select {
  91. case <-isShutdown:
  92. log.Info("Repository Indexer population shutdown before completion")
  93. return
  94. default:
  95. }
  96. ids, err := models.GetUnindexedRepos(models.RepoIndexerTypeCode, maxRepoID, 0, 50)
  97. if err != nil {
  98. log.Error("populateRepoIndexer: %v", err)
  99. return
  100. } else if len(ids) == 0 {
  101. break
  102. }
  103. for _, id := range ids {
  104. select {
  105. case <-isShutdown:
  106. log.Info("Repository Indexer population shutdown before completion")
  107. return
  108. default:
  109. }
  110. repoIndexerOperationQueue <- repoIndexerOperation{
  111. repoID: id,
  112. deleted: false,
  113. }
  114. maxRepoID = id - 1
  115. }
  116. }
  117. log.Info("Done (re)populating the repo indexer with existing repositories")
  118. }