diff options
author | Lunny Xiao <xiaolunwen@gmail.com> | 2019-02-21 08:54:05 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-02-21 08:54:05 +0800 |
commit | 0751153613bfd2e39cf28e83bbe04b76641d222f (patch) | |
tree | 91ec0e2bd81c9007f15f9ab255e177d2be138f24 /modules/indexer/issues | |
parent | eaf9ded18201d8ad2587860ed98763ca040f0b71 (diff) | |
download | gitea-0751153613bfd2e39cf28e83bbe04b76641d222f.tar.gz gitea-0751153613bfd2e39cf28e83bbe04b76641d222f.zip |
refactor issue indexer, add some testing and fix a bug (#6131)
* refactor issue indexer, add some testing and fix a bug
* fix error copyright year on comment header
* issues indexer package import keep consistent
Diffstat (limited to 'modules/indexer/issues')
-rw-r--r-- | modules/indexer/issues/indexer.go | 148 | ||||
-rw-r--r-- | modules/indexer/issues/indexer_test.go | 51 | ||||
-rw-r--r-- | modules/indexer/issues/queue_disk.go | 19 |
3 files changed, 210 insertions, 8 deletions
diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index c31006d0dd..41af5c36b6 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -4,6 +4,15 @@ package issues +import ( + "fmt" + + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/util" +) + // IndexerData data stored in the issue indexer type IndexerData struct { ID int64 @@ -34,3 +43,142 @@ type Indexer interface { Delete(ids ...int64) error Search(kw string, repoID int64, limit, start int) (*SearchResult, error) } + +var ( + // issueIndexerUpdateQueue queue of issue ids to be updated + issueIndexerUpdateQueue Queue + issueIndexer Indexer +) + +// InitIssueIndexer initialize issue indexer, syncReindex is true then reindex until +// all issue index done. +func InitIssueIndexer(syncReindex bool) error { + var populate bool + switch setting.Indexer.IssueType { + case "bleve": + issueIndexer = NewBleveIndexer(setting.Indexer.IssuePath) + exist, err := issueIndexer.Init() + if err != nil { + return err + } + populate = !exist + default: + return fmt.Errorf("unknow issue indexer type: %s", setting.Indexer.IssueType) + } + + var err error + switch setting.Indexer.IssueIndexerQueueType { + case setting.LevelQueueType: + issueIndexerUpdateQueue, err = NewLevelQueue( + issueIndexer, + setting.Indexer.IssueIndexerQueueDir, + setting.Indexer.IssueIndexerQueueBatchNumber) + if err != nil { + return err + } + case setting.ChannelQueueType: + issueIndexerUpdateQueue = NewChannelQueue(issueIndexer, setting.Indexer.IssueIndexerQueueBatchNumber) + default: + return fmt.Errorf("Unsupported indexer queue type: %v", setting.Indexer.IssueIndexerQueueType) + } + + go issueIndexerUpdateQueue.Run() + + if populate { + if syncReindex { + populateIssueIndexer() + } else { + go populateIssueIndexer() + } + } + + return nil +} + +// populateIssueIndexer populate the issue indexer with issue data +func populateIssueIndexer() { + for page := 1; ; page++ { + repos, _, err := models.SearchRepositoryByName(&models.SearchRepoOptions{ + Page: page, + PageSize: models.RepositoryListDefaultPageSize, + OrderBy: models.SearchOrderByID, + Private: true, + Collaborate: util.OptionalBoolFalse, + }) + if err != nil { + log.Error(4, "SearchRepositoryByName: %v", err) + continue + } + if len(repos) == 0 { + return + } + + for _, repo := range repos { + is, err := models.Issues(&models.IssuesOptions{ + RepoIDs: []int64{repo.ID}, + IsClosed: util.OptionalBoolNone, + IsPull: util.OptionalBoolNone, + }) + if err != nil { + log.Error(4, "Issues: %v", err) + continue + } + if err = models.IssueList(is).LoadDiscussComments(); err != nil { + log.Error(4, "LoadComments: %v", err) + continue + } + for _, issue := range is { + UpdateIssueIndexer(issue) + } + } + } +} + +// UpdateIssueIndexer add/update an issue to the issue indexer +func UpdateIssueIndexer(issue *models.Issue) { + var comments []string + for _, comment := range issue.Comments { + if comment.Type == models.CommentTypeComment { + comments = append(comments, comment.Content) + } + } + issueIndexerUpdateQueue.Push(&IndexerData{ + ID: issue.ID, + RepoID: issue.RepoID, + Title: issue.Title, + Content: issue.Content, + Comments: comments, + }) +} + +// DeleteRepoIssueIndexer deletes repo's all issues indexes +func DeleteRepoIssueIndexer(repo *models.Repository) { + var ids []int64 + ids, err := models.GetIssueIDsByRepoID(repo.ID) + if err != nil { + log.Error(4, "getIssueIDsByRepoID failed: %v", err) + return + } + + if len(ids) <= 0 { + return + } + + issueIndexerUpdateQueue.Push(&IndexerData{ + IDs: ids, + IsDelete: true, + }) +} + +// SearchIssuesByKeyword search issue ids by keywords and repo id +func SearchIssuesByKeyword(repoID int64, keyword string) ([]int64, error) { + var issueIDs []int64 + res, err := issueIndexer.Search(keyword, repoID, 1000, 0) + if err != nil { + return nil, err + } + for _, r := range res.Hits { + issueIDs = append(issueIDs, r.ID) + } + return issueIDs, nil +} diff --git a/modules/indexer/issues/indexer_test.go b/modules/indexer/issues/indexer_test.go new file mode 100644 index 0000000000..1b6bdec53e --- /dev/null +++ b/modules/indexer/issues/indexer_test.go @@ -0,0 +1,51 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package issues + +import ( + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/setting" + + "github.com/stretchr/testify/assert" +) + +func fatalTestError(fmtStr string, args ...interface{}) { + fmt.Fprintf(os.Stderr, fmtStr, args...) + os.Exit(1) +} + +func TestMain(m *testing.M) { + models.MainTest(m, filepath.Join("..", "..", "..")) +} + +func TestSearchIssues(t *testing.T) { + assert.NoError(t, models.PrepareTestDatabase()) + + os.RemoveAll(setting.Indexer.IssueIndexerQueueDir) + os.RemoveAll(setting.Indexer.IssuePath) + if err := InitIssueIndexer(true); err != nil { + fatalTestError("Error InitIssueIndexer: %v\n", err) + } + + time.Sleep(10 * time.Second) + + ids, err := SearchIssuesByKeyword(1, "issue2") + assert.NoError(t, err) + assert.EqualValues(t, []int64{2}, ids) + + ids, err = SearchIssuesByKeyword(1, "first") + assert.NoError(t, err) + assert.EqualValues(t, []int64{1}, ids) + + ids, err = SearchIssuesByKeyword(1, "for") + assert.NoError(t, err) + assert.EqualValues(t, []int64{1, 2, 3, 5}, ids) +} diff --git a/modules/indexer/issues/queue_disk.go b/modules/indexer/issues/queue_disk.go index 97e9a3d965..97bacdf99d 100644 --- a/modules/indexer/issues/queue_disk.go +++ b/modules/indexer/issues/queue_disk.go @@ -42,18 +42,21 @@ func (l *LevelQueue) Run() error { var i int var datas = make([]*IndexerData, 0, l.batchNumber) for { - bs, err := l.queue.RPop() - if err != nil { - log.Error(4, "RPop: %v", err) - time.Sleep(time.Millisecond * 100) - continue - } - i++ if len(datas) > l.batchNumber || (len(datas) > 0 && i > 3) { l.indexer.Index(datas) datas = make([]*IndexerData, 0, l.batchNumber) i = 0 + continue + } + + bs, err := l.queue.RPop() + if err != nil { + if err != levelqueue.ErrNotFound { + log.Error(4, "RPop: %v", err) + } + time.Sleep(time.Millisecond * 100) + continue } if len(bs) <= 0 { @@ -69,7 +72,7 @@ func (l *LevelQueue) Run() error { continue } - log.Trace("LedisLocalQueue: task found: %#v", data) + log.Trace("LevelQueue: task found: %#v", data) if data.IsDelete { if data.ID > 0 { |