summaryrefslogtreecommitdiffstats
path: root/modules/indexer/issues
diff options
context:
space:
mode:
authorLunny Xiao <xiaolunwen@gmail.com>2019-02-19 22:39:39 +0800
committertechknowlogick <matti@mdranta.net>2019-02-19 09:39:39 -0500
commit830ae614560b0c504c00d693b63d9889bac1a2d8 (patch)
tree5fd933f8124f4dd30d0215def2a7bcc0181573be /modules/indexer/issues
parent094263db4d9f1b53c4b4c021005eec07baddd253 (diff)
downloadgitea-830ae614560b0c504c00d693b63d9889bac1a2d8.tar.gz
gitea-830ae614560b0c504c00d693b63d9889bac1a2d8.zip
Refactor issue indexer (#5363)
Diffstat (limited to 'modules/indexer/issues')
-rw-r--r--modules/indexer/issues/bleve.go250
-rw-r--r--modules/indexer/issues/bleve_test.go88
-rw-r--r--modules/indexer/issues/indexer.go36
-rw-r--r--modules/indexer/issues/queue.go11
-rw-r--r--modules/indexer/issues/queue_channel.go56
-rw-r--r--modules/indexer/issues/queue_disk.go104
6 files changed, 545 insertions, 0 deletions
diff --git a/modules/indexer/issues/bleve.go b/modules/indexer/issues/bleve.go
new file mode 100644
index 0000000000..36279198b8
--- /dev/null
+++ b/modules/indexer/issues/bleve.go
@@ -0,0 +1,250 @@
+// Copyright 2018 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package issues
+
+import (
+ "fmt"
+ "os"
+ "strconv"
+
+ "github.com/blevesearch/bleve"
+ "github.com/blevesearch/bleve/analysis/analyzer/custom"
+ "github.com/blevesearch/bleve/analysis/token/lowercase"
+ "github.com/blevesearch/bleve/analysis/token/unicodenorm"
+ "github.com/blevesearch/bleve/analysis/tokenizer/unicode"
+ "github.com/blevesearch/bleve/index/upsidedown"
+ "github.com/blevesearch/bleve/mapping"
+ "github.com/blevesearch/bleve/search/query"
+ "github.com/ethantkoenig/rupture"
+)
+
+const (
+ issueIndexerAnalyzer = "issueIndexer"
+ issueIndexerDocType = "issueIndexerDocType"
+ issueIndexerLatestVersion = 1
+)
+
+// indexerID a bleve-compatible unique identifier for an integer id
+func indexerID(id int64) string {
+ return strconv.FormatInt(id, 36)
+}
+
+// idOfIndexerID the integer id associated with an indexer id
+func idOfIndexerID(indexerID string) (int64, error) {
+ id, err := strconv.ParseInt(indexerID, 36, 64)
+ if err != nil {
+ return 0, fmt.Errorf("Unexpected indexer ID %s: %v", indexerID, err)
+ }
+ return id, nil
+}
+
+// numericEqualityQuery a numeric equality query for the given value and field
+func numericEqualityQuery(value int64, field string) *query.NumericRangeQuery {
+ f := float64(value)
+ tru := true
+ q := bleve.NewNumericRangeInclusiveQuery(&f, &f, &tru, &tru)
+ q.SetField(field)
+ return q
+}
+
+func newMatchPhraseQuery(matchPhrase, field, analyzer string) *query.MatchPhraseQuery {
+ q := bleve.NewMatchPhraseQuery(matchPhrase)
+ q.FieldVal = field
+ q.Analyzer = analyzer
+ return q
+}
+
+const unicodeNormalizeName = "unicodeNormalize"
+
+func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error {
+ return m.AddCustomTokenFilter(unicodeNormalizeName, map[string]interface{}{
+ "type": unicodenorm.Name,
+ "form": unicodenorm.NFC,
+ })
+}
+
+const maxBatchSize = 16
+
+// openIndexer open the index at the specified path, checking for metadata
+// updates and bleve version updates. If index needs to be created (or
+// re-created), returns (nil, nil)
+func openIndexer(path string, latestVersion int) (bleve.Index, error) {
+ _, err := os.Stat(path)
+ if err != nil && os.IsNotExist(err) {
+ return nil, nil
+ } else if err != nil {
+ return nil, err
+ }
+
+ metadata, err := rupture.ReadIndexMetadata(path)
+ if err != nil {
+ return nil, err
+ }
+ if metadata.Version < latestVersion {
+ // the indexer is using a previous version, so we should delete it and
+ // re-populate
+ return nil, os.RemoveAll(path)
+ }
+
+ index, err := bleve.Open(path)
+ if err != nil && err == upsidedown.IncompatibleVersion {
+ // the indexer was built with a previous version of bleve, so we should
+ // delete it and re-populate
+ return nil, os.RemoveAll(path)
+ } else if err != nil {
+ return nil, err
+ }
+
+ return index, nil
+}
+
+// BleveIndexerData an update to the issue indexer
+type BleveIndexerData IndexerData
+
+// Type returns the document type, for bleve's mapping.Classifier interface.
+func (i *BleveIndexerData) Type() string {
+ return issueIndexerDocType
+}
+
+// createIssueIndexer create an issue indexer if one does not already exist
+func createIssueIndexer(path string, latestVersion int) (bleve.Index, error) {
+ mapping := bleve.NewIndexMapping()
+ docMapping := bleve.NewDocumentMapping()
+
+ numericFieldMapping := bleve.NewNumericFieldMapping()
+ numericFieldMapping.IncludeInAll = false
+ docMapping.AddFieldMappingsAt("RepoID", numericFieldMapping)
+
+ textFieldMapping := bleve.NewTextFieldMapping()
+ textFieldMapping.Store = false
+ textFieldMapping.IncludeInAll = false
+ docMapping.AddFieldMappingsAt("Title", textFieldMapping)
+ docMapping.AddFieldMappingsAt("Content", textFieldMapping)
+ docMapping.AddFieldMappingsAt("Comments", textFieldMapping)
+
+ if err := addUnicodeNormalizeTokenFilter(mapping); err != nil {
+ return nil, err
+ } else if err = mapping.AddCustomAnalyzer(issueIndexerAnalyzer, map[string]interface{}{
+ "type": custom.Name,
+ "char_filters": []string{},
+ "tokenizer": unicode.Name,
+ "token_filters": []string{unicodeNormalizeName, lowercase.Name},
+ }); err != nil {
+ return nil, err
+ }
+
+ mapping.DefaultAnalyzer = issueIndexerAnalyzer
+ mapping.AddDocumentMapping(issueIndexerDocType, docMapping)
+ mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping())
+
+ index, err := bleve.New(path, mapping)
+ if err != nil {
+ return nil, err
+ }
+
+ if err = rupture.WriteIndexMetadata(path, &rupture.IndexMetadata{
+ Version: latestVersion,
+ }); err != nil {
+ return nil, err
+ }
+ return index, nil
+}
+
+var (
+ _ Indexer = &BleveIndexer{}
+)
+
+// BleveIndexer implements Indexer interface
+type BleveIndexer struct {
+ indexDir string
+ indexer bleve.Index
+}
+
+// NewBleveIndexer creates a new bleve local indexer
+func NewBleveIndexer(indexDir string) *BleveIndexer {
+ return &BleveIndexer{
+ indexDir: indexDir,
+ }
+}
+
+// Init will initial the indexer
+func (b *BleveIndexer) Init() (bool, error) {
+ var err error
+ b.indexer, err = openIndexer(b.indexDir, issueIndexerLatestVersion)
+ if err != nil {
+ return false, err
+ }
+ if b.indexer != nil {
+ return true, nil
+ }
+
+ b.indexer, err = createIssueIndexer(b.indexDir, issueIndexerLatestVersion)
+ return false, err
+}
+
+// Index will save the index data
+func (b *BleveIndexer) Index(issues []*IndexerData) error {
+ batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize)
+ for _, issue := range issues {
+ if err := batch.Index(indexerID(issue.ID), struct {
+ RepoID int64
+ Title string
+ Content string
+ Comments []string
+ }{
+ RepoID: issue.RepoID,
+ Title: issue.Title,
+ Content: issue.Content,
+ Comments: issue.Comments,
+ }); err != nil {
+ return err
+ }
+ }
+ return batch.Flush()
+}
+
+// Delete deletes indexes by ids
+func (b *BleveIndexer) Delete(ids ...int64) error {
+ batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize)
+ for _, id := range ids {
+ if err := batch.Delete(indexerID(id)); err != nil {
+ return err
+ }
+ }
+ return batch.Flush()
+}
+
+// Search searches for issues by given conditions.
+// Returns the matching issue IDs
+func (b *BleveIndexer) Search(keyword string, repoID int64, limit, start int) (*SearchResult, error) {
+ indexerQuery := bleve.NewConjunctionQuery(
+ numericEqualityQuery(repoID, "RepoID"),
+ bleve.NewDisjunctionQuery(
+ newMatchPhraseQuery(keyword, "Title", issueIndexerAnalyzer),
+ newMatchPhraseQuery(keyword, "Content", issueIndexerAnalyzer),
+ newMatchPhraseQuery(keyword, "Comments", issueIndexerAnalyzer),
+ ))
+ search := bleve.NewSearchRequestOptions(indexerQuery, limit, start, false)
+
+ result, err := b.indexer.Search(search)
+ if err != nil {
+ return nil, err
+ }
+
+ var ret = SearchResult{
+ Hits: make([]Match, 0, len(result.Hits)),
+ }
+ for _, hit := range result.Hits {
+ id, err := idOfIndexerID(hit.ID)
+ if err != nil {
+ return nil, err
+ }
+ ret.Hits = append(ret.Hits, Match{
+ ID: id,
+ RepoID: repoID,
+ })
+ }
+ return &ret, nil
+}
diff --git a/modules/indexer/issues/bleve_test.go b/modules/indexer/issues/bleve_test.go
new file mode 100644
index 0000000000..720266e3b5
--- /dev/null
+++ b/modules/indexer/issues/bleve_test.go
@@ -0,0 +1,88 @@
+// Copyright 2018 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package issues
+
+import (
+ "os"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestIndexAndSearch(t *testing.T) {
+ dir := "./bleve.index"
+ indexer := NewBleveIndexer(dir)
+ defer os.RemoveAll(dir)
+
+ _, err := indexer.Init()
+ assert.NoError(t, err)
+
+ err = indexer.Index([]*IndexerData{
+ {
+ ID: 1,
+ RepoID: 2,
+ Title: "Issue search should support Chinese",
+ Content: "As title",
+ Comments: []string{
+ "test1",
+ "test2",
+ },
+ },
+ {
+ ID: 2,
+ RepoID: 2,
+ Title: "CJK support could be optional",
+ Content: "Chinese Korean and Japanese should be supported but I would like it's not enabled by default",
+ Comments: []string{
+ "LGTM",
+ "Good idea",
+ },
+ },
+ })
+ assert.NoError(t, err)
+
+ var (
+ keywords = []struct {
+ Keyword string
+ IDs []int64
+ }{
+ {
+ Keyword: "search",
+ IDs: []int64{1},
+ },
+ {
+ Keyword: "test1",
+ IDs: []int64{1},
+ },
+ {
+ Keyword: "test2",
+ IDs: []int64{1},
+ },
+ {
+ Keyword: "support",
+ IDs: []int64{1, 2},
+ },
+ {
+ Keyword: "chinese",
+ IDs: []int64{1, 2},
+ },
+ {
+ Keyword: "help",
+ IDs: []int64{},
+ },
+ }
+ )
+
+ for _, kw := range keywords {
+ res, err := indexer.Search(kw.Keyword, 2, 10, 0)
+ assert.NoError(t, err)
+
+ var ids = make([]int64, 0, len(res.Hits))
+ for _, hit := range res.Hits {
+ ids = append(ids, hit.ID)
+ }
+ assert.EqualValues(t, kw.IDs, ids)
+ }
+}
diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go
new file mode 100644
index 0000000000..c31006d0dd
--- /dev/null
+++ b/modules/indexer/issues/indexer.go
@@ -0,0 +1,36 @@
+// Copyright 2018 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package issues
+
+// IndexerData data stored in the issue indexer
+type IndexerData struct {
+ ID int64
+ RepoID int64
+ Title string
+ Content string
+ Comments []string
+ IsDelete bool
+ IDs []int64
+}
+
+// Match represents on search result
+type Match struct {
+ ID int64 `json:"id"`
+ RepoID int64 `json:"repo_id"`
+ Score float64 `json:"score"`
+}
+
+// SearchResult represents search results
+type SearchResult struct {
+ Hits []Match
+}
+
+// Indexer defines an inteface to indexer issues contents
+type Indexer interface {
+ Init() (bool, error)
+ Index(issue []*IndexerData) error
+ Delete(ids ...int64) error
+ Search(kw string, repoID int64, limit, start int) (*SearchResult, error)
+}
diff --git a/modules/indexer/issues/queue.go b/modules/indexer/issues/queue.go
new file mode 100644
index 0000000000..6f4ee4c13a
--- /dev/null
+++ b/modules/indexer/issues/queue.go
@@ -0,0 +1,11 @@
+// Copyright 2018 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package issues
+
+// Queue defines an interface to save an issue indexer queue
+type Queue interface {
+ Run() error
+ Push(*IndexerData)
+}
diff --git a/modules/indexer/issues/queue_channel.go b/modules/indexer/issues/queue_channel.go
new file mode 100644
index 0000000000..99a90ad499
--- /dev/null
+++ b/modules/indexer/issues/queue_channel.go
@@ -0,0 +1,56 @@
+// Copyright 2018 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package issues
+
+import (
+ "time"
+
+ "code.gitea.io/gitea/modules/setting"
+)
+
+// ChannelQueue implements
+type ChannelQueue struct {
+ queue chan *IndexerData
+ indexer Indexer
+ batchNumber int
+}
+
+// NewChannelQueue create a memory channel queue
+func NewChannelQueue(indexer Indexer, batchNumber int) *ChannelQueue {
+ return &ChannelQueue{
+ queue: make(chan *IndexerData, setting.Indexer.UpdateQueueLength),
+ indexer: indexer,
+ batchNumber: batchNumber,
+ }
+}
+
+// Run starts to run the queue
+func (c *ChannelQueue) Run() error {
+ var i int
+ var datas = make([]*IndexerData, 0, c.batchNumber)
+ for {
+ select {
+ case data := <-c.queue:
+ datas = append(datas, data)
+ if len(datas) >= c.batchNumber {
+ c.indexer.Index(datas)
+ // TODO: save the point
+ datas = make([]*IndexerData, 0, c.batchNumber)
+ }
+ case <-time.After(time.Millisecond * 100):
+ i++
+ if i >= 3 && len(datas) > 0 {
+ c.indexer.Index(datas)
+ // TODO: save the point
+ datas = make([]*IndexerData, 0, c.batchNumber)
+ }
+ }
+ }
+}
+
+// Push will push the indexer data to queue
+func (c *ChannelQueue) Push(data *IndexerData) {
+ c.queue <- data
+}
diff --git a/modules/indexer/issues/queue_disk.go b/modules/indexer/issues/queue_disk.go
new file mode 100644
index 0000000000..97e9a3d965
--- /dev/null
+++ b/modules/indexer/issues/queue_disk.go
@@ -0,0 +1,104 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package issues
+
+import (
+ "encoding/json"
+ "time"
+
+ "code.gitea.io/gitea/modules/log"
+ "github.com/lunny/levelqueue"
+)
+
+var (
+ _ Queue = &LevelQueue{}
+)
+
+// LevelQueue implements a disk library queue
+type LevelQueue struct {
+ indexer Indexer
+ queue *levelqueue.Queue
+ batchNumber int
+}
+
+// NewLevelQueue creates a ledis local queue
+func NewLevelQueue(indexer Indexer, dataDir string, batchNumber int) (*LevelQueue, error) {
+ queue, err := levelqueue.Open(dataDir)
+ if err != nil {
+ return nil, err
+ }
+
+ return &LevelQueue{
+ indexer: indexer,
+ queue: queue,
+ batchNumber: batchNumber,
+ }, nil
+}
+
+// Run starts to run the queue
+func (l *LevelQueue) Run() error {
+ var i int
+ var datas = make([]*IndexerData, 0, l.batchNumber)
+ for {
+ bs, err := l.queue.RPop()
+ if err != nil {
+ log.Error(4, "RPop: %v", err)
+ time.Sleep(time.Millisecond * 100)
+ continue
+ }
+
+ i++
+ if len(datas) > l.batchNumber || (len(datas) > 0 && i > 3) {
+ l.indexer.Index(datas)
+ datas = make([]*IndexerData, 0, l.batchNumber)
+ i = 0
+ }
+
+ if len(bs) <= 0 {
+ time.Sleep(time.Millisecond * 100)
+ continue
+ }
+
+ var data IndexerData
+ err = json.Unmarshal(bs, &data)
+ if err != nil {
+ log.Error(4, "Unmarshal: %v", err)
+ time.Sleep(time.Millisecond * 100)
+ continue
+ }
+
+ log.Trace("LedisLocalQueue: task found: %#v", data)
+
+ if data.IsDelete {
+ if data.ID > 0 {
+ if err = l.indexer.Delete(data.ID); err != nil {
+ log.Error(4, "indexer.Delete: %v", err)
+ }
+ } else if len(data.IDs) > 0 {
+ if err = l.indexer.Delete(data.IDs...); err != nil {
+ log.Error(4, "indexer.Delete: %v", err)
+ }
+ }
+ time.Sleep(time.Millisecond * 10)
+ continue
+ }
+
+ datas = append(datas, &data)
+ time.Sleep(time.Millisecond * 10)
+ }
+}
+
+// Push will push the indexer data to queue
+func (l *LevelQueue) Push(data *IndexerData) {
+ bs, err := json.Marshal(data)
+ if err != nil {
+ log.Error(4, "Marshal: %v", err)
+ return
+ }
+ err = l.queue.LPush(bs)
+ if err != nil {
+ log.Error(4, "LPush: %v", err)
+ }
+}