diff options
Diffstat (limited to 'modules/indexer/code')
-rw-r--r-- | modules/indexer/code/bleve.go | 13 | ||||
-rw-r--r-- | modules/indexer/code/elastic_search.go | 131 | ||||
-rw-r--r-- | modules/indexer/code/indexer.go | 36 | ||||
-rw-r--r-- | modules/indexer/code/indexer_test.go | 3 | ||||
-rw-r--r-- | modules/indexer/code/search.go | 5 | ||||
-rw-r--r-- | modules/indexer/code/wrapped.go | 25 |
6 files changed, 180 insertions, 33 deletions
diff --git a/modules/indexer/code/bleve.go b/modules/indexer/code/bleve.go index cfadcfebd8..e2e1532095 100644 --- a/modules/indexer/code/bleve.go +++ b/modules/indexer/code/bleve.go @@ -271,6 +271,15 @@ func (b *BleveIndexer) Close() { log.Info("PID: %d Repository Indexer closed", os.Getpid()) } +// SetAvailabilityChangeCallback does nothing +func (b *BleveIndexer) SetAvailabilityChangeCallback(callback func(bool)) { +} + +// Ping does nothing +func (b *BleveIndexer) Ping() bool { + return true +} + // Index indexes the data func (b *BleveIndexer) Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *repoChanges) error { batch := gitea_bleve.NewFlushingBatch(b.indexer, maxBatchSize) @@ -319,7 +328,7 @@ func (b *BleveIndexer) Delete(repoID int64) error { // Search searches for files in the specified repo. // Returns the matching file-paths -func (b *BleveIndexer) Search(repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) { +func (b *BleveIndexer) Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) { var ( indexerQuery query.Query keywordQuery query.Query @@ -372,7 +381,7 @@ func (b *BleveIndexer) Search(repoIDs []int64, language, keyword string, page, p searchRequest.AddFacet("languages", bleve.NewFacetRequest("Language", 10)) } - result, err := b.indexer.Search(searchRequest) + result, err := b.indexer.SearchInContext(ctx, searchRequest) if err != nil { return 0, nil, nil, err } diff --git a/modules/indexer/code/elastic_search.go b/modules/indexer/code/elastic_search.go index 9bd2fa301e..db37b4f66c 100644 --- a/modules/indexer/code/elastic_search.go +++ b/modules/indexer/code/elastic_search.go @@ -7,16 +7,20 @@ package code import ( "bufio" "context" + "errors" "fmt" "io" + "net" "strconv" "strings" + "sync" "time" repo_model "code.gitea.io/gitea/models/repo" "code.gitea.io/gitea/modules/analyze" "code.gitea.io/gitea/modules/charset" "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/json" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" @@ -39,8 +43,12 @@ var _ Indexer = &ElasticSearchIndexer{} // ElasticSearchIndexer implements Indexer interface type ElasticSearchIndexer struct { - client *elastic.Client - indexerAliasName string + client *elastic.Client + indexerAliasName string + available bool + availabilityCallback func(bool) + stopTimer chan struct{} + lock sync.RWMutex } type elasticLogger struct { @@ -78,7 +86,23 @@ func NewElasticSearchIndexer(url, indexerName string) (*ElasticSearchIndexer, bo indexer := &ElasticSearchIndexer{ client: client, indexerAliasName: indexerName, + available: true, + stopTimer: make(chan struct{}), } + + ticker := time.NewTicker(10 * time.Second) + go func() { + for { + select { + case <-ticker.C: + indexer.checkAvailability() + case <-indexer.stopTimer: + ticker.Stop() + return + } + } + }() + exists, err := indexer.init() if err != nil { indexer.Close() @@ -123,17 +147,17 @@ func (b *ElasticSearchIndexer) realIndexerName() string { // Init will initialize the indexer func (b *ElasticSearchIndexer) init() (bool, error) { - ctx := context.Background() + ctx := graceful.GetManager().HammerContext() exists, err := b.client.IndexExists(b.realIndexerName()).Do(ctx) if err != nil { - return false, err + return false, b.checkError(err) } if !exists { mapping := defaultMapping createIndex, err := b.client.CreateIndex(b.realIndexerName()).BodyString(mapping).Do(ctx) if err != nil { - return false, err + return false, b.checkError(err) } if !createIndex.Acknowledged { return false, fmt.Errorf("create index %s with %s failed", b.realIndexerName(), mapping) @@ -143,7 +167,7 @@ func (b *ElasticSearchIndexer) init() (bool, error) { // check version r, err := b.client.Aliases().Do(ctx) if err != nil { - return false, err + return false, b.checkError(err) } realIndexerNames := r.IndicesByAlias(b.indexerAliasName) @@ -152,10 +176,10 @@ func (b *ElasticSearchIndexer) init() (bool, error) { Add(b.realIndexerName(), b.indexerAliasName). Do(ctx) if err != nil { - return false, err + return false, b.checkError(err) } if !res.Acknowledged { - return false, fmt.Errorf("") + return false, fmt.Errorf("create alias %s to index %s failed", b.indexerAliasName, b.realIndexerName()) } } else if len(realIndexerNames) >= 1 && realIndexerNames[0] < b.realIndexerName() { log.Warn("Found older gitea indexer named %s, but we will create a new one %s and keep the old NOT DELETED. You can delete the old version after the upgrade succeed.", @@ -165,16 +189,30 @@ func (b *ElasticSearchIndexer) init() (bool, error) { Add(b.realIndexerName(), b.indexerAliasName). Do(ctx) if err != nil { - return false, err + return false, b.checkError(err) } if !res.Acknowledged { - return false, fmt.Errorf("") + return false, fmt.Errorf("change alias %s to index %s failed", b.indexerAliasName, b.realIndexerName()) } } return exists, nil } +// SetAvailabilityChangeCallback sets callback that will be triggered when availability changes +func (b *ElasticSearchIndexer) SetAvailabilityChangeCallback(callback func(bool)) { + b.lock.Lock() + defer b.lock.Unlock() + b.availabilityCallback = callback +} + +// Ping checks if elastic is available +func (b *ElasticSearchIndexer) Ping() bool { + b.lock.RLock() + defer b.lock.RUnlock() + return b.available +} + func (b *ElasticSearchIndexer) addUpdate(ctx context.Context, batchWriter git.WriteCloserError, batchReader *bufio.Reader, sha string, update fileUpdate, repo *repo_model.Repository) ([]elastic.BulkableRequest, error) { // Ignore vendored files in code search if setting.Indexer.ExcludeVendored && analyze.IsVendor(update.Filename) { @@ -190,7 +228,7 @@ func (b *ElasticSearchIndexer) addUpdate(ctx context.Context, batchWriter git.Wr return nil, err } if size, err = strconv.ParseInt(strings.TrimSpace(stdout), 10, 64); err != nil { - return nil, fmt.Errorf("Misformatted git cat-file output: %v", err) + return nil, fmt.Errorf("misformatted git cat-file output: %v", err) } } @@ -274,8 +312,8 @@ func (b *ElasticSearchIndexer) Index(ctx context.Context, repo *repo_model.Repos _, err := b.client.Bulk(). Index(b.indexerAliasName). Add(reqs...). - Do(context.Background()) - return err + Do(ctx) + return b.checkError(err) } return nil } @@ -284,8 +322,8 @@ func (b *ElasticSearchIndexer) Index(ctx context.Context, repo *repo_model.Repos func (b *ElasticSearchIndexer) Delete(repoID int64) error { _, err := b.client.DeleteByQuery(b.indexerAliasName). Query(elastic.NewTermsQuery("repo_id", repoID)). - Do(context.Background()) - return err + Do(graceful.GetManager().HammerContext()) + return b.checkError(err) } // indexPos find words positions for start and the following end on content. It will @@ -366,7 +404,7 @@ func extractAggs(searchResult *elastic.SearchResult) []*SearchResultLanguages { } // Search searches for codes and language stats by given conditions. -func (b *ElasticSearchIndexer) Search(repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) { +func (b *ElasticSearchIndexer) Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) { searchType := esMultiMatchTypeBestFields if isMatch { searchType = esMultiMatchTypePhrasePrefix @@ -407,9 +445,9 @@ func (b *ElasticSearchIndexer) Search(repoIDs []int64, language, keyword string, ). Sort("repo_id", true). From(start).Size(pageSize). - Do(context.Background()) + Do(ctx) if err != nil { - return 0, nil, nil, err + return 0, nil, nil, b.checkError(err) } return convertResult(searchResult, kw, pageSize) @@ -421,9 +459,9 @@ func (b *ElasticSearchIndexer) Search(repoIDs []int64, language, keyword string, Aggregation("language", aggregation). Query(query). Size(0). // We only needs stats information - Do(context.Background()) + Do(ctx) if err != nil { - return 0, nil, nil, err + return 0, nil, nil, b.checkError(err) } query = query.Must(langQuery) @@ -438,9 +476,9 @@ func (b *ElasticSearchIndexer) Search(repoIDs []int64, language, keyword string, ). Sort("repo_id", true). From(start).Size(pageSize). - Do(context.Background()) + Do(ctx) if err != nil { - return 0, nil, nil, err + return 0, nil, nil, b.checkError(err) } total, hits, _, err := convertResult(searchResult, kw, pageSize) @@ -449,4 +487,51 @@ func (b *ElasticSearchIndexer) Search(repoIDs []int64, language, keyword string, } // Close implements indexer -func (b *ElasticSearchIndexer) Close() {} +func (b *ElasticSearchIndexer) Close() { + select { + case <-b.stopTimer: + default: + close(b.stopTimer) + } +} + +func (b *ElasticSearchIndexer) checkError(err error) error { + var opErr *net.OpError + if !(elastic.IsConnErr(err) || (errors.As(err, &opErr) && (opErr.Op == "dial" || opErr.Op == "read"))) { + return err + } + + b.setAvailability(false) + + return err +} + +func (b *ElasticSearchIndexer) checkAvailability() { + if b.Ping() { + return + } + + // Request cluster state to check if elastic is available again + _, err := b.client.ClusterState().Do(graceful.GetManager().ShutdownContext()) + if err != nil { + b.setAvailability(false) + return + } + + b.setAvailability(true) +} + +func (b *ElasticSearchIndexer) setAvailability(available bool) { + b.lock.Lock() + defer b.lock.Unlock() + + if b.available == available { + return + } + + b.available = available + if b.availabilityCallback != nil { + // Call the callback from within the lock to ensure that the ordering remains correct + b.availabilityCallback(b.available) + } +} diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go index 9ae3abff60..d897fcccd5 100644 --- a/modules/indexer/code/indexer.go +++ b/modules/indexer/code/indexer.go @@ -42,9 +42,11 @@ type SearchResultLanguages struct { // Indexer defines an interface to index and search code contents type Indexer interface { + Ping() bool + SetAvailabilityChangeCallback(callback func(bool)) Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *repoChanges) error Delete(repoID int64) error - Search(repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) + Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) Close() } @@ -140,6 +142,7 @@ func Init() { return data } + unhandled := make([]queue.Data, 0, len(data)) for _, datum := range data { indexerData, ok := datum.(*IndexerData) if !ok { @@ -150,10 +153,14 @@ func Init() { if err := index(ctx, indexer, indexerData.RepoID); err != nil { log.Error("index: %v", err) - continue + if indexer.Ping() { + continue + } + // Add back to queue + unhandled = append(unhandled, datum) } } - return nil + return unhandled } indexerQueue = queue.CreateUniqueQueue("code_indexer", handler, &IndexerData{}) @@ -212,6 +219,18 @@ func Init() { indexer.set(rIndexer) + if queue, ok := indexerQueue.(queue.Pausable); ok { + rIndexer.SetAvailabilityChangeCallback(func(available bool) { + if !available { + log.Info("Code index queue paused") + queue.Pause() + } else { + log.Info("Code index queue resumed") + queue.Resume() + } + }) + } + // Start processing the queue go graceful.GetManager().RunWithShutdownFns(indexerQueue.Run) @@ -262,6 +281,17 @@ func UpdateRepoIndexer(repo *repo_model.Repository) { } } +// IsAvailable checks if issue indexer is available +func IsAvailable() bool { + idx, err := indexer.get() + if err != nil { + log.Error("IsAvailable(): unable to get indexer: %v", err) + return false + } + + return idx.Ping() +} + // populateRepoIndexer populate the repo indexer with pre-existing data. This // should only be run when the indexer is created for the first time. func populateRepoIndexer(ctx context.Context) { diff --git a/modules/indexer/code/indexer_test.go b/modules/indexer/code/indexer_test.go index 0f9915c84b..d56c33653f 100644 --- a/modules/indexer/code/indexer_test.go +++ b/modules/indexer/code/indexer_test.go @@ -5,6 +5,7 @@ package code import ( + "context" "path/filepath" "testing" @@ -65,7 +66,7 @@ func testIndexer(name string, t *testing.T, indexer Indexer) { for _, kw := range keywords { t.Run(kw.Keyword, func(t *testing.T) { - total, res, langs, err := indexer.Search(kw.RepoIDs, "", kw.Keyword, 1, 10, false) + total, res, langs, err := indexer.Search(context.TODO(), kw.RepoIDs, "", kw.Keyword, 1, 10, false) assert.NoError(t, err) assert.EqualValues(t, len(kw.IDs), total) assert.Len(t, langs, kw.Langs) diff --git a/modules/indexer/code/search.go b/modules/indexer/code/search.go index bb8dcf16b3..bb7715bafc 100644 --- a/modules/indexer/code/search.go +++ b/modules/indexer/code/search.go @@ -6,6 +6,7 @@ package code import ( "bytes" + "context" "strings" "code.gitea.io/gitea/modules/highlight" @@ -106,12 +107,12 @@ func searchResult(result *SearchResult, startIndex, endIndex int) (*Result, erro } // PerformSearch perform a search on a repository -func PerformSearch(repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int, []*Result, []*SearchResultLanguages, error) { +func PerformSearch(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int, []*Result, []*SearchResultLanguages, error) { if len(keyword) == 0 { return 0, nil, nil, nil } - total, results, resultLanguages, err := indexer.Search(repoIDs, language, keyword, page, pageSize, isMatch) + total, results, resultLanguages, err := indexer.Search(ctx, repoIDs, language, keyword, page, pageSize, isMatch) if err != nil { return 0, nil, nil, err } diff --git a/modules/indexer/code/wrapped.go b/modules/indexer/code/wrapped.go index 56baadd6fc..ba58236fba 100644 --- a/modules/indexer/code/wrapped.go +++ b/modules/indexer/code/wrapped.go @@ -10,6 +10,7 @@ import ( "sync" repo_model "code.gitea.io/gitea/models/repo" + "code.gitea.io/gitea/modules/log" ) var indexer = newWrappedIndexer() @@ -56,6 +57,26 @@ func (w *wrappedIndexer) get() (Indexer, error) { return w.internal, nil } +// SetAvailabilityChangeCallback sets callback that will be triggered when availability changes +func (w *wrappedIndexer) SetAvailabilityChangeCallback(callback func(bool)) { + indexer, err := w.get() + if err != nil { + log.Error("Failed to get indexer: %v", err) + return + } + indexer.SetAvailabilityChangeCallback(callback) +} + +// Ping checks if elastic is available +func (w *wrappedIndexer) Ping() bool { + indexer, err := w.get() + if err != nil { + log.Warn("Failed to get indexer: %v", err) + return false + } + return indexer.Ping() +} + func (w *wrappedIndexer) Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *repoChanges) error { indexer, err := w.get() if err != nil { @@ -72,12 +93,12 @@ func (w *wrappedIndexer) Delete(repoID int64) error { return indexer.Delete(repoID) } -func (w *wrappedIndexer) Search(repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) { +func (w *wrappedIndexer) Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) { indexer, err := w.get() if err != nil { return 0, nil, nil, err } - return indexer.Search(repoIDs, language, keyword, page, pageSize, isMatch) + return indexer.Search(ctx, repoIDs, language, keyword, page, pageSize, isMatch) } func (w *wrappedIndexer) Close() { |