Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.

indexer.go 8.2KB


  1. // Copyright 2018 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package issues
  5. import (
  6. "context"
  7. "fmt"
  8. "os"
  9. "sync"
  10. "time"
  11. "code.gitea.io/gitea/models"
  12. "code.gitea.io/gitea/modules/graceful"
  13. "code.gitea.io/gitea/modules/log"
  14. "code.gitea.io/gitea/modules/queue"
  15. "code.gitea.io/gitea/modules/setting"
  16. "code.gitea.io/gitea/modules/util"
  17. )
  18. // IndexerData data stored in the issue indexer
  19. type IndexerData struct {
  20. ID int64
  21. RepoID int64
  22. Title string
  23. Content string
  24. Comments []string
  25. IsDelete bool
  26. IDs []int64
  27. }
  28. // Match represents on search result
  29. type Match struct {
  30. ID int64 `json:"id"`
  31. Score float64 `json:"score"`
  32. }
  33. // SearchResult represents search results
  34. type SearchResult struct {
  35. Total int64
  36. Hits []Match
  37. }
  38. // Indexer defines an interface to indexer issues contents
  39. type Indexer interface {
  40. Init() (bool, error)
  41. Index(issue []*IndexerData) error
  42. Delete(ids ...int64) error
  43. Search(kw string, repoIDs []int64, limit, start int) (*SearchResult, error)
  44. Close()
  45. }
  46. type indexerHolder struct {
  47. indexer Indexer
  48. mutex sync.RWMutex
  49. cond *sync.Cond
  50. cancelled bool
  51. }
  52. func newIndexerHolder() *indexerHolder {
  53. h := &indexerHolder{}
  54. h.cond = sync.NewCond(h.mutex.RLocker())
  55. return h
  56. }
  57. func (h *indexerHolder) cancel() {
  58. h.mutex.Lock()
  59. defer h.mutex.Unlock()
  60. h.cancelled = true
  61. h.cond.Broadcast()
  62. }
  63. func (h *indexerHolder) set(indexer Indexer) {
  64. h.mutex.Lock()
  65. defer h.mutex.Unlock()
  66. h.indexer = indexer
  67. h.cond.Broadcast()
  68. }
  69. func (h *indexerHolder) get() Indexer {
  70. h.mutex.RLock()
  71. defer h.mutex.RUnlock()
  72. if h.indexer == nil && !h.cancelled {
  73. h.cond.Wait()
  74. }
  75. return h.indexer
  76. }
  77. var (
  78. // issueIndexerQueue queue of issue ids to be updated
  79. issueIndexerQueue queue.Queue
  80. holder = newIndexerHolder()
  81. )
  82. // InitIssueIndexer initialize issue indexer, syncReindex is true then reindex until
  83. // all issue index done.
  84. func InitIssueIndexer(syncReindex bool) {
  85. waitChannel := make(chan time.Duration)
  86. // Create the Queue
  87. switch setting.Indexer.IssueType {
  88. case "bleve":
  89. handler := func(data ...queue.Data) {
  90. indexer := holder.get()
  91. if indexer == nil {
  92. log.Error("Issue indexer handler: unable to get indexer!")
  93. return
  94. }
  95. iData := make([]*IndexerData, 0, setting.Indexer.IssueQueueBatchNumber)
  96. for _, datum := range data {
  97. indexerData, ok := datum.(*IndexerData)
  98. if !ok {
  99. log.Error("Unable to process provided datum: %v - not possible to cast to IndexerData", datum)
  100. continue
  101. }
  102. log.Trace("IndexerData Process: %d %v %t", indexerData.ID, indexerData.IDs, indexerData.IsDelete)
  103. if indexerData.IsDelete {
  104. _ = indexer.Delete(indexerData.IDs...)
  105. continue
  106. }
  107. iData = append(iData, indexerData)
  108. }
  109. if err := indexer.Index(iData); err != nil {
  110. log.Error("Error whilst indexing: %v Error: %v", iData, err)
  111. }
  112. }
  113. issueIndexerQueue = queue.CreateQueue("issue_indexer", handler, &IndexerData{})
  114. if issueIndexerQueue == nil {
  115. log.Fatal("Unable to create issue indexer queue")
  116. }
  117. default:
  118. issueIndexerQueue = &queue.DummyQueue{}
  119. }
  120. // Create the Indexer
  121. go func() {
  122. start := time.Now()
  123. log.Info("PID %d: Initializing Issue Indexer: %s", os.Getpid(), setting.Indexer.IssueType)
  124. var populate bool
  125. switch setting.Indexer.IssueType {
  126. case "bleve":
  127. issueIndexer := NewBleveIndexer(setting.Indexer.IssuePath)
  128. exist, err := issueIndexer.Init()
  129. if err != nil {
  130. holder.cancel()
  131. log.Fatal("Unable to initialize Bleve Issue Indexer: %v", err)
  132. }
  133. populate = !exist
  134. holder.set(issueIndexer)
  135. graceful.GetManager().RunAtTerminate(context.Background(), func() {
  136. log.Debug("Closing issue indexer")
  137. issueIndexer := holder.get()
  138. if issueIndexer != nil {
  139. issueIndexer.Close()
  140. }
  141. log.Info("PID: %d Issue Indexer closed", os.Getpid())
  142. })
  143. log.Debug("Created Bleve Indexer")
  144. case "db":
  145. issueIndexer := &DBIndexer{}
  146. holder.set(issueIndexer)
  147. default:
  148. holder.cancel()
  149. log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType)
  150. }
  151. // Start processing the queue
  152. go graceful.GetManager().RunWithShutdownFns(issueIndexerQueue.Run)
  153. // Populate the index
  154. if populate {
  155. if syncReindex {
  156. graceful.GetManager().RunWithShutdownContext(populateIssueIndexer)
  157. } else {
  158. go graceful.GetManager().RunWithShutdownContext(populateIssueIndexer)
  159. }
  160. }
  161. waitChannel <- time.Since(start)
  162. close(waitChannel)
  163. }()
  164. if syncReindex {
  165. select {
  166. case <-waitChannel:
  167. case <-graceful.GetManager().IsShutdown():
  168. }
  169. } else if setting.Indexer.StartupTimeout > 0 {
  170. go func() {
  171. timeout := setting.Indexer.StartupTimeout
  172. if graceful.GetManager().IsChild() && setting.GracefulHammerTime > 0 {
  173. timeout += setting.GracefulHammerTime
  174. }
  175. select {
  176. case duration := <-waitChannel:
  177. log.Info("Issue Indexer Initialization took %v", duration)
  178. case <-graceful.GetManager().IsShutdown():
  179. log.Warn("Shutdown occurred before issue index initialisation was complete")
  180. case <-time.After(timeout):
  181. if shutdownable, ok := issueIndexerQueue.(queue.Shutdownable); ok {
  182. shutdownable.Terminate()
  183. }
  184. log.Fatal("Issue Indexer Initialization timed-out after: %v", timeout)
  185. }
  186. }()
  187. }
  188. }
  189. // populateIssueIndexer populate the issue indexer with issue data
  190. func populateIssueIndexer(ctx context.Context) {
  191. for page := 1; ; page++ {
  192. select {
  193. case <-ctx.Done():
  194. log.Warn("Issue Indexer population shutdown before completion")
  195. return
  196. default:
  197. }
  198. repos, _, err := models.SearchRepositoryByName(&models.SearchRepoOptions{
  199. Page: page,
  200. PageSize: models.RepositoryListDefaultPageSize,
  201. OrderBy: models.SearchOrderByID,
  202. Private: true,
  203. Collaborate: util.OptionalBoolFalse,
  204. })
  205. if err != nil {
  206. log.Error("SearchRepositoryByName: %v", err)
  207. continue
  208. }
  209. if len(repos) == 0 {
  210. log.Debug("Issue Indexer population complete")
  211. return
  212. }
  213. for _, repo := range repos {
  214. select {
  215. case <-ctx.Done():
  216. log.Info("Issue Indexer population shutdown before completion")
  217. return
  218. default:
  219. }
  220. UpdateRepoIndexer(repo)
  221. }
  222. }
  223. }
  224. // UpdateRepoIndexer add/update all issues of the repositories
  225. func UpdateRepoIndexer(repo *models.Repository) {
  226. is, err := models.Issues(&models.IssuesOptions{
  227. RepoIDs: []int64{repo.ID},
  228. IsClosed: util.OptionalBoolNone,
  229. IsPull: util.OptionalBoolNone,
  230. })
  231. if err != nil {
  232. log.Error("Issues: %v", err)
  233. return
  234. }
  235. if err = models.IssueList(is).LoadDiscussComments(); err != nil {
  236. log.Error("LoadComments: %v", err)
  237. return
  238. }
  239. for _, issue := range is {
  240. UpdateIssueIndexer(issue)
  241. }
  242. }
  243. // UpdateIssueIndexer add/update an issue to the issue indexer
  244. func UpdateIssueIndexer(issue *models.Issue) {
  245. var comments []string
  246. for _, comment := range issue.Comments {
  247. if comment.Type == models.CommentTypeComment {
  248. comments = append(comments, comment.Content)
  249. }
  250. }
  251. indexerData := &IndexerData{
  252. ID: issue.ID,
  253. RepoID: issue.RepoID,
  254. Title: issue.Title,
  255. Content: issue.Content,
  256. Comments: comments,
  257. }
  258. log.Debug("Adding to channel: %v", indexerData)
  259. if err := issueIndexerQueue.Push(indexerData); err != nil {
  260. log.Error("Unable to push to issue indexer: %v: Error: %v", indexerData, err)
  261. }
  262. }
  263. // DeleteRepoIssueIndexer deletes repo's all issues indexes
  264. func DeleteRepoIssueIndexer(repo *models.Repository) {
  265. var ids []int64
  266. ids, err := models.GetIssueIDsByRepoID(repo.ID)
  267. if err != nil {
  268. log.Error("getIssueIDsByRepoID failed: %v", err)
  269. return
  270. }
  271. if len(ids) == 0 {
  272. return
  273. }
  274. indexerData := &IndexerData{
  275. IDs: ids,
  276. IsDelete: true,
  277. }
  278. if err := issueIndexerQueue.Push(indexerData); err != nil {
  279. log.Error("Unable to push to issue indexer: %v: Error: %v", indexerData, err)
  280. }
  281. }
  282. // SearchIssuesByKeyword search issue ids by keywords and repo id
  283. func SearchIssuesByKeyword(repoIDs []int64, keyword string) ([]int64, error) {
  284. var issueIDs []int64
  285. indexer := holder.get()
  286. if indexer == nil {
  287. log.Error("SearchIssuesByKeyword(): unable to get indexer!")
  288. return nil, fmt.Errorf("unable to get issue indexer")
  289. }
  290. res, err := indexer.Search(keyword, repoIDs, 1000, 0)
  291. if err != nil {
  292. return nil, err
  293. }
  294. for _, r := range res.Hits {
  295. issueIDs = append(issueIDs, r.ID)
  296. }
  297. return issueIDs, nil
  298. }