aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEthan Koenig <ethantkoenig@gmail.com>2017-09-24 17:08:48 -0700
committerLauris BH <lauris@nix.lv>2017-09-25 03:08:48 +0300
commitfa28de820e3ee70a2a0b6aa9c9cfa358675b3874 (patch)
tree3f67133fbbd184f58669881a931d87b31c153b0c
parent0b0d85c90d609b84b26dc4f6ae31c4652f9243bb (diff)
downloadgitea-fa28de820e3ee70a2a0b6aa9c9cfa358675b3874.tar.gz
gitea-fa28de820e3ee70a2a0b6aa9c9cfa358675b3874.zip
Make indexer code more reusable (#2590)
-rw-r--r--models/issue_indexer.go37
-rw-r--r--modules/indexer/indexer.go49
-rw-r--r--modules/indexer/issue.go32
3 files changed, 83 insertions, 35 deletions
diff --git a/models/issue_indexer.go b/models/issue_indexer.go
index 1e14268a0e..b58c9dc2d1 100644
--- a/models/issue_indexer.go
+++ b/models/issue_indexer.go
@@ -25,6 +25,7 @@ func InitIssueIndexer() {
// populateIssueIndexer populate the issue indexer with issue data
func populateIssueIndexer() error {
+ batch := indexer.IssueIndexerBatch()
for page := 1; ; page++ {
repos, _, err := Repositories(&SearchRepoOptions{
Page: page,
@@ -34,7 +35,7 @@ func populateIssueIndexer() error {
return fmt.Errorf("Repositories: %v", err)
}
if len(repos) == 0 {
- return nil
+ return batch.Flush()
}
for _, repo := range repos {
issues, err := Issues(&IssuesOptions{
@@ -42,29 +43,37 @@ func populateIssueIndexer() error {
IsClosed: util.OptionalBoolNone,
IsPull: util.OptionalBoolNone,
})
- updates := make([]indexer.IssueIndexerUpdate, len(issues))
- for i, issue := range issues {
- updates[i] = issue.update()
+ if err != nil {
+ return err
}
- if err = indexer.BatchUpdateIssues(updates...); err != nil {
- return fmt.Errorf("BatchUpdate: %v", err)
+ for _, issue := range issues {
+ if err := batch.Add(issue.update()); err != nil {
+ return err
+ }
}
}
}
}
func processIssueIndexerUpdateQueue() {
+ batch := indexer.IssueIndexerBatch()
for {
+ var issueID int64
select {
- case issueID := <-issueIndexerUpdateQueue:
- issue, err := GetIssueByID(issueID)
- if err != nil {
- log.Error(4, "issuesIndexer.Index: %v", err)
- continue
- }
- if err = indexer.UpdateIssue(issue.update()); err != nil {
- log.Error(4, "issuesIndexer.Index: %v", err)
+ case issueID = <-issueIndexerUpdateQueue:
+ default:
+ // flush whatever updates we currently have, since we
+ // might have to wait a while
+ if err := batch.Flush(); err != nil {
+ log.Error(4, "IssueIndexer: %v", err)
}
+ issueID = <-issueIndexerUpdateQueue
+ }
+ issue, err := GetIssueByID(issueID)
+ if err != nil {
+ log.Error(4, "GetIssueByID: %v", err)
+ } else if err = batch.Add(issue.update()); err != nil {
+ log.Error(4, "IssueIndexer: %v", err)
}
}
}
diff --git a/modules/indexer/indexer.go b/modules/indexer/indexer.go
index 5ee813412d..d5bdd51f9c 100644
--- a/modules/indexer/indexer.go
+++ b/modules/indexer/indexer.go
@@ -9,6 +9,8 @@ import (
"strconv"
"github.com/blevesearch/bleve"
+ "github.com/blevesearch/bleve/analysis/token/unicodenorm"
+ "github.com/blevesearch/bleve/mapping"
"github.com/blevesearch/bleve/search/query"
)
@@ -41,3 +43,50 @@ func newMatchPhraseQuery(matchPhrase, field, analyzer string) *query.MatchPhrase
q.Analyzer = analyzer
return q
}
+
+const unicodeNormalizeName = "unicodeNormalize"
+
+func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error {
+ return m.AddCustomTokenFilter(unicodeNormalizeName, map[string]interface{}{
+ "type": unicodenorm.Name,
+ "form": unicodenorm.NFC,
+ })
+}
+
+// Update represents an update to an indexer
+type Update interface {
+ addToBatch(batch *bleve.Batch) error
+}
+
+const maxBatchSize = 16
+
+// Batch batch of indexer updates that automatically flushes once it
+// reaches a certain size
+type Batch struct {
+ batch *bleve.Batch
+ index bleve.Index
+}
+
+// Add add update to batch, possibly flushing
+func (batch *Batch) Add(update Update) error {
+ if err := update.addToBatch(batch.batch); err != nil {
+ return err
+ }
+ return batch.flushIfFull()
+}
+
+func (batch *Batch) flushIfFull() error {
+ if batch.batch.Size() >= maxBatchSize {
+ return batch.Flush()
+ }
+ return nil
+}
+
+// Flush manually flush the batch, regardless of its size
+func (batch *Batch) Flush() error {
+ if err := batch.index.Batch(batch.batch); err != nil {
+ return err
+ }
+ batch.batch.Reset()
+ return nil
+}
diff --git a/modules/indexer/issue.go b/modules/indexer/issue.go
index 050a623ce2..62a18e2b3b 100644
--- a/modules/indexer/issue.go
+++ b/modules/indexer/issue.go
@@ -13,7 +13,6 @@ import (
"github.com/blevesearch/bleve"
"github.com/blevesearch/bleve/analysis/analyzer/custom"
"github.com/blevesearch/bleve/analysis/token/lowercase"
- "github.com/blevesearch/bleve/analysis/token/unicodenorm"
"github.com/blevesearch/bleve/analysis/tokenizer/unicode"
"github.com/blevesearch/bleve/index/upsidedown"
)
@@ -35,6 +34,10 @@ type IssueIndexerUpdate struct {
Data *IssueIndexerData
}
+func (update IssueIndexerUpdate) addToBatch(batch *bleve.Batch) error {
+ return batch.Index(indexerID(update.IssueID), update.Data)
+}
+
const issueIndexerAnalyzer = "issueIndexer"
// InitIssueIndexer initialize issue indexer
@@ -74,17 +77,13 @@ func createIssueIndexer() error {
docMapping.AddFieldMappingsAt("Content", textFieldMapping)
docMapping.AddFieldMappingsAt("Comments", textFieldMapping)
- const unicodeNormNFC = "unicodeNormNFC"
- if err := mapping.AddCustomTokenFilter(unicodeNormNFC, map[string]interface{}{
- "type": unicodenorm.Name,
- "form": unicodenorm.NFC,
- }); err != nil {
+ if err := addUnicodeNormalizeTokenFilter(mapping); err != nil {
return err
} else if err = mapping.AddCustomAnalyzer(issueIndexerAnalyzer, map[string]interface{}{
"type": custom.Name,
"char_filters": []string{},
"tokenizer": unicode.Name,
- "token_filters": []string{unicodeNormNFC, lowercase.Name},
+ "token_filters": []string{unicodeNormalizeName, lowercase.Name},
}); err != nil {
return err
}
@@ -97,21 +96,12 @@ func createIssueIndexer() error {
return err
}
-// UpdateIssue update the issue indexer
-func UpdateIssue(update IssueIndexerUpdate) error {
- return issueIndexer.Index(indexerID(update.IssueID), update.Data)
-}
-
-// BatchUpdateIssues perform a batch update of the issue indexer
-func BatchUpdateIssues(updates ...IssueIndexerUpdate) error {
- batch := issueIndexer.NewBatch()
- for _, update := range updates {
- err := batch.Index(indexerID(update.IssueID), update.Data)
- if err != nil {
- return err
- }
+// IssueIndexerBatch batch to add updates to
+func IssueIndexerBatch() *Batch {
+ return &Batch{
+ batch: issueIndexer.NewBatch(),
+ index: issueIndexer,
}
- return issueIndexer.Batch(batch)
}
// SearchIssuesByKeyword searches for issues by given conditions.