From 0751153613bfd2e39cf28e83bbe04b76641d222f Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Thu, 21 Feb 2019 08:54:05 +0800 Subject: refactor issue indexer, add some testing and fix a bug (#6131) * refactor issue indexer, add some testing and fix a bug * fix error copyright year on comment header * issues indexer package import keep consistent --- .gitignore | 1 + models/issue.go | 5 ++ models/issue_indexer.go | 148 -------------------------------- models/unit_tests.go | 4 - modules/indexer/issues/indexer.go | 148 ++++++++++++++++++++++++++++++++ modules/indexer/issues/indexer_test.go | 51 +++++++++++ modules/indexer/issues/queue_disk.go | 19 ++-- modules/notification/indexer/indexer.go | 17 ++-- routers/api/v1/repo/issue.go | 3 +- routers/init.go | 3 +- routers/repo/issue.go | 3 +- 11 files changed, 231 insertions(+), 171 deletions(-) delete mode 100644 models/issue_indexer.go create mode 100644 modules/indexer/issues/indexer_test.go diff --git a/.gitignore b/.gitignore index 941ec41e04..2fe0134f7d 100644 --- a/.gitignore +++ b/.gitignore @@ -62,6 +62,7 @@ coverage.all /integrations/pgsql.ini /integrations/mssql.ini /node_modules +/modules/indexer/issues/indexers # Snapcraft diff --git a/models/issue.go b/models/issue.go index 835c6cf9fc..8de26c2b15 100644 --- a/models/issue.go +++ b/models/issue.go @@ -1231,6 +1231,11 @@ func getIssueIDsByRepoID(e Engine, repoID int64) ([]int64, error) { return ids, err } +// GetIssueIDsByRepoID returns all issue ids by repo id +func GetIssueIDsByRepoID(repoID int64) ([]int64, error) { + return getIssueIDsByRepoID(x, repoID) +} + // GetIssuesByIDs return issues with the given IDs. func GetIssuesByIDs(issueIDs []int64) ([]*Issue, error) { return getIssuesByIDs(x, issueIDs) diff --git a/models/issue_indexer.go b/models/issue_indexer.go deleted file mode 100644 index d02b7164da..0000000000 --- a/models/issue_indexer.go +++ /dev/null @@ -1,148 +0,0 @@ -// Copyright 2017 The Gitea Authors. All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package models - -import ( - "fmt" - - "code.gitea.io/gitea/modules/indexer/issues" - "code.gitea.io/gitea/modules/log" - "code.gitea.io/gitea/modules/setting" - "code.gitea.io/gitea/modules/util" -) - -var ( - // issueIndexerUpdateQueue queue of issue ids to be updated - issueIndexerUpdateQueue issues.Queue - issueIndexer issues.Indexer -) - -// InitIssueIndexer initialize issue indexer -func InitIssueIndexer() error { - var populate bool - switch setting.Indexer.IssueType { - case "bleve": - issueIndexer = issues.NewBleveIndexer(setting.Indexer.IssuePath) - exist, err := issueIndexer.Init() - if err != nil { - return err - } - populate = !exist - default: - return fmt.Errorf("unknow issue indexer type: %s", setting.Indexer.IssueType) - } - - var err error - switch setting.Indexer.IssueIndexerQueueType { - case setting.LevelQueueType: - issueIndexerUpdateQueue, err = issues.NewLevelQueue( - issueIndexer, - setting.Indexer.IssueIndexerQueueDir, - setting.Indexer.IssueIndexerQueueBatchNumber) - if err != nil { - return err - } - case setting.ChannelQueueType: - issueIndexerUpdateQueue = issues.NewChannelQueue(issueIndexer, setting.Indexer.IssueIndexerQueueBatchNumber) - default: - return fmt.Errorf("Unsupported indexer queue type: %v", setting.Indexer.IssueIndexerQueueType) - } - - go issueIndexerUpdateQueue.Run() - - if populate { - go populateIssueIndexer() - } - - return nil -} - -// populateIssueIndexer populate the issue indexer with issue data -func populateIssueIndexer() { - for page := 1; ; page++ { - repos, _, err := SearchRepositoryByName(&SearchRepoOptions{ - Page: page, - PageSize: RepositoryListDefaultPageSize, - OrderBy: SearchOrderByID, - Private: true, - Collaborate: util.OptionalBoolFalse, - }) - if err != nil { - log.Error(4, "SearchRepositoryByName: %v", err) - continue - } - if len(repos) == 0 { - return - } - - for _, repo := range repos { - is, err := Issues(&IssuesOptions{ - RepoIDs: []int64{repo.ID}, - IsClosed: util.OptionalBoolNone, - IsPull: util.OptionalBoolNone, - }) - if err != nil { - log.Error(4, "Issues: %v", err) - continue - } - if err = IssueList(is).LoadDiscussComments(); err != nil { - log.Error(4, "LoadComments: %v", err) - continue - } - for _, issue := range is { - UpdateIssueIndexer(issue) - } - } - } -} - -// UpdateIssueIndexer add/update an issue to the issue indexer -func UpdateIssueIndexer(issue *Issue) { - var comments []string - for _, comment := range issue.Comments { - if comment.Type == CommentTypeComment { - comments = append(comments, comment.Content) - } - } - issueIndexerUpdateQueue.Push(&issues.IndexerData{ - ID: issue.ID, - RepoID: issue.RepoID, - Title: issue.Title, - Content: issue.Content, - Comments: comments, - }) -} - -// DeleteRepoIssueIndexer deletes repo's all issues indexes -func DeleteRepoIssueIndexer(repo *Repository) { - var ids []int64 - ids, err := getIssueIDsByRepoID(x, repo.ID) - if err != nil { - log.Error(4, "getIssueIDsByRepoID failed: %v", err) - return - } - - if len(ids) <= 0 { - return - } - - issueIndexerUpdateQueue.Push(&issues.IndexerData{ - IDs: ids, - IsDelete: true, - }) -} - -// SearchIssuesByKeyword search issue ids by keywords and repo id -func SearchIssuesByKeyword(repoID int64, keyword string) ([]int64, error) { - var issueIDs []int64 - res, err := issueIndexer.Search(keyword, repoID, 1000, 0) - if err != nil { - return nil, err - } - for _, r := range res.Hits { - issueIDs = append(issueIDs, r.ID) - } - return issueIDs, nil -} diff --git a/models/unit_tests.go b/models/unit_tests.go index f87dd7ee96..28cd91215e 100644 --- a/models/unit_tests.go +++ b/models/unit_tests.go @@ -44,10 +44,6 @@ func MainTest(m *testing.M, pathToGiteaRoot string) { fatalTestError("Error creating test engine: %v\n", err) } - if err = InitIssueIndexer(); err != nil { - fatalTestError("Error InitIssueIndexer: %v\n", err) - } - setting.AppURL = "https://try.gitea.io/" setting.RunUser = "runuser" setting.SSH.Port = 3000 diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index c31006d0dd..41af5c36b6 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -4,6 +4,15 @@ package issues +import ( + "fmt" + + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/util" +) + // IndexerData data stored in the issue indexer type IndexerData struct { ID int64 @@ -34,3 +43,142 @@ type Indexer interface { Delete(ids ...int64) error Search(kw string, repoID int64, limit, start int) (*SearchResult, error) } + +var ( + // issueIndexerUpdateQueue queue of issue ids to be updated + issueIndexerUpdateQueue Queue + issueIndexer Indexer +) + +// InitIssueIndexer initialize issue indexer, syncReindex is true then reindex until +// all issue index done. +func InitIssueIndexer(syncReindex bool) error { + var populate bool + switch setting.Indexer.IssueType { + case "bleve": + issueIndexer = NewBleveIndexer(setting.Indexer.IssuePath) + exist, err := issueIndexer.Init() + if err != nil { + return err + } + populate = !exist + default: + return fmt.Errorf("unknow issue indexer type: %s", setting.Indexer.IssueType) + } + + var err error + switch setting.Indexer.IssueIndexerQueueType { + case setting.LevelQueueType: + issueIndexerUpdateQueue, err = NewLevelQueue( + issueIndexer, + setting.Indexer.IssueIndexerQueueDir, + setting.Indexer.IssueIndexerQueueBatchNumber) + if err != nil { + return err + } + case setting.ChannelQueueType: + issueIndexerUpdateQueue = NewChannelQueue(issueIndexer, setting.Indexer.IssueIndexerQueueBatchNumber) + default: + return fmt.Errorf("Unsupported indexer queue type: %v", setting.Indexer.IssueIndexerQueueType) + } + + go issueIndexerUpdateQueue.Run() + + if populate { + if syncReindex { + populateIssueIndexer() + } else { + go populateIssueIndexer() + } + } + + return nil +} + +// populateIssueIndexer populate the issue indexer with issue data +func populateIssueIndexer() { + for page := 1; ; page++ { + repos, _, err := models.SearchRepositoryByName(&models.SearchRepoOptions{ + Page: page, + PageSize: models.RepositoryListDefaultPageSize, + OrderBy: models.SearchOrderByID, + Private: true, + Collaborate: util.OptionalBoolFalse, + }) + if err != nil { + log.Error(4, "SearchRepositoryByName: %v", err) + continue + } + if len(repos) == 0 { + return + } + + for _, repo := range repos { + is, err := models.Issues(&models.IssuesOptions{ + RepoIDs: []int64{repo.ID}, + IsClosed: util.OptionalBoolNone, + IsPull: util.OptionalBoolNone, + }) + if err != nil { + log.Error(4, "Issues: %v", err) + continue + } + if err = models.IssueList(is).LoadDiscussComments(); err != nil { + log.Error(4, "LoadComments: %v", err) + continue + } + for _, issue := range is { + UpdateIssueIndexer(issue) + } + } + } +} + +// UpdateIssueIndexer add/update an issue to the issue indexer +func UpdateIssueIndexer(issue *models.Issue) { + var comments []string + for _, comment := range issue.Comments { + if comment.Type == models.CommentTypeComment { + comments = append(comments, comment.Content) + } + } + issueIndexerUpdateQueue.Push(&IndexerData{ + ID: issue.ID, + RepoID: issue.RepoID, + Title: issue.Title, + Content: issue.Content, + Comments: comments, + }) +} + +// DeleteRepoIssueIndexer deletes repo's all issues indexes +func DeleteRepoIssueIndexer(repo *models.Repository) { + var ids []int64 + ids, err := models.GetIssueIDsByRepoID(repo.ID) + if err != nil { + log.Error(4, "getIssueIDsByRepoID failed: %v", err) + return + } + + if len(ids) <= 0 { + return + } + + issueIndexerUpdateQueue.Push(&IndexerData{ + IDs: ids, + IsDelete: true, + }) +} + +// SearchIssuesByKeyword search issue ids by keywords and repo id +func SearchIssuesByKeyword(repoID int64, keyword string) ([]int64, error) { + var issueIDs []int64 + res, err := issueIndexer.Search(keyword, repoID, 1000, 0) + if err != nil { + return nil, err + } + for _, r := range res.Hits { + issueIDs = append(issueIDs, r.ID) + } + return issueIDs, nil +} diff --git a/modules/indexer/issues/indexer_test.go b/modules/indexer/issues/indexer_test.go new file mode 100644 index 0000000000..1b6bdec53e --- /dev/null +++ b/modules/indexer/issues/indexer_test.go @@ -0,0 +1,51 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package issues + +import ( + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/setting" + + "github.com/stretchr/testify/assert" +) + +func fatalTestError(fmtStr string, args ...interface{}) { + fmt.Fprintf(os.Stderr, fmtStr, args...) + os.Exit(1) +} + +func TestMain(m *testing.M) { + models.MainTest(m, filepath.Join("..", "..", "..")) +} + +func TestSearchIssues(t *testing.T) { + assert.NoError(t, models.PrepareTestDatabase()) + + os.RemoveAll(setting.Indexer.IssueIndexerQueueDir) + os.RemoveAll(setting.Indexer.IssuePath) + if err := InitIssueIndexer(true); err != nil { + fatalTestError("Error InitIssueIndexer: %v\n", err) + } + + time.Sleep(10 * time.Second) + + ids, err := SearchIssuesByKeyword(1, "issue2") + assert.NoError(t, err) + assert.EqualValues(t, []int64{2}, ids) + + ids, err = SearchIssuesByKeyword(1, "first") + assert.NoError(t, err) + assert.EqualValues(t, []int64{1}, ids) + + ids, err = SearchIssuesByKeyword(1, "for") + assert.NoError(t, err) + assert.EqualValues(t, []int64{1, 2, 3, 5}, ids) +} diff --git a/modules/indexer/issues/queue_disk.go b/modules/indexer/issues/queue_disk.go index 97e9a3d965..97bacdf99d 100644 --- a/modules/indexer/issues/queue_disk.go +++ b/modules/indexer/issues/queue_disk.go @@ -42,18 +42,21 @@ func (l *LevelQueue) Run() error { var i int var datas = make([]*IndexerData, 0, l.batchNumber) for { - bs, err := l.queue.RPop() - if err != nil { - log.Error(4, "RPop: %v", err) - time.Sleep(time.Millisecond * 100) - continue - } - i++ if len(datas) > l.batchNumber || (len(datas) > 0 && i > 3) { l.indexer.Index(datas) datas = make([]*IndexerData, 0, l.batchNumber) i = 0 + continue + } + + bs, err := l.queue.RPop() + if err != nil { + if err != levelqueue.ErrNotFound { + log.Error(4, "RPop: %v", err) + } + time.Sleep(time.Millisecond * 100) + continue } if len(bs) <= 0 { @@ -69,7 +72,7 @@ func (l *LevelQueue) Run() error { continue } - log.Trace("LedisLocalQueue: task found: %#v", data) + log.Trace("LevelQueue: task found: %#v", data) if data.IsDelete { if data.ID > 0 { diff --git a/modules/notification/indexer/indexer.go b/modules/notification/indexer/indexer.go index 66d483c017..45752e411a 100644 --- a/modules/notification/indexer/indexer.go +++ b/modules/notification/indexer/indexer.go @@ -6,6 +6,7 @@ package indexer import ( "code.gitea.io/gitea/models" + issue_indexer "code.gitea.io/gitea/modules/indexer/issues" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/notification/base" ) @@ -35,16 +36,16 @@ func (r *indexerNotifier) NotifyCreateIssueComment(doer *models.User, repo *mode issue.Comments = append(issue.Comments, comment) } - models.UpdateIssueIndexer(issue) + issue_indexer.UpdateIssueIndexer(issue) } } func (r *indexerNotifier) NotifyNewIssue(issue *models.Issue) { - models.UpdateIssueIndexer(issue) + issue_indexer.UpdateIssueIndexer(issue) } func (r *indexerNotifier) NotifyNewPullRequest(pr *models.PullRequest) { - models.UpdateIssueIndexer(pr.Issue) + issue_indexer.UpdateIssueIndexer(pr.Issue) } func (r *indexerNotifier) NotifyUpdateComment(doer *models.User, c *models.Comment, oldContent string) { @@ -67,7 +68,7 @@ func (r *indexerNotifier) NotifyUpdateComment(doer *models.User, c *models.Comme } } - models.UpdateIssueIndexer(c.Issue) + issue_indexer.UpdateIssueIndexer(c.Issue) } } @@ -91,18 +92,18 @@ func (r *indexerNotifier) NotifyDeleteComment(doer *models.User, comment *models } } // reload comments to delete the old comment - models.UpdateIssueIndexer(comment.Issue) + issue_indexer.UpdateIssueIndexer(comment.Issue) } } func (r *indexerNotifier) NotifyDeleteRepository(doer *models.User, repo *models.Repository) { - models.DeleteRepoIssueIndexer(repo) + issue_indexer.DeleteRepoIssueIndexer(repo) } func (r *indexerNotifier) NotifyIssueChangeContent(doer *models.User, issue *models.Issue, oldContent string) { - models.UpdateIssueIndexer(issue) + issue_indexer.UpdateIssueIndexer(issue) } func (r *indexerNotifier) NotifyIssueChangeTitle(doer *models.User, issue *models.Issue, oldTitle string) { - models.UpdateIssueIndexer(issue) + issue_indexer.UpdateIssueIndexer(issue) } diff --git a/routers/api/v1/repo/issue.go b/routers/api/v1/repo/issue.go index a129447c09..e63db4b6a4 100644 --- a/routers/api/v1/repo/issue.go +++ b/routers/api/v1/repo/issue.go @@ -13,6 +13,7 @@ import ( "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/context" + issue_indexer "code.gitea.io/gitea/modules/indexer/issues" "code.gitea.io/gitea/modules/notification" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/util" @@ -77,7 +78,7 @@ func ListIssues(ctx *context.APIContext) { var labelIDs []int64 var err error if len(keyword) > 0 { - issueIDs, err = models.SearchIssuesByKeyword(ctx.Repo.Repository.ID, keyword) + issueIDs, err = issue_indexer.SearchIssuesByKeyword(ctx.Repo.Repository.ID, keyword) } if splitted := strings.Split(ctx.Query("labels"), ","); len(splitted) > 0 { diff --git a/routers/init.go b/routers/init.go index 1da21a351b..2fc3bb9668 100644 --- a/routers/init.go +++ b/routers/init.go @@ -15,6 +15,7 @@ import ( "code.gitea.io/gitea/modules/cache" "code.gitea.io/gitea/modules/cron" "code.gitea.io/gitea/modules/highlight" + issue_indexer "code.gitea.io/gitea/modules/indexer/issues" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/mailer" "code.gitea.io/gitea/modules/markup" @@ -90,7 +91,7 @@ func GlobalInit() { // Booting long running goroutines. cron.NewContext() - if err := models.InitIssueIndexer(); err != nil { + if err := issue_indexer.InitIssueIndexer(false); err != nil { log.Fatal(4, "Failed to initialize issue indexer: %v", err) } models.InitRepoIndexer() diff --git a/routers/repo/issue.go b/routers/repo/issue.go index f0a6d1bd5f..33d3ec6a74 100644 --- a/routers/repo/issue.go +++ b/routers/repo/issue.go @@ -23,6 +23,7 @@ import ( "code.gitea.io/gitea/modules/auth" "code.gitea.io/gitea/modules/base" "code.gitea.io/gitea/modules/context" + issue_indexer "code.gitea.io/gitea/modules/indexer/issues" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/markup/markdown" "code.gitea.io/gitea/modules/notification" @@ -146,7 +147,7 @@ func issues(ctx *context.Context, milestoneID int64, isPullOption util.OptionalB var issueIDs []int64 if len(keyword) > 0 { - issueIDs, err = models.SearchIssuesByKeyword(repo.ID, keyword) + issueIDs, err = issue_indexer.SearchIssuesByKeyword(repo.ID, keyword) if err != nil { ctx.ServerError("issueIndexer.Search", err) return -- cgit v1.2.3