]> source.dussan.org Git - gitea.git/commitdiff
Fix data race in bleve indexer (#16474) (#16509)
author6543 <6543@obermui.de>
Thu, 22 Jul 2021 03:42:32 +0000 (05:42 +0200)
committerGitHub <noreply@github.com>
Thu, 22 Jul 2021 03:42:32 +0000 (11:42 +0800)
* Fix data race in bleve indexer

Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
modules/indexer/bleve/batch.go [new file with mode: 0644]
modules/indexer/code/bleve.go
modules/indexer/issues/bleve.go

diff --git a/modules/indexer/bleve/batch.go b/modules/indexer/bleve/batch.go
new file mode 100644 (file)
index 0000000..79994e6
--- /dev/null
@@ -0,0 +1,59 @@
+// Copyright 2021 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package bleve
+
+import (
+       "github.com/blevesearch/bleve/v2"
+)
+
+// FlushingBatch is a batch of operations that automatically flushes to the
+// underlying index once it reaches a certain size.
+type FlushingBatch struct {
+       maxBatchSize int
+       batch        *bleve.Batch
+       index        bleve.Index
+}
+
+// NewFlushingBatch creates a new flushing batch for the specified index. Once
+// the number of operations in the batch reaches the specified limit, the batch
+// automatically flushes its operations to the index.
+func NewFlushingBatch(index bleve.Index, maxBatchSize int) *FlushingBatch {
+       return &FlushingBatch{
+               maxBatchSize: maxBatchSize,
+               batch:        index.NewBatch(),
+               index:        index,
+       }
+}
+
+// Index add a new index to batch
+func (b *FlushingBatch) Index(id string, data interface{}) error {
+       if err := b.batch.Index(id, data); err != nil {
+               return err
+       }
+       return b.flushIfFull()
+}
+
+// Delete add a delete index to batch
+func (b *FlushingBatch) Delete(id string) error {
+       b.batch.Delete(id)
+       return b.flushIfFull()
+}
+
+func (b *FlushingBatch) flushIfFull() error {
+       if b.batch.Size() < b.maxBatchSize {
+               return nil
+       }
+       return b.Flush()
+}
+
+// Flush submit the batch and create a new one
+func (b *FlushingBatch) Flush() error {
+       err := b.index.Batch(b.batch)
+       if err != nil {
+               return err
+       }
+       b.batch = b.index.NewBatch()
+       return nil
+}
index 600789a2840967fc83c1c89fc138813ff19b1680..fc5c602dbef082b62d2544b08cf4e753ed96515f 100644 (file)
@@ -18,6 +18,7 @@ import (
        "code.gitea.io/gitea/modules/analyze"
        "code.gitea.io/gitea/modules/charset"
        "code.gitea.io/gitea/modules/git"
+       gitea_bleve "code.gitea.io/gitea/modules/indexer/bleve"
        "code.gitea.io/gitea/modules/log"
        "code.gitea.io/gitea/modules/setting"
        "code.gitea.io/gitea/modules/timeutil"
@@ -176,7 +177,8 @@ func NewBleveIndexer(indexDir string) (*BleveIndexer, bool, error) {
        return indexer, created, err
 }
 
-func (b *BleveIndexer) addUpdate(batchWriter git.WriteCloserError, batchReader *bufio.Reader, commitSha string, update fileUpdate, repo *models.Repository, batch rupture.FlushingBatch) error {
+func (b *BleveIndexer) addUpdate(batchWriter git.WriteCloserError, batchReader *bufio.Reader, commitSha string,
+       update fileUpdate, repo *models.Repository, batch *gitea_bleve.FlushingBatch) error {
        // Ignore vendored files in code search
        if setting.Indexer.ExcludeVendored && analyze.IsVendor(update.Filename) {
                return nil
@@ -229,7 +231,7 @@ func (b *BleveIndexer) addUpdate(batchWriter git.WriteCloserError, batchReader *
        })
 }
 
-func (b *BleveIndexer) addDelete(filename string, repo *models.Repository, batch rupture.FlushingBatch) error {
+func (b *BleveIndexer) addDelete(filename string, repo *models.Repository, batch *gitea_bleve.FlushingBatch) error {
        id := filenameIndexerID(repo.ID, filename)
        return batch.Delete(id)
 }
@@ -267,7 +269,7 @@ func (b *BleveIndexer) Close() {
 
 // Index indexes the data
 func (b *BleveIndexer) Index(repo *models.Repository, sha string, changes *repoChanges) error {
-       batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize)
+       batch := gitea_bleve.NewFlushingBatch(b.indexer, maxBatchSize)
        if len(changes.Updates) > 0 {
 
                batchWriter, batchReader, cancel := git.CatFileBatch(repo.RepoPath())
@@ -296,7 +298,7 @@ func (b *BleveIndexer) Delete(repoID int64) error {
        if err != nil {
                return err
        }
-       batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize)
+       batch := gitea_bleve.NewFlushingBatch(b.indexer, maxBatchSize)
        for _, hit := range result.Hits {
                if err = batch.Delete(hit.ID); err != nil {
                        return err
index b1385eb676256d7a7bee5556da06d5f1c4f94338..db12874e84e585a496cfb6ef7f6460b199834b0a 100644 (file)
@@ -9,8 +9,10 @@ import (
        "os"
        "strconv"
 
+       gitea_bleve "code.gitea.io/gitea/modules/indexer/bleve"
        "code.gitea.io/gitea/modules/log"
        "code.gitea.io/gitea/modules/util"
+
        "github.com/blevesearch/bleve/v2"
        "github.com/blevesearch/bleve/v2/analysis/analyzer/custom"
        "github.com/blevesearch/bleve/v2/analysis/token/lowercase"
@@ -197,7 +199,7 @@ func (b *BleveIndexer) Close() {
 
 // Index will save the index data
 func (b *BleveIndexer) Index(issues []*IndexerData) error {
-       batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize)
+       batch := gitea_bleve.NewFlushingBatch(b.indexer, maxBatchSize)
        for _, issue := range issues {
                if err := batch.Index(indexerID(issue.ID), struct {
                        RepoID   int64
@@ -218,7 +220,7 @@ func (b *BleveIndexer) Index(issues []*IndexerData) error {
 
 // Delete deletes indexes by ids
 func (b *BleveIndexer) Delete(ids ...int64) error {
-       batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize)
+       batch := gitea_bleve.NewFlushingBatch(b.indexer, maxBatchSize)
        for _, id := range ids {
                if err := batch.Delete(indexerID(id)); err != nil {
                        return err