aboutsummaryrefslogtreecommitdiffstats
path: root/modules/indexer/issues/indexer.go
diff options
context:
space:
mode:
authorJason Song <i@wolfogre.com>2023-06-23 20:37:56 +0800
committerGitHub <noreply@github.com>2023-06-23 12:37:56 +0000
commit375fd15fbfc29787323840688bacdfa0a9e9e414 (patch)
tree69c82fdad0fad24e25d2b6ddd2e5cb7766b9768b /modules/indexer/issues/indexer.go
parentb0215c40cdf9a3e46a45a3823b894998d1044cda (diff)
downloadgitea-375fd15fbfc29787323840688bacdfa0a9e9e414.tar.gz
gitea-375fd15fbfc29787323840688bacdfa0a9e9e414.zip
Refactor indexer (#25174)
Refactor `modules/indexer` to make it more maintainable. And it can be easier to support more features. I'm trying to solve some of issue searching, this is a precursor to making functional changes. Current supported engines and the index versions: | engines | issues | code | | - | - | - | | db | Just a wrapper for database queries, doesn't need version | - | | bleve | The version of index is **2** | The version of index is **6** | | elasticsearch | The old index has no version, will be treated as version **0** in this PR | The version of index is **1** | | meilisearch | The old index has no version, will be treated as version **0** in this PR | - | ## Changes ### Split Splited it into mutiple packages ```text indexer ├── internal │   ├── bleve │   ├── db │   ├── elasticsearch │   └── meilisearch ├── code │   ├── bleve │   ├── elasticsearch │   └── internal └── issues ├── bleve ├── db ├── elasticsearch ├── internal └── meilisearch ``` - `indexer/interanal`: Internal shared package for indexer. - `indexer/interanal/[engine]`: Internal shared package for each engine (bleve/db/elasticsearch/meilisearch). - `indexer/code`: Implementations for code indexer. - `indexer/code/internal`: Internal shared package for code indexer. - `indexer/code/[engine]`: Implementation via each engine for code indexer. - `indexer/issues`: Implementations for issues indexer. ### Deduplication - Combine `Init/Ping/Close` for code indexer and issues indexer. - ~Combine `issues.indexerHolder` and `code.wrappedIndexer` to `internal.IndexHolder`.~ Remove it, use dummy indexer instead when the indexer is not ready. - Duplicate two copies of creating ES clients. - Duplicate two copies of `indexerID()`. ### Enhancement - [x] Support index version for elasticsearch issues indexer, the old index without version will be treated as version 0. - [x] Fix spell of `elastic_search/ElasticSearch`, it should be `Elasticsearch`. - [x] Improve versioning of ES index. We don't need `Aliases`: - Gitea does't need aliases for "Zero Downtime" because it never delete old indexes. - The old code of issues indexer uses the orignal name to create issue index, so it's tricky to convert it to an alias. - [x] Support index version for meilisearch issues indexer, the old index without version will be treated as version 0. - [x] Do "ping" only when `Ping` has been called, don't ping periodically and cache the status. - [x] Support the context parameter whenever possible. - [x] Fix outdated example config. - [x] Give up the requeue logic of issues indexer: When indexing fails, call Ping to check if it was caused by the engine being unavailable, and only requeue the task if the engine is unavailable. - It is fragile and tricky, could cause data losing (It did happen when I was doing some tests for this PR). And it works for ES only. - Just always requeue the failed task, if it caused by bad data, it's a bug of Gitea which should be fixed. --------- Co-authored-by: Giteabot <teabot@gitea.io>
Diffstat (limited to 'modules/indexer/issues/indexer.go')
-rw-r--r--modules/indexer/issues/indexer.go199
1 files changed, 55 insertions, 144 deletions
diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go
index f36ea10935..9e2f13371e 100644
--- a/modules/indexer/issues/indexer.go
+++ b/modules/indexer/issues/indexer.go
@@ -5,16 +5,20 @@ package issues
import (
"context"
- "fmt"
"os"
"runtime/pprof"
- "sync"
+ "sync/atomic"
"time"
- "code.gitea.io/gitea/models/db"
+ db_model "code.gitea.io/gitea/models/db"
issues_model "code.gitea.io/gitea/models/issues"
repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/modules/graceful"
+ "code.gitea.io/gitea/modules/indexer/issues/bleve"
+ "code.gitea.io/gitea/modules/indexer/issues/db"
+ "code.gitea.io/gitea/modules/indexer/issues/elasticsearch"
+ "code.gitea.io/gitea/modules/indexer/issues/internal"
+ "code.gitea.io/gitea/modules/indexer/issues/meilisearch"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/queue"
@@ -22,81 +26,22 @@ import (
"code.gitea.io/gitea/modules/util"
)
-// IndexerData data stored in the issue indexer
-type IndexerData struct {
- ID int64 `json:"id"`
- RepoID int64 `json:"repo_id"`
- Title string `json:"title"`
- Content string `json:"content"`
- Comments []string `json:"comments"`
- IsDelete bool `json:"is_delete"`
- IDs []int64 `json:"ids"`
-}
-
-// Match represents on search result
-type Match struct {
- ID int64 `json:"id"`
- Score float64 `json:"score"`
-}
-
-// SearchResult represents search results
-type SearchResult struct {
- Total int64
- Hits []Match
-}
-
-// Indexer defines an interface to indexer issues contents
-type Indexer interface {
- Init() (bool, error)
- Ping() bool
- Index(issue []*IndexerData) error
- Delete(ids ...int64) error
- Search(ctx context.Context, kw string, repoIDs []int64, limit, start int) (*SearchResult, error)
- Close()
-}
-
-type indexerHolder struct {
- indexer Indexer
- mutex sync.RWMutex
- cond *sync.Cond
- cancelled bool
-}
-
-func newIndexerHolder() *indexerHolder {
- h := &indexerHolder{}
- h.cond = sync.NewCond(h.mutex.RLocker())
- return h
-}
-
-func (h *indexerHolder) cancel() {
- h.mutex.Lock()
- defer h.mutex.Unlock()
- h.cancelled = true
- h.cond.Broadcast()
-}
-
-func (h *indexerHolder) set(indexer Indexer) {
- h.mutex.Lock()
- defer h.mutex.Unlock()
- h.indexer = indexer
- h.cond.Broadcast()
-}
-
-func (h *indexerHolder) get() Indexer {
- h.mutex.RLock()
- defer h.mutex.RUnlock()
- if h.indexer == nil && !h.cancelled {
- h.cond.Wait()
- }
- return h.indexer
-}
-
var (
// issueIndexerQueue queue of issue ids to be updated
- issueIndexerQueue *queue.WorkerPoolQueue[*IndexerData]
- holder = newIndexerHolder()
+ issueIndexerQueue *queue.WorkerPoolQueue[*internal.IndexerData]
+ // globalIndexer is the global indexer, it cannot be nil.
+ // When the real indexer is not ready, it will be a dummy indexer which will return error to explain it's not ready.
+ // So it's always safe use it as *globalIndexer.Load() and call its methods.
+ globalIndexer atomic.Pointer[internal.Indexer]
+ dummyIndexer *internal.Indexer
)
+func init() {
+ i := internal.NewDummyIndexer()
+ dummyIndexer = &i
+ globalIndexer.Store(dummyIndexer)
+}
+
// InitIssueIndexer initialize issue indexer, syncReindex is true then reindex until
// all issue index done.
func InitIssueIndexer(syncReindex bool) {
@@ -107,33 +52,23 @@ func InitIssueIndexer(syncReindex bool) {
// Create the Queue
switch setting.Indexer.IssueType {
case "bleve", "elasticsearch", "meilisearch":
- handler := func(items ...*IndexerData) (unhandled []*IndexerData) {
- indexer := holder.get()
- if indexer == nil {
- log.Warn("Issue indexer handler: indexer is not ready, retry later.")
- return items
- }
- toIndex := make([]*IndexerData, 0, len(items))
+ handler := func(items ...*internal.IndexerData) (unhandled []*internal.IndexerData) {
+ indexer := *globalIndexer.Load()
+ toIndex := make([]*internal.IndexerData, 0, len(items))
for _, indexerData := range items {
log.Trace("IndexerData Process: %d %v %t", indexerData.ID, indexerData.IDs, indexerData.IsDelete)
if indexerData.IsDelete {
- if err := indexer.Delete(indexerData.IDs...); err != nil {
+ if err := indexer.Delete(ctx, indexerData.IDs...); err != nil {
log.Error("Issue indexer handler: failed to from index: %v Error: %v", indexerData.IDs, err)
- if !indexer.Ping() {
- log.Error("Issue indexer handler: indexer is unavailable when deleting")
- unhandled = append(unhandled, indexerData)
- }
+ unhandled = append(unhandled, indexerData)
}
continue
}
toIndex = append(toIndex, indexerData)
}
- if err := indexer.Index(toIndex); err != nil {
+ if err := indexer.Index(ctx, toIndex); err != nil {
log.Error("Error whilst indexing: %v Error: %v", toIndex, err)
- if !indexer.Ping() {
- log.Error("Issue indexer handler: indexer is unavailable when indexing")
- unhandled = append(unhandled, toIndex...)
- }
+ unhandled = append(unhandled, toIndex...)
}
return unhandled
}
@@ -144,7 +79,7 @@ func InitIssueIndexer(syncReindex bool) {
log.Fatal("Unable to create issue indexer queue")
}
default:
- issueIndexerQueue = queue.CreateSimpleQueue[*IndexerData](ctx, "issue_indexer", nil)
+ issueIndexerQueue = queue.CreateSimpleQueue[*internal.IndexerData](ctx, "issue_indexer", nil)
}
graceful.GetManager().RunAtTerminate(finished)
@@ -154,7 +89,11 @@ func InitIssueIndexer(syncReindex bool) {
pprof.SetGoroutineLabels(ctx)
start := time.Now()
log.Info("PID %d: Initializing Issue Indexer: %s", os.Getpid(), setting.Indexer.IssueType)
- var populate bool
+ var (
+ issueIndexer internal.Indexer
+ existed bool
+ err error
+ )
switch setting.Indexer.IssueType {
case "bleve":
defer func() {
@@ -162,62 +101,45 @@ func InitIssueIndexer(syncReindex bool) {
log.Error("PANIC whilst initializing issue indexer: %v\nStacktrace: %s", err, log.Stack(2))
log.Error("The indexer files are likely corrupted and may need to be deleted")
log.Error("You can completely remove the %q directory to make Gitea recreate the indexes", setting.Indexer.IssuePath)
- holder.cancel()
+ globalIndexer.Store(dummyIndexer)
log.Fatal("PID: %d Unable to initialize the Bleve Issue Indexer at path: %s Error: %v", os.Getpid(), setting.Indexer.IssuePath, err)
}
}()
- issueIndexer := NewBleveIndexer(setting.Indexer.IssuePath)
- exist, err := issueIndexer.Init()
+ issueIndexer = bleve.NewIndexer(setting.Indexer.IssuePath)
+ existed, err = issueIndexer.Init(ctx)
if err != nil {
- holder.cancel()
log.Fatal("Unable to initialize Bleve Issue Indexer at path: %s Error: %v", setting.Indexer.IssuePath, err)
}
- populate = !exist
- holder.set(issueIndexer)
- graceful.GetManager().RunAtTerminate(func() {
- log.Debug("Closing issue indexer")
- issueIndexer := holder.get()
- if issueIndexer != nil {
- issueIndexer.Close()
- }
- log.Info("PID: %d Issue Indexer closed", os.Getpid())
- })
- log.Debug("Created Bleve Indexer")
case "elasticsearch":
- issueIndexer, err := NewElasticSearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueIndexerName)
- if err != nil {
- log.Fatal("Unable to initialize Elastic Search Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err)
- }
- exist, err := issueIndexer.Init()
+ issueIndexer = elasticsearch.NewIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueIndexerName)
+ existed, err = issueIndexer.Init(ctx)
if err != nil {
log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err)
}
- populate = !exist
- holder.set(issueIndexer)
case "db":
- issueIndexer := &DBIndexer{}
- holder.set(issueIndexer)
+ issueIndexer = db.NewIndexer()
case "meilisearch":
- issueIndexer, err := NewMeilisearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueConnAuth, setting.Indexer.IssueIndexerName)
- if err != nil {
- log.Fatal("Unable to initialize Meilisearch Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err)
- }
- exist, err := issueIndexer.Init()
+ issueIndexer = meilisearch.NewIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueConnAuth, setting.Indexer.IssueIndexerName)
+ existed, err = issueIndexer.Init(ctx)
if err != nil {
log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err)
}
- populate = !exist
- holder.set(issueIndexer)
default:
- holder.cancel()
log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType)
}
+ globalIndexer.Store(&issueIndexer)
+
+ graceful.GetManager().RunAtTerminate(func() {
+ log.Debug("Closing issue indexer")
+ (*globalIndexer.Load()).Close()
+ log.Info("PID: %d Issue Indexer closed", os.Getpid())
+ })
// Start processing the queue
go graceful.GetManager().RunWithCancel(issueIndexerQueue)
// Populate the index
- if populate {
+ if !existed {
if syncReindex {
graceful.GetManager().RunWithShutdownContext(populateIssueIndexer)
} else {
@@ -266,8 +188,8 @@ func populateIssueIndexer(ctx context.Context) {
default:
}
repos, _, err := repo_model.SearchRepositoryByName(ctx, &repo_model.SearchRepoOptions{
- ListOptions: db.ListOptions{Page: page, PageSize: repo_model.RepositoryListDefaultPageSize},
- OrderBy: db.SearchOrderByID,
+ ListOptions: db_model.ListOptions{Page: page, PageSize: repo_model.RepositoryListDefaultPageSize},
+ OrderBy: db_model.SearchOrderByID,
Private: true,
Collaborate: util.OptionalBoolFalse,
})
@@ -320,7 +242,7 @@ func UpdateIssueIndexer(issue *issues_model.Issue) {
comments = append(comments, comment.Content)
}
}
- indexerData := &IndexerData{
+ indexerData := &internal.IndexerData{
ID: issue.ID,
RepoID: issue.RepoID,
Title: issue.Title,
@@ -345,7 +267,7 @@ func DeleteRepoIssueIndexer(ctx context.Context, repo *repo_model.Repository) {
if len(ids) == 0 {
return
}
- indexerData := &IndexerData{
+ indexerData := &internal.IndexerData{
IDs: ids,
IsDelete: true,
}
@@ -358,12 +280,7 @@ func DeleteRepoIssueIndexer(ctx context.Context, repo *repo_model.Repository) {
// WARNNING: You have to ensure user have permission to visit repoIDs' issues
func SearchIssuesByKeyword(ctx context.Context, repoIDs []int64, keyword string) ([]int64, error) {
var issueIDs []int64
- indexer := holder.get()
-
- if indexer == nil {
- log.Error("SearchIssuesByKeyword(): unable to get indexer!")
- return nil, fmt.Errorf("unable to get issue indexer")
- }
+ indexer := *globalIndexer.Load()
res, err := indexer.Search(ctx, keyword, repoIDs, 50, 0)
if err != nil {
return nil, err
@@ -375,12 +292,6 @@ func SearchIssuesByKeyword(ctx context.Context, repoIDs []int64, keyword string)
}
// IsAvailable checks if issue indexer is available
-func IsAvailable() bool {
- indexer := holder.get()
- if indexer == nil {
- log.Error("IsAvailable(): unable to get indexer!")
- return false
- }
-
- return indexer.Ping()
+func IsAvailable(ctx context.Context) bool {
+ return (*globalIndexer.Load()).Ping(ctx) == nil
}