diff options
Diffstat (limited to 'modules')
-rw-r--r-- | modules/indexer/issues/indexer.go | 148 | ||||
-rw-r--r-- | modules/indexer/issues/indexer_test.go | 51 | ||||
-rw-r--r-- | modules/indexer/issues/queue_disk.go | 19 | ||||
-rw-r--r-- | modules/notification/indexer/indexer.go | 17 |
4 files changed, 219 insertions, 16 deletions
diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index c31006d0dd..41af5c36b6 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -4,6 +4,15 @@ package issues +import ( + "fmt" + + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/util" +) + // IndexerData data stored in the issue indexer type IndexerData struct { ID int64 @@ -34,3 +43,142 @@ type Indexer interface { Delete(ids ...int64) error Search(kw string, repoID int64, limit, start int) (*SearchResult, error) } + +var ( + // issueIndexerUpdateQueue queue of issue ids to be updated + issueIndexerUpdateQueue Queue + issueIndexer Indexer +) + +// InitIssueIndexer initialize issue indexer, syncReindex is true then reindex until +// all issue index done. +func InitIssueIndexer(syncReindex bool) error { + var populate bool + switch setting.Indexer.IssueType { + case "bleve": + issueIndexer = NewBleveIndexer(setting.Indexer.IssuePath) + exist, err := issueIndexer.Init() + if err != nil { + return err + } + populate = !exist + default: + return fmt.Errorf("unknow issue indexer type: %s", setting.Indexer.IssueType) + } + + var err error + switch setting.Indexer.IssueIndexerQueueType { + case setting.LevelQueueType: + issueIndexerUpdateQueue, err = NewLevelQueue( + issueIndexer, + setting.Indexer.IssueIndexerQueueDir, + setting.Indexer.IssueIndexerQueueBatchNumber) + if err != nil { + return err + } + case setting.ChannelQueueType: + issueIndexerUpdateQueue = NewChannelQueue(issueIndexer, setting.Indexer.IssueIndexerQueueBatchNumber) + default: + return fmt.Errorf("Unsupported indexer queue type: %v", setting.Indexer.IssueIndexerQueueType) + } + + go issueIndexerUpdateQueue.Run() + + if populate { + if syncReindex { + populateIssueIndexer() + } else { + go populateIssueIndexer() + } + } + + return nil +} + +// populateIssueIndexer populate the issue indexer with issue data +func populateIssueIndexer() { + for page := 1; ; page++ { + repos, _, err := models.SearchRepositoryByName(&models.SearchRepoOptions{ + Page: page, + PageSize: models.RepositoryListDefaultPageSize, + OrderBy: models.SearchOrderByID, + Private: true, + Collaborate: util.OptionalBoolFalse, + }) + if err != nil { + log.Error(4, "SearchRepositoryByName: %v", err) + continue + } + if len(repos) == 0 { + return + } + + for _, repo := range repos { + is, err := models.Issues(&models.IssuesOptions{ + RepoIDs: []int64{repo.ID}, + IsClosed: util.OptionalBoolNone, + IsPull: util.OptionalBoolNone, + }) + if err != nil { + log.Error(4, "Issues: %v", err) + continue + } + if err = models.IssueList(is).LoadDiscussComments(); err != nil { + log.Error(4, "LoadComments: %v", err) + continue + } + for _, issue := range is { + UpdateIssueIndexer(issue) + } + } + } +} + +// UpdateIssueIndexer add/update an issue to the issue indexer +func UpdateIssueIndexer(issue *models.Issue) { + var comments []string + for _, comment := range issue.Comments { + if comment.Type == models.CommentTypeComment { + comments = append(comments, comment.Content) + } + } + issueIndexerUpdateQueue.Push(&IndexerData{ + ID: issue.ID, + RepoID: issue.RepoID, + Title: issue.Title, + Content: issue.Content, + Comments: comments, + }) +} + +// DeleteRepoIssueIndexer deletes repo's all issues indexes +func DeleteRepoIssueIndexer(repo *models.Repository) { + var ids []int64 + ids, err := models.GetIssueIDsByRepoID(repo.ID) + if err != nil { + log.Error(4, "getIssueIDsByRepoID failed: %v", err) + return + } + + if len(ids) <= 0 { + return + } + + issueIndexerUpdateQueue.Push(&IndexerData{ + IDs: ids, + IsDelete: true, + }) +} + +// SearchIssuesByKeyword search issue ids by keywords and repo id +func SearchIssuesByKeyword(repoID int64, keyword string) ([]int64, error) { + var issueIDs []int64 + res, err := issueIndexer.Search(keyword, repoID, 1000, 0) + if err != nil { + return nil, err + } + for _, r := range res.Hits { + issueIDs = append(issueIDs, r.ID) + } + return issueIDs, nil +} diff --git a/modules/indexer/issues/indexer_test.go b/modules/indexer/issues/indexer_test.go new file mode 100644 index 0000000000..1b6bdec53e --- /dev/null +++ b/modules/indexer/issues/indexer_test.go @@ -0,0 +1,51 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package issues + +import ( + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/setting" + + "github.com/stretchr/testify/assert" +) + +func fatalTestError(fmtStr string, args ...interface{}) { + fmt.Fprintf(os.Stderr, fmtStr, args...) + os.Exit(1) +} + +func TestMain(m *testing.M) { + models.MainTest(m, filepath.Join("..", "..", "..")) +} + +func TestSearchIssues(t *testing.T) { + assert.NoError(t, models.PrepareTestDatabase()) + + os.RemoveAll(setting.Indexer.IssueIndexerQueueDir) + os.RemoveAll(setting.Indexer.IssuePath) + if err := InitIssueIndexer(true); err != nil { + fatalTestError("Error InitIssueIndexer: %v\n", err) + } + + time.Sleep(10 * time.Second) + + ids, err := SearchIssuesByKeyword(1, "issue2") + assert.NoError(t, err) + assert.EqualValues(t, []int64{2}, ids) + + ids, err = SearchIssuesByKeyword(1, "first") + assert.NoError(t, err) + assert.EqualValues(t, []int64{1}, ids) + + ids, err = SearchIssuesByKeyword(1, "for") + assert.NoError(t, err) + assert.EqualValues(t, []int64{1, 2, 3, 5}, ids) +} diff --git a/modules/indexer/issues/queue_disk.go b/modules/indexer/issues/queue_disk.go index 97e9a3d965..97bacdf99d 100644 --- a/modules/indexer/issues/queue_disk.go +++ b/modules/indexer/issues/queue_disk.go @@ -42,18 +42,21 @@ func (l *LevelQueue) Run() error { var i int var datas = make([]*IndexerData, 0, l.batchNumber) for { - bs, err := l.queue.RPop() - if err != nil { - log.Error(4, "RPop: %v", err) - time.Sleep(time.Millisecond * 100) - continue - } - i++ if len(datas) > l.batchNumber || (len(datas) > 0 && i > 3) { l.indexer.Index(datas) datas = make([]*IndexerData, 0, l.batchNumber) i = 0 + continue + } + + bs, err := l.queue.RPop() + if err != nil { + if err != levelqueue.ErrNotFound { + log.Error(4, "RPop: %v", err) + } + time.Sleep(time.Millisecond * 100) + continue } if len(bs) <= 0 { @@ -69,7 +72,7 @@ func (l *LevelQueue) Run() error { continue } - log.Trace("LedisLocalQueue: task found: %#v", data) + log.Trace("LevelQueue: task found: %#v", data) if data.IsDelete { if data.ID > 0 { diff --git a/modules/notification/indexer/indexer.go b/modules/notification/indexer/indexer.go index 66d483c017..45752e411a 100644 --- a/modules/notification/indexer/indexer.go +++ b/modules/notification/indexer/indexer.go @@ -6,6 +6,7 @@ package indexer import ( "code.gitea.io/gitea/models" + issue_indexer "code.gitea.io/gitea/modules/indexer/issues" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/notification/base" ) @@ -35,16 +36,16 @@ func (r *indexerNotifier) NotifyCreateIssueComment(doer *models.User, repo *mode issue.Comments = append(issue.Comments, comment) } - models.UpdateIssueIndexer(issue) + issue_indexer.UpdateIssueIndexer(issue) } } func (r *indexerNotifier) NotifyNewIssue(issue *models.Issue) { - models.UpdateIssueIndexer(issue) + issue_indexer.UpdateIssueIndexer(issue) } func (r *indexerNotifier) NotifyNewPullRequest(pr *models.PullRequest) { - models.UpdateIssueIndexer(pr.Issue) + issue_indexer.UpdateIssueIndexer(pr.Issue) } func (r *indexerNotifier) NotifyUpdateComment(doer *models.User, c *models.Comment, oldContent string) { @@ -67,7 +68,7 @@ func (r *indexerNotifier) NotifyUpdateComment(doer *models.User, c *models.Comme } } - models.UpdateIssueIndexer(c.Issue) + issue_indexer.UpdateIssueIndexer(c.Issue) } } @@ -91,18 +92,18 @@ func (r *indexerNotifier) NotifyDeleteComment(doer *models.User, comment *models } } // reload comments to delete the old comment - models.UpdateIssueIndexer(comment.Issue) + issue_indexer.UpdateIssueIndexer(comment.Issue) } } func (r *indexerNotifier) NotifyDeleteRepository(doer *models.User, repo *models.Repository) { - models.DeleteRepoIssueIndexer(repo) + issue_indexer.DeleteRepoIssueIndexer(repo) } func (r *indexerNotifier) NotifyIssueChangeContent(doer *models.User, issue *models.Issue, oldContent string) { - models.UpdateIssueIndexer(issue) + issue_indexer.UpdateIssueIndexer(issue) } func (r *indexerNotifier) NotifyIssueChangeTitle(doer *models.User, issue *models.Issue, oldTitle string) { - models.UpdateIssueIndexer(issue) + issue_indexer.UpdateIssueIndexer(issue) } |