diff options
author | Jason Song <i@wolfogre.com> | 2023-06-23 20:37:56 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-06-23 12:37:56 +0000 |
commit | 375fd15fbfc29787323840688bacdfa0a9e9e414 (patch) | |
tree | 69c82fdad0fad24e25d2b6ddd2e5cb7766b9768b /modules/indexer/issues/indexer.go | |
parent | b0215c40cdf9a3e46a45a3823b894998d1044cda (diff) | |
download | gitea-375fd15fbfc29787323840688bacdfa0a9e9e414.tar.gz gitea-375fd15fbfc29787323840688bacdfa0a9e9e414.zip |
Refactor indexer (#25174)
Refactor `modules/indexer` to make it more maintainable. And it can be
easier to support more features. I'm trying to solve some of issue
searching, this is a precursor to making functional changes.
Current supported engines and the index versions:
| engines | issues | code |
| - | - | - |
| db | Just a wrapper for database queries, doesn't need version | - |
| bleve | The version of index is **2** | The version of index is **6**
|
| elasticsearch | The old index has no version, will be treated as
version **0** in this PR | The version of index is **1** |
| meilisearch | The old index has no version, will be treated as version
**0** in this PR | - |
## Changes
### Split
Splited it into mutiple packages
```text
indexer
├── internal
│ ├── bleve
│ ├── db
│ ├── elasticsearch
│ └── meilisearch
├── code
│ ├── bleve
│ ├── elasticsearch
│ └── internal
└── issues
├── bleve
├── db
├── elasticsearch
├── internal
└── meilisearch
```
- `indexer/interanal`: Internal shared package for indexer.
- `indexer/interanal/[engine]`: Internal shared package for each engine
(bleve/db/elasticsearch/meilisearch).
- `indexer/code`: Implementations for code indexer.
- `indexer/code/internal`: Internal shared package for code indexer.
- `indexer/code/[engine]`: Implementation via each engine for code
indexer.
- `indexer/issues`: Implementations for issues indexer.
### Deduplication
- Combine `Init/Ping/Close` for code indexer and issues indexer.
- ~Combine `issues.indexerHolder` and `code.wrappedIndexer` to
`internal.IndexHolder`.~ Remove it, use dummy indexer instead when the
indexer is not ready.
- Duplicate two copies of creating ES clients.
- Duplicate two copies of `indexerID()`.
### Enhancement
- [x] Support index version for elasticsearch issues indexer, the old
index without version will be treated as version 0.
- [x] Fix spell of `elastic_search/ElasticSearch`, it should be
`Elasticsearch`.
- [x] Improve versioning of ES index. We don't need `Aliases`:
- Gitea does't need aliases for "Zero Downtime" because it never delete
old indexes.
- The old code of issues indexer uses the orignal name to create issue
index, so it's tricky to convert it to an alias.
- [x] Support index version for meilisearch issues indexer, the old
index without version will be treated as version 0.
- [x] Do "ping" only when `Ping` has been called, don't ping
periodically and cache the status.
- [x] Support the context parameter whenever possible.
- [x] Fix outdated example config.
- [x] Give up the requeue logic of issues indexer: When indexing fails,
call Ping to check if it was caused by the engine being unavailable, and
only requeue the task if the engine is unavailable.
- It is fragile and tricky, could cause data losing (It did happen when
I was doing some tests for this PR). And it works for ES only.
- Just always requeue the failed task, if it caused by bad data, it's a
bug of Gitea which should be fixed.
---------
Co-authored-by: Giteabot <teabot@gitea.io>
Diffstat (limited to 'modules/indexer/issues/indexer.go')
-rw-r--r-- | modules/indexer/issues/indexer.go | 199 |
1 files changed, 55 insertions, 144 deletions
diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index f36ea10935..9e2f13371e 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -5,16 +5,20 @@ package issues import ( "context" - "fmt" "os" "runtime/pprof" - "sync" + "sync/atomic" "time" - "code.gitea.io/gitea/models/db" + db_model "code.gitea.io/gitea/models/db" issues_model "code.gitea.io/gitea/models/issues" repo_model "code.gitea.io/gitea/models/repo" "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/indexer/issues/bleve" + "code.gitea.io/gitea/modules/indexer/issues/db" + "code.gitea.io/gitea/modules/indexer/issues/elasticsearch" + "code.gitea.io/gitea/modules/indexer/issues/internal" + "code.gitea.io/gitea/modules/indexer/issues/meilisearch" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/process" "code.gitea.io/gitea/modules/queue" @@ -22,81 +26,22 @@ import ( "code.gitea.io/gitea/modules/util" ) -// IndexerData data stored in the issue indexer -type IndexerData struct { - ID int64 `json:"id"` - RepoID int64 `json:"repo_id"` - Title string `json:"title"` - Content string `json:"content"` - Comments []string `json:"comments"` - IsDelete bool `json:"is_delete"` - IDs []int64 `json:"ids"` -} - -// Match represents on search result -type Match struct { - ID int64 `json:"id"` - Score float64 `json:"score"` -} - -// SearchResult represents search results -type SearchResult struct { - Total int64 - Hits []Match -} - -// Indexer defines an interface to indexer issues contents -type Indexer interface { - Init() (bool, error) - Ping() bool - Index(issue []*IndexerData) error - Delete(ids ...int64) error - Search(ctx context.Context, kw string, repoIDs []int64, limit, start int) (*SearchResult, error) - Close() -} - -type indexerHolder struct { - indexer Indexer - mutex sync.RWMutex - cond *sync.Cond - cancelled bool -} - -func newIndexerHolder() *indexerHolder { - h := &indexerHolder{} - h.cond = sync.NewCond(h.mutex.RLocker()) - return h -} - -func (h *indexerHolder) cancel() { - h.mutex.Lock() - defer h.mutex.Unlock() - h.cancelled = true - h.cond.Broadcast() -} - -func (h *indexerHolder) set(indexer Indexer) { - h.mutex.Lock() - defer h.mutex.Unlock() - h.indexer = indexer - h.cond.Broadcast() -} - -func (h *indexerHolder) get() Indexer { - h.mutex.RLock() - defer h.mutex.RUnlock() - if h.indexer == nil && !h.cancelled { - h.cond.Wait() - } - return h.indexer -} - var ( // issueIndexerQueue queue of issue ids to be updated - issueIndexerQueue *queue.WorkerPoolQueue[*IndexerData] - holder = newIndexerHolder() + issueIndexerQueue *queue.WorkerPoolQueue[*internal.IndexerData] + // globalIndexer is the global indexer, it cannot be nil. + // When the real indexer is not ready, it will be a dummy indexer which will return error to explain it's not ready. + // So it's always safe use it as *globalIndexer.Load() and call its methods. + globalIndexer atomic.Pointer[internal.Indexer] + dummyIndexer *internal.Indexer ) +func init() { + i := internal.NewDummyIndexer() + dummyIndexer = &i + globalIndexer.Store(dummyIndexer) +} + // InitIssueIndexer initialize issue indexer, syncReindex is true then reindex until // all issue index done. func InitIssueIndexer(syncReindex bool) { @@ -107,33 +52,23 @@ func InitIssueIndexer(syncReindex bool) { // Create the Queue switch setting.Indexer.IssueType { case "bleve", "elasticsearch", "meilisearch": - handler := func(items ...*IndexerData) (unhandled []*IndexerData) { - indexer := holder.get() - if indexer == nil { - log.Warn("Issue indexer handler: indexer is not ready, retry later.") - return items - } - toIndex := make([]*IndexerData, 0, len(items)) + handler := func(items ...*internal.IndexerData) (unhandled []*internal.IndexerData) { + indexer := *globalIndexer.Load() + toIndex := make([]*internal.IndexerData, 0, len(items)) for _, indexerData := range items { log.Trace("IndexerData Process: %d %v %t", indexerData.ID, indexerData.IDs, indexerData.IsDelete) if indexerData.IsDelete { - if err := indexer.Delete(indexerData.IDs...); err != nil { + if err := indexer.Delete(ctx, indexerData.IDs...); err != nil { log.Error("Issue indexer handler: failed to from index: %v Error: %v", indexerData.IDs, err) - if !indexer.Ping() { - log.Error("Issue indexer handler: indexer is unavailable when deleting") - unhandled = append(unhandled, indexerData) - } + unhandled = append(unhandled, indexerData) } continue } toIndex = append(toIndex, indexerData) } - if err := indexer.Index(toIndex); err != nil { + if err := indexer.Index(ctx, toIndex); err != nil { log.Error("Error whilst indexing: %v Error: %v", toIndex, err) - if !indexer.Ping() { - log.Error("Issue indexer handler: indexer is unavailable when indexing") - unhandled = append(unhandled, toIndex...) - } + unhandled = append(unhandled, toIndex...) } return unhandled } @@ -144,7 +79,7 @@ func InitIssueIndexer(syncReindex bool) { log.Fatal("Unable to create issue indexer queue") } default: - issueIndexerQueue = queue.CreateSimpleQueue[*IndexerData](ctx, "issue_indexer", nil) + issueIndexerQueue = queue.CreateSimpleQueue[*internal.IndexerData](ctx, "issue_indexer", nil) } graceful.GetManager().RunAtTerminate(finished) @@ -154,7 +89,11 @@ func InitIssueIndexer(syncReindex bool) { pprof.SetGoroutineLabels(ctx) start := time.Now() log.Info("PID %d: Initializing Issue Indexer: %s", os.Getpid(), setting.Indexer.IssueType) - var populate bool + var ( + issueIndexer internal.Indexer + existed bool + err error + ) switch setting.Indexer.IssueType { case "bleve": defer func() { @@ -162,62 +101,45 @@ func InitIssueIndexer(syncReindex bool) { log.Error("PANIC whilst initializing issue indexer: %v\nStacktrace: %s", err, log.Stack(2)) log.Error("The indexer files are likely corrupted and may need to be deleted") log.Error("You can completely remove the %q directory to make Gitea recreate the indexes", setting.Indexer.IssuePath) - holder.cancel() + globalIndexer.Store(dummyIndexer) log.Fatal("PID: %d Unable to initialize the Bleve Issue Indexer at path: %s Error: %v", os.Getpid(), setting.Indexer.IssuePath, err) } }() - issueIndexer := NewBleveIndexer(setting.Indexer.IssuePath) - exist, err := issueIndexer.Init() + issueIndexer = bleve.NewIndexer(setting.Indexer.IssuePath) + existed, err = issueIndexer.Init(ctx) if err != nil { - holder.cancel() log.Fatal("Unable to initialize Bleve Issue Indexer at path: %s Error: %v", setting.Indexer.IssuePath, err) } - populate = !exist - holder.set(issueIndexer) - graceful.GetManager().RunAtTerminate(func() { - log.Debug("Closing issue indexer") - issueIndexer := holder.get() - if issueIndexer != nil { - issueIndexer.Close() - } - log.Info("PID: %d Issue Indexer closed", os.Getpid()) - }) - log.Debug("Created Bleve Indexer") case "elasticsearch": - issueIndexer, err := NewElasticSearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueIndexerName) - if err != nil { - log.Fatal("Unable to initialize Elastic Search Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err) - } - exist, err := issueIndexer.Init() + issueIndexer = elasticsearch.NewIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueIndexerName) + existed, err = issueIndexer.Init(ctx) if err != nil { log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err) } - populate = !exist - holder.set(issueIndexer) case "db": - issueIndexer := &DBIndexer{} - holder.set(issueIndexer) + issueIndexer = db.NewIndexer() case "meilisearch": - issueIndexer, err := NewMeilisearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueConnAuth, setting.Indexer.IssueIndexerName) - if err != nil { - log.Fatal("Unable to initialize Meilisearch Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err) - } - exist, err := issueIndexer.Init() + issueIndexer = meilisearch.NewIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueConnAuth, setting.Indexer.IssueIndexerName) + existed, err = issueIndexer.Init(ctx) if err != nil { log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err) } - populate = !exist - holder.set(issueIndexer) default: - holder.cancel() log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType) } + globalIndexer.Store(&issueIndexer) + + graceful.GetManager().RunAtTerminate(func() { + log.Debug("Closing issue indexer") + (*globalIndexer.Load()).Close() + log.Info("PID: %d Issue Indexer closed", os.Getpid()) + }) // Start processing the queue go graceful.GetManager().RunWithCancel(issueIndexerQueue) // Populate the index - if populate { + if !existed { if syncReindex { graceful.GetManager().RunWithShutdownContext(populateIssueIndexer) } else { @@ -266,8 +188,8 @@ func populateIssueIndexer(ctx context.Context) { default: } repos, _, err := repo_model.SearchRepositoryByName(ctx, &repo_model.SearchRepoOptions{ - ListOptions: db.ListOptions{Page: page, PageSize: repo_model.RepositoryListDefaultPageSize}, - OrderBy: db.SearchOrderByID, + ListOptions: db_model.ListOptions{Page: page, PageSize: repo_model.RepositoryListDefaultPageSize}, + OrderBy: db_model.SearchOrderByID, Private: true, Collaborate: util.OptionalBoolFalse, }) @@ -320,7 +242,7 @@ func UpdateIssueIndexer(issue *issues_model.Issue) { comments = append(comments, comment.Content) } } - indexerData := &IndexerData{ + indexerData := &internal.IndexerData{ ID: issue.ID, RepoID: issue.RepoID, Title: issue.Title, @@ -345,7 +267,7 @@ func DeleteRepoIssueIndexer(ctx context.Context, repo *repo_model.Repository) { if len(ids) == 0 { return } - indexerData := &IndexerData{ + indexerData := &internal.IndexerData{ IDs: ids, IsDelete: true, } @@ -358,12 +280,7 @@ func DeleteRepoIssueIndexer(ctx context.Context, repo *repo_model.Repository) { // WARNNING: You have to ensure user have permission to visit repoIDs' issues func SearchIssuesByKeyword(ctx context.Context, repoIDs []int64, keyword string) ([]int64, error) { var issueIDs []int64 - indexer := holder.get() - - if indexer == nil { - log.Error("SearchIssuesByKeyword(): unable to get indexer!") - return nil, fmt.Errorf("unable to get issue indexer") - } + indexer := *globalIndexer.Load() res, err := indexer.Search(ctx, keyword, repoIDs, 50, 0) if err != nil { return nil, err @@ -375,12 +292,6 @@ func SearchIssuesByKeyword(ctx context.Context, repoIDs []int64, keyword string) } // IsAvailable checks if issue indexer is available -func IsAvailable() bool { - indexer := holder.get() - if indexer == nil { - log.Error("IsAvailable(): unable to get indexer!") - return false - } - - return indexer.Ping() +func IsAvailable(ctx context.Context) bool { + return (*globalIndexer.Load()).Ping(ctx) == nil } |