diff options
Diffstat (limited to 'modules/indexer/code')
-rw-r--r-- | modules/indexer/code/bleve/bleve.go (renamed from modules/indexer/code/bleve.go) | 156 | ||||
-rw-r--r-- | modules/indexer/code/bleve_test.go | 30 | ||||
-rw-r--r-- | modules/indexer/code/elastic_search_test.go | 41 | ||||
-rw-r--r-- | modules/indexer/code/elasticsearch/elasticsearch.go (renamed from modules/indexer/code/elastic_search.go) | 252 | ||||
-rw-r--r-- | modules/indexer/code/elasticsearch/elasticsearch_test.go | 16 | ||||
-rw-r--r-- | modules/indexer/code/git.go | 32 | ||||
-rw-r--r-- | modules/indexer/code/indexer.go | 145 | ||||
-rw-r--r-- | modules/indexer/code/indexer_test.go | 50 | ||||
-rw-r--r-- | modules/indexer/code/internal/indexer.go | 43 | ||||
-rw-r--r-- | modules/indexer/code/internal/model.go | 44 | ||||
-rw-r--r-- | modules/indexer/code/internal/util.go | 32 | ||||
-rw-r--r-- | modules/indexer/code/search.go | 9 | ||||
-rw-r--r-- | modules/indexer/code/wrapped.go | 104 |
13 files changed, 331 insertions, 623 deletions
diff --git a/modules/indexer/code/bleve.go b/modules/indexer/code/bleve/bleve.go index 5936613e3a..33cc4e02b5 100644 --- a/modules/indexer/code/bleve.go +++ b/modules/indexer/code/bleve/bleve.go @@ -1,14 +1,13 @@ // Copyright 2019 The Gitea Authors. All rights reserved. // SPDX-License-Identifier: MIT -package code +package bleve import ( "bufio" "context" "fmt" "io" - "os" "strconv" "strings" "time" @@ -17,12 +16,13 @@ import ( "code.gitea.io/gitea/modules/analyze" "code.gitea.io/gitea/modules/charset" "code.gitea.io/gitea/modules/git" - gitea_bleve "code.gitea.io/gitea/modules/indexer/bleve" + "code.gitea.io/gitea/modules/indexer/code/internal" + indexer_internal "code.gitea.io/gitea/modules/indexer/internal" + inner_bleve "code.gitea.io/gitea/modules/indexer/internal/bleve" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/typesniffer" - "code.gitea.io/gitea/modules/util" "github.com/blevesearch/bleve/v2" analyzer_custom "github.com/blevesearch/bleve/v2/analysis/analyzer/custom" @@ -31,10 +31,8 @@ import ( "github.com/blevesearch/bleve/v2/analysis/token/lowercase" "github.com/blevesearch/bleve/v2/analysis/token/unicodenorm" "github.com/blevesearch/bleve/v2/analysis/tokenizer/unicode" - "github.com/blevesearch/bleve/v2/index/upsidedown" "github.com/blevesearch/bleve/v2/mapping" "github.com/blevesearch/bleve/v2/search/query" - "github.com/ethantkoenig/rupture" "github.com/go-enry/go-enry/v2" ) @@ -59,38 +57,6 @@ func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error { }) } -// openBleveIndexer open the index at the specified path, checking for metadata -// updates and bleve version updates. If index needs to be created (or -// re-created), returns (nil, nil) -func openBleveIndexer(path string, latestVersion int) (bleve.Index, error) { - _, err := os.Stat(path) - if err != nil && os.IsNotExist(err) { - return nil, nil - } else if err != nil { - return nil, err - } - - metadata, err := rupture.ReadIndexMetadata(path) - if err != nil { - return nil, err - } - if metadata.Version < latestVersion { - // the indexer is using a previous version, so we should delete it and - // re-populate - return nil, util.RemoveAll(path) - } - - index, err := bleve.Open(path) - if err != nil && err == upsidedown.IncompatibleVersion { - // the indexer was built with a previous version of bleve, so we should - // delete it and re-populate - return nil, util.RemoveAll(path) - } else if err != nil { - return nil, err - } - return index, nil -} - // RepoIndexerData data stored in the repo indexer type RepoIndexerData struct { RepoID int64 @@ -111,8 +77,8 @@ const ( repoIndexerLatestVersion = 6 ) -// createBleveIndexer create a bleve repo indexer if one does not already exist -func createBleveIndexer(path string, latestVersion int) (bleve.Index, error) { +// generateBleveIndexMapping generates a bleve index mapping for the repo indexer +func generateBleveIndexMapping() (mapping.IndexMapping, error) { docMapping := bleve.NewDocumentMapping() numericFieldMapping := bleve.NewNumericFieldMapping() numericFieldMapping.IncludeInAll = false @@ -147,42 +113,28 @@ func createBleveIndexer(path string, latestVersion int) (bleve.Index, error) { mapping.AddDocumentMapping(repoIndexerDocType, docMapping) mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping()) - indexer, err := bleve.New(path, mapping) - if err != nil { - return nil, err - } - - if err = rupture.WriteIndexMetadata(path, &rupture.IndexMetadata{ - Version: latestVersion, - }); err != nil { - return nil, err - } - return indexer, nil + return mapping, nil } -var _ Indexer = &BleveIndexer{} +var _ internal.Indexer = &Indexer{} -// BleveIndexer represents a bleve indexer implementation -type BleveIndexer struct { - indexDir string - indexer bleve.Index +// Indexer represents a bleve indexer implementation +type Indexer struct { + inner *inner_bleve.Indexer + indexer_internal.Indexer // do not composite inner_bleve.Indexer directly to avoid exposing too much } -// NewBleveIndexer creates a new bleve local indexer -func NewBleveIndexer(indexDir string) (*BleveIndexer, bool, error) { - indexer := &BleveIndexer{ - indexDir: indexDir, +// NewIndexer creates a new bleve local indexer +func NewIndexer(indexDir string) *Indexer { + inner := inner_bleve.NewIndexer(indexDir, repoIndexerLatestVersion, generateBleveIndexMapping) + return &Indexer{ + Indexer: inner, + inner: inner, } - created, err := indexer.init() - if err != nil { - indexer.Close() - return nil, false, err - } - return indexer, created, err } -func (b *BleveIndexer) addUpdate(ctx context.Context, batchWriter git.WriteCloserError, batchReader *bufio.Reader, commitSha string, - update fileUpdate, repo *repo_model.Repository, batch *gitea_bleve.FlushingBatch, +func (b *Indexer) addUpdate(ctx context.Context, batchWriter git.WriteCloserError, batchReader *bufio.Reader, commitSha string, + update internal.FileUpdate, repo *repo_model.Repository, batch *inner_bleve.FlushingBatch, ) error { // Ignore vendored files in code search if setting.Indexer.ExcludeVendored && analyze.IsVendor(update.Filename) { @@ -227,7 +179,7 @@ func (b *BleveIndexer) addUpdate(ctx context.Context, batchWriter git.WriteClose if _, err = batchReader.Discard(1); err != nil { return err } - id := filenameIndexerID(repo.ID, update.Filename) + id := internal.FilenameIndexerID(repo.ID, update.Filename) return batch.Index(id, &RepoIndexerData{ RepoID: repo.ID, CommitID: commitSha, @@ -237,50 +189,14 @@ func (b *BleveIndexer) addUpdate(ctx context.Context, batchWriter git.WriteClose }) } -func (b *BleveIndexer) addDelete(filename string, repo *repo_model.Repository, batch *gitea_bleve.FlushingBatch) error { - id := filenameIndexerID(repo.ID, filename) +func (b *Indexer) addDelete(filename string, repo *repo_model.Repository, batch *inner_bleve.FlushingBatch) error { + id := internal.FilenameIndexerID(repo.ID, filename) return batch.Delete(id) } -// init init the indexer -func (b *BleveIndexer) init() (bool, error) { - var err error - b.indexer, err = openBleveIndexer(b.indexDir, repoIndexerLatestVersion) - if err != nil { - return false, err - } - if b.indexer != nil { - return false, nil - } - - b.indexer, err = createBleveIndexer(b.indexDir, repoIndexerLatestVersion) - if err != nil { - return false, err - } - - return true, nil -} - -// Close close the indexer -func (b *BleveIndexer) Close() { - log.Debug("Closing repo indexer") - if b.indexer != nil { - err := b.indexer.Close() - if err != nil { - log.Error("Error whilst closing the repository indexer: %v", err) - } - } - log.Info("PID: %d Repository Indexer closed", os.Getpid()) -} - -// Ping does nothing -func (b *BleveIndexer) Ping() bool { - return true -} - // Index indexes the data -func (b *BleveIndexer) Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *repoChanges) error { - batch := gitea_bleve.NewFlushingBatch(b.indexer, maxBatchSize) +func (b *Indexer) Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *internal.RepoChanges) error { + batch := inner_bleve.NewFlushingBatch(b.inner.Indexer, maxBatchSize) if len(changes.Updates) > 0 { // Now because of some insanity with git cat-file not immediately failing if not run in a valid git directory we need to run git rev-parse first! @@ -308,14 +224,14 @@ func (b *BleveIndexer) Index(ctx context.Context, repo *repo_model.Repository, s } // Delete deletes indexes by ids -func (b *BleveIndexer) Delete(repoID int64) error { +func (b *Indexer) Delete(_ context.Context, repoID int64) error { query := numericEqualityQuery(repoID, "RepoID") searchRequest := bleve.NewSearchRequestOptions(query, 2147483647, 0, false) - result, err := b.indexer.Search(searchRequest) + result, err := b.inner.Indexer.Search(searchRequest) if err != nil { return err } - batch := gitea_bleve.NewFlushingBatch(b.indexer, maxBatchSize) + batch := inner_bleve.NewFlushingBatch(b.inner.Indexer, maxBatchSize) for _, hit := range result.Hits { if err = batch.Delete(hit.ID); err != nil { return err @@ -326,7 +242,7 @@ func (b *BleveIndexer) Delete(repoID int64) error { // Search searches for files in the specified repo. // Returns the matching file-paths -func (b *BleveIndexer) Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) { +func (b *Indexer) Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*internal.SearchResult, []*internal.SearchResultLanguages, error) { var ( indexerQuery query.Query keywordQuery query.Query @@ -379,14 +295,14 @@ func (b *BleveIndexer) Search(ctx context.Context, repoIDs []int64, language, ke searchRequest.AddFacet("languages", bleve.NewFacetRequest("Language", 10)) } - result, err := b.indexer.SearchInContext(ctx, searchRequest) + result, err := b.inner.Indexer.SearchInContext(ctx, searchRequest) if err != nil { return 0, nil, nil, err } total := int64(result.Total) - searchResults := make([]*SearchResult, len(result.Hits)) + searchResults := make([]*internal.SearchResult, len(result.Hits)) for i, hit := range result.Hits { startIndex, endIndex := -1, -1 for _, locations := range hit.Locations["Content"] { @@ -405,11 +321,11 @@ func (b *BleveIndexer) Search(ctx context.Context, repoIDs []int64, language, ke if t, err := time.Parse(time.RFC3339, hit.Fields["UpdatedAt"].(string)); err == nil { updatedUnix = timeutil.TimeStamp(t.Unix()) } - searchResults[i] = &SearchResult{ + searchResults[i] = &internal.SearchResult{ RepoID: int64(hit.Fields["RepoID"].(float64)), StartIndex: startIndex, EndIndex: endIndex, - Filename: filenameOfIndexerID(hit.ID), + Filename: internal.FilenameOfIndexerID(hit.ID), Content: hit.Fields["Content"].(string), CommitID: hit.Fields["CommitID"].(string), UpdatedUnix: updatedUnix, @@ -418,7 +334,7 @@ func (b *BleveIndexer) Search(ctx context.Context, repoIDs []int64, language, ke } } - searchResultLanguages := make([]*SearchResultLanguages, 0, 10) + searchResultLanguages := make([]*internal.SearchResultLanguages, 0, 10) if len(language) > 0 { // Use separate query to go get all language counts facetRequest := bleve.NewSearchRequestOptions(facetQuery, 1, 0, false) @@ -426,7 +342,7 @@ func (b *BleveIndexer) Search(ctx context.Context, repoIDs []int64, language, ke facetRequest.IncludeLocations = true facetRequest.AddFacet("languages", bleve.NewFacetRequest("Language", 10)) - if result, err = b.indexer.Search(facetRequest); err != nil { + if result, err = b.inner.Indexer.Search(facetRequest); err != nil { return 0, nil, nil, err } @@ -436,7 +352,7 @@ func (b *BleveIndexer) Search(ctx context.Context, repoIDs []int64, language, ke if len(term.Term) == 0 { continue } - searchResultLanguages = append(searchResultLanguages, &SearchResultLanguages{ + searchResultLanguages = append(searchResultLanguages, &internal.SearchResultLanguages{ Language: term.Term, Color: enry.GetColor(term.Term), Count: term.Count, diff --git a/modules/indexer/code/bleve_test.go b/modules/indexer/code/bleve_test.go deleted file mode 100644 index 00bcd5c90c..0000000000 --- a/modules/indexer/code/bleve_test.go +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright 2019 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package code - -import ( - "testing" - - "code.gitea.io/gitea/models/unittest" - - "github.com/stretchr/testify/assert" -) - -func TestBleveIndexAndSearch(t *testing.T) { - unittest.PrepareTestEnv(t) - - dir := t.TempDir() - - idx, _, err := NewBleveIndexer(dir) - if err != nil { - assert.Fail(t, "Unable to create bleve indexer Error: %v", err) - if idx != nil { - idx.Close() - } - return - } - defer idx.Close() - - testIndexer("beleve", t, idx) -} diff --git a/modules/indexer/code/elastic_search_test.go b/modules/indexer/code/elastic_search_test.go deleted file mode 100644 index e7506eefa6..0000000000 --- a/modules/indexer/code/elastic_search_test.go +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright 2020 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package code - -import ( - "os" - "testing" - - "code.gitea.io/gitea/models/unittest" - - "github.com/stretchr/testify/assert" -) - -func TestESIndexAndSearch(t *testing.T) { - unittest.PrepareTestEnv(t) - - u := os.Getenv("TEST_INDEXER_CODE_ES_URL") - if u == "" { - t.SkipNow() - return - } - - indexer, _, err := NewElasticSearchIndexer(u, "gitea_codes") - if err != nil { - assert.Fail(t, "Unable to create ES indexer Error: %v", err) - if indexer != nil { - indexer.Close() - } - return - } - defer indexer.Close() - - testIndexer("elastic_search", t, indexer) -} - -func TestIndexPos(t *testing.T) { - startIdx, endIdx := indexPos("test index start and end", "start", "end") - assert.EqualValues(t, 11, startIdx) - assert.EqualValues(t, 24, endIdx) -} diff --git a/modules/indexer/code/elastic_search.go b/modules/indexer/code/elasticsearch/elasticsearch.go index 0e56a86588..88054585cd 100644 --- a/modules/indexer/code/elastic_search.go +++ b/modules/indexer/code/elasticsearch/elasticsearch.go @@ -1,25 +1,23 @@ // Copyright 2020 The Gitea Authors. All rights reserved. // SPDX-License-Identifier: MIT -package code +package elasticsearch import ( "bufio" "context" - "errors" "fmt" "io" - "net" "strconv" "strings" - "sync" - "time" repo_model "code.gitea.io/gitea/models/repo" "code.gitea.io/gitea/modules/analyze" "code.gitea.io/gitea/modules/charset" "code.gitea.io/gitea/modules/git" - "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/indexer/code/internal" + indexer_internal "code.gitea.io/gitea/modules/indexer/internal" + inner_elasticsearch "code.gitea.io/gitea/modules/indexer/internal/elasticsearch" "code.gitea.io/gitea/modules/json" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" @@ -38,63 +36,22 @@ const ( esMultiMatchTypePhrasePrefix = "phrase_prefix" ) -var _ Indexer = &ElasticSearchIndexer{} +var _ internal.Indexer = &Indexer{} -// ElasticSearchIndexer implements Indexer interface -type ElasticSearchIndexer struct { - client *elastic.Client - indexerAliasName string - available bool - stopTimer chan struct{} - lock sync.RWMutex +// Indexer implements Indexer interface +type Indexer struct { + inner *inner_elasticsearch.Indexer + indexer_internal.Indexer // do not composite inner_elasticsearch.Indexer directly to avoid exposing too much } -// NewElasticSearchIndexer creates a new elasticsearch indexer -func NewElasticSearchIndexer(url, indexerName string) (*ElasticSearchIndexer, bool, error) { - opts := []elastic.ClientOptionFunc{ - elastic.SetURL(url), - elastic.SetSniff(false), - elastic.SetHealthcheckInterval(10 * time.Second), - elastic.SetGzip(false), +// NewIndexer creates a new elasticsearch indexer +func NewIndexer(url, indexerName string) *Indexer { + inner := inner_elasticsearch.NewIndexer(url, indexerName, esRepoIndexerLatestVersion, defaultMapping) + indexer := &Indexer{ + inner: inner, + Indexer: inner, } - - logger := log.GetLogger(log.DEFAULT) - - opts = append(opts, elastic.SetTraceLog(&log.PrintfLogger{Logf: logger.Trace})) - opts = append(opts, elastic.SetInfoLog(&log.PrintfLogger{Logf: logger.Info})) - opts = append(opts, elastic.SetErrorLog(&log.PrintfLogger{Logf: logger.Error})) - - client, err := elastic.NewClient(opts...) - if err != nil { - return nil, false, err - } - - indexer := &ElasticSearchIndexer{ - client: client, - indexerAliasName: indexerName, - available: true, - stopTimer: make(chan struct{}), - } - - ticker := time.NewTicker(10 * time.Second) - go func() { - for { - select { - case <-ticker.C: - indexer.checkAvailability() - case <-indexer.stopTimer: - ticker.Stop() - return - } - } - }() - - exists, err := indexer.init() - if err != nil { - indexer.Close() - return nil, false, err - } - return indexer, !exists, err + return indexer } const ( @@ -127,72 +84,7 @@ const ( }` ) -func (b *ElasticSearchIndexer) realIndexerName() string { - return fmt.Sprintf("%s.v%d", b.indexerAliasName, esRepoIndexerLatestVersion) -} - -// Init will initialize the indexer -func (b *ElasticSearchIndexer) init() (bool, error) { - ctx := graceful.GetManager().HammerContext() - exists, err := b.client.IndexExists(b.realIndexerName()).Do(ctx) - if err != nil { - return false, b.checkError(err) - } - if !exists { - mapping := defaultMapping - - createIndex, err := b.client.CreateIndex(b.realIndexerName()).BodyString(mapping).Do(ctx) - if err != nil { - return false, b.checkError(err) - } - if !createIndex.Acknowledged { - return false, fmt.Errorf("create index %s with %s failed", b.realIndexerName(), mapping) - } - } - - // check version - r, err := b.client.Aliases().Do(ctx) - if err != nil { - return false, b.checkError(err) - } - - realIndexerNames := r.IndicesByAlias(b.indexerAliasName) - if len(realIndexerNames) < 1 { - res, err := b.client.Alias(). - Add(b.realIndexerName(), b.indexerAliasName). - Do(ctx) - if err != nil { - return false, b.checkError(err) - } - if !res.Acknowledged { - return false, fmt.Errorf("create alias %s to index %s failed", b.indexerAliasName, b.realIndexerName()) - } - } else if len(realIndexerNames) >= 1 && realIndexerNames[0] < b.realIndexerName() { - log.Warn("Found older gitea indexer named %s, but we will create a new one %s and keep the old NOT DELETED. You can delete the old version after the upgrade succeed.", - realIndexerNames[0], b.realIndexerName()) - res, err := b.client.Alias(). - Remove(realIndexerNames[0], b.indexerAliasName). - Add(b.realIndexerName(), b.indexerAliasName). - Do(ctx) - if err != nil { - return false, b.checkError(err) - } - if !res.Acknowledged { - return false, fmt.Errorf("change alias %s to index %s failed", b.indexerAliasName, b.realIndexerName()) - } - } - - return exists, nil -} - -// Ping checks if elastic is available -func (b *ElasticSearchIndexer) Ping() bool { - b.lock.RLock() - defer b.lock.RUnlock() - return b.available -} - -func (b *ElasticSearchIndexer) addUpdate(ctx context.Context, batchWriter git.WriteCloserError, batchReader *bufio.Reader, sha string, update fileUpdate, repo *repo_model.Repository) ([]elastic.BulkableRequest, error) { +func (b *Indexer) addUpdate(ctx context.Context, batchWriter git.WriteCloserError, batchReader *bufio.Reader, sha string, update internal.FileUpdate, repo *repo_model.Repository) ([]elastic.BulkableRequest, error) { // Ignore vendored files in code search if setting.Indexer.ExcludeVendored && analyze.IsVendor(update.Filename) { return nil, nil @@ -235,11 +127,11 @@ func (b *ElasticSearchIndexer) addUpdate(ctx context.Context, batchWriter git.Wr if _, err = batchReader.Discard(1); err != nil { return nil, err } - id := filenameIndexerID(repo.ID, update.Filename) + id := internal.FilenameIndexerID(repo.ID, update.Filename) return []elastic.BulkableRequest{ elastic.NewBulkIndexRequest(). - Index(b.indexerAliasName). + Index(b.inner.VersionedIndexName()). Id(id). Doc(map[string]interface{}{ "repo_id": repo.ID, @@ -251,15 +143,15 @@ func (b *ElasticSearchIndexer) addUpdate(ctx context.Context, batchWriter git.Wr }, nil } -func (b *ElasticSearchIndexer) addDelete(filename string, repo *repo_model.Repository) elastic.BulkableRequest { - id := filenameIndexerID(repo.ID, filename) +func (b *Indexer) addDelete(filename string, repo *repo_model.Repository) elastic.BulkableRequest { + id := internal.FilenameIndexerID(repo.ID, filename) return elastic.NewBulkDeleteRequest(). - Index(b.indexerAliasName). + Index(b.inner.VersionedIndexName()). Id(id) } // Index will save the index data -func (b *ElasticSearchIndexer) Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *repoChanges) error { +func (b *Indexer) Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *internal.RepoChanges) error { reqs := make([]elastic.BulkableRequest, 0) if len(changes.Updates) > 0 { // Now because of some insanity with git cat-file not immediately failing if not run in a valid git directory we need to run git rev-parse first! @@ -288,21 +180,21 @@ func (b *ElasticSearchIndexer) Index(ctx context.Context, repo *repo_model.Repos } if len(reqs) > 0 { - _, err := b.client.Bulk(). - Index(b.indexerAliasName). + _, err := b.inner.Client.Bulk(). + Index(b.inner.VersionedIndexName()). Add(reqs...). Do(ctx) - return b.checkError(err) + return err } return nil } // Delete deletes indexes by ids -func (b *ElasticSearchIndexer) Delete(repoID int64) error { - _, err := b.client.DeleteByQuery(b.indexerAliasName). +func (b *Indexer) Delete(ctx context.Context, repoID int64) error { + _, err := b.inner.Client.DeleteByQuery(b.inner.VersionedIndexName()). Query(elastic.NewTermsQuery("repo_id", repoID)). - Do(graceful.GetManager().HammerContext()) - return b.checkError(err) + Do(ctx) + return err } // indexPos find words positions for start and the following end on content. It will @@ -321,8 +213,8 @@ func indexPos(content, start, end string) (int, int) { return startIdx, startIdx + len(start) + endIdx + len(end) } -func convertResult(searchResult *elastic.SearchResult, kw string, pageSize int) (int64, []*SearchResult, []*SearchResultLanguages, error) { - hits := make([]*SearchResult, 0, pageSize) +func convertResult(searchResult *elastic.SearchResult, kw string, pageSize int) (int64, []*internal.SearchResult, []*internal.SearchResultLanguages, error) { + hits := make([]*internal.SearchResult, 0, pageSize) for _, hit := range searchResult.Hits.Hits { // FIXME: There is no way to get the position the keyword on the content currently on the same request. // So we get it from content, this may made the query slower. See @@ -341,7 +233,7 @@ func convertResult(searchResult *elastic.SearchResult, kw string, pageSize int) panic(fmt.Sprintf("2===%#v", hit.Highlight)) } - repoID, fileName := parseIndexerID(hit.Id) + repoID, fileName := internal.ParseIndexerID(hit.Id) res := make(map[string]interface{}) if err := json.Unmarshal(hit.Source, &res); err != nil { return 0, nil, nil, err @@ -349,7 +241,7 @@ func convertResult(searchResult *elastic.SearchResult, kw string, pageSize int) language := res["language"].(string) - hits = append(hits, &SearchResult{ + hits = append(hits, &internal.SearchResult{ RepoID: repoID, Filename: fileName, CommitID: res["commit_id"].(string), @@ -365,14 +257,14 @@ func convertResult(searchResult *elastic.SearchResult, kw string, pageSize int) return searchResult.TotalHits(), hits, extractAggs(searchResult), nil } -func extractAggs(searchResult *elastic.SearchResult) []*SearchResultLanguages { - var searchResultLanguages []*SearchResultLanguages +func extractAggs(searchResult *elastic.SearchResult) []*internal.SearchResultLanguages { + var searchResultLanguages []*internal.SearchResultLanguages agg, found := searchResult.Aggregations.Terms("language") if found { - searchResultLanguages = make([]*SearchResultLanguages, 0, 10) + searchResultLanguages = make([]*internal.SearchResultLanguages, 0, 10) for _, bucket := range agg.Buckets { - searchResultLanguages = append(searchResultLanguages, &SearchResultLanguages{ + searchResultLanguages = append(searchResultLanguages, &internal.SearchResultLanguages{ Language: bucket.Key.(string), Color: enry.GetColor(bucket.Key.(string)), Count: int(bucket.DocCount), @@ -383,7 +275,7 @@ func extractAggs(searchResult *elastic.SearchResult) []*SearchResultLanguages { } // Search searches for codes and language stats by given conditions. -func (b *ElasticSearchIndexer) Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) { +func (b *Indexer) Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*internal.SearchResult, []*internal.SearchResultLanguages, error) { searchType := esMultiMatchTypeBestFields if isMatch { searchType = esMultiMatchTypePhrasePrefix @@ -412,8 +304,8 @@ func (b *ElasticSearchIndexer) Search(ctx context.Context, repoIDs []int64, lang } if len(language) == 0 { - searchResult, err := b.client.Search(). - Index(b.indexerAliasName). + searchResult, err := b.inner.Client.Search(). + Index(b.inner.VersionedIndexName()). Aggregation("language", aggregation). Query(query). Highlight( @@ -426,26 +318,26 @@ func (b *ElasticSearchIndexer) Search(ctx context.Context, repoIDs []int64, lang From(start).Size(pageSize). Do(ctx) if err != nil { - return 0, nil, nil, b.checkError(err) + return 0, nil, nil, err } return convertResult(searchResult, kw, pageSize) } langQuery := elastic.NewMatchQuery("language", language) - countResult, err := b.client.Search(). - Index(b.indexerAliasName). + countResult, err := b.inner.Client.Search(). + Index(b.inner.VersionedIndexName()). Aggregation("language", aggregation). Query(query). - Size(0). // We only needs stats information + Size(0). // We only need stats information Do(ctx) if err != nil { - return 0, nil, nil, b.checkError(err) + return 0, nil, nil, err } query = query.Must(langQuery) - searchResult, err := b.client.Search(). - Index(b.indexerAliasName). + searchResult, err := b.inner.Client.Search(). + Index(b.inner.VersionedIndexName()). Query(query). Highlight( elastic.NewHighlight(). @@ -457,56 +349,10 @@ func (b *ElasticSearchIndexer) Search(ctx context.Context, repoIDs []int64, lang From(start).Size(pageSize). Do(ctx) if err != nil { - return 0, nil, nil, b.checkError(err) + return 0, nil, nil, err } total, hits, _, err := convertResult(searchResult, kw, pageSize) return total, hits, extractAggs(countResult), err } - -// Close implements indexer -func (b *ElasticSearchIndexer) Close() { - select { - case <-b.stopTimer: - default: - close(b.stopTimer) - } -} - -func (b *ElasticSearchIndexer) checkError(err error) error { - var opErr *net.OpError - if !(elastic.IsConnErr(err) || (errors.As(err, &opErr) && (opErr.Op == "dial" || opErr.Op == "read"))) { - return err - } - - b.setAvailability(false) - - return err -} - -func (b *ElasticSearchIndexer) checkAvailability() { - if b.Ping() { - return - } - - // Request cluster state to check if elastic is available again - _, err := b.client.ClusterState().Do(graceful.GetManager().ShutdownContext()) - if err != nil { - b.setAvailability(false) - return - } - - b.setAvailability(true) -} - -func (b *ElasticSearchIndexer) setAvailability(available bool) { - b.lock.Lock() - defer b.lock.Unlock() - - if b.available == available { - return - } - - b.available = available -} diff --git a/modules/indexer/code/elasticsearch/elasticsearch_test.go b/modules/indexer/code/elasticsearch/elasticsearch_test.go new file mode 100644 index 0000000000..c6ba93e76d --- /dev/null +++ b/modules/indexer/code/elasticsearch/elasticsearch_test.go @@ -0,0 +1,16 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package elasticsearch + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestIndexPos(t *testing.T) { + startIdx, endIdx := indexPos("test index start and end", "start", "end") + assert.EqualValues(t, 11, startIdx) + assert.EqualValues(t, 24, endIdx) +} diff --git a/modules/indexer/code/git.go b/modules/indexer/code/git.go index bbcc6ba487..1ba6b849d1 100644 --- a/modules/indexer/code/git.go +++ b/modules/indexer/code/git.go @@ -10,23 +10,11 @@ import ( repo_model "code.gitea.io/gitea/models/repo" "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/indexer/code/internal" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" ) -type fileUpdate struct { - Filename string - BlobSha string - Size int64 - Sized bool -} - -// repoChanges changes (file additions/updates/removals) to a repo -type repoChanges struct { - Updates []fileUpdate - RemovedFilenames []string -} - func getDefaultBranchSha(ctx context.Context, repo *repo_model.Repository) (string, error) { stdout, _, err := git.NewCommand(ctx, "show-ref", "-s").AddDynamicArguments(git.BranchPrefix + repo.DefaultBranch).RunStdString(&git.RunOpts{Dir: repo.RepoPath()}) if err != nil { @@ -36,7 +24,7 @@ func getDefaultBranchSha(ctx context.Context, repo *repo_model.Repository) (stri } // getRepoChanges returns changes to repo since last indexer update -func getRepoChanges(ctx context.Context, repo *repo_model.Repository, revision string) (*repoChanges, error) { +func getRepoChanges(ctx context.Context, repo *repo_model.Repository, revision string) (*internal.RepoChanges, error) { status, err := repo_model.GetIndexerStatus(ctx, repo, repo_model.RepoIndexerTypeCode) if err != nil { return nil, err @@ -67,16 +55,16 @@ func isIndexable(entry *git.TreeEntry) bool { } // parseGitLsTreeOutput parses the output of a `git ls-tree -r --full-name` command -func parseGitLsTreeOutput(stdout []byte) ([]fileUpdate, error) { +func parseGitLsTreeOutput(stdout []byte) ([]internal.FileUpdate, error) { entries, err := git.ParseTreeEntries(stdout) if err != nil { return nil, err } idxCount := 0 - updates := make([]fileUpdate, len(entries)) + updates := make([]internal.FileUpdate, len(entries)) for _, entry := range entries { if isIndexable(entry) { - updates[idxCount] = fileUpdate{ + updates[idxCount] = internal.FileUpdate{ Filename: entry.Name(), BlobSha: entry.ID.String(), Size: entry.Size(), @@ -89,8 +77,8 @@ func parseGitLsTreeOutput(stdout []byte) ([]fileUpdate, error) { } // genesisChanges get changes to add repo to the indexer for the first time -func genesisChanges(ctx context.Context, repo *repo_model.Repository, revision string) (*repoChanges, error) { - var changes repoChanges +func genesisChanges(ctx context.Context, repo *repo_model.Repository, revision string) (*internal.RepoChanges, error) { + var changes internal.RepoChanges stdout, _, runErr := git.NewCommand(ctx, "ls-tree", "--full-tree", "-l", "-r").AddDynamicArguments(revision).RunStdBytes(&git.RunOpts{Dir: repo.RepoPath()}) if runErr != nil { return nil, runErr @@ -102,20 +90,20 @@ func genesisChanges(ctx context.Context, repo *repo_model.Repository, revision s } // nonGenesisChanges get changes since the previous indexer update -func nonGenesisChanges(ctx context.Context, repo *repo_model.Repository, revision string) (*repoChanges, error) { +func nonGenesisChanges(ctx context.Context, repo *repo_model.Repository, revision string) (*internal.RepoChanges, error) { diffCmd := git.NewCommand(ctx, "diff", "--name-status").AddDynamicArguments(repo.CodeIndexerStatus.CommitSha, revision) stdout, _, runErr := diffCmd.RunStdString(&git.RunOpts{Dir: repo.RepoPath()}) if runErr != nil { // previous commit sha may have been removed by a force push, so // try rebuilding from scratch log.Warn("git diff: %v", runErr) - if err := indexer.Delete(repo.ID); err != nil { + if err := (*globalIndexer.Load()).Delete(ctx, repo.ID); err != nil { return nil, err } return genesisChanges(ctx, repo, revision) } - var changes repoChanges + var changes internal.RepoChanges var err error updatedFilenames := make([]string, 0, 10) for _, line := range strings.Split(stdout, "\n") { diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go index f38fd6000c..13d06874c9 100644 --- a/modules/indexer/code/indexer.go +++ b/modules/indexer/code/indexer.go @@ -7,86 +7,41 @@ import ( "context" "os" "runtime/pprof" - "strconv" - "strings" + "sync/atomic" "time" "code.gitea.io/gitea/models/db" repo_model "code.gitea.io/gitea/models/repo" "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/indexer/code/bleve" + "code.gitea.io/gitea/modules/indexer/code/elasticsearch" + "code.gitea.io/gitea/modules/indexer/code/internal" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/process" "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" - "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/util" ) -// SearchResult result of performing a search in a repo -type SearchResult struct { - RepoID int64 - StartIndex int - EndIndex int - Filename string - Content string - CommitID string - UpdatedUnix timeutil.TimeStamp - Language string - Color string -} - -// SearchResultLanguages result of top languages count in search results -type SearchResultLanguages struct { - Language string - Color string - Count int -} - -// Indexer defines an interface to index and search code contents -type Indexer interface { - Ping() bool - Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *repoChanges) error - Delete(repoID int64) error - Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) - Close() -} - -func filenameIndexerID(repoID int64, filename string) string { - return indexerID(repoID) + "_" + filename -} - -func indexerID(id int64) string { - return strconv.FormatInt(id, 36) -} - -func parseIndexerID(indexerID string) (int64, string) { - index := strings.IndexByte(indexerID, '_') - if index == -1 { - log.Error("Unexpected ID in repo indexer: %s", indexerID) - } - repoID, _ := strconv.ParseInt(indexerID[:index], 36, 64) - return repoID, indexerID[index+1:] -} - -func filenameOfIndexerID(indexerID string) string { - index := strings.IndexByte(indexerID, '_') - if index == -1 { - log.Error("Unexpected ID in repo indexer: %s", indexerID) - } - return indexerID[index+1:] -} +var ( + indexerQueue *queue.WorkerPoolQueue[*internal.IndexerData] + // globalIndexer is the global indexer, it cannot be nil. + // When the real indexer is not ready, it will be a dummy indexer which will return error to explain it's not ready. + // So it's always safe use it as *globalIndexer.Load() and call its methods. + globalIndexer atomic.Pointer[internal.Indexer] + dummyIndexer *internal.Indexer +) -// IndexerData represents data stored in the code indexer -type IndexerData struct { - RepoID int64 +func init() { + i := internal.NewDummyIndexer() + dummyIndexer = &i + globalIndexer.Store(dummyIndexer) } -var indexerQueue *queue.WorkerPoolQueue[*IndexerData] - -func index(ctx context.Context, indexer Indexer, repoID int64) error { +func index(ctx context.Context, indexer internal.Indexer, repoID int64) error { repo, err := repo_model.GetRepositoryByID(ctx, repoID) if repo_model.IsErrRepoNotExist(err) { - return indexer.Delete(repoID) + return indexer.Delete(ctx, repoID) } if err != nil { return err @@ -139,7 +94,7 @@ func index(ctx context.Context, indexer Indexer, repoID int64) error { // Init initialize the repo indexer func Init() { if !setting.Indexer.RepoIndexerEnabled { - indexer.Close() + (*globalIndexer.Load()).Close() return } @@ -153,7 +108,7 @@ func Init() { } cancel() log.Debug("Closing repository indexer") - indexer.Close() + (*globalIndexer.Load()).Close() log.Info("PID: %d Repository Indexer closed", os.Getpid()) finished() }) @@ -163,13 +118,8 @@ func Init() { // Create the Queue switch setting.Indexer.RepoType { case "bleve", "elasticsearch": - handler := func(items ...*IndexerData) (unhandled []*IndexerData) { - idx, err := indexer.get() - if idx == nil || err != nil { - log.Warn("Codes indexer handler: indexer is not ready, retry later.") - return items - } - + handler := func(items ...*internal.IndexerData) (unhandled []*internal.IndexerData) { + indexer := *globalIndexer.Load() for _, indexerData := range items { log.Trace("IndexerData Process Repo: %d", indexerData.RepoID) @@ -188,11 +138,7 @@ func Init() { code.gitea.io/gitea/modules/indexer/code.index(indexer.go:105) */ if err := index(ctx, indexer, indexerData.RepoID); err != nil { - if !idx.Ping() { - log.Error("Code indexer handler: indexer is unavailable.") - unhandled = append(unhandled, indexerData) - continue - } + unhandled = append(unhandled, indexerData) if !setting.IsInTesting { log.Error("Codes indexer handler: index error for repo %v: %v", indexerData.RepoID, err) } @@ -213,8 +159,8 @@ func Init() { pprof.SetGoroutineLabels(ctx) start := time.Now() var ( - rIndexer Indexer - populate bool + rIndexer internal.Indexer + existed bool err error ) switch setting.Indexer.RepoType { @@ -228,10 +174,11 @@ func Init() { } }() - rIndexer, populate, err = NewBleveIndexer(setting.Indexer.RepoPath) + rIndexer = bleve.NewIndexer(setting.Indexer.RepoPath) + existed, err = rIndexer.Init(ctx) if err != nil { cancel() - indexer.Close() + (*globalIndexer.Load()).Close() close(waitChannel) log.Fatal("PID: %d Unable to initialize the bleve Repository Indexer at path: %s Error: %v", os.Getpid(), setting.Indexer.RepoPath, err) } @@ -245,23 +192,31 @@ func Init() { } }() - rIndexer, populate, err = NewElasticSearchIndexer(setting.Indexer.RepoConnStr, setting.Indexer.RepoIndexerName) + rIndexer = elasticsearch.NewIndexer(setting.Indexer.RepoConnStr, setting.Indexer.RepoIndexerName) + if err != nil { + cancel() + (*globalIndexer.Load()).Close() + close(waitChannel) + log.Fatal("PID: %d Unable to create the elasticsearch Repository Indexer connstr: %s Error: %v", os.Getpid(), setting.Indexer.RepoConnStr, err) + } + existed, err = rIndexer.Init(ctx) if err != nil { cancel() - indexer.Close() + (*globalIndexer.Load()).Close() close(waitChannel) log.Fatal("PID: %d Unable to initialize the elasticsearch Repository Indexer connstr: %s Error: %v", os.Getpid(), setting.Indexer.RepoConnStr, err) } + default: log.Fatal("PID: %d Unknown Indexer type: %s", os.Getpid(), setting.Indexer.RepoType) } - indexer.set(rIndexer) + globalIndexer.Store(&rIndexer) // Start processing the queue go graceful.GetManager().RunWithCancel(indexerQueue) - if populate { + if !existed { // populate the index because it's created for the first time go graceful.GetManager().RunWithShutdownContext(populateRepoIndexer) } select { @@ -283,18 +238,18 @@ func Init() { case <-graceful.GetManager().IsShutdown(): log.Warn("Shutdown before Repository Indexer completed initialization") cancel() - indexer.Close() + (*globalIndexer.Load()).Close() case duration, ok := <-waitChannel: if !ok { log.Warn("Repository Indexer Initialization failed") cancel() - indexer.Close() + (*globalIndexer.Load()).Close() return } log.Info("Repository Indexer Initialization took %v", duration) case <-time.After(timeout): cancel() - indexer.Close() + (*globalIndexer.Load()).Close() log.Fatal("Repository Indexer Initialization Timed-Out after: %v", timeout) } }() @@ -303,21 +258,15 @@ func Init() { // UpdateRepoIndexer update a repository's entries in the indexer func UpdateRepoIndexer(repo *repo_model.Repository) { - indexData := &IndexerData{RepoID: repo.ID} + indexData := &internal.IndexerData{RepoID: repo.ID} if err := indexerQueue.Push(indexData); err != nil { log.Error("Update repo index data %v failed: %v", indexData, err) } } // IsAvailable checks if issue indexer is available -func IsAvailable() bool { - idx, err := indexer.get() - if err != nil { - log.Error("IsAvailable(): unable to get indexer: %v", err) - return false - } - - return idx.Ping() +func IsAvailable(ctx context.Context) bool { + return (*globalIndexer.Load()).Ping(ctx) == nil } // populateRepoIndexer populate the repo indexer with pre-existing data. This @@ -368,7 +317,7 @@ func populateRepoIndexer(ctx context.Context) { return default: } - if err := indexerQueue.Push(&IndexerData{RepoID: id}); err != nil { + if err := indexerQueue.Push(&internal.IndexerData{RepoID: id}); err != nil { log.Error("indexerQueue.Push: %v", err) return } diff --git a/modules/indexer/code/indexer_test.go b/modules/indexer/code/indexer_test.go index 52f7e76e41..55616a0361 100644 --- a/modules/indexer/code/indexer_test.go +++ b/modules/indexer/code/indexer_test.go @@ -5,11 +5,15 @@ package code import ( "context" + "os" "path/filepath" "testing" "code.gitea.io/gitea/models/unittest" "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/indexer/code/bleve" + "code.gitea.io/gitea/modules/indexer/code/elasticsearch" + "code.gitea.io/gitea/modules/indexer/code/internal" _ "code.gitea.io/gitea/models" @@ -22,7 +26,7 @@ func TestMain(m *testing.M) { }) } -func testIndexer(name string, t *testing.T, indexer Indexer) { +func testIndexer(name string, t *testing.T, indexer internal.Indexer) { t.Run(name, func(t *testing.T) { var repoID int64 = 1 err := index(git.DefaultContext, indexer, repoID) @@ -81,6 +85,48 @@ func testIndexer(name string, t *testing.T, indexer Indexer) { }) } - assert.NoError(t, indexer.Delete(repoID)) + assert.NoError(t, indexer.Delete(context.Background(), repoID)) }) } + +func TestBleveIndexAndSearch(t *testing.T) { + unittest.PrepareTestEnv(t) + + dir := t.TempDir() + + idx := bleve.NewIndexer(dir) + _, err := idx.Init(context.Background()) + if err != nil { + assert.Fail(t, "Unable to create bleve indexer Error: %v", err) + if idx != nil { + idx.Close() + } + return + } + defer idx.Close() + + testIndexer("beleve", t, idx) +} + +func TestESIndexAndSearch(t *testing.T) { + unittest.PrepareTestEnv(t) + + u := os.Getenv("TEST_INDEXER_CODE_ES_URL") + if u == "" { + t.SkipNow() + return + } + + indexer := elasticsearch.NewIndexer(u, "gitea_codes") + if _, err := indexer.Init(context.Background()); err != nil { + assert.Fail(t, "Unable to init ES indexer Error: %v", err) + if indexer != nil { + indexer.Close() + } + return + } + + defer indexer.Close() + + testIndexer("elastic_search", t, indexer) +} diff --git a/modules/indexer/code/internal/indexer.go b/modules/indexer/code/internal/indexer.go new file mode 100644 index 0000000000..da3ac3623c --- /dev/null +++ b/modules/indexer/code/internal/indexer.go @@ -0,0 +1,43 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package internal + +import ( + "context" + "fmt" + + repo_model "code.gitea.io/gitea/models/repo" + "code.gitea.io/gitea/modules/indexer/internal" +) + +// Indexer defines an interface to index and search code contents +type Indexer interface { + internal.Indexer + Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *RepoChanges) error + Delete(ctx context.Context, repoID int64) error + Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) +} + +// NewDummyIndexer returns a dummy indexer +func NewDummyIndexer() Indexer { + return &dummyIndexer{ + Indexer: internal.NewDummyIndexer(), + } +} + +type dummyIndexer struct { + internal.Indexer +} + +func (d *dummyIndexer) Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *RepoChanges) error { + return fmt.Errorf("indexer is not ready") +} + +func (d *dummyIndexer) Delete(ctx context.Context, repoID int64) error { + return fmt.Errorf("indexer is not ready") +} + +func (d *dummyIndexer) Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) { + return 0, nil, nil, fmt.Errorf("indexer is not ready") +} diff --git a/modules/indexer/code/internal/model.go b/modules/indexer/code/internal/model.go new file mode 100644 index 0000000000..f75263c83c --- /dev/null +++ b/modules/indexer/code/internal/model.go @@ -0,0 +1,44 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package internal + +import "code.gitea.io/gitea/modules/timeutil" + +type FileUpdate struct { + Filename string + BlobSha string + Size int64 + Sized bool +} + +// RepoChanges changes (file additions/updates/removals) to a repo +type RepoChanges struct { + Updates []FileUpdate + RemovedFilenames []string +} + +// IndexerData represents data stored in the code indexer +type IndexerData struct { + RepoID int64 +} + +// SearchResult result of performing a search in a repo +type SearchResult struct { + RepoID int64 + StartIndex int + EndIndex int + Filename string + Content string + CommitID string + UpdatedUnix timeutil.TimeStamp + Language string + Color string +} + +// SearchResultLanguages result of top languages count in search results +type SearchResultLanguages struct { + Language string + Color string + Count int +} diff --git a/modules/indexer/code/internal/util.go b/modules/indexer/code/internal/util.go new file mode 100644 index 0000000000..689c4f4584 --- /dev/null +++ b/modules/indexer/code/internal/util.go @@ -0,0 +1,32 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package internal + +import ( + "strings" + + "code.gitea.io/gitea/modules/indexer/internal" + "code.gitea.io/gitea/modules/log" +) + +func FilenameIndexerID(repoID int64, filename string) string { + return internal.Base36(repoID) + "_" + filename +} + +func ParseIndexerID(indexerID string) (int64, string) { + index := strings.IndexByte(indexerID, '_') + if index == -1 { + log.Error("Unexpected ID in repo indexer: %s", indexerID) + } + repoID, _ := internal.ParseBase36(indexerID[:index]) + return repoID, indexerID[index+1:] +} + +func FilenameOfIndexerID(indexerID string) string { + index := strings.IndexByte(indexerID, '_') + if index == -1 { + log.Error("Unexpected ID in repo indexer: %s", indexerID) + } + return indexerID[index+1:] +} diff --git a/modules/indexer/code/search.go b/modules/indexer/code/search.go index 1de9ffc224..1f9bddff7b 100644 --- a/modules/indexer/code/search.go +++ b/modules/indexer/code/search.go @@ -9,6 +9,7 @@ import ( "strings" "code.gitea.io/gitea/modules/highlight" + "code.gitea.io/gitea/modules/indexer/code/internal" "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/util" ) @@ -25,6 +26,8 @@ type Result struct { FormattedLines string } +type SearchResultLanguages = internal.SearchResultLanguages + func indices(content string, selectionStartIndex, selectionEndIndex int) (int, int) { startIndex := selectionStartIndex numLinesBefore := 0 @@ -61,7 +64,7 @@ func writeStrings(buf *bytes.Buffer, strs ...string) error { return nil } -func searchResult(result *SearchResult, startIndex, endIndex int) (*Result, error) { +func searchResult(result *internal.SearchResult, startIndex, endIndex int) (*Result, error) { startLineNum := 1 + strings.Count(result.Content[:startIndex], "\n") var formattedLinesBuffer bytes.Buffer @@ -109,12 +112,12 @@ func searchResult(result *SearchResult, startIndex, endIndex int) (*Result, erro } // PerformSearch perform a search on a repository -func PerformSearch(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int, []*Result, []*SearchResultLanguages, error) { +func PerformSearch(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int, []*Result, []*internal.SearchResultLanguages, error) { if len(keyword) == 0 { return 0, nil, nil, nil } - total, results, resultLanguages, err := indexer.Search(ctx, repoIDs, language, keyword, page, pageSize, isMatch) + total, results, resultLanguages, err := (*globalIndexer.Load()).Search(ctx, repoIDs, language, keyword, page, pageSize, isMatch) if err != nil { return 0, nil, nil, err } diff --git a/modules/indexer/code/wrapped.go b/modules/indexer/code/wrapped.go deleted file mode 100644 index 7eed3e8557..0000000000 --- a/modules/indexer/code/wrapped.go +++ /dev/null @@ -1,104 +0,0 @@ -// Copyright 2019 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package code - -import ( - "context" - "fmt" - "sync" - - repo_model "code.gitea.io/gitea/models/repo" - "code.gitea.io/gitea/modules/log" -) - -var indexer = newWrappedIndexer() - -// ErrWrappedIndexerClosed is the error returned if the indexer was closed before it was ready -var ErrWrappedIndexerClosed = fmt.Errorf("Indexer closed before ready") - -type wrappedIndexer struct { - internal Indexer - lock sync.RWMutex - cond *sync.Cond - closed bool -} - -func newWrappedIndexer() *wrappedIndexer { - w := &wrappedIndexer{} - w.cond = sync.NewCond(w.lock.RLocker()) - return w -} - -func (w *wrappedIndexer) set(indexer Indexer) { - w.lock.Lock() - defer w.lock.Unlock() - if w.closed { - // Too late! - indexer.Close() - } - w.internal = indexer - w.cond.Broadcast() -} - -func (w *wrappedIndexer) get() (Indexer, error) { - w.lock.RLock() - defer w.lock.RUnlock() - if w.internal == nil { - if w.closed { - return nil, ErrWrappedIndexerClosed - } - w.cond.Wait() - if w.closed { - return nil, ErrWrappedIndexerClosed - } - } - return w.internal, nil -} - -// Ping checks if elastic is available -func (w *wrappedIndexer) Ping() bool { - indexer, err := w.get() - if err != nil { - log.Warn("Failed to get indexer: %v", err) - return false - } - return indexer.Ping() -} - -func (w *wrappedIndexer) Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *repoChanges) error { - indexer, err := w.get() - if err != nil { - return err - } - return indexer.Index(ctx, repo, sha, changes) -} - -func (w *wrappedIndexer) Delete(repoID int64) error { - indexer, err := w.get() - if err != nil { - return err - } - return indexer.Delete(repoID) -} - -func (w *wrappedIndexer) Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) { - indexer, err := w.get() - if err != nil { - return 0, nil, nil, err - } - return indexer.Search(ctx, repoIDs, language, keyword, page, pageSize, isMatch) -} - -func (w *wrappedIndexer) Close() { - w.lock.Lock() - defer w.lock.Unlock() - if w.closed { - return - } - w.closed = true - w.cond.Broadcast() - if w.internal != nil { - w.internal.Close() - } -} |