diff options
author | Jason Song <i@wolfogre.com> | 2023-06-23 20:37:56 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-06-23 12:37:56 +0000 |
commit | 375fd15fbfc29787323840688bacdfa0a9e9e414 (patch) | |
tree | 69c82fdad0fad24e25d2b6ddd2e5cb7766b9768b /modules | |
parent | b0215c40cdf9a3e46a45a3823b894998d1044cda (diff) | |
download | gitea-375fd15fbfc29787323840688bacdfa0a9e9e414.tar.gz gitea-375fd15fbfc29787323840688bacdfa0a9e9e414.zip |
Refactor indexer (#25174)
Refactor `modules/indexer` to make it more maintainable. And it can be
easier to support more features. I'm trying to solve some of issue
searching, this is a precursor to making functional changes.
Current supported engines and the index versions:
| engines | issues | code |
| - | - | - |
| db | Just a wrapper for database queries, doesn't need version | - |
| bleve | The version of index is **2** | The version of index is **6**
|
| elasticsearch | The old index has no version, will be treated as
version **0** in this PR | The version of index is **1** |
| meilisearch | The old index has no version, will be treated as version
**0** in this PR | - |
## Changes
### Split
Splited it into mutiple packages
```text
indexer
├── internal
│ ├── bleve
│ ├── db
│ ├── elasticsearch
│ └── meilisearch
├── code
│ ├── bleve
│ ├── elasticsearch
│ └── internal
└── issues
├── bleve
├── db
├── elasticsearch
├── internal
└── meilisearch
```
- `indexer/interanal`: Internal shared package for indexer.
- `indexer/interanal/[engine]`: Internal shared package for each engine
(bleve/db/elasticsearch/meilisearch).
- `indexer/code`: Implementations for code indexer.
- `indexer/code/internal`: Internal shared package for code indexer.
- `indexer/code/[engine]`: Implementation via each engine for code
indexer.
- `indexer/issues`: Implementations for issues indexer.
### Deduplication
- Combine `Init/Ping/Close` for code indexer and issues indexer.
- ~Combine `issues.indexerHolder` and `code.wrappedIndexer` to
`internal.IndexHolder`.~ Remove it, use dummy indexer instead when the
indexer is not ready.
- Duplicate two copies of creating ES clients.
- Duplicate two copies of `indexerID()`.
### Enhancement
- [x] Support index version for elasticsearch issues indexer, the old
index without version will be treated as version 0.
- [x] Fix spell of `elastic_search/ElasticSearch`, it should be
`Elasticsearch`.
- [x] Improve versioning of ES index. We don't need `Aliases`:
- Gitea does't need aliases for "Zero Downtime" because it never delete
old indexes.
- The old code of issues indexer uses the orignal name to create issue
index, so it's tricky to convert it to an alias.
- [x] Support index version for meilisearch issues indexer, the old
index without version will be treated as version 0.
- [x] Do "ping" only when `Ping` has been called, don't ping
periodically and cache the status.
- [x] Support the context parameter whenever possible.
- [x] Fix outdated example config.
- [x] Give up the requeue logic of issues indexer: When indexing fails,
call Ping to check if it was caused by the engine being unavailable, and
only requeue the task if the engine is unavailable.
- It is fragile and tricky, could cause data losing (It did happen when
I was doing some tests for this PR). And it works for ES only.
- Just always requeue the failed task, if it caused by bad data, it's a
bug of Gitea which should be fixed.
---------
Co-authored-by: Giteabot <teabot@gitea.io>
Diffstat (limited to 'modules')
37 files changed, 1360 insertions, 1412 deletions
diff --git a/modules/context/repo.go b/modules/context/repo.go index fd5f208576..003309f1b0 100644 --- a/modules/context/repo.go +++ b/modules/context/repo.go @@ -593,7 +593,7 @@ func RepoAssignment(ctx *Context) (cancel context.CancelFunc) { ctx.Data["RepoSearchEnabled"] = setting.Indexer.RepoIndexerEnabled if setting.Indexer.RepoIndexerEnabled { - ctx.Data["CodeIndexerUnavailable"] = !code_indexer.IsAvailable() + ctx.Data["CodeIndexerUnavailable"] = !code_indexer.IsAvailable(ctx) } if ctx.IsSigned { diff --git a/modules/indexer/code/bleve.go b/modules/indexer/code/bleve/bleve.go index 5936613e3a..33cc4e02b5 100644 --- a/modules/indexer/code/bleve.go +++ b/modules/indexer/code/bleve/bleve.go @@ -1,14 +1,13 @@ // Copyright 2019 The Gitea Authors. All rights reserved. // SPDX-License-Identifier: MIT -package code +package bleve import ( "bufio" "context" "fmt" "io" - "os" "strconv" "strings" "time" @@ -17,12 +16,13 @@ import ( "code.gitea.io/gitea/modules/analyze" "code.gitea.io/gitea/modules/charset" "code.gitea.io/gitea/modules/git" - gitea_bleve "code.gitea.io/gitea/modules/indexer/bleve" + "code.gitea.io/gitea/modules/indexer/code/internal" + indexer_internal "code.gitea.io/gitea/modules/indexer/internal" + inner_bleve "code.gitea.io/gitea/modules/indexer/internal/bleve" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/typesniffer" - "code.gitea.io/gitea/modules/util" "github.com/blevesearch/bleve/v2" analyzer_custom "github.com/blevesearch/bleve/v2/analysis/analyzer/custom" @@ -31,10 +31,8 @@ import ( "github.com/blevesearch/bleve/v2/analysis/token/lowercase" "github.com/blevesearch/bleve/v2/analysis/token/unicodenorm" "github.com/blevesearch/bleve/v2/analysis/tokenizer/unicode" - "github.com/blevesearch/bleve/v2/index/upsidedown" "github.com/blevesearch/bleve/v2/mapping" "github.com/blevesearch/bleve/v2/search/query" - "github.com/ethantkoenig/rupture" "github.com/go-enry/go-enry/v2" ) @@ -59,38 +57,6 @@ func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error { }) } -// openBleveIndexer open the index at the specified path, checking for metadata -// updates and bleve version updates. If index needs to be created (or -// re-created), returns (nil, nil) -func openBleveIndexer(path string, latestVersion int) (bleve.Index, error) { - _, err := os.Stat(path) - if err != nil && os.IsNotExist(err) { - return nil, nil - } else if err != nil { - return nil, err - } - - metadata, err := rupture.ReadIndexMetadata(path) - if err != nil { - return nil, err - } - if metadata.Version < latestVersion { - // the indexer is using a previous version, so we should delete it and - // re-populate - return nil, util.RemoveAll(path) - } - - index, err := bleve.Open(path) - if err != nil && err == upsidedown.IncompatibleVersion { - // the indexer was built with a previous version of bleve, so we should - // delete it and re-populate - return nil, util.RemoveAll(path) - } else if err != nil { - return nil, err - } - return index, nil -} - // RepoIndexerData data stored in the repo indexer type RepoIndexerData struct { RepoID int64 @@ -111,8 +77,8 @@ const ( repoIndexerLatestVersion = 6 ) -// createBleveIndexer create a bleve repo indexer if one does not already exist -func createBleveIndexer(path string, latestVersion int) (bleve.Index, error) { +// generateBleveIndexMapping generates a bleve index mapping for the repo indexer +func generateBleveIndexMapping() (mapping.IndexMapping, error) { docMapping := bleve.NewDocumentMapping() numericFieldMapping := bleve.NewNumericFieldMapping() numericFieldMapping.IncludeInAll = false @@ -147,42 +113,28 @@ func createBleveIndexer(path string, latestVersion int) (bleve.Index, error) { mapping.AddDocumentMapping(repoIndexerDocType, docMapping) mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping()) - indexer, err := bleve.New(path, mapping) - if err != nil { - return nil, err - } - - if err = rupture.WriteIndexMetadata(path, &rupture.IndexMetadata{ - Version: latestVersion, - }); err != nil { - return nil, err - } - return indexer, nil + return mapping, nil } -var _ Indexer = &BleveIndexer{} +var _ internal.Indexer = &Indexer{} -// BleveIndexer represents a bleve indexer implementation -type BleveIndexer struct { - indexDir string - indexer bleve.Index +// Indexer represents a bleve indexer implementation +type Indexer struct { + inner *inner_bleve.Indexer + indexer_internal.Indexer // do not composite inner_bleve.Indexer directly to avoid exposing too much } -// NewBleveIndexer creates a new bleve local indexer -func NewBleveIndexer(indexDir string) (*BleveIndexer, bool, error) { - indexer := &BleveIndexer{ - indexDir: indexDir, +// NewIndexer creates a new bleve local indexer +func NewIndexer(indexDir string) *Indexer { + inner := inner_bleve.NewIndexer(indexDir, repoIndexerLatestVersion, generateBleveIndexMapping) + return &Indexer{ + Indexer: inner, + inner: inner, } - created, err := indexer.init() - if err != nil { - indexer.Close() - return nil, false, err - } - return indexer, created, err } -func (b *BleveIndexer) addUpdate(ctx context.Context, batchWriter git.WriteCloserError, batchReader *bufio.Reader, commitSha string, - update fileUpdate, repo *repo_model.Repository, batch *gitea_bleve.FlushingBatch, +func (b *Indexer) addUpdate(ctx context.Context, batchWriter git.WriteCloserError, batchReader *bufio.Reader, commitSha string, + update internal.FileUpdate, repo *repo_model.Repository, batch *inner_bleve.FlushingBatch, ) error { // Ignore vendored files in code search if setting.Indexer.ExcludeVendored && analyze.IsVendor(update.Filename) { @@ -227,7 +179,7 @@ func (b *BleveIndexer) addUpdate(ctx context.Context, batchWriter git.WriteClose if _, err = batchReader.Discard(1); err != nil { return err } - id := filenameIndexerID(repo.ID, update.Filename) + id := internal.FilenameIndexerID(repo.ID, update.Filename) return batch.Index(id, &RepoIndexerData{ RepoID: repo.ID, CommitID: commitSha, @@ -237,50 +189,14 @@ func (b *BleveIndexer) addUpdate(ctx context.Context, batchWriter git.WriteClose }) } -func (b *BleveIndexer) addDelete(filename string, repo *repo_model.Repository, batch *gitea_bleve.FlushingBatch) error { - id := filenameIndexerID(repo.ID, filename) +func (b *Indexer) addDelete(filename string, repo *repo_model.Repository, batch *inner_bleve.FlushingBatch) error { + id := internal.FilenameIndexerID(repo.ID, filename) return batch.Delete(id) } -// init init the indexer -func (b *BleveIndexer) init() (bool, error) { - var err error - b.indexer, err = openBleveIndexer(b.indexDir, repoIndexerLatestVersion) - if err != nil { - return false, err - } - if b.indexer != nil { - return false, nil - } - - b.indexer, err = createBleveIndexer(b.indexDir, repoIndexerLatestVersion) - if err != nil { - return false, err - } - - return true, nil -} - -// Close close the indexer -func (b *BleveIndexer) Close() { - log.Debug("Closing repo indexer") - if b.indexer != nil { - err := b.indexer.Close() - if err != nil { - log.Error("Error whilst closing the repository indexer: %v", err) - } - } - log.Info("PID: %d Repository Indexer closed", os.Getpid()) -} - -// Ping does nothing -func (b *BleveIndexer) Ping() bool { - return true -} - // Index indexes the data -func (b *BleveIndexer) Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *repoChanges) error { - batch := gitea_bleve.NewFlushingBatch(b.indexer, maxBatchSize) +func (b *Indexer) Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *internal.RepoChanges) error { + batch := inner_bleve.NewFlushingBatch(b.inner.Indexer, maxBatchSize) if len(changes.Updates) > 0 { // Now because of some insanity with git cat-file not immediately failing if not run in a valid git directory we need to run git rev-parse first! @@ -308,14 +224,14 @@ func (b *BleveIndexer) Index(ctx context.Context, repo *repo_model.Repository, s } // Delete deletes indexes by ids -func (b *BleveIndexer) Delete(repoID int64) error { +func (b *Indexer) Delete(_ context.Context, repoID int64) error { query := numericEqualityQuery(repoID, "RepoID") searchRequest := bleve.NewSearchRequestOptions(query, 2147483647, 0, false) - result, err := b.indexer.Search(searchRequest) + result, err := b.inner.Indexer.Search(searchRequest) if err != nil { return err } - batch := gitea_bleve.NewFlushingBatch(b.indexer, maxBatchSize) + batch := inner_bleve.NewFlushingBatch(b.inner.Indexer, maxBatchSize) for _, hit := range result.Hits { if err = batch.Delete(hit.ID); err != nil { return err @@ -326,7 +242,7 @@ func (b *BleveIndexer) Delete(repoID int64) error { // Search searches for files in the specified repo. // Returns the matching file-paths -func (b *BleveIndexer) Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) { +func (b *Indexer) Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*internal.SearchResult, []*internal.SearchResultLanguages, error) { var ( indexerQuery query.Query keywordQuery query.Query @@ -379,14 +295,14 @@ func (b *BleveIndexer) Search(ctx context.Context, repoIDs []int64, language, ke searchRequest.AddFacet("languages", bleve.NewFacetRequest("Language", 10)) } - result, err := b.indexer.SearchInContext(ctx, searchRequest) + result, err := b.inner.Indexer.SearchInContext(ctx, searchRequest) if err != nil { return 0, nil, nil, err } total := int64(result.Total) - searchResults := make([]*SearchResult, len(result.Hits)) + searchResults := make([]*internal.SearchResult, len(result.Hits)) for i, hit := range result.Hits { startIndex, endIndex := -1, -1 for _, locations := range hit.Locations["Content"] { @@ -405,11 +321,11 @@ func (b *BleveIndexer) Search(ctx context.Context, repoIDs []int64, language, ke if t, err := time.Parse(time.RFC3339, hit.Fields["UpdatedAt"].(string)); err == nil { updatedUnix = timeutil.TimeStamp(t.Unix()) } - searchResults[i] = &SearchResult{ + searchResults[i] = &internal.SearchResult{ RepoID: int64(hit.Fields["RepoID"].(float64)), StartIndex: startIndex, EndIndex: endIndex, - Filename: filenameOfIndexerID(hit.ID), + Filename: internal.FilenameOfIndexerID(hit.ID), Content: hit.Fields["Content"].(string), CommitID: hit.Fields["CommitID"].(string), UpdatedUnix: updatedUnix, @@ -418,7 +334,7 @@ func (b *BleveIndexer) Search(ctx context.Context, repoIDs []int64, language, ke } } - searchResultLanguages := make([]*SearchResultLanguages, 0, 10) + searchResultLanguages := make([]*internal.SearchResultLanguages, 0, 10) if len(language) > 0 { // Use separate query to go get all language counts facetRequest := bleve.NewSearchRequestOptions(facetQuery, 1, 0, false) @@ -426,7 +342,7 @@ func (b *BleveIndexer) Search(ctx context.Context, repoIDs []int64, language, ke facetRequest.IncludeLocations = true facetRequest.AddFacet("languages", bleve.NewFacetRequest("Language", 10)) - if result, err = b.indexer.Search(facetRequest); err != nil { + if result, err = b.inner.Indexer.Search(facetRequest); err != nil { return 0, nil, nil, err } @@ -436,7 +352,7 @@ func (b *BleveIndexer) Search(ctx context.Context, repoIDs []int64, language, ke if len(term.Term) == 0 { continue } - searchResultLanguages = append(searchResultLanguages, &SearchResultLanguages{ + searchResultLanguages = append(searchResultLanguages, &internal.SearchResultLanguages{ Language: term.Term, Color: enry.GetColor(term.Term), Count: term.Count, diff --git a/modules/indexer/code/bleve_test.go b/modules/indexer/code/bleve_test.go deleted file mode 100644 index 00bcd5c90c..0000000000 --- a/modules/indexer/code/bleve_test.go +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright 2019 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package code - -import ( - "testing" - - "code.gitea.io/gitea/models/unittest" - - "github.com/stretchr/testify/assert" -) - -func TestBleveIndexAndSearch(t *testing.T) { - unittest.PrepareTestEnv(t) - - dir := t.TempDir() - - idx, _, err := NewBleveIndexer(dir) - if err != nil { - assert.Fail(t, "Unable to create bleve indexer Error: %v", err) - if idx != nil { - idx.Close() - } - return - } - defer idx.Close() - - testIndexer("beleve", t, idx) -} diff --git a/modules/indexer/code/elastic_search_test.go b/modules/indexer/code/elastic_search_test.go deleted file mode 100644 index e7506eefa6..0000000000 --- a/modules/indexer/code/elastic_search_test.go +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright 2020 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package code - -import ( - "os" - "testing" - - "code.gitea.io/gitea/models/unittest" - - "github.com/stretchr/testify/assert" -) - -func TestESIndexAndSearch(t *testing.T) { - unittest.PrepareTestEnv(t) - - u := os.Getenv("TEST_INDEXER_CODE_ES_URL") - if u == "" { - t.SkipNow() - return - } - - indexer, _, err := NewElasticSearchIndexer(u, "gitea_codes") - if err != nil { - assert.Fail(t, "Unable to create ES indexer Error: %v", err) - if indexer != nil { - indexer.Close() - } - return - } - defer indexer.Close() - - testIndexer("elastic_search", t, indexer) -} - -func TestIndexPos(t *testing.T) { - startIdx, endIdx := indexPos("test index start and end", "start", "end") - assert.EqualValues(t, 11, startIdx) - assert.EqualValues(t, 24, endIdx) -} diff --git a/modules/indexer/code/elastic_search.go b/modules/indexer/code/elasticsearch/elasticsearch.go index 0e56a86588..88054585cd 100644 --- a/modules/indexer/code/elastic_search.go +++ b/modules/indexer/code/elasticsearch/elasticsearch.go @@ -1,25 +1,23 @@ // Copyright 2020 The Gitea Authors. All rights reserved. // SPDX-License-Identifier: MIT -package code +package elasticsearch import ( "bufio" "context" - "errors" "fmt" "io" - "net" "strconv" "strings" - "sync" - "time" repo_model "code.gitea.io/gitea/models/repo" "code.gitea.io/gitea/modules/analyze" "code.gitea.io/gitea/modules/charset" "code.gitea.io/gitea/modules/git" - "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/indexer/code/internal" + indexer_internal "code.gitea.io/gitea/modules/indexer/internal" + inner_elasticsearch "code.gitea.io/gitea/modules/indexer/internal/elasticsearch" "code.gitea.io/gitea/modules/json" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" @@ -38,63 +36,22 @@ const ( esMultiMatchTypePhrasePrefix = "phrase_prefix" ) -var _ Indexer = &ElasticSearchIndexer{} +var _ internal.Indexer = &Indexer{} -// ElasticSearchIndexer implements Indexer interface -type ElasticSearchIndexer struct { - client *elastic.Client - indexerAliasName string - available bool - stopTimer chan struct{} - lock sync.RWMutex +// Indexer implements Indexer interface +type Indexer struct { + inner *inner_elasticsearch.Indexer + indexer_internal.Indexer // do not composite inner_elasticsearch.Indexer directly to avoid exposing too much } -// NewElasticSearchIndexer creates a new elasticsearch indexer -func NewElasticSearchIndexer(url, indexerName string) (*ElasticSearchIndexer, bool, error) { - opts := []elastic.ClientOptionFunc{ - elastic.SetURL(url), - elastic.SetSniff(false), - elastic.SetHealthcheckInterval(10 * time.Second), - elastic.SetGzip(false), +// NewIndexer creates a new elasticsearch indexer +func NewIndexer(url, indexerName string) *Indexer { + inner := inner_elasticsearch.NewIndexer(url, indexerName, esRepoIndexerLatestVersion, defaultMapping) + indexer := &Indexer{ + inner: inner, + Indexer: inner, } - - logger := log.GetLogger(log.DEFAULT) - - opts = append(opts, elastic.SetTraceLog(&log.PrintfLogger{Logf: logger.Trace})) - opts = append(opts, elastic.SetInfoLog(&log.PrintfLogger{Logf: logger.Info})) - opts = append(opts, elastic.SetErrorLog(&log.PrintfLogger{Logf: logger.Error})) - - client, err := elastic.NewClient(opts...) - if err != nil { - return nil, false, err - } - - indexer := &ElasticSearchIndexer{ - client: client, - indexerAliasName: indexerName, - available: true, - stopTimer: make(chan struct{}), - } - - ticker := time.NewTicker(10 * time.Second) - go func() { - for { - select { - case <-ticker.C: - indexer.checkAvailability() - case <-indexer.stopTimer: - ticker.Stop() - return - } - } - }() - - exists, err := indexer.init() - if err != nil { - indexer.Close() - return nil, false, err - } - return indexer, !exists, err + return indexer } const ( @@ -127,72 +84,7 @@ const ( }` ) -func (b *ElasticSearchIndexer) realIndexerName() string { - return fmt.Sprintf("%s.v%d", b.indexerAliasName, esRepoIndexerLatestVersion) -} - -// Init will initialize the indexer -func (b *ElasticSearchIndexer) init() (bool, error) { - ctx := graceful.GetManager().HammerContext() - exists, err := b.client.IndexExists(b.realIndexerName()).Do(ctx) - if err != nil { - return false, b.checkError(err) - } - if !exists { - mapping := defaultMapping - - createIndex, err := b.client.CreateIndex(b.realIndexerName()).BodyString(mapping).Do(ctx) - if err != nil { - return false, b.checkError(err) - } - if !createIndex.Acknowledged { - return false, fmt.Errorf("create index %s with %s failed", b.realIndexerName(), mapping) - } - } - - // check version - r, err := b.client.Aliases().Do(ctx) - if err != nil { - return false, b.checkError(err) - } - - realIndexerNames := r.IndicesByAlias(b.indexerAliasName) - if len(realIndexerNames) < 1 { - res, err := b.client.Alias(). - Add(b.realIndexerName(), b.indexerAliasName). - Do(ctx) - if err != nil { - return false, b.checkError(err) - } - if !res.Acknowledged { - return false, fmt.Errorf("create alias %s to index %s failed", b.indexerAliasName, b.realIndexerName()) - } - } else if len(realIndexerNames) >= 1 && realIndexerNames[0] < b.realIndexerName() { - log.Warn("Found older gitea indexer named %s, but we will create a new one %s and keep the old NOT DELETED. You can delete the old version after the upgrade succeed.", - realIndexerNames[0], b.realIndexerName()) - res, err := b.client.Alias(). - Remove(realIndexerNames[0], b.indexerAliasName). - Add(b.realIndexerName(), b.indexerAliasName). - Do(ctx) - if err != nil { - return false, b.checkError(err) - } - if !res.Acknowledged { - return false, fmt.Errorf("change alias %s to index %s failed", b.indexerAliasName, b.realIndexerName()) - } - } - - return exists, nil -} - -// Ping checks if elastic is available -func (b *ElasticSearchIndexer) Ping() bool { - b.lock.RLock() - defer b.lock.RUnlock() - return b.available -} - -func (b *ElasticSearchIndexer) addUpdate(ctx context.Context, batchWriter git.WriteCloserError, batchReader *bufio.Reader, sha string, update fileUpdate, repo *repo_model.Repository) ([]elastic.BulkableRequest, error) { +func (b *Indexer) addUpdate(ctx context.Context, batchWriter git.WriteCloserError, batchReader *bufio.Reader, sha string, update internal.FileUpdate, repo *repo_model.Repository) ([]elastic.BulkableRequest, error) { // Ignore vendored files in code search if setting.Indexer.ExcludeVendored && analyze.IsVendor(update.Filename) { return nil, nil @@ -235,11 +127,11 @@ func (b *ElasticSearchIndexer) addUpdate(ctx context.Context, batchWriter git.Wr if _, err = batchReader.Discard(1); err != nil { return nil, err } - id := filenameIndexerID(repo.ID, update.Filename) + id := internal.FilenameIndexerID(repo.ID, update.Filename) return []elastic.BulkableRequest{ elastic.NewBulkIndexRequest(). - Index(b.indexerAliasName). + Index(b.inner.VersionedIndexName()). Id(id). Doc(map[string]interface{}{ "repo_id": repo.ID, @@ -251,15 +143,15 @@ func (b *ElasticSearchIndexer) addUpdate(ctx context.Context, batchWriter git.Wr }, nil } -func (b *ElasticSearchIndexer) addDelete(filename string, repo *repo_model.Repository) elastic.BulkableRequest { - id := filenameIndexerID(repo.ID, filename) +func (b *Indexer) addDelete(filename string, repo *repo_model.Repository) elastic.BulkableRequest { + id := internal.FilenameIndexerID(repo.ID, filename) return elastic.NewBulkDeleteRequest(). - Index(b.indexerAliasName). + Index(b.inner.VersionedIndexName()). Id(id) } // Index will save the index data -func (b *ElasticSearchIndexer) Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *repoChanges) error { +func (b *Indexer) Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *internal.RepoChanges) error { reqs := make([]elastic.BulkableRequest, 0) if len(changes.Updates) > 0 { // Now because of some insanity with git cat-file not immediately failing if not run in a valid git directory we need to run git rev-parse first! @@ -288,21 +180,21 @@ func (b *ElasticSearchIndexer) Index(ctx context.Context, repo *repo_model.Repos } if len(reqs) > 0 { - _, err := b.client.Bulk(). - Index(b.indexerAliasName). + _, err := b.inner.Client.Bulk(). + Index(b.inner.VersionedIndexName()). Add(reqs...). Do(ctx) - return b.checkError(err) + return err } return nil } // Delete deletes indexes by ids -func (b *ElasticSearchIndexer) Delete(repoID int64) error { - _, err := b.client.DeleteByQuery(b.indexerAliasName). +func (b *Indexer) Delete(ctx context.Context, repoID int64) error { + _, err := b.inner.Client.DeleteByQuery(b.inner.VersionedIndexName()). Query(elastic.NewTermsQuery("repo_id", repoID)). - Do(graceful.GetManager().HammerContext()) - return b.checkError(err) + Do(ctx) + return err } // indexPos find words positions for start and the following end on content. It will @@ -321,8 +213,8 @@ func indexPos(content, start, end string) (int, int) { return startIdx, startIdx + len(start) + endIdx + len(end) } -func convertResult(searchResult *elastic.SearchResult, kw string, pageSize int) (int64, []*SearchResult, []*SearchResultLanguages, error) { - hits := make([]*SearchResult, 0, pageSize) +func convertResult(searchResult *elastic.SearchResult, kw string, pageSize int) (int64, []*internal.SearchResult, []*internal.SearchResultLanguages, error) { + hits := make([]*internal.SearchResult, 0, pageSize) for _, hit := range searchResult.Hits.Hits { // FIXME: There is no way to get the position the keyword on the content currently on the same request. // So we get it from content, this may made the query slower. See @@ -341,7 +233,7 @@ func convertResult(searchResult *elastic.SearchResult, kw string, pageSize int) panic(fmt.Sprintf("2===%#v", hit.Highlight)) } - repoID, fileName := parseIndexerID(hit.Id) + repoID, fileName := internal.ParseIndexerID(hit.Id) res := make(map[string]interface{}) if err := json.Unmarshal(hit.Source, &res); err != nil { return 0, nil, nil, err @@ -349,7 +241,7 @@ func convertResult(searchResult *elastic.SearchResult, kw string, pageSize int) language := res["language"].(string) - hits = append(hits, &SearchResult{ + hits = append(hits, &internal.SearchResult{ RepoID: repoID, Filename: fileName, CommitID: res["commit_id"].(string), @@ -365,14 +257,14 @@ func convertResult(searchResult *elastic.SearchResult, kw string, pageSize int) return searchResult.TotalHits(), hits, extractAggs(searchResult), nil } -func extractAggs(searchResult *elastic.SearchResult) []*SearchResultLanguages { - var searchResultLanguages []*SearchResultLanguages +func extractAggs(searchResult *elastic.SearchResult) []*internal.SearchResultLanguages { + var searchResultLanguages []*internal.SearchResultLanguages agg, found := searchResult.Aggregations.Terms("language") if found { - searchResultLanguages = make([]*SearchResultLanguages, 0, 10) + searchResultLanguages = make([]*internal.SearchResultLanguages, 0, 10) for _, bucket := range agg.Buckets { - searchResultLanguages = append(searchResultLanguages, &SearchResultLanguages{ + searchResultLanguages = append(searchResultLanguages, &internal.SearchResultLanguages{ Language: bucket.Key.(string), Color: enry.GetColor(bucket.Key.(string)), Count: int(bucket.DocCount), @@ -383,7 +275,7 @@ func extractAggs(searchResult *elastic.SearchResult) []*SearchResultLanguages { } // Search searches for codes and language stats by given conditions. -func (b *ElasticSearchIndexer) Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) { +func (b *Indexer) Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*internal.SearchResult, []*internal.SearchResultLanguages, error) { searchType := esMultiMatchTypeBestFields if isMatch { searchType = esMultiMatchTypePhrasePrefix @@ -412,8 +304,8 @@ func (b *ElasticSearchIndexer) Search(ctx context.Context, repoIDs []int64, lang } if len(language) == 0 { - searchResult, err := b.client.Search(). - Index(b.indexerAliasName). + searchResult, err := b.inner.Client.Search(). + Index(b.inner.VersionedIndexName()). Aggregation("language", aggregation). Query(query). Highlight( @@ -426,26 +318,26 @@ func (b *ElasticSearchIndexer) Search(ctx context.Context, repoIDs []int64, lang From(start).Size(pageSize). Do(ctx) if err != nil { - return 0, nil, nil, b.checkError(err) + return 0, nil, nil, err } return convertResult(searchResult, kw, pageSize) } langQuery := elastic.NewMatchQuery("language", language) - countResult, err := b.client.Search(). - Index(b.indexerAliasName). + countResult, err := b.inner.Client.Search(). + Index(b.inner.VersionedIndexName()). Aggregation("language", aggregation). Query(query). - Size(0). // We only needs stats information + Size(0). // We only need stats information Do(ctx) if err != nil { - return 0, nil, nil, b.checkError(err) + return 0, nil, nil, err } query = query.Must(langQuery) - searchResult, err := b.client.Search(). - Index(b.indexerAliasName). + searchResult, err := b.inner.Client.Search(). + Index(b.inner.VersionedIndexName()). Query(query). Highlight( elastic.NewHighlight(). @@ -457,56 +349,10 @@ func (b *ElasticSearchIndexer) Search(ctx context.Context, repoIDs []int64, lang From(start).Size(pageSize). Do(ctx) if err != nil { - return 0, nil, nil, b.checkError(err) + return 0, nil, nil, err } total, hits, _, err := convertResult(searchResult, kw, pageSize) return total, hits, extractAggs(countResult), err } - -// Close implements indexer -func (b *ElasticSearchIndexer) Close() { - select { - case <-b.stopTimer: - default: - close(b.stopTimer) - } -} - -func (b *ElasticSearchIndexer) checkError(err error) error { - var opErr *net.OpError - if !(elastic.IsConnErr(err) || (errors.As(err, &opErr) && (opErr.Op == "dial" || opErr.Op == "read"))) { - return err - } - - b.setAvailability(false) - - return err -} - -func (b *ElasticSearchIndexer) checkAvailability() { - if b.Ping() { - return - } - - // Request cluster state to check if elastic is available again - _, err := b.client.ClusterState().Do(graceful.GetManager().ShutdownContext()) - if err != nil { - b.setAvailability(false) - return - } - - b.setAvailability(true) -} - -func (b *ElasticSearchIndexer) setAvailability(available bool) { - b.lock.Lock() - defer b.lock.Unlock() - - if b.available == available { - return - } - - b.available = available -} diff --git a/modules/indexer/code/elasticsearch/elasticsearch_test.go b/modules/indexer/code/elasticsearch/elasticsearch_test.go new file mode 100644 index 0000000000..c6ba93e76d --- /dev/null +++ b/modules/indexer/code/elasticsearch/elasticsearch_test.go @@ -0,0 +1,16 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package elasticsearch + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestIndexPos(t *testing.T) { + startIdx, endIdx := indexPos("test index start and end", "start", "end") + assert.EqualValues(t, 11, startIdx) + assert.EqualValues(t, 24, endIdx) +} diff --git a/modules/indexer/code/git.go b/modules/indexer/code/git.go index bbcc6ba487..1ba6b849d1 100644 --- a/modules/indexer/code/git.go +++ b/modules/indexer/code/git.go @@ -10,23 +10,11 @@ import ( repo_model "code.gitea.io/gitea/models/repo" "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/indexer/code/internal" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" ) -type fileUpdate struct { - Filename string - BlobSha string - Size int64 - Sized bool -} - -// repoChanges changes (file additions/updates/removals) to a repo -type repoChanges struct { - Updates []fileUpdate - RemovedFilenames []string -} - func getDefaultBranchSha(ctx context.Context, repo *repo_model.Repository) (string, error) { stdout, _, err := git.NewCommand(ctx, "show-ref", "-s").AddDynamicArguments(git.BranchPrefix + repo.DefaultBranch).RunStdString(&git.RunOpts{Dir: repo.RepoPath()}) if err != nil { @@ -36,7 +24,7 @@ func getDefaultBranchSha(ctx context.Context, repo *repo_model.Repository) (stri } // getRepoChanges returns changes to repo since last indexer update -func getRepoChanges(ctx context.Context, repo *repo_model.Repository, revision string) (*repoChanges, error) { +func getRepoChanges(ctx context.Context, repo *repo_model.Repository, revision string) (*internal.RepoChanges, error) { status, err := repo_model.GetIndexerStatus(ctx, repo, repo_model.RepoIndexerTypeCode) if err != nil { return nil, err @@ -67,16 +55,16 @@ func isIndexable(entry *git.TreeEntry) bool { } // parseGitLsTreeOutput parses the output of a `git ls-tree -r --full-name` command -func parseGitLsTreeOutput(stdout []byte) ([]fileUpdate, error) { +func parseGitLsTreeOutput(stdout []byte) ([]internal.FileUpdate, error) { entries, err := git.ParseTreeEntries(stdout) if err != nil { return nil, err } idxCount := 0 - updates := make([]fileUpdate, len(entries)) + updates := make([]internal.FileUpdate, len(entries)) for _, entry := range entries { if isIndexable(entry) { - updates[idxCount] = fileUpdate{ + updates[idxCount] = internal.FileUpdate{ Filename: entry.Name(), BlobSha: entry.ID.String(), Size: entry.Size(), @@ -89,8 +77,8 @@ func parseGitLsTreeOutput(stdout []byte) ([]fileUpdate, error) { } // genesisChanges get changes to add repo to the indexer for the first time -func genesisChanges(ctx context.Context, repo *repo_model.Repository, revision string) (*repoChanges, error) { - var changes repoChanges +func genesisChanges(ctx context.Context, repo *repo_model.Repository, revision string) (*internal.RepoChanges, error) { + var changes internal.RepoChanges stdout, _, runErr := git.NewCommand(ctx, "ls-tree", "--full-tree", "-l", "-r").AddDynamicArguments(revision).RunStdBytes(&git.RunOpts{Dir: repo.RepoPath()}) if runErr != nil { return nil, runErr @@ -102,20 +90,20 @@ func genesisChanges(ctx context.Context, repo *repo_model.Repository, revision s } // nonGenesisChanges get changes since the previous indexer update -func nonGenesisChanges(ctx context.Context, repo *repo_model.Repository, revision string) (*repoChanges, error) { +func nonGenesisChanges(ctx context.Context, repo *repo_model.Repository, revision string) (*internal.RepoChanges, error) { diffCmd := git.NewCommand(ctx, "diff", "--name-status").AddDynamicArguments(repo.CodeIndexerStatus.CommitSha, revision) stdout, _, runErr := diffCmd.RunStdString(&git.RunOpts{Dir: repo.RepoPath()}) if runErr != nil { // previous commit sha may have been removed by a force push, so // try rebuilding from scratch log.Warn("git diff: %v", runErr) - if err := indexer.Delete(repo.ID); err != nil { + if err := (*globalIndexer.Load()).Delete(ctx, repo.ID); err != nil { return nil, err } return genesisChanges(ctx, repo, revision) } - var changes repoChanges + var changes internal.RepoChanges var err error updatedFilenames := make([]string, 0, 10) for _, line := range strings.Split(stdout, "\n") { diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go index f38fd6000c..13d06874c9 100644 --- a/modules/indexer/code/indexer.go +++ b/modules/indexer/code/indexer.go @@ -7,86 +7,41 @@ import ( "context" "os" "runtime/pprof" - "strconv" - "strings" + "sync/atomic" "time" "code.gitea.io/gitea/models/db" repo_model "code.gitea.io/gitea/models/repo" "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/indexer/code/bleve" + "code.gitea.io/gitea/modules/indexer/code/elasticsearch" + "code.gitea.io/gitea/modules/indexer/code/internal" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/process" "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" - "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/util" ) -// SearchResult result of performing a search in a repo -type SearchResult struct { - RepoID int64 - StartIndex int - EndIndex int - Filename string - Content string - CommitID string - UpdatedUnix timeutil.TimeStamp - Language string - Color string -} - -// SearchResultLanguages result of top languages count in search results -type SearchResultLanguages struct { - Language string - Color string - Count int -} - -// Indexer defines an interface to index and search code contents -type Indexer interface { - Ping() bool - Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *repoChanges) error - Delete(repoID int64) error - Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) - Close() -} - -func filenameIndexerID(repoID int64, filename string) string { - return indexerID(repoID) + "_" + filename -} - -func indexerID(id int64) string { - return strconv.FormatInt(id, 36) -} - -func parseIndexerID(indexerID string) (int64, string) { - index := strings.IndexByte(indexerID, '_') - if index == -1 { - log.Error("Unexpected ID in repo indexer: %s", indexerID) - } - repoID, _ := strconv.ParseInt(indexerID[:index], 36, 64) - return repoID, indexerID[index+1:] -} - -func filenameOfIndexerID(indexerID string) string { - index := strings.IndexByte(indexerID, '_') - if index == -1 { - log.Error("Unexpected ID in repo indexer: %s", indexerID) - } - return indexerID[index+1:] -} +var ( + indexerQueue *queue.WorkerPoolQueue[*internal.IndexerData] + // globalIndexer is the global indexer, it cannot be nil. + // When the real indexer is not ready, it will be a dummy indexer which will return error to explain it's not ready. + // So it's always safe use it as *globalIndexer.Load() and call its methods. + globalIndexer atomic.Pointer[internal.Indexer] + dummyIndexer *internal.Indexer +) -// IndexerData represents data stored in the code indexer -type IndexerData struct { - RepoID int64 +func init() { + i := internal.NewDummyIndexer() + dummyIndexer = &i + globalIndexer.Store(dummyIndexer) } -var indexerQueue *queue.WorkerPoolQueue[*IndexerData] - -func index(ctx context.Context, indexer Indexer, repoID int64) error { +func index(ctx context.Context, indexer internal.Indexer, repoID int64) error { repo, err := repo_model.GetRepositoryByID(ctx, repoID) if repo_model.IsErrRepoNotExist(err) { - return indexer.Delete(repoID) + return indexer.Delete(ctx, repoID) } if err != nil { return err @@ -139,7 +94,7 @@ func index(ctx context.Context, indexer Indexer, repoID int64) error { // Init initialize the repo indexer func Init() { if !setting.Indexer.RepoIndexerEnabled { - indexer.Close() + (*globalIndexer.Load()).Close() return } @@ -153,7 +108,7 @@ func Init() { } cancel() log.Debug("Closing repository indexer") - indexer.Close() + (*globalIndexer.Load()).Close() log.Info("PID: %d Repository Indexer closed", os.Getpid()) finished() }) @@ -163,13 +118,8 @@ func Init() { // Create the Queue switch setting.Indexer.RepoType { case "bleve", "elasticsearch": - handler := func(items ...*IndexerData) (unhandled []*IndexerData) { - idx, err := indexer.get() - if idx == nil || err != nil { - log.Warn("Codes indexer handler: indexer is not ready, retry later.") - return items - } - + handler := func(items ...*internal.IndexerData) (unhandled []*internal.IndexerData) { + indexer := *globalIndexer.Load() for _, indexerData := range items { log.Trace("IndexerData Process Repo: %d", indexerData.RepoID) @@ -188,11 +138,7 @@ func Init() { code.gitea.io/gitea/modules/indexer/code.index(indexer.go:105) */ if err := index(ctx, indexer, indexerData.RepoID); err != nil { - if !idx.Ping() { - log.Error("Code indexer handler: indexer is unavailable.") - unhandled = append(unhandled, indexerData) - continue - } + unhandled = append(unhandled, indexerData) if !setting.IsInTesting { log.Error("Codes indexer handler: index error for repo %v: %v", indexerData.RepoID, err) } @@ -213,8 +159,8 @@ func Init() { pprof.SetGoroutineLabels(ctx) start := time.Now() var ( - rIndexer Indexer - populate bool + rIndexer internal.Indexer + existed bool err error ) switch setting.Indexer.RepoType { @@ -228,10 +174,11 @@ func Init() { } }() - rIndexer, populate, err = NewBleveIndexer(setting.Indexer.RepoPath) + rIndexer = bleve.NewIndexer(setting.Indexer.RepoPath) + existed, err = rIndexer.Init(ctx) if err != nil { cancel() - indexer.Close() + (*globalIndexer.Load()).Close() close(waitChannel) log.Fatal("PID: %d Unable to initialize the bleve Repository Indexer at path: %s Error: %v", os.Getpid(), setting.Indexer.RepoPath, err) } @@ -245,23 +192,31 @@ func Init() { } }() - rIndexer, populate, err = NewElasticSearchIndexer(setting.Indexer.RepoConnStr, setting.Indexer.RepoIndexerName) + rIndexer = elasticsearch.NewIndexer(setting.Indexer.RepoConnStr, setting.Indexer.RepoIndexerName) + if err != nil { + cancel() + (*globalIndexer.Load()).Close() + close(waitChannel) + log.Fatal("PID: %d Unable to create the elasticsearch Repository Indexer connstr: %s Error: %v", os.Getpid(), setting.Indexer.RepoConnStr, err) + } + existed, err = rIndexer.Init(ctx) if err != nil { cancel() - indexer.Close() + (*globalIndexer.Load()).Close() close(waitChannel) log.Fatal("PID: %d Unable to initialize the elasticsearch Repository Indexer connstr: %s Error: %v", os.Getpid(), setting.Indexer.RepoConnStr, err) } + default: log.Fatal("PID: %d Unknown Indexer type: %s", os.Getpid(), setting.Indexer.RepoType) } - indexer.set(rIndexer) + globalIndexer.Store(&rIndexer) // Start processing the queue go graceful.GetManager().RunWithCancel(indexerQueue) - if populate { + if !existed { // populate the index because it's created for the first time go graceful.GetManager().RunWithShutdownContext(populateRepoIndexer) } select { @@ -283,18 +238,18 @@ func Init() { case <-graceful.GetManager().IsShutdown(): log.Warn("Shutdown before Repository Indexer completed initialization") cancel() - indexer.Close() + (*globalIndexer.Load()).Close() case duration, ok := <-waitChannel: if !ok { log.Warn("Repository Indexer Initialization failed") cancel() - indexer.Close() + (*globalIndexer.Load()).Close() return } log.Info("Repository Indexer Initialization took %v", duration) case <-time.After(timeout): cancel() - indexer.Close() + (*globalIndexer.Load()).Close() log.Fatal("Repository Indexer Initialization Timed-Out after: %v", timeout) } }() @@ -303,21 +258,15 @@ func Init() { // UpdateRepoIndexer update a repository's entries in the indexer func UpdateRepoIndexer(repo *repo_model.Repository) { - indexData := &IndexerData{RepoID: repo.ID} + indexData := &internal.IndexerData{RepoID: repo.ID} if err := indexerQueue.Push(indexData); err != nil { log.Error("Update repo index data %v failed: %v", indexData, err) } } // IsAvailable checks if issue indexer is available -func IsAvailable() bool { - idx, err := indexer.get() - if err != nil { - log.Error("IsAvailable(): unable to get indexer: %v", err) - return false - } - - return idx.Ping() +func IsAvailable(ctx context.Context) bool { + return (*globalIndexer.Load()).Ping(ctx) == nil } // populateRepoIndexer populate the repo indexer with pre-existing data. This @@ -368,7 +317,7 @@ func populateRepoIndexer(ctx context.Context) { return default: } - if err := indexerQueue.Push(&IndexerData{RepoID: id}); err != nil { + if err := indexerQueue.Push(&internal.IndexerData{RepoID: id}); err != nil { log.Error("indexerQueue.Push: %v", err) return } diff --git a/modules/indexer/code/indexer_test.go b/modules/indexer/code/indexer_test.go index 52f7e76e41..55616a0361 100644 --- a/modules/indexer/code/indexer_test.go +++ b/modules/indexer/code/indexer_test.go @@ -5,11 +5,15 @@ package code import ( "context" + "os" "path/filepath" "testing" "code.gitea.io/gitea/models/unittest" "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/indexer/code/bleve" + "code.gitea.io/gitea/modules/indexer/code/elasticsearch" + "code.gitea.io/gitea/modules/indexer/code/internal" _ "code.gitea.io/gitea/models" @@ -22,7 +26,7 @@ func TestMain(m *testing.M) { }) } -func testIndexer(name string, t *testing.T, indexer Indexer) { +func testIndexer(name string, t *testing.T, indexer internal.Indexer) { t.Run(name, func(t *testing.T) { var repoID int64 = 1 err := index(git.DefaultContext, indexer, repoID) @@ -81,6 +85,48 @@ func testIndexer(name string, t *testing.T, indexer Indexer) { }) } - assert.NoError(t, indexer.Delete(repoID)) + assert.NoError(t, indexer.Delete(context.Background(), repoID)) }) } + +func TestBleveIndexAndSearch(t *testing.T) { + unittest.PrepareTestEnv(t) + + dir := t.TempDir() + + idx := bleve.NewIndexer(dir) + _, err := idx.Init(context.Background()) + if err != nil { + assert.Fail(t, "Unable to create bleve indexer Error: %v", err) + if idx != nil { + idx.Close() + } + return + } + defer idx.Close() + + testIndexer("beleve", t, idx) +} + +func TestESIndexAndSearch(t *testing.T) { + unittest.PrepareTestEnv(t) + + u := os.Getenv("TEST_INDEXER_CODE_ES_URL") + if u == "" { + t.SkipNow() + return + } + + indexer := elasticsearch.NewIndexer(u, "gitea_codes") + if _, err := indexer.Init(context.Background()); err != nil { + assert.Fail(t, "Unable to init ES indexer Error: %v", err) + if indexer != nil { + indexer.Close() + } + return + } + + defer indexer.Close() + + testIndexer("elastic_search", t, indexer) +} diff --git a/modules/indexer/code/internal/indexer.go b/modules/indexer/code/internal/indexer.go new file mode 100644 index 0000000000..da3ac3623c --- /dev/null +++ b/modules/indexer/code/internal/indexer.go @@ -0,0 +1,43 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package internal + +import ( + "context" + "fmt" + + repo_model "code.gitea.io/gitea/models/repo" + "code.gitea.io/gitea/modules/indexer/internal" +) + +// Indexer defines an interface to index and search code contents +type Indexer interface { + internal.Indexer + Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *RepoChanges) error + Delete(ctx context.Context, repoID int64) error + Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) +} + +// NewDummyIndexer returns a dummy indexer +func NewDummyIndexer() Indexer { + return &dummyIndexer{ + Indexer: internal.NewDummyIndexer(), + } +} + +type dummyIndexer struct { + internal.Indexer +} + +func (d *dummyIndexer) Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *RepoChanges) error { + return fmt.Errorf("indexer is not ready") +} + +func (d *dummyIndexer) Delete(ctx context.Context, repoID int64) error { + return fmt.Errorf("indexer is not ready") +} + +func (d *dummyIndexer) Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) { + return 0, nil, nil, fmt.Errorf("indexer is not ready") +} diff --git a/modules/indexer/code/internal/model.go b/modules/indexer/code/internal/model.go new file mode 100644 index 0000000000..f75263c83c --- /dev/null +++ b/modules/indexer/code/internal/model.go @@ -0,0 +1,44 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package internal + +import "code.gitea.io/gitea/modules/timeutil" + +type FileUpdate struct { + Filename string + BlobSha string + Size int64 + Sized bool +} + +// RepoChanges changes (file additions/updates/removals) to a repo +type RepoChanges struct { + Updates []FileUpdate + RemovedFilenames []string +} + +// IndexerData represents data stored in the code indexer +type IndexerData struct { + RepoID int64 +} + +// SearchResult result of performing a search in a repo +type SearchResult struct { + RepoID int64 + StartIndex int + EndIndex int + Filename string + Content string + CommitID string + UpdatedUnix timeutil.TimeStamp + Language string + Color string +} + +// SearchResultLanguages result of top languages count in search results +type SearchResultLanguages struct { + Language string + Color string + Count int +} diff --git a/modules/indexer/code/internal/util.go b/modules/indexer/code/internal/util.go new file mode 100644 index 0000000000..689c4f4584 --- /dev/null +++ b/modules/indexer/code/internal/util.go @@ -0,0 +1,32 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package internal + +import ( + "strings" + + "code.gitea.io/gitea/modules/indexer/internal" + "code.gitea.io/gitea/modules/log" +) + +func FilenameIndexerID(repoID int64, filename string) string { + return internal.Base36(repoID) + "_" + filename +} + +func ParseIndexerID(indexerID string) (int64, string) { + index := strings.IndexByte(indexerID, '_') + if index == -1 { + log.Error("Unexpected ID in repo indexer: %s", indexerID) + } + repoID, _ := internal.ParseBase36(indexerID[:index]) + return repoID, indexerID[index+1:] +} + +func FilenameOfIndexerID(indexerID string) string { + index := strings.IndexByte(indexerID, '_') + if index == -1 { + log.Error("Unexpected ID in repo indexer: %s", indexerID) + } + return indexerID[index+1:] +} diff --git a/modules/indexer/code/search.go b/modules/indexer/code/search.go index 1de9ffc224..1f9bddff7b 100644 --- a/modules/indexer/code/search.go +++ b/modules/indexer/code/search.go @@ -9,6 +9,7 @@ import ( "strings" "code.gitea.io/gitea/modules/highlight" + "code.gitea.io/gitea/modules/indexer/code/internal" "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/util" ) @@ -25,6 +26,8 @@ type Result struct { FormattedLines string } +type SearchResultLanguages = internal.SearchResultLanguages + func indices(content string, selectionStartIndex, selectionEndIndex int) (int, int) { startIndex := selectionStartIndex numLinesBefore := 0 @@ -61,7 +64,7 @@ func writeStrings(buf *bytes.Buffer, strs ...string) error { return nil } -func searchResult(result *SearchResult, startIndex, endIndex int) (*Result, error) { +func searchResult(result *internal.SearchResult, startIndex, endIndex int) (*Result, error) { startLineNum := 1 + strings.Count(result.Content[:startIndex], "\n") var formattedLinesBuffer bytes.Buffer @@ -109,12 +112,12 @@ func searchResult(result *SearchResult, startIndex, endIndex int) (*Result, erro } // PerformSearch perform a search on a repository -func PerformSearch(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int, []*Result, []*SearchResultLanguages, error) { +func PerformSearch(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int, []*Result, []*internal.SearchResultLanguages, error) { if len(keyword) == 0 { return 0, nil, nil, nil } - total, results, resultLanguages, err := indexer.Search(ctx, repoIDs, language, keyword, page, pageSize, isMatch) + total, results, resultLanguages, err := (*globalIndexer.Load()).Search(ctx, repoIDs, language, keyword, page, pageSize, isMatch) if err != nil { return 0, nil, nil, err } diff --git a/modules/indexer/code/wrapped.go b/modules/indexer/code/wrapped.go deleted file mode 100644 index 7eed3e8557..0000000000 --- a/modules/indexer/code/wrapped.go +++ /dev/null @@ -1,104 +0,0 @@ -// Copyright 2019 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package code - -import ( - "context" - "fmt" - "sync" - - repo_model "code.gitea.io/gitea/models/repo" - "code.gitea.io/gitea/modules/log" -) - -var indexer = newWrappedIndexer() - -// ErrWrappedIndexerClosed is the error returned if the indexer was closed before it was ready -var ErrWrappedIndexerClosed = fmt.Errorf("Indexer closed before ready") - -type wrappedIndexer struct { - internal Indexer - lock sync.RWMutex - cond *sync.Cond - closed bool -} - -func newWrappedIndexer() *wrappedIndexer { - w := &wrappedIndexer{} - w.cond = sync.NewCond(w.lock.RLocker()) - return w -} - -func (w *wrappedIndexer) set(indexer Indexer) { - w.lock.Lock() - defer w.lock.Unlock() - if w.closed { - // Too late! - indexer.Close() - } - w.internal = indexer - w.cond.Broadcast() -} - -func (w *wrappedIndexer) get() (Indexer, error) { - w.lock.RLock() - defer w.lock.RUnlock() - if w.internal == nil { - if w.closed { - return nil, ErrWrappedIndexerClosed - } - w.cond.Wait() - if w.closed { - return nil, ErrWrappedIndexerClosed - } - } - return w.internal, nil -} - -// Ping checks if elastic is available -func (w *wrappedIndexer) Ping() bool { - indexer, err := w.get() - if err != nil { - log.Warn("Failed to get indexer: %v", err) - return false - } - return indexer.Ping() -} - -func (w *wrappedIndexer) Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *repoChanges) error { - indexer, err := w.get() - if err != nil { - return err - } - return indexer.Index(ctx, repo, sha, changes) -} - -func (w *wrappedIndexer) Delete(repoID int64) error { - indexer, err := w.get() - if err != nil { - return err - } - return indexer.Delete(repoID) -} - -func (w *wrappedIndexer) Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) { - indexer, err := w.get() - if err != nil { - return 0, nil, nil, err - } - return indexer.Search(ctx, repoIDs, language, keyword, page, pageSize, isMatch) -} - -func (w *wrappedIndexer) Close() { - w.lock.Lock() - defer w.lock.Unlock() - if w.closed { - return - } - w.closed = true - w.cond.Broadcast() - if w.internal != nil { - w.internal.Close() - } -} diff --git a/modules/indexer/internal/base32.go b/modules/indexer/internal/base32.go new file mode 100644 index 0000000000..aca756c638 --- /dev/null +++ b/modules/indexer/internal/base32.go @@ -0,0 +1,21 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package internal + +import ( + "fmt" + "strconv" +) + +func Base36(i int64) string { + return strconv.FormatInt(i, 36) +} + +func ParseBase36(s string) (int64, error) { + i, err := strconv.ParseInt(s, 36, 64) + if err != nil { + return 0, fmt.Errorf("invalid base36 integer %q: %w", s, err) + } + return i, nil +} diff --git a/modules/indexer/bleve/batch.go b/modules/indexer/internal/bleve/batch.go index 77675147b2..77675147b2 100644 --- a/modules/indexer/bleve/batch.go +++ b/modules/indexer/internal/bleve/batch.go diff --git a/modules/indexer/internal/bleve/indexer.go b/modules/indexer/internal/bleve/indexer.go new file mode 100644 index 0000000000..ce06b5afcb --- /dev/null +++ b/modules/indexer/internal/bleve/indexer.go @@ -0,0 +1,103 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package bleve + +import ( + "context" + "fmt" + + "code.gitea.io/gitea/modules/indexer/internal" + "code.gitea.io/gitea/modules/log" + + "github.com/blevesearch/bleve/v2" + "github.com/blevesearch/bleve/v2/mapping" + "github.com/ethantkoenig/rupture" +) + +var _ internal.Indexer = &Indexer{} + +// Indexer represents a basic bleve indexer implementation +type Indexer struct { + Indexer bleve.Index + + indexDir string + version int + mappingGetter MappingGetter +} + +type MappingGetter func() (mapping.IndexMapping, error) + +func NewIndexer(indexDir string, version int, mappingGetter func() (mapping.IndexMapping, error)) *Indexer { + return &Indexer{ + indexDir: indexDir, + version: version, + mappingGetter: mappingGetter, + } +} + +// Init initializes the indexer +func (i *Indexer) Init(_ context.Context) (bool, error) { + if i == nil { + return false, fmt.Errorf("cannot init nil indexer") + } + + if i.Indexer != nil { + return false, fmt.Errorf("indexer is already initialized") + } + + indexer, version, err := openIndexer(i.indexDir, i.version) + if err != nil { + return false, err + } + if indexer != nil { + i.Indexer = indexer + return true, nil + } + + if version != 0 { + log.Warn("Found older bleve index with version %d, Gitea will remove it and rebuild", version) + } + + indexMapping, err := i.mappingGetter() + if err != nil { + return false, err + } + + indexer, err = bleve.New(i.indexDir, indexMapping) + if err != nil { + return false, err + } + + if err = rupture.WriteIndexMetadata(i.indexDir, &rupture.IndexMetadata{ + Version: i.version, + }); err != nil { + return false, err + } + + i.Indexer = indexer + + return false, nil +} + +// Ping checks if the indexer is available +func (i *Indexer) Ping(_ context.Context) error { + if i == nil { + return fmt.Errorf("cannot ping nil indexer") + } + if i.Indexer == nil { + return fmt.Errorf("indexer is not initialized") + } + return nil +} + +func (i *Indexer) Close() { + if i == nil { + return + } + + if err := i.Indexer.Close(); err != nil { + log.Error("Failed to close bleve indexer in %q: %v", i.indexDir, err) + } + i.Indexer = nil +} diff --git a/modules/indexer/internal/bleve/util.go b/modules/indexer/internal/bleve/util.go new file mode 100644 index 0000000000..43a7c3c5ec --- /dev/null +++ b/modules/indexer/internal/bleve/util.go @@ -0,0 +1,49 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package bleve + +import ( + "errors" + "os" + + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/util" + + "github.com/blevesearch/bleve/v2" + "github.com/blevesearch/bleve/v2/index/upsidedown" + "github.com/ethantkoenig/rupture" +) + +// openIndexer open the index at the specified path, checking for metadata +// updates and bleve version updates. If index needs to be created (or +// re-created), returns (nil, nil) +func openIndexer(path string, latestVersion int) (bleve.Index, int, error) { + _, err := os.Stat(path) + if err != nil && os.IsNotExist(err) { + return nil, 0, nil + } else if err != nil { + return nil, 0, err + } + + metadata, err := rupture.ReadIndexMetadata(path) + if err != nil { + return nil, 0, err + } + if metadata.Version < latestVersion { + // the indexer is using a previous version, so we should delete it and + // re-populate + return nil, metadata.Version, util.RemoveAll(path) + } + + index, err := bleve.Open(path) + if err != nil { + if errors.Is(err, upsidedown.IncompatibleVersion) { + log.Warn("Indexer was built with a previous version of bleve, deleting and rebuilding") + return nil, 0, util.RemoveAll(path) + } + return nil, 0, err + } + + return index, 0, nil +} diff --git a/modules/indexer/internal/db/indexer.go b/modules/indexer/internal/db/indexer.go new file mode 100644 index 0000000000..3f7e00efbb --- /dev/null +++ b/modules/indexer/internal/db/indexer.go @@ -0,0 +1,33 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package db + +import ( + "context" + + "code.gitea.io/gitea/modules/indexer/internal" +) + +var _ internal.Indexer = &Indexer{} + +// Indexer represents a basic db indexer implementation +type Indexer struct{} + +// Init initializes the indexer +func (i *Indexer) Init(_ context.Context) (bool, error) { + // nothing to do + return false, nil +} + +// Ping checks if the indexer is available +func (i *Indexer) Ping(_ context.Context) error { + // No need to ping database to check if it is available. + // If the database goes down, Gitea will go down, so nobody will care if the indexer is available. + return nil +} + +// Close closes the indexer +func (i *Indexer) Close() { + // nothing to do +} diff --git a/modules/indexer/internal/elasticsearch/indexer.go b/modules/indexer/internal/elasticsearch/indexer.go new file mode 100644 index 0000000000..2c60efad56 --- /dev/null +++ b/modules/indexer/internal/elasticsearch/indexer.go @@ -0,0 +1,92 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package elasticsearch + +import ( + "context" + "fmt" + + "code.gitea.io/gitea/modules/indexer/internal" + + "github.com/olivere/elastic/v7" +) + +var _ internal.Indexer = &Indexer{} + +// Indexer represents a basic elasticsearch indexer implementation +type Indexer struct { + Client *elastic.Client + + url string + indexName string + version int + mapping string +} + +func NewIndexer(url, indexName string, version int, mapping string) *Indexer { + return &Indexer{ + url: url, + indexName: indexName, + version: version, + mapping: mapping, + } +} + +// Init initializes the indexer +func (i *Indexer) Init(ctx context.Context) (bool, error) { + if i == nil { + return false, fmt.Errorf("cannot init nil indexer") + } + if i.Client != nil { + return false, fmt.Errorf("indexer is already initialized") + } + + client, err := i.initClient() + if err != nil { + return false, err + } + i.Client = client + + exists, err := i.Client.IndexExists(i.VersionedIndexName()).Do(ctx) + if err != nil { + return false, err + } + if exists { + return true, nil + } + + if err := i.createIndex(ctx); err != nil { + return false, err + } + + return exists, nil +} + +// Ping checks if the indexer is available +func (i *Indexer) Ping(ctx context.Context) error { + if i == nil { + return fmt.Errorf("cannot ping nil indexer") + } + if i.Client == nil { + return fmt.Errorf("indexer is not initialized") + } + + resp, err := i.Client.ClusterHealth().Do(ctx) + if err != nil { + return err + } + if resp.Status != "green" { + // see https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-health.html + return fmt.Errorf("status of elasticsearch cluster is %s", resp.Status) + } + return nil +} + +// Close closes the indexer +func (i *Indexer) Close() { + if i == nil { + return + } + i.Client = nil +} diff --git a/modules/indexer/internal/elasticsearch/util.go b/modules/indexer/internal/elasticsearch/util.go new file mode 100644 index 0000000000..9e034bd553 --- /dev/null +++ b/modules/indexer/internal/elasticsearch/util.go @@ -0,0 +1,68 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package elasticsearch + +import ( + "context" + "fmt" + "time" + + "code.gitea.io/gitea/modules/log" + + "github.com/olivere/elastic/v7" +) + +// VersionedIndexName returns the full index name with version +func (i *Indexer) VersionedIndexName() string { + return versionedIndexName(i.indexName, i.version) +} + +func versionedIndexName(indexName string, version int) string { + if version == 0 { + // Old index name without version + return indexName + } + return fmt.Sprintf("%s.v%d", indexName, version) +} + +func (i *Indexer) createIndex(ctx context.Context) error { + createIndex, err := i.Client.CreateIndex(i.VersionedIndexName()).BodyString(i.mapping).Do(ctx) + if err != nil { + return err + } + if !createIndex.Acknowledged { + return fmt.Errorf("create index %s with %s failed", i.VersionedIndexName(), i.mapping) + } + + i.checkOldIndexes(ctx) + + return nil +} + +func (i *Indexer) initClient() (*elastic.Client, error) { + opts := []elastic.ClientOptionFunc{ + elastic.SetURL(i.url), + elastic.SetSniff(false), + elastic.SetHealthcheckInterval(10 * time.Second), + elastic.SetGzip(false), + } + + logger := log.GetLogger(log.DEFAULT) + + opts = append(opts, elastic.SetTraceLog(&log.PrintfLogger{Logf: logger.Trace})) + opts = append(opts, elastic.SetInfoLog(&log.PrintfLogger{Logf: logger.Info})) + opts = append(opts, elastic.SetErrorLog(&log.PrintfLogger{Logf: logger.Error})) + + return elastic.NewClient(opts...) +} + +func (i *Indexer) checkOldIndexes(ctx context.Context) { + for v := 0; v < i.version; v++ { + indexName := versionedIndexName(i.indexName, v) + exists, err := i.Client.IndexExists(indexName).Do(ctx) + if err == nil && exists { + log.Warn("Found older elasticsearch index named %q, Gitea will keep the old NOT DELETED. You can delete the old version after the upgrade succeed.", indexName) + } + } +} diff --git a/modules/indexer/internal/indexer.go b/modules/indexer/internal/indexer.go new file mode 100644 index 0000000000..c7f356da1e --- /dev/null +++ b/modules/indexer/internal/indexer.go @@ -0,0 +1,37 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package internal + +import ( + "context" + "fmt" +) + +// Indexer defines an basic indexer interface +type Indexer interface { + // Init initializes the indexer + // returns true if the index was opened/existed (with data populated), false if it was created/not-existed (with no data) + Init(ctx context.Context) (bool, error) + // Ping checks if the indexer is available + Ping(ctx context.Context) error + // Close closes the indexer + Close() +} + +// NewDummyIndexer returns a dummy indexer +func NewDummyIndexer() Indexer { + return &dummyIndexer{} +} + +type dummyIndexer struct{} + +func (d *dummyIndexer) Init(ctx context.Context) (bool, error) { + return false, fmt.Errorf("indexer is not ready") +} + +func (d *dummyIndexer) Ping(ctx context.Context) error { + return fmt.Errorf("indexer is not ready") +} + +func (d *dummyIndexer) Close() {} diff --git a/modules/indexer/internal/meilisearch/indexer.go b/modules/indexer/internal/meilisearch/indexer.go new file mode 100644 index 0000000000..06747ff7e0 --- /dev/null +++ b/modules/indexer/internal/meilisearch/indexer.go @@ -0,0 +1,92 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package meilisearch + +import ( + "context" + "fmt" + + "github.com/meilisearch/meilisearch-go" +) + +// Indexer represents a basic meilisearch indexer implementation +type Indexer struct { + Client *meilisearch.Client + + url, apiKey string + indexName string + version int +} + +func NewIndexer(url, apiKey, indexName string, version int) *Indexer { + return &Indexer{ + url: url, + apiKey: apiKey, + indexName: indexName, + version: version, + } +} + +// Init initializes the indexer +func (i *Indexer) Init(_ context.Context) (bool, error) { + if i == nil { + return false, fmt.Errorf("cannot init nil indexer") + } + + if i.Client != nil { + return false, fmt.Errorf("indexer is already initialized") + } + + i.Client = meilisearch.NewClient(meilisearch.ClientConfig{ + Host: i.url, + APIKey: i.apiKey, + }) + + _, err := i.Client.GetIndex(i.VersionedIndexName()) + if err == nil { + return true, nil + } + _, err = i.Client.CreateIndex(&meilisearch.IndexConfig{ + Uid: i.VersionedIndexName(), + PrimaryKey: "id", + }) + if err != nil { + return false, err + } + + i.checkOldIndexes() + + _, err = i.Client.Index(i.VersionedIndexName()).UpdateFilterableAttributes(&[]string{"repo_id"}) + return false, err +} + +// Ping checks if the indexer is available +func (i *Indexer) Ping(ctx context.Context) error { + if i == nil { + return fmt.Errorf("cannot ping nil indexer") + } + if i.Client == nil { + return fmt.Errorf("indexer is not initialized") + } + resp, err := i.Client.Health() + if err != nil { + return err + } + if resp.Status != "available" { + // See https://docs.meilisearch.com/reference/api/health.html#status + return fmt.Errorf("status of meilisearch is not available: %s", resp.Status) + } + return nil +} + +// Close closes the indexer +func (i *Indexer) Close() { + if i == nil { + return + } + if i.Client == nil { + return + } + i.Client = nil +} diff --git a/modules/indexer/internal/meilisearch/util.go b/modules/indexer/internal/meilisearch/util.go new file mode 100644 index 0000000000..e6d8fefade --- /dev/null +++ b/modules/indexer/internal/meilisearch/util.go @@ -0,0 +1,38 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package meilisearch + +import ( + "fmt" + + "code.gitea.io/gitea/modules/log" +) + +// VersionedIndexName returns the full index name with version +func (i *Indexer) VersionedIndexName() string { + return versionedIndexName(i.indexName, i.version) +} + +func versionedIndexName(indexName string, version int) string { + if version == 0 { + // Old index name without version + return indexName + } + + // The format of the index name is <index_name>_v<version>, not <index_name>.v<version> like elasticsearch. + // Because meilisearch does not support "." in index name, it should contain only alphanumeric characters, hyphens (-) and underscores (_). + // See https://www.meilisearch.com/docs/learn/core_concepts/indexes#index-uid + + return fmt.Sprintf("%s_v%d", indexName, version) +} + +func (i *Indexer) checkOldIndexes() { + for v := 0; v < i.version; v++ { + indexName := versionedIndexName(i.indexName, v) + _, err := i.Client.GetIndex(indexName) + if err == nil { + log.Warn("Found older meilisearch index named %q, Gitea will keep the old NOT DELETED. You can delete the old version after the upgrade succeed.", indexName) + } + } +} diff --git a/modules/indexer/issues/bleve.go b/modules/indexer/issues/bleve/bleve.go index 60d9ef7617..bb0bc4b04a 100644 --- a/modules/indexer/issues/bleve.go +++ b/modules/indexer/issues/bleve/bleve.go @@ -1,17 +1,14 @@ // Copyright 2018 The Gitea Authors. All rights reserved. // SPDX-License-Identifier: MIT -package issues +package bleve import ( "context" - "fmt" - "os" - "strconv" - gitea_bleve "code.gitea.io/gitea/modules/indexer/bleve" - "code.gitea.io/gitea/modules/log" - "code.gitea.io/gitea/modules/util" + indexer_internal "code.gitea.io/gitea/modules/indexer/internal" + inner_bleve "code.gitea.io/gitea/modules/indexer/internal/bleve" + "code.gitea.io/gitea/modules/indexer/issues/internal" "github.com/blevesearch/bleve/v2" "github.com/blevesearch/bleve/v2/analysis/analyzer/custom" @@ -19,10 +16,8 @@ import ( "github.com/blevesearch/bleve/v2/analysis/token/lowercase" "github.com/blevesearch/bleve/v2/analysis/token/unicodenorm" "github.com/blevesearch/bleve/v2/analysis/tokenizer/unicode" - "github.com/blevesearch/bleve/v2/index/upsidedown" "github.com/blevesearch/bleve/v2/mapping" "github.com/blevesearch/bleve/v2/search/query" - "github.com/ethantkoenig/rupture" ) const ( @@ -31,20 +26,6 @@ const ( issueIndexerLatestVersion = 2 ) -// indexerID a bleve-compatible unique identifier for an integer id -func indexerID(id int64) string { - return strconv.FormatInt(id, 36) -} - -// idOfIndexerID the integer id associated with an indexer id -func idOfIndexerID(indexerID string) (int64, error) { - id, err := strconv.ParseInt(indexerID, 36, 64) - if err != nil { - return 0, fmt.Errorf("Unexpected indexer ID %s: %w", indexerID, err) - } - return id, nil -} - // numericEqualityQuery a numeric equality query for the given value and field func numericEqualityQuery(value int64, field string) *query.NumericRangeQuery { f := float64(value) @@ -72,49 +53,16 @@ func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error { const maxBatchSize = 16 -// openIndexer open the index at the specified path, checking for metadata -// updates and bleve version updates. If index needs to be created (or -// re-created), returns (nil, nil) -func openIndexer(path string, latestVersion int) (bleve.Index, error) { - _, err := os.Stat(path) - if err != nil && os.IsNotExist(err) { - return nil, nil - } else if err != nil { - return nil, err - } - - metadata, err := rupture.ReadIndexMetadata(path) - if err != nil { - return nil, err - } - if metadata.Version < latestVersion { - // the indexer is using a previous version, so we should delete it and - // re-populate - return nil, util.RemoveAll(path) - } - - index, err := bleve.Open(path) - if err != nil && err == upsidedown.IncompatibleVersion { - // the indexer was built with a previous version of bleve, so we should - // delete it and re-populate - return nil, util.RemoveAll(path) - } else if err != nil { - return nil, err - } - - return index, nil -} - -// BleveIndexerData an update to the issue indexer -type BleveIndexerData IndexerData +// IndexerData an update to the issue indexer +type IndexerData internal.IndexerData // Type returns the document type, for bleve's mapping.Classifier interface. -func (i *BleveIndexerData) Type() string { +func (i *IndexerData) Type() string { return issueIndexerDocType } -// createIssueIndexer create an issue indexer if one does not already exist -func createIssueIndexer(path string, latestVersion int) (bleve.Index, error) { +// generateIssueIndexMapping generates the bleve index mapping for issues +func generateIssueIndexMapping() (mapping.IndexMapping, error) { mapping := bleve.NewIndexMapping() docMapping := bleve.NewDocumentMapping() @@ -144,68 +92,31 @@ func createIssueIndexer(path string, latestVersion int) (bleve.Index, error) { mapping.AddDocumentMapping(issueIndexerDocType, docMapping) mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping()) - index, err := bleve.New(path, mapping) - if err != nil { - return nil, err - } - - if err = rupture.WriteIndexMetadata(path, &rupture.IndexMetadata{ - Version: latestVersion, - }); err != nil { - return nil, err - } - return index, nil + return mapping, nil } -var _ Indexer = &BleveIndexer{} +var _ internal.Indexer = &Indexer{} -// BleveIndexer implements Indexer interface -type BleveIndexer struct { - indexDir string - indexer bleve.Index -} - -// NewBleveIndexer creates a new bleve local indexer -func NewBleveIndexer(indexDir string) *BleveIndexer { - return &BleveIndexer{ - indexDir: indexDir, - } +// Indexer implements Indexer interface +type Indexer struct { + inner *inner_bleve.Indexer + indexer_internal.Indexer // do not composite inner_bleve.Indexer directly to avoid exposing too much } -// Init will initialize the indexer -func (b *BleveIndexer) Init() (bool, error) { - var err error - b.indexer, err = openIndexer(b.indexDir, issueIndexerLatestVersion) - if err != nil { - return false, err - } - if b.indexer != nil { - return true, nil - } - - b.indexer, err = createIssueIndexer(b.indexDir, issueIndexerLatestVersion) - return false, err -} - -// Ping does nothing -func (b *BleveIndexer) Ping() bool { - return true -} - -// Close will close the bleve indexer -func (b *BleveIndexer) Close() { - if b.indexer != nil { - if err := b.indexer.Close(); err != nil { - log.Error("Error whilst closing indexer: %v", err) - } +// NewIndexer creates a new bleve local indexer +func NewIndexer(indexDir string) *Indexer { + inner := inner_bleve.NewIndexer(indexDir, issueIndexerLatestVersion, generateIssueIndexMapping) + return &Indexer{ + Indexer: inner, + inner: inner, } } // Index will save the index data -func (b *BleveIndexer) Index(issues []*IndexerData) error { - batch := gitea_bleve.NewFlushingBatch(b.indexer, maxBatchSize) +func (b *Indexer) Index(_ context.Context, issues []*internal.IndexerData) error { + batch := inner_bleve.NewFlushingBatch(b.inner.Indexer, maxBatchSize) for _, issue := range issues { - if err := batch.Index(indexerID(issue.ID), struct { + if err := batch.Index(indexer_internal.Base36(issue.ID), struct { RepoID int64 Title string Content string @@ -223,10 +134,10 @@ func (b *BleveIndexer) Index(issues []*IndexerData) error { } // Delete deletes indexes by ids -func (b *BleveIndexer) Delete(ids ...int64) error { - batch := gitea_bleve.NewFlushingBatch(b.indexer, maxBatchSize) +func (b *Indexer) Delete(_ context.Context, ids ...int64) error { + batch := inner_bleve.NewFlushingBatch(b.inner.Indexer, maxBatchSize) for _, id := range ids { - if err := batch.Delete(indexerID(id)); err != nil { + if err := batch.Delete(indexer_internal.Base36(id)); err != nil { return err } } @@ -235,7 +146,7 @@ func (b *BleveIndexer) Delete(ids ...int64) error { // Search searches for issues by given conditions. // Returns the matching issue IDs -func (b *BleveIndexer) Search(ctx context.Context, keyword string, repoIDs []int64, limit, start int) (*SearchResult, error) { +func (b *Indexer) Search(ctx context.Context, keyword string, repoIDs []int64, limit, start int) (*internal.SearchResult, error) { var repoQueriesP []*query.NumericRangeQuery for _, repoID := range repoIDs { repoQueriesP = append(repoQueriesP, numericEqualityQuery(repoID, "RepoID")) @@ -255,20 +166,20 @@ func (b *BleveIndexer) Search(ctx context.Context, keyword string, repoIDs []int search := bleve.NewSearchRequestOptions(indexerQuery, limit, start, false) search.SortBy([]string{"-_score"}) - result, err := b.indexer.SearchInContext(ctx, search) + result, err := b.inner.Indexer.SearchInContext(ctx, search) if err != nil { return nil, err } - ret := SearchResult{ - Hits: make([]Match, 0, len(result.Hits)), + ret := internal.SearchResult{ + Hits: make([]internal.Match, 0, len(result.Hits)), } for _, hit := range result.Hits { - id, err := idOfIndexerID(hit.ID) + id, err := indexer_internal.ParseBase36(hit.ID) if err != nil { return nil, err } - ret.Hits = append(ret.Hits, Match{ + ret.Hits = append(ret.Hits, internal.Match{ ID: id, }) } diff --git a/modules/indexer/issues/bleve_test.go b/modules/indexer/issues/bleve/bleve_test.go index 22827158e4..f890f8eb48 100644 --- a/modules/indexer/issues/bleve_test.go +++ b/modules/indexer/issues/bleve/bleve_test.go @@ -1,26 +1,28 @@ // Copyright 2018 The Gitea Authors. All rights reserved. // SPDX-License-Identifier: MIT -package issues +package bleve import ( "context" "testing" + "code.gitea.io/gitea/modules/indexer/issues/internal" + "github.com/stretchr/testify/assert" ) func TestBleveIndexAndSearch(t *testing.T) { dir := t.TempDir() - indexer := NewBleveIndexer(dir) + indexer := NewIndexer(dir) defer indexer.Close() - if _, err := indexer.Init(); err != nil { + if _, err := indexer.Init(context.Background()); err != nil { assert.Fail(t, "Unable to initialize bleve indexer: %v", err) return } - err := indexer.Index([]*IndexerData{ + err := indexer.Index(context.Background(), []*internal.IndexerData{ { ID: 1, RepoID: 2, diff --git a/modules/indexer/issues/db.go b/modules/indexer/issues/db.go deleted file mode 100644 index 04c101c356..0000000000 --- a/modules/indexer/issues/db.go +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright 2019 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package issues - -import ( - "context" - - "code.gitea.io/gitea/models/db" - issues_model "code.gitea.io/gitea/models/issues" -) - -// DBIndexer implements Indexer interface to use database's like search -type DBIndexer struct{} - -// Init dummy function -func (i *DBIndexer) Init() (bool, error) { - return false, nil -} - -// Ping checks if database is available -func (i *DBIndexer) Ping() bool { - return db.GetEngine(db.DefaultContext).Ping() != nil -} - -// Index dummy function -func (i *DBIndexer) Index(issue []*IndexerData) error { - return nil -} - -// Delete dummy function -func (i *DBIndexer) Delete(ids ...int64) error { - return nil -} - -// Close dummy function -func (i *DBIndexer) Close() { -} - -// Search dummy function -func (i *DBIndexer) Search(ctx context.Context, kw string, repoIDs []int64, limit, start int) (*SearchResult, error) { - total, ids, err := issues_model.SearchIssueIDsByKeyword(ctx, kw, repoIDs, limit, start) - if err != nil { - return nil, err - } - result := SearchResult{ - Total: total, - Hits: make([]Match, 0, limit), - } - for _, id := range ids { - result.Hits = append(result.Hits, Match{ - ID: id, - }) - } - return &result, nil -} diff --git a/modules/indexer/issues/db/db.go b/modules/indexer/issues/db/db.go new file mode 100644 index 0000000000..17ed426b38 --- /dev/null +++ b/modules/indexer/issues/db/db.go @@ -0,0 +1,54 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package db + +import ( + "context" + + issues_model "code.gitea.io/gitea/models/issues" + indexer_internal "code.gitea.io/gitea/modules/indexer/internal" + inner_db "code.gitea.io/gitea/modules/indexer/internal/db" + "code.gitea.io/gitea/modules/indexer/issues/internal" +) + +var _ internal.Indexer = &Indexer{} + +// Indexer implements Indexer interface to use database's like search +type Indexer struct { + indexer_internal.Indexer +} + +func NewIndexer() *Indexer { + return &Indexer{ + Indexer: &inner_db.Indexer{}, + } +} + +// Index dummy function +func (i *Indexer) Index(_ context.Context, _ []*internal.IndexerData) error { + return nil +} + +// Delete dummy function +func (i *Indexer) Delete(_ context.Context, _ ...int64) error { + return nil +} + +// Search searches for issues +func (i *Indexer) Search(ctx context.Context, kw string, repoIDs []int64, limit, start int) (*internal.SearchResult, error) { + total, ids, err := issues_model.SearchIssueIDsByKeyword(ctx, kw, repoIDs, limit, start) + if err != nil { + return nil, err + } + result := internal.SearchResult{ + Total: total, + Hits: make([]internal.Match, 0, limit), + } + for _, id := range ids { + result.Hits = append(result.Hits, internal.Match{ + ID: id, + }) + } + return &result, nil +} diff --git a/modules/indexer/issues/elastic_search.go b/modules/indexer/issues/elastic_search.go deleted file mode 100644 index ec62f857ad..0000000000 --- a/modules/indexer/issues/elastic_search.go +++ /dev/null @@ -1,287 +0,0 @@ -// Copyright 2019 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package issues - -import ( - "context" - "errors" - "fmt" - "net" - "strconv" - "sync" - "time" - - "code.gitea.io/gitea/modules/graceful" - "code.gitea.io/gitea/modules/log" - - "github.com/olivere/elastic/v7" -) - -var _ Indexer = &ElasticSearchIndexer{} - -// ElasticSearchIndexer implements Indexer interface -type ElasticSearchIndexer struct { - client *elastic.Client - indexerName string - available bool - stopTimer chan struct{} - lock sync.RWMutex -} - -// NewElasticSearchIndexer creates a new elasticsearch indexer -func NewElasticSearchIndexer(url, indexerName string) (*ElasticSearchIndexer, error) { - opts := []elastic.ClientOptionFunc{ - elastic.SetURL(url), - elastic.SetSniff(false), - elastic.SetHealthcheckInterval(10 * time.Second), - elastic.SetGzip(false), - } - - logger := log.GetLogger(log.DEFAULT) - opts = append(opts, elastic.SetTraceLog(&log.PrintfLogger{Logf: logger.Trace})) - opts = append(opts, elastic.SetInfoLog(&log.PrintfLogger{Logf: logger.Info})) - opts = append(opts, elastic.SetErrorLog(&log.PrintfLogger{Logf: logger.Error})) - - client, err := elastic.NewClient(opts...) - if err != nil { - return nil, err - } - - indexer := &ElasticSearchIndexer{ - client: client, - indexerName: indexerName, - available: true, - stopTimer: make(chan struct{}), - } - - ticker := time.NewTicker(10 * time.Second) - go func() { - for { - select { - case <-ticker.C: - indexer.checkAvailability() - case <-indexer.stopTimer: - ticker.Stop() - return - } - } - }() - - return indexer, nil -} - -const ( - defaultMapping = `{ - "mappings": { - "properties": { - "id": { - "type": "integer", - "index": true - }, - "repo_id": { - "type": "integer", - "index": true - }, - "title": { - "type": "text", - "index": true - }, - "content": { - "type": "text", - "index": true - }, - "comments": { - "type" : "text", - "index": true - } - } - } - }` -) - -// Init will initialize the indexer -func (b *ElasticSearchIndexer) Init() (bool, error) { - ctx := graceful.GetManager().HammerContext() - exists, err := b.client.IndexExists(b.indexerName).Do(ctx) - if err != nil { - return false, b.checkError(err) - } - - if !exists { - mapping := defaultMapping - - createIndex, err := b.client.CreateIndex(b.indexerName).BodyString(mapping).Do(ctx) - if err != nil { - return false, b.checkError(err) - } - if !createIndex.Acknowledged { - return false, errors.New("init failed") - } - - return false, nil - } - return true, nil -} - -// Ping checks if elastic is available -func (b *ElasticSearchIndexer) Ping() bool { - b.lock.RLock() - defer b.lock.RUnlock() - return b.available -} - -// Index will save the index data -func (b *ElasticSearchIndexer) Index(issues []*IndexerData) error { - if len(issues) == 0 { - return nil - } else if len(issues) == 1 { - issue := issues[0] - _, err := b.client.Index(). - Index(b.indexerName). - Id(fmt.Sprintf("%d", issue.ID)). - BodyJson(map[string]interface{}{ - "id": issue.ID, - "repo_id": issue.RepoID, - "title": issue.Title, - "content": issue.Content, - "comments": issue.Comments, - }). - Do(graceful.GetManager().HammerContext()) - return b.checkError(err) - } - - reqs := make([]elastic.BulkableRequest, 0) - for _, issue := range issues { - reqs = append(reqs, - elastic.NewBulkIndexRequest(). - Index(b.indexerName). - Id(fmt.Sprintf("%d", issue.ID)). - Doc(map[string]interface{}{ - "id": issue.ID, - "repo_id": issue.RepoID, - "title": issue.Title, - "content": issue.Content, - "comments": issue.Comments, - }), - ) - } - - _, err := b.client.Bulk(). - Index(b.indexerName). - Add(reqs...). - Do(graceful.GetManager().HammerContext()) - return b.checkError(err) -} - -// Delete deletes indexes by ids -func (b *ElasticSearchIndexer) Delete(ids ...int64) error { - if len(ids) == 0 { - return nil - } else if len(ids) == 1 { - _, err := b.client.Delete(). - Index(b.indexerName). - Id(fmt.Sprintf("%d", ids[0])). - Do(graceful.GetManager().HammerContext()) - return b.checkError(err) - } - - reqs := make([]elastic.BulkableRequest, 0) - for _, id := range ids { - reqs = append(reqs, - elastic.NewBulkDeleteRequest(). - Index(b.indexerName). - Id(fmt.Sprintf("%d", id)), - ) - } - - _, err := b.client.Bulk(). - Index(b.indexerName). - Add(reqs...). - Do(graceful.GetManager().HammerContext()) - return b.checkError(err) -} - -// Search searches for issues by given conditions. -// Returns the matching issue IDs -func (b *ElasticSearchIndexer) Search(ctx context.Context, keyword string, repoIDs []int64, limit, start int) (*SearchResult, error) { - kwQuery := elastic.NewMultiMatchQuery(keyword, "title", "content", "comments") - query := elastic.NewBoolQuery() - query = query.Must(kwQuery) - if len(repoIDs) > 0 { - repoStrs := make([]interface{}, 0, len(repoIDs)) - for _, repoID := range repoIDs { - repoStrs = append(repoStrs, repoID) - } - repoQuery := elastic.NewTermsQuery("repo_id", repoStrs...) - query = query.Must(repoQuery) - } - searchResult, err := b.client.Search(). - Index(b.indexerName). - Query(query). - Sort("_score", false). - From(start).Size(limit). - Do(ctx) - if err != nil { - return nil, b.checkError(err) - } - - hits := make([]Match, 0, limit) - for _, hit := range searchResult.Hits.Hits { - id, _ := strconv.ParseInt(hit.Id, 10, 64) - hits = append(hits, Match{ - ID: id, - }) - } - - return &SearchResult{ - Total: searchResult.TotalHits(), - Hits: hits, - }, nil -} - -// Close implements indexer -func (b *ElasticSearchIndexer) Close() { - select { - case <-b.stopTimer: - default: - close(b.stopTimer) - } -} - -func (b *ElasticSearchIndexer) checkError(err error) error { - var opErr *net.OpError - if !(elastic.IsConnErr(err) || (errors.As(err, &opErr) && (opErr.Op == "dial" || opErr.Op == "read"))) { - return err - } - - b.setAvailability(false) - - return err -} - -func (b *ElasticSearchIndexer) checkAvailability() { - if b.Ping() { - return - } - - // Request cluster state to check if elastic is available again - _, err := b.client.ClusterState().Do(graceful.GetManager().ShutdownContext()) - if err != nil { - b.setAvailability(false) - return - } - - b.setAvailability(true) -} - -func (b *ElasticSearchIndexer) setAvailability(available bool) { - b.lock.Lock() - defer b.lock.Unlock() - - if b.available == available { - return - } - - b.available = available -} diff --git a/modules/indexer/issues/elasticsearch/elasticsearch.go b/modules/indexer/issues/elasticsearch/elasticsearch.go new file mode 100644 index 0000000000..33a7dfc21e --- /dev/null +++ b/modules/indexer/issues/elasticsearch/elasticsearch.go @@ -0,0 +1,177 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package elasticsearch + +import ( + "context" + "fmt" + "strconv" + + "code.gitea.io/gitea/modules/graceful" + indexer_internal "code.gitea.io/gitea/modules/indexer/internal" + inner_elasticsearch "code.gitea.io/gitea/modules/indexer/internal/elasticsearch" + "code.gitea.io/gitea/modules/indexer/issues/internal" + + "github.com/olivere/elastic/v7" +) + +const ( + issueIndexerLatestVersion = 0 +) + +var _ internal.Indexer = &Indexer{} + +// Indexer implements Indexer interface +type Indexer struct { + inner *inner_elasticsearch.Indexer + indexer_internal.Indexer // do not composite inner_elasticsearch.Indexer directly to avoid exposing too much +} + +// NewIndexer creates a new elasticsearch indexer +func NewIndexer(url, indexerName string) *Indexer { + inner := inner_elasticsearch.NewIndexer(url, indexerName, issueIndexerLatestVersion, defaultMapping) + indexer := &Indexer{ + inner: inner, + Indexer: inner, + } + return indexer +} + +const ( + defaultMapping = `{ + "mappings": { + "properties": { + "id": { + "type": "integer", + "index": true + }, + "repo_id": { + "type": "integer", + "index": true + }, + "title": { + "type": "text", + "index": true + }, + "content": { + "type": "text", + "index": true + }, + "comments": { + "type" : "text", + "index": true + } + } + } + }` +) + +// Index will save the index data +func (b *Indexer) Index(ctx context.Context, issues []*internal.IndexerData) error { + if len(issues) == 0 { + return nil + } else if len(issues) == 1 { + issue := issues[0] + _, err := b.inner.Client.Index(). + Index(b.inner.VersionedIndexName()). + Id(fmt.Sprintf("%d", issue.ID)). + BodyJson(map[string]interface{}{ + "id": issue.ID, + "repo_id": issue.RepoID, + "title": issue.Title, + "content": issue.Content, + "comments": issue.Comments, + }). + Do(ctx) + return err + } + + reqs := make([]elastic.BulkableRequest, 0) + for _, issue := range issues { + reqs = append(reqs, + elastic.NewBulkIndexRequest(). + Index(b.inner.VersionedIndexName()). + Id(fmt.Sprintf("%d", issue.ID)). + Doc(map[string]interface{}{ + "id": issue.ID, + "repo_id": issue.RepoID, + "title": issue.Title, + "content": issue.Content, + "comments": issue.Comments, + }), + ) + } + + _, err := b.inner.Client.Bulk(). + Index(b.inner.VersionedIndexName()). + Add(reqs...). + Do(graceful.GetManager().HammerContext()) + return err +} + +// Delete deletes indexes by ids +func (b *Indexer) Delete(ctx context.Context, ids ...int64) error { + if len(ids) == 0 { + return nil + } else if len(ids) == 1 { + _, err := b.inner.Client.Delete(). + Index(b.inner.VersionedIndexName()). + Id(fmt.Sprintf("%d", ids[0])). + Do(ctx) + return err + } + + reqs := make([]elastic.BulkableRequest, 0) + for _, id := range ids { + reqs = append(reqs, + elastic.NewBulkDeleteRequest(). + Index(b.inner.VersionedIndexName()). + Id(fmt.Sprintf("%d", id)), + ) + } + + _, err := b.inner.Client.Bulk(). + Index(b.inner.VersionedIndexName()). + Add(reqs...). + Do(graceful.GetManager().HammerContext()) + return err +} + +// Search searches for issues by given conditions. +// Returns the matching issue IDs +func (b *Indexer) Search(ctx context.Context, keyword string, repoIDs []int64, limit, start int) (*internal.SearchResult, error) { + kwQuery := elastic.NewMultiMatchQuery(keyword, "title", "content", "comments") + query := elastic.NewBoolQuery() + query = query.Must(kwQuery) + if len(repoIDs) > 0 { + repoStrs := make([]interface{}, 0, len(repoIDs)) + for _, repoID := range repoIDs { + repoStrs = append(repoStrs, repoID) + } + repoQuery := elastic.NewTermsQuery("repo_id", repoStrs...) + query = query.Must(repoQuery) + } + searchResult, err := b.inner.Client.Search(). + Index(b.inner.VersionedIndexName()). + Query(query). + Sort("_score", false). + From(start).Size(limit). + Do(ctx) + if err != nil { + return nil, err + } + + hits := make([]internal.Match, 0, limit) + for _, hit := range searchResult.Hits.Hits { + id, _ := strconv.ParseInt(hit.Id, 10, 64) + hits = append(hits, internal.Match{ + ID: id, + }) + } + + return &internal.SearchResult{ + Total: searchResult.TotalHits(), + Hits: hits, + }, nil +} diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index f36ea10935..9e2f13371e 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -5,16 +5,20 @@ package issues import ( "context" - "fmt" "os" "runtime/pprof" - "sync" + "sync/atomic" "time" - "code.gitea.io/gitea/models/db" + db_model "code.gitea.io/gitea/models/db" issues_model "code.gitea.io/gitea/models/issues" repo_model "code.gitea.io/gitea/models/repo" "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/indexer/issues/bleve" + "code.gitea.io/gitea/modules/indexer/issues/db" + "code.gitea.io/gitea/modules/indexer/issues/elasticsearch" + "code.gitea.io/gitea/modules/indexer/issues/internal" + "code.gitea.io/gitea/modules/indexer/issues/meilisearch" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/process" "code.gitea.io/gitea/modules/queue" @@ -22,81 +26,22 @@ import ( "code.gitea.io/gitea/modules/util" ) -// IndexerData data stored in the issue indexer -type IndexerData struct { - ID int64 `json:"id"` - RepoID int64 `json:"repo_id"` - Title string `json:"title"` - Content string `json:"content"` - Comments []string `json:"comments"` - IsDelete bool `json:"is_delete"` - IDs []int64 `json:"ids"` -} - -// Match represents on search result -type Match struct { - ID int64 `json:"id"` - Score float64 `json:"score"` -} - -// SearchResult represents search results -type SearchResult struct { - Total int64 - Hits []Match -} - -// Indexer defines an interface to indexer issues contents -type Indexer interface { - Init() (bool, error) - Ping() bool - Index(issue []*IndexerData) error - Delete(ids ...int64) error - Search(ctx context.Context, kw string, repoIDs []int64, limit, start int) (*SearchResult, error) - Close() -} - -type indexerHolder struct { - indexer Indexer - mutex sync.RWMutex - cond *sync.Cond - cancelled bool -} - -func newIndexerHolder() *indexerHolder { - h := &indexerHolder{} - h.cond = sync.NewCond(h.mutex.RLocker()) - return h -} - -func (h *indexerHolder) cancel() { - h.mutex.Lock() - defer h.mutex.Unlock() - h.cancelled = true - h.cond.Broadcast() -} - -func (h *indexerHolder) set(indexer Indexer) { - h.mutex.Lock() - defer h.mutex.Unlock() - h.indexer = indexer - h.cond.Broadcast() -} - -func (h *indexerHolder) get() Indexer { - h.mutex.RLock() - defer h.mutex.RUnlock() - if h.indexer == nil && !h.cancelled { - h.cond.Wait() - } - return h.indexer -} - var ( // issueIndexerQueue queue of issue ids to be updated - issueIndexerQueue *queue.WorkerPoolQueue[*IndexerData] - holder = newIndexerHolder() + issueIndexerQueue *queue.WorkerPoolQueue[*internal.IndexerData] + // globalIndexer is the global indexer, it cannot be nil. + // When the real indexer is not ready, it will be a dummy indexer which will return error to explain it's not ready. + // So it's always safe use it as *globalIndexer.Load() and call its methods. + globalIndexer atomic.Pointer[internal.Indexer] + dummyIndexer *internal.Indexer ) +func init() { + i := internal.NewDummyIndexer() + dummyIndexer = &i + globalIndexer.Store(dummyIndexer) +} + // InitIssueIndexer initialize issue indexer, syncReindex is true then reindex until // all issue index done. func InitIssueIndexer(syncReindex bool) { @@ -107,33 +52,23 @@ func InitIssueIndexer(syncReindex bool) { // Create the Queue switch setting.Indexer.IssueType { case "bleve", "elasticsearch", "meilisearch": - handler := func(items ...*IndexerData) (unhandled []*IndexerData) { - indexer := holder.get() - if indexer == nil { - log.Warn("Issue indexer handler: indexer is not ready, retry later.") - return items - } - toIndex := make([]*IndexerData, 0, len(items)) + handler := func(items ...*internal.IndexerData) (unhandled []*internal.IndexerData) { + indexer := *globalIndexer.Load() + toIndex := make([]*internal.IndexerData, 0, len(items)) for _, indexerData := range items { log.Trace("IndexerData Process: %d %v %t", indexerData.ID, indexerData.IDs, indexerData.IsDelete) if indexerData.IsDelete { - if err := indexer.Delete(indexerData.IDs...); err != nil { + if err := indexer.Delete(ctx, indexerData.IDs...); err != nil { log.Error("Issue indexer handler: failed to from index: %v Error: %v", indexerData.IDs, err) - if !indexer.Ping() { - log.Error("Issue indexer handler: indexer is unavailable when deleting") - unhandled = append(unhandled, indexerData) - } + unhandled = append(unhandled, indexerData) } continue } toIndex = append(toIndex, indexerData) } - if err := indexer.Index(toIndex); err != nil { + if err := indexer.Index(ctx, toIndex); err != nil { log.Error("Error whilst indexing: %v Error: %v", toIndex, err) - if !indexer.Ping() { - log.Error("Issue indexer handler: indexer is unavailable when indexing") - unhandled = append(unhandled, toIndex...) - } + unhandled = append(unhandled, toIndex...) } return unhandled } @@ -144,7 +79,7 @@ func InitIssueIndexer(syncReindex bool) { log.Fatal("Unable to create issue indexer queue") } default: - issueIndexerQueue = queue.CreateSimpleQueue[*IndexerData](ctx, "issue_indexer", nil) + issueIndexerQueue = queue.CreateSimpleQueue[*internal.IndexerData](ctx, "issue_indexer", nil) } graceful.GetManager().RunAtTerminate(finished) @@ -154,7 +89,11 @@ func InitIssueIndexer(syncReindex bool) { pprof.SetGoroutineLabels(ctx) start := time.Now() log.Info("PID %d: Initializing Issue Indexer: %s", os.Getpid(), setting.Indexer.IssueType) - var populate bool + var ( + issueIndexer internal.Indexer + existed bool + err error + ) switch setting.Indexer.IssueType { case "bleve": defer func() { @@ -162,62 +101,45 @@ func InitIssueIndexer(syncReindex bool) { log.Error("PANIC whilst initializing issue indexer: %v\nStacktrace: %s", err, log.Stack(2)) log.Error("The indexer files are likely corrupted and may need to be deleted") log.Error("You can completely remove the %q directory to make Gitea recreate the indexes", setting.Indexer.IssuePath) - holder.cancel() + globalIndexer.Store(dummyIndexer) log.Fatal("PID: %d Unable to initialize the Bleve Issue Indexer at path: %s Error: %v", os.Getpid(), setting.Indexer.IssuePath, err) } }() - issueIndexer := NewBleveIndexer(setting.Indexer.IssuePath) - exist, err := issueIndexer.Init() + issueIndexer = bleve.NewIndexer(setting.Indexer.IssuePath) + existed, err = issueIndexer.Init(ctx) if err != nil { - holder.cancel() log.Fatal("Unable to initialize Bleve Issue Indexer at path: %s Error: %v", setting.Indexer.IssuePath, err) } - populate = !exist - holder.set(issueIndexer) - graceful.GetManager().RunAtTerminate(func() { - log.Debug("Closing issue indexer") - issueIndexer := holder.get() - if issueIndexer != nil { - issueIndexer.Close() - } - log.Info("PID: %d Issue Indexer closed", os.Getpid()) - }) - log.Debug("Created Bleve Indexer") case "elasticsearch": - issueIndexer, err := NewElasticSearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueIndexerName) - if err != nil { - log.Fatal("Unable to initialize Elastic Search Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err) - } - exist, err := issueIndexer.Init() + issueIndexer = elasticsearch.NewIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueIndexerName) + existed, err = issueIndexer.Init(ctx) if err != nil { log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err) } - populate = !exist - holder.set(issueIndexer) case "db": - issueIndexer := &DBIndexer{} - holder.set(issueIndexer) + issueIndexer = db.NewIndexer() case "meilisearch": - issueIndexer, err := NewMeilisearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueConnAuth, setting.Indexer.IssueIndexerName) - if err != nil { - log.Fatal("Unable to initialize Meilisearch Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err) - } - exist, err := issueIndexer.Init() + issueIndexer = meilisearch.NewIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueConnAuth, setting.Indexer.IssueIndexerName) + existed, err = issueIndexer.Init(ctx) if err != nil { log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err) } - populate = !exist - holder.set(issueIndexer) default: - holder.cancel() log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType) } + globalIndexer.Store(&issueIndexer) + + graceful.GetManager().RunAtTerminate(func() { + log.Debug("Closing issue indexer") + (*globalIndexer.Load()).Close() + log.Info("PID: %d Issue Indexer closed", os.Getpid()) + }) // Start processing the queue go graceful.GetManager().RunWithCancel(issueIndexerQueue) // Populate the index - if populate { + if !existed { if syncReindex { graceful.GetManager().RunWithShutdownContext(populateIssueIndexer) } else { @@ -266,8 +188,8 @@ func populateIssueIndexer(ctx context.Context) { default: } repos, _, err := repo_model.SearchRepositoryByName(ctx, &repo_model.SearchRepoOptions{ - ListOptions: db.ListOptions{Page: page, PageSize: repo_model.RepositoryListDefaultPageSize}, - OrderBy: db.SearchOrderByID, + ListOptions: db_model.ListOptions{Page: page, PageSize: repo_model.RepositoryListDefaultPageSize}, + OrderBy: db_model.SearchOrderByID, Private: true, Collaborate: util.OptionalBoolFalse, }) @@ -320,7 +242,7 @@ func UpdateIssueIndexer(issue *issues_model.Issue) { comments = append(comments, comment.Content) } } - indexerData := &IndexerData{ + indexerData := &internal.IndexerData{ ID: issue.ID, RepoID: issue.RepoID, Title: issue.Title, @@ -345,7 +267,7 @@ func DeleteRepoIssueIndexer(ctx context.Context, repo *repo_model.Repository) { if len(ids) == 0 { return } - indexerData := &IndexerData{ + indexerData := &internal.IndexerData{ IDs: ids, IsDelete: true, } @@ -358,12 +280,7 @@ func DeleteRepoIssueIndexer(ctx context.Context, repo *repo_model.Repository) { // WARNNING: You have to ensure user have permission to visit repoIDs' issues func SearchIssuesByKeyword(ctx context.Context, repoIDs []int64, keyword string) ([]int64, error) { var issueIDs []int64 - indexer := holder.get() - - if indexer == nil { - log.Error("SearchIssuesByKeyword(): unable to get indexer!") - return nil, fmt.Errorf("unable to get issue indexer") - } + indexer := *globalIndexer.Load() res, err := indexer.Search(ctx, keyword, repoIDs, 50, 0) if err != nil { return nil, err @@ -375,12 +292,6 @@ func SearchIssuesByKeyword(ctx context.Context, repoIDs []int64, keyword string) } // IsAvailable checks if issue indexer is available -func IsAvailable() bool { - indexer := holder.get() - if indexer == nil { - log.Error("IsAvailable(): unable to get indexer!") - return false - } - - return indexer.Ping() +func IsAvailable(ctx context.Context) bool { + return (*globalIndexer.Load()).Ping(ctx) == nil } diff --git a/modules/indexer/issues/indexer_test.go b/modules/indexer/issues/indexer_test.go index a2d1794f4b..5962a4ee9c 100644 --- a/modules/indexer/issues/indexer_test.go +++ b/modules/indexer/issues/indexer_test.go @@ -11,6 +11,7 @@ import ( "time" "code.gitea.io/gitea/models/unittest" + "code.gitea.io/gitea/modules/indexer/issues/bleve" "code.gitea.io/gitea/modules/setting" _ "code.gitea.io/gitea/models" @@ -42,8 +43,7 @@ func TestBleveSearchIssues(t *testing.T) { setting.LoadQueueSettings() InitIssueIndexer(true) defer func() { - indexer := holder.get() - if bleveIndexer, ok := indexer.(*BleveIndexer); ok { + if bleveIndexer, ok := (*globalIndexer.Load()).(*bleve.Indexer); ok { bleveIndexer.Close() } }() diff --git a/modules/indexer/issues/internal/indexer.go b/modules/indexer/issues/internal/indexer.go new file mode 100644 index 0000000000..553c8a573c --- /dev/null +++ b/modules/indexer/issues/internal/indexer.go @@ -0,0 +1,42 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package internal + +import ( + "context" + "fmt" + + "code.gitea.io/gitea/modules/indexer/internal" +) + +// Indexer defines an interface to indexer issues contents +type Indexer interface { + internal.Indexer + Index(ctx context.Context, issue []*IndexerData) error + Delete(ctx context.Context, ids ...int64) error + Search(ctx context.Context, kw string, repoIDs []int64, limit, start int) (*SearchResult, error) +} + +// NewDummyIndexer returns a dummy indexer +func NewDummyIndexer() Indexer { + return &dummyIndexer{ + Indexer: internal.NewDummyIndexer(), + } +} + +type dummyIndexer struct { + internal.Indexer +} + +func (d *dummyIndexer) Index(ctx context.Context, issue []*IndexerData) error { + return fmt.Errorf("indexer is not ready") +} + +func (d *dummyIndexer) Delete(ctx context.Context, ids ...int64) error { + return fmt.Errorf("indexer is not ready") +} + +func (d *dummyIndexer) Search(ctx context.Context, kw string, repoIDs []int64, limit, start int) (*SearchResult, error) { + return nil, fmt.Errorf("indexer is not ready") +} diff --git a/modules/indexer/issues/internal/model.go b/modules/indexer/issues/internal/model.go new file mode 100644 index 0000000000..8c206fc1cf --- /dev/null +++ b/modules/indexer/issues/internal/model.go @@ -0,0 +1,27 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package internal + +// IndexerData data stored in the issue indexer +type IndexerData struct { + ID int64 `json:"id"` + RepoID int64 `json:"repo_id"` + Title string `json:"title"` + Content string `json:"content"` + Comments []string `json:"comments"` + IsDelete bool `json:"is_delete"` + IDs []int64 `json:"ids"` +} + +// Match represents on search result +type Match struct { + ID int64 `json:"id"` + Score float64 `json:"score"` +} + +// SearchResult represents search results +type SearchResult struct { + Total int64 + Hits []Match +} diff --git a/modules/indexer/issues/meilisearch.go b/modules/indexer/issues/meilisearch.go deleted file mode 100644 index 990bc57a05..0000000000 --- a/modules/indexer/issues/meilisearch.go +++ /dev/null @@ -1,173 +0,0 @@ -// Copyright 2023 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package issues - -import ( - "context" - "strconv" - "strings" - "sync" - "time" - - "github.com/meilisearch/meilisearch-go" -) - -var _ Indexer = &MeilisearchIndexer{} - -// MeilisearchIndexer implements Indexer interface -type MeilisearchIndexer struct { - client *meilisearch.Client - indexerName string - available bool - stopTimer chan struct{} - lock sync.RWMutex -} - -// MeilisearchIndexer creates a new meilisearch indexer -func NewMeilisearchIndexer(url, apiKey, indexerName string) (*MeilisearchIndexer, error) { - client := meilisearch.NewClient(meilisearch.ClientConfig{ - Host: url, - APIKey: apiKey, - }) - - indexer := &MeilisearchIndexer{ - client: client, - indexerName: indexerName, - available: true, - stopTimer: make(chan struct{}), - } - - ticker := time.NewTicker(10 * time.Second) - go func() { - for { - select { - case <-ticker.C: - indexer.checkAvailability() - case <-indexer.stopTimer: - ticker.Stop() - return - } - } - }() - - return indexer, nil -} - -// Init will initialize the indexer -func (b *MeilisearchIndexer) Init() (bool, error) { - _, err := b.client.GetIndex(b.indexerName) - if err == nil { - return true, nil - } - _, err = b.client.CreateIndex(&meilisearch.IndexConfig{ - Uid: b.indexerName, - PrimaryKey: "id", - }) - if err != nil { - return false, b.checkError(err) - } - - _, err = b.client.Index(b.indexerName).UpdateFilterableAttributes(&[]string{"repo_id"}) - return false, b.checkError(err) -} - -// Ping checks if meilisearch is available -func (b *MeilisearchIndexer) Ping() bool { - b.lock.RLock() - defer b.lock.RUnlock() - return b.available -} - -// Index will save the index data -func (b *MeilisearchIndexer) Index(issues []*IndexerData) error { - if len(issues) == 0 { - return nil - } - for _, issue := range issues { - _, err := b.client.Index(b.indexerName).AddDocuments(issue) - if err != nil { - return b.checkError(err) - } - } - // TODO: bulk send index data - return nil -} - -// Delete deletes indexes by ids -func (b *MeilisearchIndexer) Delete(ids ...int64) error { - if len(ids) == 0 { - return nil - } - - for _, id := range ids { - _, err := b.client.Index(b.indexerName).DeleteDocument(strconv.FormatInt(id, 10)) - if err != nil { - return b.checkError(err) - } - } - // TODO: bulk send deletes - return nil -} - -// Search searches for issues by given conditions. -// Returns the matching issue IDs -func (b *MeilisearchIndexer) Search(ctx context.Context, keyword string, repoIDs []int64, limit, start int) (*SearchResult, error) { - repoFilters := make([]string, 0, len(repoIDs)) - for _, repoID := range repoIDs { - repoFilters = append(repoFilters, "repo_id = "+strconv.FormatInt(repoID, 10)) - } - filter := strings.Join(repoFilters, " OR ") - searchRes, err := b.client.Index(b.indexerName).Search(keyword, &meilisearch.SearchRequest{ - Filter: filter, - Limit: int64(limit), - Offset: int64(start), - }) - if err != nil { - return nil, b.checkError(err) - } - - hits := make([]Match, 0, len(searchRes.Hits)) - for _, hit := range searchRes.Hits { - hits = append(hits, Match{ - ID: int64(hit.(map[string]interface{})["id"].(float64)), - }) - } - return &SearchResult{ - Total: searchRes.TotalHits, - Hits: hits, - }, nil -} - -// Close implements indexer -func (b *MeilisearchIndexer) Close() { - select { - case <-b.stopTimer: - default: - close(b.stopTimer) - } -} - -func (b *MeilisearchIndexer) checkError(err error) error { - return err -} - -func (b *MeilisearchIndexer) checkAvailability() { - _, err := b.client.Health() - if err != nil { - b.setAvailability(false) - return - } - b.setAvailability(true) -} - -func (b *MeilisearchIndexer) setAvailability(available bool) { - b.lock.Lock() - defer b.lock.Unlock() - - if b.available == available { - return - } - - b.available = available -} diff --git a/modules/indexer/issues/meilisearch/meilisearch.go b/modules/indexer/issues/meilisearch/meilisearch.go new file mode 100644 index 0000000000..877c04f1dc --- /dev/null +++ b/modules/indexer/issues/meilisearch/meilisearch.go @@ -0,0 +1,98 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package meilisearch + +import ( + "context" + "strconv" + "strings" + + indexer_internal "code.gitea.io/gitea/modules/indexer/internal" + inner_meilisearch "code.gitea.io/gitea/modules/indexer/internal/meilisearch" + "code.gitea.io/gitea/modules/indexer/issues/internal" + + "github.com/meilisearch/meilisearch-go" +) + +const ( + issueIndexerLatestVersion = 0 +) + +var _ internal.Indexer = &Indexer{} + +// Indexer implements Indexer interface +type Indexer struct { + inner *inner_meilisearch.Indexer + indexer_internal.Indexer // do not composite inner_meilisearch.Indexer directly to avoid exposing too much +} + +// NewIndexer creates a new meilisearch indexer +func NewIndexer(url, apiKey, indexerName string) *Indexer { + inner := inner_meilisearch.NewIndexer(url, apiKey, indexerName, issueIndexerLatestVersion) + indexer := &Indexer{ + inner: inner, + Indexer: inner, + } + return indexer +} + +// Index will save the index data +func (b *Indexer) Index(_ context.Context, issues []*internal.IndexerData) error { + if len(issues) == 0 { + return nil + } + for _, issue := range issues { + _, err := b.inner.Client.Index(b.inner.VersionedIndexName()).AddDocuments(issue) + if err != nil { + return err + } + } + // TODO: bulk send index data + return nil +} + +// Delete deletes indexes by ids +func (b *Indexer) Delete(_ context.Context, ids ...int64) error { + if len(ids) == 0 { + return nil + } + + for _, id := range ids { + _, err := b.inner.Client.Index(b.inner.VersionedIndexName()).DeleteDocument(strconv.FormatInt(id, 10)) + if err != nil { + return err + } + } + // TODO: bulk send deletes + return nil +} + +// Search searches for issues by given conditions. +// Returns the matching issue IDs +func (b *Indexer) Search(ctx context.Context, keyword string, repoIDs []int64, limit, start int) (*internal.SearchResult, error) { + repoFilters := make([]string, 0, len(repoIDs)) + for _, repoID := range repoIDs { + repoFilters = append(repoFilters, "repo_id = "+strconv.FormatInt(repoID, 10)) + } + filter := strings.Join(repoFilters, " OR ") + searchRes, err := b.inner.Client.Index(b.inner.VersionedIndexName()).Search(keyword, &meilisearch.SearchRequest{ + Filter: filter, + Limit: int64(limit), + Offset: int64(start), + }) + if err != nil { + return nil, err + } + + hits := make([]internal.Match, 0, len(searchRes.Hits)) + for _, hit := range searchRes.Hits { + hits = append(hits, internal.Match{ + ID: int64(hit.(map[string]interface{})["id"].(float64)), + }) + } + return &internal.SearchResult{ + Total: searchRes.TotalHits, + Hits: hits, + }, nil +} diff --git a/modules/indexer/stats/indexer.go b/modules/indexer/stats/indexer.go index 1c01e25e29..6bfa8bdedb 100644 --- a/modules/indexer/stats/indexer.go +++ b/modules/indexer/stats/indexer.go @@ -11,6 +11,7 @@ import ( ) // Indexer defines an interface to index repository stats +// TODO: this indexer is quite different from the others, maybe this package should be moved out from module/indexer type Indexer interface { Index(id int64) error Close() |