diff options
Diffstat (limited to 'modules/indexer/issues')
-rw-r--r-- | modules/indexer/issues/bleve.go | 14 | ||||
-rw-r--r-- | modules/indexer/issues/bleve_test.go | 3 | ||||
-rw-r--r-- | modules/indexer/issues/db.go | 28 | ||||
-rw-r--r-- | modules/indexer/issues/elastic_search.go | 123 | ||||
-rw-r--r-- | modules/indexer/issues/indexer.go | 55 | ||||
-rw-r--r-- | modules/indexer/issues/indexer_test.go | 17 |
6 files changed, 199 insertions, 41 deletions
diff --git a/modules/indexer/issues/bleve.go b/modules/indexer/issues/bleve.go index d986a0e55e..c298b7de3e 100644 --- a/modules/indexer/issues/bleve.go +++ b/modules/indexer/issues/bleve.go @@ -5,6 +5,7 @@ package issues import ( + "context" "fmt" "os" "strconv" @@ -186,6 +187,15 @@ func (b *BleveIndexer) Init() (bool, error) { return false, err } +// SetAvailabilityChangeCallback does nothing +func (b *BleveIndexer) SetAvailabilityChangeCallback(callback func(bool)) { +} + +// Ping does nothing +func (b *BleveIndexer) Ping() bool { + return true +} + // Close will close the bleve indexer func (b *BleveIndexer) Close() { if b.indexer != nil { @@ -229,7 +239,7 @@ func (b *BleveIndexer) Delete(ids ...int64) error { // Search searches for issues by given conditions. // Returns the matching issue IDs -func (b *BleveIndexer) Search(keyword string, repoIDs []int64, limit, start int) (*SearchResult, error) { +func (b *BleveIndexer) Search(ctx context.Context, keyword string, repoIDs []int64, limit, start int) (*SearchResult, error) { var repoQueriesP []*query.NumericRangeQuery for _, repoID := range repoIDs { repoQueriesP = append(repoQueriesP, numericEqualityQuery(repoID, "RepoID")) @@ -249,7 +259,7 @@ func (b *BleveIndexer) Search(keyword string, repoIDs []int64, limit, start int) search := bleve.NewSearchRequestOptions(indexerQuery, limit, start, false) search.SortBy([]string{"-_score"}) - result, err := b.indexer.Search(search) + result, err := b.indexer.SearchInContext(ctx, search) if err != nil { return nil, err } diff --git a/modules/indexer/issues/bleve_test.go b/modules/indexer/issues/bleve_test.go index df036fb573..926c32e242 100644 --- a/modules/indexer/issues/bleve_test.go +++ b/modules/indexer/issues/bleve_test.go @@ -5,6 +5,7 @@ package issues import ( + "context" "os" "testing" @@ -84,7 +85,7 @@ func TestBleveIndexAndSearch(t *testing.T) { } for _, kw := range keywords { - res, err := indexer.Search(kw.Keyword, []int64{2}, 10, 0) + res, err := indexer.Search(context.TODO(), kw.Keyword, []int64{2}, 10, 0) assert.NoError(t, err) ids := make([]int64, 0, len(res.Hits)) diff --git a/modules/indexer/issues/db.go b/modules/indexer/issues/db.go index f02cbddce8..e2badf64f2 100644 --- a/modules/indexer/issues/db.go +++ b/modules/indexer/issues/db.go @@ -4,33 +4,47 @@ package issues -import "code.gitea.io/gitea/models" +import ( + "context" + + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/models/db" +) // DBIndexer implements Indexer interface to use database's like search type DBIndexer struct{} // Init dummy function -func (db *DBIndexer) Init() (bool, error) { +func (i *DBIndexer) Init() (bool, error) { return false, nil } +// SetAvailabilityChangeCallback dummy function +func (i *DBIndexer) SetAvailabilityChangeCallback(callback func(bool)) { +} + +// Ping checks if database is available +func (i *DBIndexer) Ping() bool { + return db.GetEngine(db.DefaultContext).Ping() != nil +} + // Index dummy function -func (db *DBIndexer) Index(issue []*IndexerData) error { +func (i *DBIndexer) Index(issue []*IndexerData) error { return nil } // Delete dummy function -func (db *DBIndexer) Delete(ids ...int64) error { +func (i *DBIndexer) Delete(ids ...int64) error { return nil } // Close dummy function -func (db *DBIndexer) Close() { +func (i *DBIndexer) Close() { } // Search dummy function -func (db *DBIndexer) Search(kw string, repoIDs []int64, limit, start int) (*SearchResult, error) { - total, ids, err := models.SearchIssueIDsByKeyword(kw, repoIDs, limit, start) +func (i *DBIndexer) Search(ctx context.Context, kw string, repoIDs []int64, limit, start int) (*SearchResult, error) { + total, ids, err := models.SearchIssueIDsByKeyword(ctx, kw, repoIDs, limit, start) if err != nil { return nil, err } diff --git a/modules/indexer/issues/elastic_search.go b/modules/indexer/issues/elastic_search.go index 187b69b749..97e32b8975 100644 --- a/modules/indexer/issues/elastic_search.go +++ b/modules/indexer/issues/elastic_search.go @@ -8,9 +8,12 @@ import ( "context" "errors" "fmt" + "net" "strconv" + "sync" "time" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "github.com/olivere/elastic/v7" @@ -20,8 +23,12 @@ var _ Indexer = &ElasticSearchIndexer{} // ElasticSearchIndexer implements Indexer interface type ElasticSearchIndexer struct { - client *elastic.Client - indexerName string + client *elastic.Client + indexerName string + available bool + availabilityCallback func(bool) + stopTimer chan struct{} + lock sync.RWMutex } type elasticLogger struct { @@ -56,10 +63,27 @@ func NewElasticSearchIndexer(url, indexerName string) (*ElasticSearchIndexer, er return nil, err } - return &ElasticSearchIndexer{ + indexer := &ElasticSearchIndexer{ client: client, indexerName: indexerName, - }, nil + available: true, + stopTimer: make(chan struct{}), + } + + ticker := time.NewTicker(10 * time.Second) + go func() { + for { + select { + case <-ticker.C: + indexer.checkAvailability() + case <-indexer.stopTimer: + ticker.Stop() + return + } + } + }() + + return indexer, nil } const ( @@ -93,10 +117,10 @@ const ( // Init will initialize the indexer func (b *ElasticSearchIndexer) Init() (bool, error) { - ctx := context.Background() + ctx := graceful.GetManager().HammerContext() exists, err := b.client.IndexExists(b.indexerName).Do(ctx) if err != nil { - return false, err + return false, b.checkError(err) } if !exists { @@ -104,7 +128,7 @@ func (b *ElasticSearchIndexer) Init() (bool, error) { createIndex, err := b.client.CreateIndex(b.indexerName).BodyString(mapping).Do(ctx) if err != nil { - return false, err + return false, b.checkError(err) } if !createIndex.Acknowledged { return false, errors.New("init failed") @@ -115,6 +139,20 @@ func (b *ElasticSearchIndexer) Init() (bool, error) { return true, nil } +// SetAvailabilityChangeCallback sets callback that will be triggered when availability changes +func (b *ElasticSearchIndexer) SetAvailabilityChangeCallback(callback func(bool)) { + b.lock.Lock() + defer b.lock.Unlock() + b.availabilityCallback = callback +} + +// Ping checks if elastic is available +func (b *ElasticSearchIndexer) Ping() bool { + b.lock.RLock() + defer b.lock.RUnlock() + return b.available +} + // Index will save the index data func (b *ElasticSearchIndexer) Index(issues []*IndexerData) error { if len(issues) == 0 { @@ -131,8 +169,8 @@ func (b *ElasticSearchIndexer) Index(issues []*IndexerData) error { "content": issue.Content, "comments": issue.Comments, }). - Do(context.Background()) - return err + Do(graceful.GetManager().HammerContext()) + return b.checkError(err) } reqs := make([]elastic.BulkableRequest, 0) @@ -154,8 +192,8 @@ func (b *ElasticSearchIndexer) Index(issues []*IndexerData) error { _, err := b.client.Bulk(). Index(b.indexerName). Add(reqs...). - Do(context.Background()) - return err + Do(graceful.GetManager().HammerContext()) + return b.checkError(err) } // Delete deletes indexes by ids @@ -166,8 +204,8 @@ func (b *ElasticSearchIndexer) Delete(ids ...int64) error { _, err := b.client.Delete(). Index(b.indexerName). Id(fmt.Sprintf("%d", ids[0])). - Do(context.Background()) - return err + Do(graceful.GetManager().HammerContext()) + return b.checkError(err) } reqs := make([]elastic.BulkableRequest, 0) @@ -182,13 +220,13 @@ func (b *ElasticSearchIndexer) Delete(ids ...int64) error { _, err := b.client.Bulk(). Index(b.indexerName). Add(reqs...). - Do(context.Background()) - return err + Do(graceful.GetManager().HammerContext()) + return b.checkError(err) } // Search searches for issues by given conditions. // Returns the matching issue IDs -func (b *ElasticSearchIndexer) Search(keyword string, repoIDs []int64, limit, start int) (*SearchResult, error) { +func (b *ElasticSearchIndexer) Search(ctx context.Context, keyword string, repoIDs []int64, limit, start int) (*SearchResult, error) { kwQuery := elastic.NewMultiMatchQuery(keyword, "title", "content", "comments") query := elastic.NewBoolQuery() query = query.Must(kwQuery) @@ -205,9 +243,9 @@ func (b *ElasticSearchIndexer) Search(keyword string, repoIDs []int64, limit, st Query(query). Sort("_score", false). From(start).Size(limit). - Do(context.Background()) + Do(ctx) if err != nil { - return nil, err + return nil, b.checkError(err) } hits := make([]Match, 0, limit) @@ -225,4 +263,51 @@ func (b *ElasticSearchIndexer) Search(keyword string, repoIDs []int64, limit, st } // Close implements indexer -func (b *ElasticSearchIndexer) Close() {} +func (b *ElasticSearchIndexer) Close() { + select { + case <-b.stopTimer: + default: + close(b.stopTimer) + } +} + +func (b *ElasticSearchIndexer) checkError(err error) error { + var opErr *net.OpError + if !(elastic.IsConnErr(err) || (errors.As(err, &opErr) && (opErr.Op == "dial" || opErr.Op == "read"))) { + return err + } + + b.setAvailability(false) + + return err +} + +func (b *ElasticSearchIndexer) checkAvailability() { + if b.Ping() { + return + } + + // Request cluster state to check if elastic is available again + _, err := b.client.ClusterState().Do(graceful.GetManager().ShutdownContext()) + if err != nil { + b.setAvailability(false) + return + } + + b.setAvailability(true) +} + +func (b *ElasticSearchIndexer) setAvailability(available bool) { + b.lock.Lock() + defer b.lock.Unlock() + + if b.available == available { + return + } + + b.available = available + if b.availabilityCallback != nil { + // Call the callback from within the lock to ensure that the ordering remains correct + b.availabilityCallback(b.available) + } +} diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index 729981ec71..3aaa27eed2 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -47,9 +47,11 @@ type SearchResult struct { // Indexer defines an interface to indexer issues contents type Indexer interface { Init() (bool, error) + Ping() bool + SetAvailabilityChangeCallback(callback func(bool)) Index(issue []*IndexerData) error Delete(ids ...int64) error - Search(kw string, repoIDs []int64, limit, start int) (*SearchResult, error) + Search(ctx context.Context, kw string, repoIDs []int64, limit, start int) (*SearchResult, error) Close() } @@ -111,6 +113,7 @@ func InitIssueIndexer(syncReindex bool) { } iData := make([]*IndexerData, 0, len(data)) + unhandled := make([]queue.Data, 0, len(data)) for _, datum := range data { indexerData, ok := datum.(*IndexerData) if !ok { @@ -119,13 +122,34 @@ func InitIssueIndexer(syncReindex bool) { } log.Trace("IndexerData Process: %d %v %t", indexerData.ID, indexerData.IDs, indexerData.IsDelete) if indexerData.IsDelete { - _ = indexer.Delete(indexerData.IDs...) + if err := indexer.Delete(indexerData.IDs...); err != nil { + log.Error("Error whilst deleting from index: %v Error: %v", indexerData.IDs, err) + if indexer.Ping() { + continue + } + // Add back to queue + unhandled = append(unhandled, datum) + } continue } iData = append(iData, indexerData) } + if len(unhandled) > 0 { + for _, indexerData := range iData { + unhandled = append(unhandled, indexerData) + } + return unhandled + } if err := indexer.Index(iData); err != nil { log.Error("Error whilst indexing: %v Error: %v", iData, err) + if indexer.Ping() { + return nil + } + // Add back to queue + for _, indexerData := range iData { + unhandled = append(unhandled, indexerData) + } + return unhandled } return nil } @@ -193,6 +217,18 @@ func InitIssueIndexer(syncReindex bool) { log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType) } + if queue, ok := issueIndexerQueue.(queue.Pausable); ok { + holder.get().SetAvailabilityChangeCallback(func(available bool) { + if !available { + log.Info("Issue index queue paused") + queue.Pause() + } else { + log.Info("Issue index queue resumed") + queue.Resume() + } + }) + } + // Start processing the queue go graceful.GetManager().RunWithShutdownFns(issueIndexerQueue.Run) @@ -334,7 +370,7 @@ func DeleteRepoIssueIndexer(repo *repo_model.Repository) { // SearchIssuesByKeyword search issue ids by keywords and repo id // WARNNING: You have to ensure user have permission to visit repoIDs' issues -func SearchIssuesByKeyword(repoIDs []int64, keyword string) ([]int64, error) { +func SearchIssuesByKeyword(ctx context.Context, repoIDs []int64, keyword string) ([]int64, error) { var issueIDs []int64 indexer := holder.get() @@ -342,7 +378,7 @@ func SearchIssuesByKeyword(repoIDs []int64, keyword string) ([]int64, error) { log.Error("SearchIssuesByKeyword(): unable to get indexer!") return nil, fmt.Errorf("unable to get issue indexer") } - res, err := indexer.Search(keyword, repoIDs, 50, 0) + res, err := indexer.Search(ctx, keyword, repoIDs, 50, 0) if err != nil { return nil, err } @@ -351,3 +387,14 @@ func SearchIssuesByKeyword(repoIDs []int64, keyword string) ([]int64, error) { } return issueIDs, nil } + +// IsAvailable checks if issue indexer is available +func IsAvailable() bool { + indexer := holder.get() + if indexer == nil { + log.Error("IsAvailable(): unable to get indexer!") + return false + } + + return indexer.Ping() +} diff --git a/modules/indexer/issues/indexer_test.go b/modules/indexer/issues/indexer_test.go index ee6ebcdd18..d516615b56 100644 --- a/modules/indexer/issues/indexer_test.go +++ b/modules/indexer/issues/indexer_test.go @@ -5,6 +5,7 @@ package issues import ( + "context" "os" "path" "path/filepath" @@ -56,19 +57,19 @@ func TestBleveSearchIssues(t *testing.T) { time.Sleep(5 * time.Second) - ids, err := SearchIssuesByKeyword([]int64{1}, "issue2") + ids, err := SearchIssuesByKeyword(context.TODO(), []int64{1}, "issue2") assert.NoError(t, err) assert.EqualValues(t, []int64{2}, ids) - ids, err = SearchIssuesByKeyword([]int64{1}, "first") + ids, err = SearchIssuesByKeyword(context.TODO(), []int64{1}, "first") assert.NoError(t, err) assert.EqualValues(t, []int64{1}, ids) - ids, err = SearchIssuesByKeyword([]int64{1}, "for") + ids, err = SearchIssuesByKeyword(context.TODO(), []int64{1}, "for") assert.NoError(t, err) assert.ElementsMatch(t, []int64{1, 2, 3, 5, 11}, ids) - ids, err = SearchIssuesByKeyword([]int64{1}, "good") + ids, err = SearchIssuesByKeyword(context.TODO(), []int64{1}, "good") assert.NoError(t, err) assert.EqualValues(t, []int64{1}, ids) } @@ -79,19 +80,19 @@ func TestDBSearchIssues(t *testing.T) { setting.Indexer.IssueType = "db" InitIssueIndexer(true) - ids, err := SearchIssuesByKeyword([]int64{1}, "issue2") + ids, err := SearchIssuesByKeyword(context.TODO(), []int64{1}, "issue2") assert.NoError(t, err) assert.EqualValues(t, []int64{2}, ids) - ids, err = SearchIssuesByKeyword([]int64{1}, "first") + ids, err = SearchIssuesByKeyword(context.TODO(), []int64{1}, "first") assert.NoError(t, err) assert.EqualValues(t, []int64{1}, ids) - ids, err = SearchIssuesByKeyword([]int64{1}, "for") + ids, err = SearchIssuesByKeyword(context.TODO(), []int64{1}, "for") assert.NoError(t, err) assert.ElementsMatch(t, []int64{1, 2, 3, 5, 11}, ids) - ids, err = SearchIssuesByKeyword([]int64{1}, "good") + ids, err = SearchIssuesByKeyword(context.TODO(), []int64{1}, "good") assert.NoError(t, err) assert.EqualValues(t, []int64{1}, ids) } |