diff options
author | Lunny Xiao <xiaolunwen@gmail.com> | 2019-02-19 22:39:39 +0800 |
---|---|---|
committer | techknowlogick <matti@mdranta.net> | 2019-02-19 09:39:39 -0500 |
commit | 830ae614560b0c504c00d693b63d9889bac1a2d8 (patch) | |
tree | 5fd933f8124f4dd30d0215def2a7bcc0181573be /modules | |
parent | 094263db4d9f1b53c4b4c021005eec07baddd253 (diff) | |
download | gitea-830ae614560b0c504c00d693b63d9889bac1a2d8.tar.gz gitea-830ae614560b0c504c00d693b63d9889bac1a2d8.zip |
Refactor issue indexer (#5363)
Diffstat (limited to 'modules')
-rw-r--r-- | modules/indexer/issues/bleve.go | 250 | ||||
-rw-r--r-- | modules/indexer/issues/bleve_test.go | 88 | ||||
-rw-r--r-- | modules/indexer/issues/indexer.go | 36 | ||||
-rw-r--r-- | modules/indexer/issues/queue.go | 11 | ||||
-rw-r--r-- | modules/indexer/issues/queue_channel.go | 56 | ||||
-rw-r--r-- | modules/indexer/issues/queue_disk.go | 104 | ||||
-rw-r--r-- | modules/notification/indexer/indexer.go | 62 | ||||
-rw-r--r-- | modules/setting/indexer.go | 55 | ||||
-rw-r--r-- | modules/setting/setting.go | 11 |
9 files changed, 656 insertions, 17 deletions
diff --git a/modules/indexer/issues/bleve.go b/modules/indexer/issues/bleve.go new file mode 100644 index 0000000000..36279198b8 --- /dev/null +++ b/modules/indexer/issues/bleve.go @@ -0,0 +1,250 @@ +// Copyright 2018 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package issues + +import ( + "fmt" + "os" + "strconv" + + "github.com/blevesearch/bleve" + "github.com/blevesearch/bleve/analysis/analyzer/custom" + "github.com/blevesearch/bleve/analysis/token/lowercase" + "github.com/blevesearch/bleve/analysis/token/unicodenorm" + "github.com/blevesearch/bleve/analysis/tokenizer/unicode" + "github.com/blevesearch/bleve/index/upsidedown" + "github.com/blevesearch/bleve/mapping" + "github.com/blevesearch/bleve/search/query" + "github.com/ethantkoenig/rupture" +) + +const ( + issueIndexerAnalyzer = "issueIndexer" + issueIndexerDocType = "issueIndexerDocType" + issueIndexerLatestVersion = 1 +) + +// indexerID a bleve-compatible unique identifier for an integer id +func indexerID(id int64) string { + return strconv.FormatInt(id, 36) +} + +// idOfIndexerID the integer id associated with an indexer id +func idOfIndexerID(indexerID string) (int64, error) { + id, err := strconv.ParseInt(indexerID, 36, 64) + if err != nil { + return 0, fmt.Errorf("Unexpected indexer ID %s: %v", indexerID, err) + } + return id, nil +} + +// numericEqualityQuery a numeric equality query for the given value and field +func numericEqualityQuery(value int64, field string) *query.NumericRangeQuery { + f := float64(value) + tru := true + q := bleve.NewNumericRangeInclusiveQuery(&f, &f, &tru, &tru) + q.SetField(field) + return q +} + +func newMatchPhraseQuery(matchPhrase, field, analyzer string) *query.MatchPhraseQuery { + q := bleve.NewMatchPhraseQuery(matchPhrase) + q.FieldVal = field + q.Analyzer = analyzer + return q +} + +const unicodeNormalizeName = "unicodeNormalize" + +func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error { + return m.AddCustomTokenFilter(unicodeNormalizeName, map[string]interface{}{ + "type": unicodenorm.Name, + "form": unicodenorm.NFC, + }) +} + +const maxBatchSize = 16 + +// openIndexer open the index at the specified path, checking for metadata +// updates and bleve version updates. If index needs to be created (or +// re-created), returns (nil, nil) +func openIndexer(path string, latestVersion int) (bleve.Index, error) { + _, err := os.Stat(path) + if err != nil && os.IsNotExist(err) { + return nil, nil + } else if err != nil { + return nil, err + } + + metadata, err := rupture.ReadIndexMetadata(path) + if err != nil { + return nil, err + } + if metadata.Version < latestVersion { + // the indexer is using a previous version, so we should delete it and + // re-populate + return nil, os.RemoveAll(path) + } + + index, err := bleve.Open(path) + if err != nil && err == upsidedown.IncompatibleVersion { + // the indexer was built with a previous version of bleve, so we should + // delete it and re-populate + return nil, os.RemoveAll(path) + } else if err != nil { + return nil, err + } + + return index, nil +} + +// BleveIndexerData an update to the issue indexer +type BleveIndexerData IndexerData + +// Type returns the document type, for bleve's mapping.Classifier interface. +func (i *BleveIndexerData) Type() string { + return issueIndexerDocType +} + +// createIssueIndexer create an issue indexer if one does not already exist +func createIssueIndexer(path string, latestVersion int) (bleve.Index, error) { + mapping := bleve.NewIndexMapping() + docMapping := bleve.NewDocumentMapping() + + numericFieldMapping := bleve.NewNumericFieldMapping() + numericFieldMapping.IncludeInAll = false + docMapping.AddFieldMappingsAt("RepoID", numericFieldMapping) + + textFieldMapping := bleve.NewTextFieldMapping() + textFieldMapping.Store = false + textFieldMapping.IncludeInAll = false + docMapping.AddFieldMappingsAt("Title", textFieldMapping) + docMapping.AddFieldMappingsAt("Content", textFieldMapping) + docMapping.AddFieldMappingsAt("Comments", textFieldMapping) + + if err := addUnicodeNormalizeTokenFilter(mapping); err != nil { + return nil, err + } else if err = mapping.AddCustomAnalyzer(issueIndexerAnalyzer, map[string]interface{}{ + "type": custom.Name, + "char_filters": []string{}, + "tokenizer": unicode.Name, + "token_filters": []string{unicodeNormalizeName, lowercase.Name}, + }); err != nil { + return nil, err + } + + mapping.DefaultAnalyzer = issueIndexerAnalyzer + mapping.AddDocumentMapping(issueIndexerDocType, docMapping) + mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping()) + + index, err := bleve.New(path, mapping) + if err != nil { + return nil, err + } + + if err = rupture.WriteIndexMetadata(path, &rupture.IndexMetadata{ + Version: latestVersion, + }); err != nil { + return nil, err + } + return index, nil +} + +var ( + _ Indexer = &BleveIndexer{} +) + +// BleveIndexer implements Indexer interface +type BleveIndexer struct { + indexDir string + indexer bleve.Index +} + +// NewBleveIndexer creates a new bleve local indexer +func NewBleveIndexer(indexDir string) *BleveIndexer { + return &BleveIndexer{ + indexDir: indexDir, + } +} + +// Init will initial the indexer +func (b *BleveIndexer) Init() (bool, error) { + var err error + b.indexer, err = openIndexer(b.indexDir, issueIndexerLatestVersion) + if err != nil { + return false, err + } + if b.indexer != nil { + return true, nil + } + + b.indexer, err = createIssueIndexer(b.indexDir, issueIndexerLatestVersion) + return false, err +} + +// Index will save the index data +func (b *BleveIndexer) Index(issues []*IndexerData) error { + batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize) + for _, issue := range issues { + if err := batch.Index(indexerID(issue.ID), struct { + RepoID int64 + Title string + Content string + Comments []string + }{ + RepoID: issue.RepoID, + Title: issue.Title, + Content: issue.Content, + Comments: issue.Comments, + }); err != nil { + return err + } + } + return batch.Flush() +} + +// Delete deletes indexes by ids +func (b *BleveIndexer) Delete(ids ...int64) error { + batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize) + for _, id := range ids { + if err := batch.Delete(indexerID(id)); err != nil { + return err + } + } + return batch.Flush() +} + +// Search searches for issues by given conditions. +// Returns the matching issue IDs +func (b *BleveIndexer) Search(keyword string, repoID int64, limit, start int) (*SearchResult, error) { + indexerQuery := bleve.NewConjunctionQuery( + numericEqualityQuery(repoID, "RepoID"), + bleve.NewDisjunctionQuery( + newMatchPhraseQuery(keyword, "Title", issueIndexerAnalyzer), + newMatchPhraseQuery(keyword, "Content", issueIndexerAnalyzer), + newMatchPhraseQuery(keyword, "Comments", issueIndexerAnalyzer), + )) + search := bleve.NewSearchRequestOptions(indexerQuery, limit, start, false) + + result, err := b.indexer.Search(search) + if err != nil { + return nil, err + } + + var ret = SearchResult{ + Hits: make([]Match, 0, len(result.Hits)), + } + for _, hit := range result.Hits { + id, err := idOfIndexerID(hit.ID) + if err != nil { + return nil, err + } + ret.Hits = append(ret.Hits, Match{ + ID: id, + RepoID: repoID, + }) + } + return &ret, nil +} diff --git a/modules/indexer/issues/bleve_test.go b/modules/indexer/issues/bleve_test.go new file mode 100644 index 0000000000..720266e3b5 --- /dev/null +++ b/modules/indexer/issues/bleve_test.go @@ -0,0 +1,88 @@ +// Copyright 2018 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package issues + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestIndexAndSearch(t *testing.T) { + dir := "./bleve.index" + indexer := NewBleveIndexer(dir) + defer os.RemoveAll(dir) + + _, err := indexer.Init() + assert.NoError(t, err) + + err = indexer.Index([]*IndexerData{ + { + ID: 1, + RepoID: 2, + Title: "Issue search should support Chinese", + Content: "As title", + Comments: []string{ + "test1", + "test2", + }, + }, + { + ID: 2, + RepoID: 2, + Title: "CJK support could be optional", + Content: "Chinese Korean and Japanese should be supported but I would like it's not enabled by default", + Comments: []string{ + "LGTM", + "Good idea", + }, + }, + }) + assert.NoError(t, err) + + var ( + keywords = []struct { + Keyword string + IDs []int64 + }{ + { + Keyword: "search", + IDs: []int64{1}, + }, + { + Keyword: "test1", + IDs: []int64{1}, + }, + { + Keyword: "test2", + IDs: []int64{1}, + }, + { + Keyword: "support", + IDs: []int64{1, 2}, + }, + { + Keyword: "chinese", + IDs: []int64{1, 2}, + }, + { + Keyword: "help", + IDs: []int64{}, + }, + } + ) + + for _, kw := range keywords { + res, err := indexer.Search(kw.Keyword, 2, 10, 0) + assert.NoError(t, err) + + var ids = make([]int64, 0, len(res.Hits)) + for _, hit := range res.Hits { + ids = append(ids, hit.ID) + } + assert.EqualValues(t, kw.IDs, ids) + } +} diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go new file mode 100644 index 0000000000..c31006d0dd --- /dev/null +++ b/modules/indexer/issues/indexer.go @@ -0,0 +1,36 @@ +// Copyright 2018 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package issues + +// IndexerData data stored in the issue indexer +type IndexerData struct { + ID int64 + RepoID int64 + Title string + Content string + Comments []string + IsDelete bool + IDs []int64 +} + +// Match represents on search result +type Match struct { + ID int64 `json:"id"` + RepoID int64 `json:"repo_id"` + Score float64 `json:"score"` +} + +// SearchResult represents search results +type SearchResult struct { + Hits []Match +} + +// Indexer defines an inteface to indexer issues contents +type Indexer interface { + Init() (bool, error) + Index(issue []*IndexerData) error + Delete(ids ...int64) error + Search(kw string, repoID int64, limit, start int) (*SearchResult, error) +} diff --git a/modules/indexer/issues/queue.go b/modules/indexer/issues/queue.go new file mode 100644 index 0000000000..6f4ee4c13a --- /dev/null +++ b/modules/indexer/issues/queue.go @@ -0,0 +1,11 @@ +// Copyright 2018 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package issues + +// Queue defines an interface to save an issue indexer queue +type Queue interface { + Run() error + Push(*IndexerData) +} diff --git a/modules/indexer/issues/queue_channel.go b/modules/indexer/issues/queue_channel.go new file mode 100644 index 0000000000..99a90ad499 --- /dev/null +++ b/modules/indexer/issues/queue_channel.go @@ -0,0 +1,56 @@ +// Copyright 2018 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package issues + +import ( + "time" + + "code.gitea.io/gitea/modules/setting" +) + +// ChannelQueue implements +type ChannelQueue struct { + queue chan *IndexerData + indexer Indexer + batchNumber int +} + +// NewChannelQueue create a memory channel queue +func NewChannelQueue(indexer Indexer, batchNumber int) *ChannelQueue { + return &ChannelQueue{ + queue: make(chan *IndexerData, setting.Indexer.UpdateQueueLength), + indexer: indexer, + batchNumber: batchNumber, + } +} + +// Run starts to run the queue +func (c *ChannelQueue) Run() error { + var i int + var datas = make([]*IndexerData, 0, c.batchNumber) + for { + select { + case data := <-c.queue: + datas = append(datas, data) + if len(datas) >= c.batchNumber { + c.indexer.Index(datas) + // TODO: save the point + datas = make([]*IndexerData, 0, c.batchNumber) + } + case <-time.After(time.Millisecond * 100): + i++ + if i >= 3 && len(datas) > 0 { + c.indexer.Index(datas) + // TODO: save the point + datas = make([]*IndexerData, 0, c.batchNumber) + } + } + } +} + +// Push will push the indexer data to queue +func (c *ChannelQueue) Push(data *IndexerData) { + c.queue <- data +} diff --git a/modules/indexer/issues/queue_disk.go b/modules/indexer/issues/queue_disk.go new file mode 100644 index 0000000000..97e9a3d965 --- /dev/null +++ b/modules/indexer/issues/queue_disk.go @@ -0,0 +1,104 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package issues + +import ( + "encoding/json" + "time" + + "code.gitea.io/gitea/modules/log" + "github.com/lunny/levelqueue" +) + +var ( + _ Queue = &LevelQueue{} +) + +// LevelQueue implements a disk library queue +type LevelQueue struct { + indexer Indexer + queue *levelqueue.Queue + batchNumber int +} + +// NewLevelQueue creates a ledis local queue +func NewLevelQueue(indexer Indexer, dataDir string, batchNumber int) (*LevelQueue, error) { + queue, err := levelqueue.Open(dataDir) + if err != nil { + return nil, err + } + + return &LevelQueue{ + indexer: indexer, + queue: queue, + batchNumber: batchNumber, + }, nil +} + +// Run starts to run the queue +func (l *LevelQueue) Run() error { + var i int + var datas = make([]*IndexerData, 0, l.batchNumber) + for { + bs, err := l.queue.RPop() + if err != nil { + log.Error(4, "RPop: %v", err) + time.Sleep(time.Millisecond * 100) + continue + } + + i++ + if len(datas) > l.batchNumber || (len(datas) > 0 && i > 3) { + l.indexer.Index(datas) + datas = make([]*IndexerData, 0, l.batchNumber) + i = 0 + } + + if len(bs) <= 0 { + time.Sleep(time.Millisecond * 100) + continue + } + + var data IndexerData + err = json.Unmarshal(bs, &data) + if err != nil { + log.Error(4, "Unmarshal: %v", err) + time.Sleep(time.Millisecond * 100) + continue + } + + log.Trace("LedisLocalQueue: task found: %#v", data) + + if data.IsDelete { + if data.ID > 0 { + if err = l.indexer.Delete(data.ID); err != nil { + log.Error(4, "indexer.Delete: %v", err) + } + } else if len(data.IDs) > 0 { + if err = l.indexer.Delete(data.IDs...); err != nil { + log.Error(4, "indexer.Delete: %v", err) + } + } + time.Sleep(time.Millisecond * 10) + continue + } + + datas = append(datas, &data) + time.Sleep(time.Millisecond * 10) + } +} + +// Push will push the indexer data to queue +func (l *LevelQueue) Push(data *IndexerData) { + bs, err := json.Marshal(data) + if err != nil { + log.Error(4, "Marshal: %v", err) + return + } + err = l.queue.LPush(bs) + if err != nil { + log.Error(4, "LPush: %v", err) + } +} diff --git a/modules/notification/indexer/indexer.go b/modules/notification/indexer/indexer.go index 3fd3352188..66d483c017 100644 --- a/modules/notification/indexer/indexer.go +++ b/modules/notification/indexer/indexer.go @@ -6,6 +6,7 @@ package indexer import ( "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/notification/base" ) @@ -25,38 +26,83 @@ func NewNotifier() base.Notifier { func (r *indexerNotifier) NotifyCreateIssueComment(doer *models.User, repo *models.Repository, issue *models.Issue, comment *models.Comment) { if comment.Type == models.CommentTypeComment { - models.UpdateIssueIndexer(issue.ID) + if issue.Comments == nil { + if err := issue.LoadDiscussComments(); err != nil { + log.Error(4, "LoadComments failed: %v", err) + return + } + } else { + issue.Comments = append(issue.Comments, comment) + } + + models.UpdateIssueIndexer(issue) } } func (r *indexerNotifier) NotifyNewIssue(issue *models.Issue) { - models.UpdateIssueIndexer(issue.ID) + models.UpdateIssueIndexer(issue) } func (r *indexerNotifier) NotifyNewPullRequest(pr *models.PullRequest) { - models.UpdateIssueIndexer(pr.Issue.ID) + models.UpdateIssueIndexer(pr.Issue) } func (r *indexerNotifier) NotifyUpdateComment(doer *models.User, c *models.Comment, oldContent string) { if c.Type == models.CommentTypeComment { - models.UpdateIssueIndexer(c.IssueID) + var found bool + if c.Issue.Comments != nil { + for i := 0; i < len(c.Issue.Comments); i++ { + if c.Issue.Comments[i].ID == c.ID { + c.Issue.Comments[i] = c + found = true + break + } + } + } + + if !found { + if err := c.Issue.LoadDiscussComments(); err != nil { + log.Error(4, "LoadComments failed: %v", err) + return + } + } + + models.UpdateIssueIndexer(c.Issue) } } func (r *indexerNotifier) NotifyDeleteComment(doer *models.User, comment *models.Comment) { if comment.Type == models.CommentTypeComment { - models.UpdateIssueIndexer(comment.IssueID) + var found bool + if comment.Issue.Comments != nil { + for i := 0; i < len(comment.Issue.Comments); i++ { + if comment.Issue.Comments[i].ID == comment.ID { + comment.Issue.Comments = append(comment.Issue.Comments[:i], comment.Issue.Comments[i+1:]...) + found = true + break + } + } + } + + if !found { + if err := comment.Issue.LoadDiscussComments(); err != nil { + log.Error(4, "LoadComments failed: %v", err) + return + } + } + // reload comments to delete the old comment + models.UpdateIssueIndexer(comment.Issue) } } func (r *indexerNotifier) NotifyDeleteRepository(doer *models.User, repo *models.Repository) { - models.DeleteRepoFromIndexer(repo) + models.DeleteRepoIssueIndexer(repo) } func (r *indexerNotifier) NotifyIssueChangeContent(doer *models.User, issue *models.Issue, oldContent string) { - models.UpdateIssueIndexer(issue.ID) + models.UpdateIssueIndexer(issue) } func (r *indexerNotifier) NotifyIssueChangeTitle(doer *models.User, issue *models.Issue, oldTitle string) { - models.UpdateIssueIndexer(issue.ID) + models.UpdateIssueIndexer(issue) } diff --git a/modules/setting/indexer.go b/modules/setting/indexer.go new file mode 100644 index 0000000000..245ebb0496 --- /dev/null +++ b/modules/setting/indexer.go @@ -0,0 +1,55 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package setting + +import ( + "path" + "path/filepath" +) + +// enumerates all the indexer queue types +const ( + LevelQueueType = "levelqueue" + ChannelQueueType = "channel" +) + +var ( + // Indexer settings + Indexer = struct { + IssueType string + IssuePath string + RepoIndexerEnabled bool + RepoPath string + UpdateQueueLength int + MaxIndexerFileSize int64 + IssueIndexerQueueType string + IssueIndexerQueueDir string + IssueIndexerQueueBatchNumber int + }{ + IssueType: "bleve", + IssuePath: "indexers/issues.bleve", + IssueIndexerQueueType: LevelQueueType, + IssueIndexerQueueDir: "indexers/issues.queue", + IssueIndexerQueueBatchNumber: 20, + } +) + +func newIndexerService() { + sec := Cfg.Section("indexer") + Indexer.IssuePath = sec.Key("ISSUE_INDEXER_PATH").MustString(path.Join(AppDataPath, "indexers/issues.bleve")) + if !filepath.IsAbs(Indexer.IssuePath) { + Indexer.IssuePath = path.Join(AppWorkPath, Indexer.IssuePath) + } + Indexer.RepoIndexerEnabled = sec.Key("REPO_INDEXER_ENABLED").MustBool(false) + Indexer.RepoPath = sec.Key("REPO_INDEXER_PATH").MustString(path.Join(AppDataPath, "indexers/repos.bleve")) + if !filepath.IsAbs(Indexer.RepoPath) { + Indexer.RepoPath = path.Join(AppWorkPath, Indexer.RepoPath) + } + Indexer.UpdateQueueLength = sec.Key("UPDATE_BUFFER_LEN").MustInt(20) + Indexer.MaxIndexerFileSize = sec.Key("MAX_FILE_SIZE").MustInt64(1024 * 1024) + Indexer.IssueIndexerQueueType = sec.Key("ISSUE_INDEXER_QUEUE_TYPE").MustString(LevelQueueType) + Indexer.IssueIndexerQueueDir = sec.Key("ISSUE_INDEXER_QUEUE_DIR").MustString(path.Join(AppDataPath, "indexers/issues.queue")) + Indexer.IssueIndexerQueueBatchNumber = sec.Key("ISSUE_INDEXER_QUEUE_BATCH_NUMBER").MustInt(20) +} diff --git a/modules/setting/setting.go b/modules/setting/setting.go index 5f65570540..4c016f3489 100644 --- a/modules/setting/setting.go +++ b/modules/setting/setting.go @@ -179,15 +179,6 @@ var ( DBConnectRetries int DBConnectBackoff time.Duration - // Indexer settings - Indexer struct { - IssuePath string - RepoIndexerEnabled bool - RepoPath string - UpdateQueueLength int - MaxIndexerFileSize int64 - } - // Repository settings Repository = struct { AnsiCharset string @@ -1214,6 +1205,7 @@ func NewContext() { IsInputFile: sec.Key("IS_INPUT_FILE").MustBool(false), }) } + sec = Cfg.Section("U2F") U2F.TrustedFacets, _ = shellquote.Split(sec.Key("TRUSTED_FACETS").MustString(strings.TrimRight(AppURL, "/"))) U2F.AppID = sec.Key("APP_ID").MustString(strings.TrimRight(AppURL, "/")) @@ -1240,4 +1232,5 @@ func NewServices() { newRegisterMailService() newNotifyMailService() newWebhookService() + newIndexerService() } |