diff options
author | Jason Song <i@wolfogre.com> | 2023-06-23 20:37:56 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-06-23 12:37:56 +0000 |
commit | 375fd15fbfc29787323840688bacdfa0a9e9e414 (patch) | |
tree | 69c82fdad0fad24e25d2b6ddd2e5cb7766b9768b /modules/indexer/code/indexer.go | |
parent | b0215c40cdf9a3e46a45a3823b894998d1044cda (diff) | |
download | gitea-375fd15fbfc29787323840688bacdfa0a9e9e414.tar.gz gitea-375fd15fbfc29787323840688bacdfa0a9e9e414.zip |
Refactor indexer (#25174)
Refactor `modules/indexer` to make it more maintainable. And it can be
easier to support more features. I'm trying to solve some of issue
searching, this is a precursor to making functional changes.
Current supported engines and the index versions:
| engines | issues | code |
| - | - | - |
| db | Just a wrapper for database queries, doesn't need version | - |
| bleve | The version of index is **2** | The version of index is **6**
|
| elasticsearch | The old index has no version, will be treated as
version **0** in this PR | The version of index is **1** |
| meilisearch | The old index has no version, will be treated as version
**0** in this PR | - |
## Changes
### Split
Splited it into mutiple packages
```text
indexer
├── internal
│ ├── bleve
│ ├── db
│ ├── elasticsearch
│ └── meilisearch
├── code
│ ├── bleve
│ ├── elasticsearch
│ └── internal
└── issues
├── bleve
├── db
├── elasticsearch
├── internal
└── meilisearch
```
- `indexer/interanal`: Internal shared package for indexer.
- `indexer/interanal/[engine]`: Internal shared package for each engine
(bleve/db/elasticsearch/meilisearch).
- `indexer/code`: Implementations for code indexer.
- `indexer/code/internal`: Internal shared package for code indexer.
- `indexer/code/[engine]`: Implementation via each engine for code
indexer.
- `indexer/issues`: Implementations for issues indexer.
### Deduplication
- Combine `Init/Ping/Close` for code indexer and issues indexer.
- ~Combine `issues.indexerHolder` and `code.wrappedIndexer` to
`internal.IndexHolder`.~ Remove it, use dummy indexer instead when the
indexer is not ready.
- Duplicate two copies of creating ES clients.
- Duplicate two copies of `indexerID()`.
### Enhancement
- [x] Support index version for elasticsearch issues indexer, the old
index without version will be treated as version 0.
- [x] Fix spell of `elastic_search/ElasticSearch`, it should be
`Elasticsearch`.
- [x] Improve versioning of ES index. We don't need `Aliases`:
- Gitea does't need aliases for "Zero Downtime" because it never delete
old indexes.
- The old code of issues indexer uses the orignal name to create issue
index, so it's tricky to convert it to an alias.
- [x] Support index version for meilisearch issues indexer, the old
index without version will be treated as version 0.
- [x] Do "ping" only when `Ping` has been called, don't ping
periodically and cache the status.
- [x] Support the context parameter whenever possible.
- [x] Fix outdated example config.
- [x] Give up the requeue logic of issues indexer: When indexing fails,
call Ping to check if it was caused by the engine being unavailable, and
only requeue the task if the engine is unavailable.
- It is fragile and tricky, could cause data losing (It did happen when
I was doing some tests for this PR). And it works for ES only.
- Just always requeue the failed task, if it caused by bad data, it's a
bug of Gitea which should be fixed.
---------
Co-authored-by: Giteabot <teabot@gitea.io>
Diffstat (limited to 'modules/indexer/code/indexer.go')
-rw-r--r-- | modules/indexer/code/indexer.go | 145 |
1 files changed, 47 insertions, 98 deletions
diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go index f38fd6000c..13d06874c9 100644 --- a/modules/indexer/code/indexer.go +++ b/modules/indexer/code/indexer.go @@ -7,86 +7,41 @@ import ( "context" "os" "runtime/pprof" - "strconv" - "strings" + "sync/atomic" "time" "code.gitea.io/gitea/models/db" repo_model "code.gitea.io/gitea/models/repo" "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/indexer/code/bleve" + "code.gitea.io/gitea/modules/indexer/code/elasticsearch" + "code.gitea.io/gitea/modules/indexer/code/internal" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/process" "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" - "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/util" ) -// SearchResult result of performing a search in a repo -type SearchResult struct { - RepoID int64 - StartIndex int - EndIndex int - Filename string - Content string - CommitID string - UpdatedUnix timeutil.TimeStamp - Language string - Color string -} - -// SearchResultLanguages result of top languages count in search results -type SearchResultLanguages struct { - Language string - Color string - Count int -} - -// Indexer defines an interface to index and search code contents -type Indexer interface { - Ping() bool - Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *repoChanges) error - Delete(repoID int64) error - Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) - Close() -} - -func filenameIndexerID(repoID int64, filename string) string { - return indexerID(repoID) + "_" + filename -} - -func indexerID(id int64) string { - return strconv.FormatInt(id, 36) -} - -func parseIndexerID(indexerID string) (int64, string) { - index := strings.IndexByte(indexerID, '_') - if index == -1 { - log.Error("Unexpected ID in repo indexer: %s", indexerID) - } - repoID, _ := strconv.ParseInt(indexerID[:index], 36, 64) - return repoID, indexerID[index+1:] -} - -func filenameOfIndexerID(indexerID string) string { - index := strings.IndexByte(indexerID, '_') - if index == -1 { - log.Error("Unexpected ID in repo indexer: %s", indexerID) - } - return indexerID[index+1:] -} +var ( + indexerQueue *queue.WorkerPoolQueue[*internal.IndexerData] + // globalIndexer is the global indexer, it cannot be nil. + // When the real indexer is not ready, it will be a dummy indexer which will return error to explain it's not ready. + // So it's always safe use it as *globalIndexer.Load() and call its methods. + globalIndexer atomic.Pointer[internal.Indexer] + dummyIndexer *internal.Indexer +) -// IndexerData represents data stored in the code indexer -type IndexerData struct { - RepoID int64 +func init() { + i := internal.NewDummyIndexer() + dummyIndexer = &i + globalIndexer.Store(dummyIndexer) } -var indexerQueue *queue.WorkerPoolQueue[*IndexerData] - -func index(ctx context.Context, indexer Indexer, repoID int64) error { +func index(ctx context.Context, indexer internal.Indexer, repoID int64) error { repo, err := repo_model.GetRepositoryByID(ctx, repoID) if repo_model.IsErrRepoNotExist(err) { - return indexer.Delete(repoID) + return indexer.Delete(ctx, repoID) } if err != nil { return err @@ -139,7 +94,7 @@ func index(ctx context.Context, indexer Indexer, repoID int64) error { // Init initialize the repo indexer func Init() { if !setting.Indexer.RepoIndexerEnabled { - indexer.Close() + (*globalIndexer.Load()).Close() return } @@ -153,7 +108,7 @@ func Init() { } cancel() log.Debug("Closing repository indexer") - indexer.Close() + (*globalIndexer.Load()).Close() log.Info("PID: %d Repository Indexer closed", os.Getpid()) finished() }) @@ -163,13 +118,8 @@ func Init() { // Create the Queue switch setting.Indexer.RepoType { case "bleve", "elasticsearch": - handler := func(items ...*IndexerData) (unhandled []*IndexerData) { - idx, err := indexer.get() - if idx == nil || err != nil { - log.Warn("Codes indexer handler: indexer is not ready, retry later.") - return items - } - + handler := func(items ...*internal.IndexerData) (unhandled []*internal.IndexerData) { + indexer := *globalIndexer.Load() for _, indexerData := range items { log.Trace("IndexerData Process Repo: %d", indexerData.RepoID) @@ -188,11 +138,7 @@ func Init() { code.gitea.io/gitea/modules/indexer/code.index(indexer.go:105) */ if err := index(ctx, indexer, indexerData.RepoID); err != nil { - if !idx.Ping() { - log.Error("Code indexer handler: indexer is unavailable.") - unhandled = append(unhandled, indexerData) - continue - } + unhandled = append(unhandled, indexerData) if !setting.IsInTesting { log.Error("Codes indexer handler: index error for repo %v: %v", indexerData.RepoID, err) } @@ -213,8 +159,8 @@ func Init() { pprof.SetGoroutineLabels(ctx) start := time.Now() var ( - rIndexer Indexer - populate bool + rIndexer internal.Indexer + existed bool err error ) switch setting.Indexer.RepoType { @@ -228,10 +174,11 @@ func Init() { } }() - rIndexer, populate, err = NewBleveIndexer(setting.Indexer.RepoPath) + rIndexer = bleve.NewIndexer(setting.Indexer.RepoPath) + existed, err = rIndexer.Init(ctx) if err != nil { cancel() - indexer.Close() + (*globalIndexer.Load()).Close() close(waitChannel) log.Fatal("PID: %d Unable to initialize the bleve Repository Indexer at path: %s Error: %v", os.Getpid(), setting.Indexer.RepoPath, err) } @@ -245,23 +192,31 @@ func Init() { } }() - rIndexer, populate, err = NewElasticSearchIndexer(setting.Indexer.RepoConnStr, setting.Indexer.RepoIndexerName) + rIndexer = elasticsearch.NewIndexer(setting.Indexer.RepoConnStr, setting.Indexer.RepoIndexerName) + if err != nil { + cancel() + (*globalIndexer.Load()).Close() + close(waitChannel) + log.Fatal("PID: %d Unable to create the elasticsearch Repository Indexer connstr: %s Error: %v", os.Getpid(), setting.Indexer.RepoConnStr, err) + } + existed, err = rIndexer.Init(ctx) if err != nil { cancel() - indexer.Close() + (*globalIndexer.Load()).Close() close(waitChannel) log.Fatal("PID: %d Unable to initialize the elasticsearch Repository Indexer connstr: %s Error: %v", os.Getpid(), setting.Indexer.RepoConnStr, err) } + default: log.Fatal("PID: %d Unknown Indexer type: %s", os.Getpid(), setting.Indexer.RepoType) } - indexer.set(rIndexer) + globalIndexer.Store(&rIndexer) // Start processing the queue go graceful.GetManager().RunWithCancel(indexerQueue) - if populate { + if !existed { // populate the index because it's created for the first time go graceful.GetManager().RunWithShutdownContext(populateRepoIndexer) } select { @@ -283,18 +238,18 @@ func Init() { case <-graceful.GetManager().IsShutdown(): log.Warn("Shutdown before Repository Indexer completed initialization") cancel() - indexer.Close() + (*globalIndexer.Load()).Close() case duration, ok := <-waitChannel: if !ok { log.Warn("Repository Indexer Initialization failed") cancel() - indexer.Close() + (*globalIndexer.Load()).Close() return } log.Info("Repository Indexer Initialization took %v", duration) case <-time.After(timeout): cancel() - indexer.Close() + (*globalIndexer.Load()).Close() log.Fatal("Repository Indexer Initialization Timed-Out after: %v", timeout) } }() @@ -303,21 +258,15 @@ func Init() { // UpdateRepoIndexer update a repository's entries in the indexer func UpdateRepoIndexer(repo *repo_model.Repository) { - indexData := &IndexerData{RepoID: repo.ID} + indexData := &internal.IndexerData{RepoID: repo.ID} if err := indexerQueue.Push(indexData); err != nil { log.Error("Update repo index data %v failed: %v", indexData, err) } } // IsAvailable checks if issue indexer is available -func IsAvailable() bool { - idx, err := indexer.get() - if err != nil { - log.Error("IsAvailable(): unable to get indexer: %v", err) - return false - } - - return idx.Ping() +func IsAvailable(ctx context.Context) bool { + return (*globalIndexer.Load()).Ping(ctx) == nil } // populateRepoIndexer populate the repo indexer with pre-existing data. This @@ -368,7 +317,7 @@ func populateRepoIndexer(ctx context.Context) { return default: } - if err := indexerQueue.Push(&IndexerData{RepoID: id}); err != nil { + if err := indexerQueue.Push(&internal.IndexerData{RepoID: id}); err != nil { log.Error("indexerQueue.Push: %v", err) return } |