summaryrefslogtreecommitdiffstats
path: root/modules/indexer
diff options
context:
space:
mode:
authorLunny Xiao <xiaolunwen@gmail.com>2019-02-21 08:54:05 +0800
committerGitHub <noreply@github.com>2019-02-21 08:54:05 +0800
commit0751153613bfd2e39cf28e83bbe04b76641d222f (patch)
tree91ec0e2bd81c9007f15f9ab255e177d2be138f24 /modules/indexer
parenteaf9ded18201d8ad2587860ed98763ca040f0b71 (diff)
downloadgitea-0751153613bfd2e39cf28e83bbe04b76641d222f.tar.gz
gitea-0751153613bfd2e39cf28e83bbe04b76641d222f.zip
refactor issue indexer, add some testing and fix a bug (#6131)
* refactor issue indexer, add some testing and fix a bug * fix error copyright year on comment header * issues indexer package import keep consistent
Diffstat (limited to 'modules/indexer')
-rw-r--r--modules/indexer/issues/indexer.go148
-rw-r--r--modules/indexer/issues/indexer_test.go51
-rw-r--r--modules/indexer/issues/queue_disk.go19
3 files changed, 210 insertions, 8 deletions
diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go
index c31006d0dd..41af5c36b6 100644
--- a/modules/indexer/issues/indexer.go
+++ b/modules/indexer/issues/indexer.go
@@ -4,6 +4,15 @@
package issues
+import (
+ "fmt"
+
+ "code.gitea.io/gitea/models"
+ "code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/setting"
+ "code.gitea.io/gitea/modules/util"
+)
+
// IndexerData data stored in the issue indexer
type IndexerData struct {
ID int64
@@ -34,3 +43,142 @@ type Indexer interface {
Delete(ids ...int64) error
Search(kw string, repoID int64, limit, start int) (*SearchResult, error)
}
+
+var (
+ // issueIndexerUpdateQueue queue of issue ids to be updated
+ issueIndexerUpdateQueue Queue
+ issueIndexer Indexer
+)
+
+// InitIssueIndexer initialize issue indexer, syncReindex is true then reindex until
+// all issue index done.
+func InitIssueIndexer(syncReindex bool) error {
+ var populate bool
+ switch setting.Indexer.IssueType {
+ case "bleve":
+ issueIndexer = NewBleveIndexer(setting.Indexer.IssuePath)
+ exist, err := issueIndexer.Init()
+ if err != nil {
+ return err
+ }
+ populate = !exist
+ default:
+ return fmt.Errorf("unknow issue indexer type: %s", setting.Indexer.IssueType)
+ }
+
+ var err error
+ switch setting.Indexer.IssueIndexerQueueType {
+ case setting.LevelQueueType:
+ issueIndexerUpdateQueue, err = NewLevelQueue(
+ issueIndexer,
+ setting.Indexer.IssueIndexerQueueDir,
+ setting.Indexer.IssueIndexerQueueBatchNumber)
+ if err != nil {
+ return err
+ }
+ case setting.ChannelQueueType:
+ issueIndexerUpdateQueue = NewChannelQueue(issueIndexer, setting.Indexer.IssueIndexerQueueBatchNumber)
+ default:
+ return fmt.Errorf("Unsupported indexer queue type: %v", setting.Indexer.IssueIndexerQueueType)
+ }
+
+ go issueIndexerUpdateQueue.Run()
+
+ if populate {
+ if syncReindex {
+ populateIssueIndexer()
+ } else {
+ go populateIssueIndexer()
+ }
+ }
+
+ return nil
+}
+
+// populateIssueIndexer populate the issue indexer with issue data
+func populateIssueIndexer() {
+ for page := 1; ; page++ {
+ repos, _, err := models.SearchRepositoryByName(&models.SearchRepoOptions{
+ Page: page,
+ PageSize: models.RepositoryListDefaultPageSize,
+ OrderBy: models.SearchOrderByID,
+ Private: true,
+ Collaborate: util.OptionalBoolFalse,
+ })
+ if err != nil {
+ log.Error(4, "SearchRepositoryByName: %v", err)
+ continue
+ }
+ if len(repos) == 0 {
+ return
+ }
+
+ for _, repo := range repos {
+ is, err := models.Issues(&models.IssuesOptions{
+ RepoIDs: []int64{repo.ID},
+ IsClosed: util.OptionalBoolNone,
+ IsPull: util.OptionalBoolNone,
+ })
+ if err != nil {
+ log.Error(4, "Issues: %v", err)
+ continue
+ }
+ if err = models.IssueList(is).LoadDiscussComments(); err != nil {
+ log.Error(4, "LoadComments: %v", err)
+ continue
+ }
+ for _, issue := range is {
+ UpdateIssueIndexer(issue)
+ }
+ }
+ }
+}
+
+// UpdateIssueIndexer add/update an issue to the issue indexer
+func UpdateIssueIndexer(issue *models.Issue) {
+ var comments []string
+ for _, comment := range issue.Comments {
+ if comment.Type == models.CommentTypeComment {
+ comments = append(comments, comment.Content)
+ }
+ }
+ issueIndexerUpdateQueue.Push(&IndexerData{
+ ID: issue.ID,
+ RepoID: issue.RepoID,
+ Title: issue.Title,
+ Content: issue.Content,
+ Comments: comments,
+ })
+}
+
+// DeleteRepoIssueIndexer deletes repo's all issues indexes
+func DeleteRepoIssueIndexer(repo *models.Repository) {
+ var ids []int64
+ ids, err := models.GetIssueIDsByRepoID(repo.ID)
+ if err != nil {
+ log.Error(4, "getIssueIDsByRepoID failed: %v", err)
+ return
+ }
+
+ if len(ids) <= 0 {
+ return
+ }
+
+ issueIndexerUpdateQueue.Push(&IndexerData{
+ IDs: ids,
+ IsDelete: true,
+ })
+}
+
+// SearchIssuesByKeyword search issue ids by keywords and repo id
+func SearchIssuesByKeyword(repoID int64, keyword string) ([]int64, error) {
+ var issueIDs []int64
+ res, err := issueIndexer.Search(keyword, repoID, 1000, 0)
+ if err != nil {
+ return nil, err
+ }
+ for _, r := range res.Hits {
+ issueIDs = append(issueIDs, r.ID)
+ }
+ return issueIDs, nil
+}
diff --git a/modules/indexer/issues/indexer_test.go b/modules/indexer/issues/indexer_test.go
new file mode 100644
index 0000000000..1b6bdec53e
--- /dev/null
+++ b/modules/indexer/issues/indexer_test.go
@@ -0,0 +1,51 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package issues
+
+import (
+ "fmt"
+ "os"
+ "path/filepath"
+ "testing"
+ "time"
+
+ "code.gitea.io/gitea/models"
+ "code.gitea.io/gitea/modules/setting"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func fatalTestError(fmtStr string, args ...interface{}) {
+ fmt.Fprintf(os.Stderr, fmtStr, args...)
+ os.Exit(1)
+}
+
+func TestMain(m *testing.M) {
+ models.MainTest(m, filepath.Join("..", "..", ".."))
+}
+
+func TestSearchIssues(t *testing.T) {
+ assert.NoError(t, models.PrepareTestDatabase())
+
+ os.RemoveAll(setting.Indexer.IssueIndexerQueueDir)
+ os.RemoveAll(setting.Indexer.IssuePath)
+ if err := InitIssueIndexer(true); err != nil {
+ fatalTestError("Error InitIssueIndexer: %v\n", err)
+ }
+
+ time.Sleep(10 * time.Second)
+
+ ids, err := SearchIssuesByKeyword(1, "issue2")
+ assert.NoError(t, err)
+ assert.EqualValues(t, []int64{2}, ids)
+
+ ids, err = SearchIssuesByKeyword(1, "first")
+ assert.NoError(t, err)
+ assert.EqualValues(t, []int64{1}, ids)
+
+ ids, err = SearchIssuesByKeyword(1, "for")
+ assert.NoError(t, err)
+ assert.EqualValues(t, []int64{1, 2, 3, 5}, ids)
+}
diff --git a/modules/indexer/issues/queue_disk.go b/modules/indexer/issues/queue_disk.go
index 97e9a3d965..97bacdf99d 100644
--- a/modules/indexer/issues/queue_disk.go
+++ b/modules/indexer/issues/queue_disk.go
@@ -42,18 +42,21 @@ func (l *LevelQueue) Run() error {
var i int
var datas = make([]*IndexerData, 0, l.batchNumber)
for {
- bs, err := l.queue.RPop()
- if err != nil {
- log.Error(4, "RPop: %v", err)
- time.Sleep(time.Millisecond * 100)
- continue
- }
-
i++
if len(datas) > l.batchNumber || (len(datas) > 0 && i > 3) {
l.indexer.Index(datas)
datas = make([]*IndexerData, 0, l.batchNumber)
i = 0
+ continue
+ }
+
+ bs, err := l.queue.RPop()
+ if err != nil {
+ if err != levelqueue.ErrNotFound {
+ log.Error(4, "RPop: %v", err)
+ }
+ time.Sleep(time.Millisecond * 100)
+ continue
}
if len(bs) <= 0 {
@@ -69,7 +72,7 @@ func (l *LevelQueue) Run() error {
continue
}
- log.Trace("LedisLocalQueue: task found: %#v", data)
+ log.Trace("LevelQueue: task found: %#v", data)
if data.IsDelete {
if data.ID > 0 {