You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

indexer.go 11KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. // Copyright 2018 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package issues
  4. import (
  5. "context"
  6. "fmt"
  7. "os"
  8. "runtime/pprof"
  9. "sync/atomic"
  10. "time"
  11. db_model "code.gitea.io/gitea/models/db"
  12. repo_model "code.gitea.io/gitea/models/repo"
  13. "code.gitea.io/gitea/modules/graceful"
  14. "code.gitea.io/gitea/modules/indexer/issues/bleve"
  15. "code.gitea.io/gitea/modules/indexer/issues/db"
  16. "code.gitea.io/gitea/modules/indexer/issues/elasticsearch"
  17. "code.gitea.io/gitea/modules/indexer/issues/internal"
  18. "code.gitea.io/gitea/modules/indexer/issues/meilisearch"
  19. "code.gitea.io/gitea/modules/log"
  20. "code.gitea.io/gitea/modules/optional"
  21. "code.gitea.io/gitea/modules/process"
  22. "code.gitea.io/gitea/modules/queue"
  23. "code.gitea.io/gitea/modules/setting"
  24. )
  25. // IndexerMetadata is used to send data to the queue, so it contains only the ids.
  26. // It may look weired, because it has to be compatible with the old queue data format.
  27. // If the IsDelete flag is true, the IDs specify the issues to delete from the index without querying the database.
  28. // If the IsDelete flag is false, the ID specify the issue to index, so Indexer will query the database to get the issue data.
  29. // It should be noted that if the id is not existing in the database, it's index will be deleted too even if IsDelete is false.
  30. // Valid values:
  31. // - IsDelete = true, IDs = [1, 2, 3], and ID will be ignored
  32. // - IsDelete = false, ID = 1, and IDs will be ignored
  33. type IndexerMetadata struct {
  34. ID int64 `json:"id"`
  35. IsDelete bool `json:"is_delete"`
  36. IDs []int64 `json:"ids"`
  37. }
  38. var (
  39. // issueIndexerQueue queue of issue ids to be updated
  40. issueIndexerQueue *queue.WorkerPoolQueue[*IndexerMetadata]
  41. // globalIndexer is the global indexer, it cannot be nil.
  42. // When the real indexer is not ready, it will be a dummy indexer which will return error to explain it's not ready.
  43. // So it's always safe use it as *globalIndexer.Load() and call its methods.
  44. globalIndexer atomic.Pointer[internal.Indexer]
  45. dummyIndexer *internal.Indexer
  46. )
  47. func init() {
  48. i := internal.NewDummyIndexer()
  49. dummyIndexer = &i
  50. globalIndexer.Store(dummyIndexer)
  51. }
  52. // InitIssueIndexer initialize issue indexer, syncReindex is true then reindex until
  53. // all issue index done.
  54. func InitIssueIndexer(syncReindex bool) {
  55. ctx, _, finished := process.GetManager().AddTypedContext(context.Background(), "Service: IssueIndexer", process.SystemProcessType, false)
  56. indexerInitWaitChannel := make(chan time.Duration, 1)
  57. // Create the Queue
  58. issueIndexerQueue = queue.CreateUniqueQueue(ctx, "issue_indexer", getIssueIndexerQueueHandler(ctx))
  59. graceful.GetManager().RunAtTerminate(finished)
  60. // Create the Indexer
  61. go func() {
  62. pprof.SetGoroutineLabels(ctx)
  63. start := time.Now()
  64. log.Info("PID %d: Initializing Issue Indexer: %s", os.Getpid(), setting.Indexer.IssueType)
  65. var (
  66. issueIndexer internal.Indexer
  67. existed bool
  68. err error
  69. )
  70. switch setting.Indexer.IssueType {
  71. case "bleve":
  72. defer func() {
  73. if err := recover(); err != nil {
  74. log.Error("PANIC whilst initializing issue indexer: %v\nStacktrace: %s", err, log.Stack(2))
  75. log.Error("The indexer files are likely corrupted and may need to be deleted")
  76. log.Error("You can completely remove the %q directory to make Gitea recreate the indexes", setting.Indexer.IssuePath)
  77. globalIndexer.Store(dummyIndexer)
  78. log.Fatal("PID: %d Unable to initialize the Bleve Issue Indexer at path: %s Error: %v", os.Getpid(), setting.Indexer.IssuePath, err)
  79. }
  80. }()
  81. issueIndexer = bleve.NewIndexer(setting.Indexer.IssuePath)
  82. existed, err = issueIndexer.Init(ctx)
  83. if err != nil {
  84. log.Fatal("Unable to initialize Bleve Issue Indexer at path: %s Error: %v", setting.Indexer.IssuePath, err)
  85. }
  86. case "elasticsearch":
  87. issueIndexer = elasticsearch.NewIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueIndexerName)
  88. existed, err = issueIndexer.Init(ctx)
  89. if err != nil {
  90. log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err)
  91. }
  92. case "db":
  93. issueIndexer = db.NewIndexer()
  94. case "meilisearch":
  95. issueIndexer = meilisearch.NewIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueConnAuth, setting.Indexer.IssueIndexerName)
  96. existed, err = issueIndexer.Init(ctx)
  97. if err != nil {
  98. log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err)
  99. }
  100. default:
  101. log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType)
  102. }
  103. globalIndexer.Store(&issueIndexer)
  104. graceful.GetManager().RunAtTerminate(func() {
  105. log.Debug("Closing issue indexer")
  106. (*globalIndexer.Load()).Close()
  107. log.Info("PID: %d Issue Indexer closed", os.Getpid())
  108. })
  109. // Start processing the queue
  110. go graceful.GetManager().RunWithCancel(issueIndexerQueue)
  111. // Populate the index
  112. if !existed {
  113. if syncReindex {
  114. graceful.GetManager().RunWithShutdownContext(populateIssueIndexer)
  115. } else {
  116. go graceful.GetManager().RunWithShutdownContext(populateIssueIndexer)
  117. }
  118. }
  119. indexerInitWaitChannel <- time.Since(start)
  120. close(indexerInitWaitChannel)
  121. }()
  122. if syncReindex {
  123. select {
  124. case <-indexerInitWaitChannel:
  125. case <-graceful.GetManager().IsShutdown():
  126. }
  127. } else if setting.Indexer.StartupTimeout > 0 {
  128. go func() {
  129. pprof.SetGoroutineLabels(ctx)
  130. timeout := setting.Indexer.StartupTimeout
  131. if graceful.GetManager().IsChild() && setting.GracefulHammerTime > 0 {
  132. timeout += setting.GracefulHammerTime
  133. }
  134. select {
  135. case duration := <-indexerInitWaitChannel:
  136. log.Info("Issue Indexer Initialization took %v", duration)
  137. case <-graceful.GetManager().IsShutdown():
  138. log.Warn("Shutdown occurred before issue index initialisation was complete")
  139. case <-time.After(timeout):
  140. issueIndexerQueue.ShutdownWait(5 * time.Second)
  141. log.Fatal("Issue Indexer Initialization timed-out after: %v", timeout)
  142. }
  143. }()
  144. }
  145. }
  146. func getIssueIndexerQueueHandler(ctx context.Context) func(items ...*IndexerMetadata) []*IndexerMetadata {
  147. return func(items ...*IndexerMetadata) []*IndexerMetadata {
  148. var unhandled []*IndexerMetadata
  149. indexer := *globalIndexer.Load()
  150. for _, item := range items {
  151. log.Trace("IndexerMetadata Process: %d %v %t", item.ID, item.IDs, item.IsDelete)
  152. if item.IsDelete {
  153. if err := indexer.Delete(ctx, item.IDs...); err != nil {
  154. log.Error("Issue indexer handler: failed to from index: %v Error: %v", item.IDs, err)
  155. unhandled = append(unhandled, item)
  156. }
  157. continue
  158. }
  159. data, existed, err := getIssueIndexerData(ctx, item.ID)
  160. if err != nil {
  161. log.Error("Issue indexer handler: failed to get issue data of %d: %v", item.ID, err)
  162. unhandled = append(unhandled, item)
  163. continue
  164. }
  165. if !existed {
  166. if err := indexer.Delete(ctx, item.ID); err != nil {
  167. log.Error("Issue indexer handler: failed to delete issue %d from index: %v", item.ID, err)
  168. unhandled = append(unhandled, item)
  169. }
  170. continue
  171. }
  172. if err := indexer.Index(ctx, data); err != nil {
  173. log.Error("Issue indexer handler: failed to index issue %d: %v", item.ID, err)
  174. unhandled = append(unhandled, item)
  175. continue
  176. }
  177. }
  178. return unhandled
  179. }
  180. }
  181. // populateIssueIndexer populate the issue indexer with issue data
  182. func populateIssueIndexer(ctx context.Context) {
  183. ctx, _, finished := process.GetManager().AddTypedContext(ctx, "Service: PopulateIssueIndexer", process.SystemProcessType, true)
  184. defer finished()
  185. ctx = contextWithKeepRetry(ctx) // keep retrying since it's a background task
  186. if err := PopulateIssueIndexer(ctx); err != nil {
  187. log.Error("Issue indexer population failed: %v", err)
  188. }
  189. }
  190. func PopulateIssueIndexer(ctx context.Context) error {
  191. for page := 1; ; page++ {
  192. select {
  193. case <-ctx.Done():
  194. return fmt.Errorf("shutdown before completion: %w", ctx.Err())
  195. default:
  196. }
  197. repos, _, err := repo_model.SearchRepositoryByName(ctx, &repo_model.SearchRepoOptions{
  198. ListOptions: db_model.ListOptions{Page: page, PageSize: repo_model.RepositoryListDefaultPageSize},
  199. OrderBy: db_model.SearchOrderByID,
  200. Private: true,
  201. Collaborate: optional.Some(false),
  202. })
  203. if err != nil {
  204. log.Error("SearchRepositoryByName: %v", err)
  205. continue
  206. }
  207. if len(repos) == 0 {
  208. log.Debug("Issue Indexer population complete")
  209. return nil
  210. }
  211. for _, repo := range repos {
  212. if err := updateRepoIndexer(ctx, repo.ID); err != nil {
  213. return fmt.Errorf("populate issue indexer for repo %d: %v", repo.ID, err)
  214. }
  215. }
  216. }
  217. }
  218. // UpdateRepoIndexer add/update all issues of the repositories
  219. func UpdateRepoIndexer(ctx context.Context, repoID int64) {
  220. if err := updateRepoIndexer(ctx, repoID); err != nil {
  221. log.Error("Unable to push repo %d to issue indexer: %v", repoID, err)
  222. }
  223. }
  224. // UpdateIssueIndexer add/update an issue to the issue indexer
  225. func UpdateIssueIndexer(ctx context.Context, issueID int64) {
  226. if err := updateIssueIndexer(ctx, issueID); err != nil {
  227. log.Error("Unable to push issue %d to issue indexer: %v", issueID, err)
  228. }
  229. }
  230. // DeleteRepoIssueIndexer deletes repo's all issues indexes
  231. func DeleteRepoIssueIndexer(ctx context.Context, repoID int64) {
  232. if err := deleteRepoIssueIndexer(ctx, repoID); err != nil {
  233. log.Error("Unable to push deleted repo %d to issue indexer: %v", repoID, err)
  234. }
  235. }
  236. // IsAvailable checks if issue indexer is available
  237. func IsAvailable(ctx context.Context) bool {
  238. return (*globalIndexer.Load()).Ping(ctx) == nil
  239. }
  240. // SearchOptions indicates the options for searching issues
  241. type SearchOptions = internal.SearchOptions
  242. const (
  243. SortByCreatedDesc = internal.SortByCreatedDesc
  244. SortByUpdatedDesc = internal.SortByUpdatedDesc
  245. SortByCommentsDesc = internal.SortByCommentsDesc
  246. SortByDeadlineDesc = internal.SortByDeadlineDesc
  247. SortByCreatedAsc = internal.SortByCreatedAsc
  248. SortByUpdatedAsc = internal.SortByUpdatedAsc
  249. SortByCommentsAsc = internal.SortByCommentsAsc
  250. SortByDeadlineAsc = internal.SortByDeadlineAsc
  251. )
  252. // SearchIssues search issues by options.
  253. func SearchIssues(ctx context.Context, opts *SearchOptions) ([]int64, int64, error) {
  254. indexer := *globalIndexer.Load()
  255. if opts.Keyword == "" {
  256. // This is a conservative shortcut.
  257. // If the keyword is empty, db has better (at least not worse) performance to filter issues.
  258. // When the keyword is empty, it tends to listing rather than searching issues.
  259. // So if the user creates an issue and list issues immediately, the issue may not be listed because the indexer needs time to index the issue.
  260. // Even worse, the external indexer like elastic search may not be available for a while,
  261. // and the user may not be able to list issues completely until it is available again.
  262. indexer = db.NewIndexer()
  263. }
  264. result, err := indexer.Search(ctx, opts)
  265. if err != nil {
  266. return nil, 0, err
  267. }
  268. ret := make([]int64, 0, len(result.Hits))
  269. for _, hit := range result.Hits {
  270. ret = append(ret, hit.ID)
  271. }
  272. return ret, result.Total, nil
  273. }
  274. // CountIssues counts issues by options. It is a shortcut of SearchIssues(ctx, opts) but only returns the total count.
  275. func CountIssues(ctx context.Context, opts *SearchOptions) (int64, error) {
  276. opts = opts.Copy(func(options *SearchOptions) { options.Paginator = &db_model.ListOptions{PageSize: 0} })
  277. _, total, err := SearchIssues(ctx, opts)
  278. return total, err
  279. }