You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

indexer.go 9.6KB


  1. // Copyright 2018 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package issues
  5. import (
  6. "context"
  7. "fmt"
  8. "os"
  9. "sync"
  10. "time"
  11. "code.gitea.io/gitea/models"
  12. "code.gitea.io/gitea/modules/graceful"
  13. "code.gitea.io/gitea/modules/log"
  14. "code.gitea.io/gitea/modules/queue"
  15. "code.gitea.io/gitea/modules/setting"
  16. "code.gitea.io/gitea/modules/util"
  17. )
  18. // IndexerData data stored in the issue indexer
  19. type IndexerData struct {
  20. ID int64 `json:"id"`
  21. RepoID int64 `json:"repo_id"`
  22. Title string `json:"title"`
  23. Content string `json:"content"`
  24. Comments []string `json:"comments"`
  25. IsDelete bool `json:"is_delete"`
  26. IDs []int64 `json:"ids"`
  27. }
  28. // Match represents on search result
  29. type Match struct {
  30. ID int64 `json:"id"`
  31. Score float64 `json:"score"`
  32. }
  33. // SearchResult represents search results
  34. type SearchResult struct {
  35. Total int64
  36. Hits []Match
  37. }
  38. // Indexer defines an interface to indexer issues contents
  39. type Indexer interface {
  40. Init() (bool, error)
  41. Index(issue []*IndexerData) error
  42. Delete(ids ...int64) error
  43. Search(kw string, repoIDs []int64, limit, start int) (*SearchResult, error)
  44. Close()
  45. }
  46. type indexerHolder struct {
  47. indexer Indexer
  48. mutex sync.RWMutex
  49. cond *sync.Cond
  50. cancelled bool
  51. }
  52. func newIndexerHolder() *indexerHolder {
  53. h := &indexerHolder{}
  54. h.cond = sync.NewCond(h.mutex.RLocker())
  55. return h
  56. }
  57. func (h *indexerHolder) cancel() {
  58. h.mutex.Lock()
  59. defer h.mutex.Unlock()
  60. h.cancelled = true
  61. h.cond.Broadcast()
  62. }
  63. func (h *indexerHolder) set(indexer Indexer) {
  64. h.mutex.Lock()
  65. defer h.mutex.Unlock()
  66. h.indexer = indexer
  67. h.cond.Broadcast()
  68. }
  69. func (h *indexerHolder) get() Indexer {
  70. h.mutex.RLock()
  71. defer h.mutex.RUnlock()
  72. if h.indexer == nil && !h.cancelled {
  73. h.cond.Wait()
  74. }
  75. return h.indexer
  76. }
  77. var (
  78. // issueIndexerQueue queue of issue ids to be updated
  79. issueIndexerQueue queue.Queue
  80. holder = newIndexerHolder()
  81. )
  82. // InitIssueIndexer initialize issue indexer, syncReindex is true then reindex until
  83. // all issue index done.
  84. func InitIssueIndexer(syncReindex bool) {
  85. waitChannel := make(chan time.Duration)
  86. // Create the Queue
  87. switch setting.Indexer.IssueType {
  88. case "bleve", "elasticsearch":
  89. handler := func(data ...queue.Data) {
  90. indexer := holder.get()
  91. if indexer == nil {
  92. log.Error("Issue indexer handler: unable to get indexer!")
  93. return
  94. }
  95. iData := make([]*IndexerData, 0, setting.Indexer.IssueQueueBatchNumber)
  96. for _, datum := range data {
  97. indexerData, ok := datum.(*IndexerData)
  98. if !ok {
  99. log.Error("Unable to process provided datum: %v - not possible to cast to IndexerData", datum)
  100. continue
  101. }
  102. log.Trace("IndexerData Process: %d %v %t", indexerData.ID, indexerData.IDs, indexerData.IsDelete)
  103. if indexerData.IsDelete {
  104. _ = indexer.Delete(indexerData.IDs...)
  105. continue
  106. }
  107. iData = append(iData, indexerData)
  108. }
  109. if err := indexer.Index(iData); err != nil {
  110. log.Error("Error whilst indexing: %v Error: %v", iData, err)
  111. }
  112. }
  113. issueIndexerQueue = queue.CreateQueue("issue_indexer", handler, &IndexerData{})
  114. if issueIndexerQueue == nil {
  115. log.Fatal("Unable to create issue indexer queue")
  116. }
  117. default:
  118. issueIndexerQueue = &queue.DummyQueue{}
  119. }
  120. // Create the Indexer
  121. go func() {
  122. start := time.Now()
  123. log.Info("PID %d: Initializing Issue Indexer: %s", os.Getpid(), setting.Indexer.IssueType)
  124. var populate bool
  125. switch setting.Indexer.IssueType {
  126. case "bleve":
  127. defer func() {
  128. if err := recover(); err != nil {
  129. log.Error("PANIC whilst initializing issue indexer: %v\nStacktrace: %s", err, log.Stack(2))
  130. log.Error("The indexer files are likely corrupted and may need to be deleted")
  131. log.Error("You can completely remove the %q directory to make Gitea recreate the indexes", setting.Indexer.IssuePath)
  132. holder.cancel()
  133. log.Fatal("PID: %d Unable to initialize the Bleve Issue Indexer at path: %s Error: %v", os.Getpid(), setting.Indexer.IssuePath, err)
  134. }
  135. }()
  136. issueIndexer := NewBleveIndexer(setting.Indexer.IssuePath)
  137. exist, err := issueIndexer.Init()
  138. if err != nil {
  139. holder.cancel()
  140. log.Fatal("Unable to initialize Bleve Issue Indexer at path: %s Error: %v", setting.Indexer.IssuePath, err)
  141. }
  142. populate = !exist
  143. holder.set(issueIndexer)
  144. graceful.GetManager().RunAtTerminate(context.Background(), func() {
  145. log.Debug("Closing issue indexer")
  146. issueIndexer := holder.get()
  147. if issueIndexer != nil {
  148. issueIndexer.Close()
  149. }
  150. log.Info("PID: %d Issue Indexer closed", os.Getpid())
  151. })
  152. log.Debug("Created Bleve Indexer")
  153. case "elasticsearch":
  154. graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(context.Context, func())) {
  155. issueIndexer, err := NewElasticSearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueIndexerName)
  156. if err != nil {
  157. log.Fatal("Unable to initialize Elastic Search Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err)
  158. }
  159. exist, err := issueIndexer.Init()
  160. if err != nil {
  161. log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err)
  162. }
  163. populate = !exist
  164. holder.set(issueIndexer)
  165. })
  166. case "db":
  167. issueIndexer := &DBIndexer{}
  168. holder.set(issueIndexer)
  169. default:
  170. holder.cancel()
  171. log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType)
  172. }
  173. // Start processing the queue
  174. go graceful.GetManager().RunWithShutdownFns(issueIndexerQueue.Run)
  175. // Populate the index
  176. if populate {
  177. if syncReindex {
  178. graceful.GetManager().RunWithShutdownContext(populateIssueIndexer)
  179. } else {
  180. go graceful.GetManager().RunWithShutdownContext(populateIssueIndexer)
  181. }
  182. }
  183. waitChannel <- time.Since(start)
  184. close(waitChannel)
  185. }()
  186. if syncReindex {
  187. select {
  188. case <-waitChannel:
  189. case <-graceful.GetManager().IsShutdown():
  190. }
  191. } else if setting.Indexer.StartupTimeout > 0 {
  192. go func() {
  193. timeout := setting.Indexer.StartupTimeout
  194. if graceful.GetManager().IsChild() && setting.GracefulHammerTime > 0 {
  195. timeout += setting.GracefulHammerTime
  196. }
  197. select {
  198. case duration := <-waitChannel:
  199. log.Info("Issue Indexer Initialization took %v", duration)
  200. case <-graceful.GetManager().IsShutdown():
  201. log.Warn("Shutdown occurred before issue index initialisation was complete")
  202. case <-time.After(timeout):
  203. if shutdownable, ok := issueIndexerQueue.(queue.Shutdownable); ok {
  204. shutdownable.Terminate()
  205. }
  206. log.Fatal("Issue Indexer Initialization timed-out after: %v", timeout)
  207. }
  208. }()
  209. }
  210. }
  211. // populateIssueIndexer populate the issue indexer with issue data
  212. func populateIssueIndexer(ctx context.Context) {
  213. for page := 1; ; page++ {
  214. select {
  215. case <-ctx.Done():
  216. log.Warn("Issue Indexer population shutdown before completion")
  217. return
  218. default:
  219. }
  220. repos, _, err := models.SearchRepositoryByName(&models.SearchRepoOptions{
  221. ListOptions: models.ListOptions{Page: page, PageSize: models.RepositoryListDefaultPageSize},
  222. OrderBy: models.SearchOrderByID,
  223. Private: true,
  224. Collaborate: util.OptionalBoolFalse,
  225. })
  226. if err != nil {
  227. log.Error("SearchRepositoryByName: %v", err)
  228. continue
  229. }
  230. if len(repos) == 0 {
  231. log.Debug("Issue Indexer population complete")
  232. return
  233. }
  234. for _, repo := range repos {
  235. select {
  236. case <-ctx.Done():
  237. log.Info("Issue Indexer population shutdown before completion")
  238. return
  239. default:
  240. }
  241. UpdateRepoIndexer(repo)
  242. }
  243. }
  244. }
  245. // UpdateRepoIndexer add/update all issues of the repositories
  246. func UpdateRepoIndexer(repo *models.Repository) {
  247. is, err := models.Issues(&models.IssuesOptions{
  248. RepoIDs: []int64{repo.ID},
  249. IsClosed: util.OptionalBoolNone,
  250. IsPull: util.OptionalBoolNone,
  251. })
  252. if err != nil {
  253. log.Error("Issues: %v", err)
  254. return
  255. }
  256. if err = models.IssueList(is).LoadDiscussComments(); err != nil {
  257. log.Error("LoadComments: %v", err)
  258. return
  259. }
  260. for _, issue := range is {
  261. UpdateIssueIndexer(issue)
  262. }
  263. }
  264. // UpdateIssueIndexer add/update an issue to the issue indexer
  265. func UpdateIssueIndexer(issue *models.Issue) {
  266. var comments []string
  267. for _, comment := range issue.Comments {
  268. if comment.Type == models.CommentTypeComment {
  269. comments = append(comments, comment.Content)
  270. }
  271. }
  272. indexerData := &IndexerData{
  273. ID: issue.ID,
  274. RepoID: issue.RepoID,
  275. Title: issue.Title,
  276. Content: issue.Content,
  277. Comments: comments,
  278. }
  279. log.Debug("Adding to channel: %v", indexerData)
  280. if err := issueIndexerQueue.Push(indexerData); err != nil {
  281. log.Error("Unable to push to issue indexer: %v: Error: %v", indexerData, err)
  282. }
  283. }
  284. // DeleteRepoIssueIndexer deletes repo's all issues indexes
  285. func DeleteRepoIssueIndexer(repo *models.Repository) {
  286. var ids []int64
  287. ids, err := models.GetIssueIDsByRepoID(repo.ID)
  288. if err != nil {
  289. log.Error("getIssueIDsByRepoID failed: %v", err)
  290. return
  291. }
  292. if len(ids) == 0 {
  293. return
  294. }
  295. indexerData := &IndexerData{
  296. IDs: ids,
  297. IsDelete: true,
  298. }
  299. if err := issueIndexerQueue.Push(indexerData); err != nil {
  300. log.Error("Unable to push to issue indexer: %v: Error: %v", indexerData, err)
  301. }
  302. }
  303. // SearchIssuesByKeyword search issue ids by keywords and repo id
  304. // WARNNING: You have to ensure user have permission to visit repoIDs' issues
  305. func SearchIssuesByKeyword(repoIDs []int64, keyword string) ([]int64, error) {
  306. var issueIDs []int64
  307. indexer := holder.get()
  308. if indexer == nil {
  309. log.Error("SearchIssuesByKeyword(): unable to get indexer!")
  310. return nil, fmt.Errorf("unable to get issue indexer")
  311. }
  312. res, err := indexer.Search(keyword, repoIDs, 50, 0)
  313. if err != nil {
  314. return nil, err
  315. }
  316. for _, r := range res.Hits {
  317. issueIDs = append(issueIDs, r.ID)
  318. }
  319. return issueIDs, nil
  320. }