diff options
Diffstat (limited to 'modules/indexer/issues/indexer.go')
-rw-r--r-- | modules/indexer/issues/indexer.go | 199 |
1 files changed, 55 insertions, 144 deletions
diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index f36ea10935..9e2f13371e 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -5,16 +5,20 @@ package issues import ( "context" - "fmt" "os" "runtime/pprof" - "sync" + "sync/atomic" "time" - "code.gitea.io/gitea/models/db" + db_model "code.gitea.io/gitea/models/db" issues_model "code.gitea.io/gitea/models/issues" repo_model "code.gitea.io/gitea/models/repo" "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/indexer/issues/bleve" + "code.gitea.io/gitea/modules/indexer/issues/db" + "code.gitea.io/gitea/modules/indexer/issues/elasticsearch" + "code.gitea.io/gitea/modules/indexer/issues/internal" + "code.gitea.io/gitea/modules/indexer/issues/meilisearch" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/process" "code.gitea.io/gitea/modules/queue" @@ -22,81 +26,22 @@ import ( "code.gitea.io/gitea/modules/util" ) -// IndexerData data stored in the issue indexer -type IndexerData struct { - ID int64 `json:"id"` - RepoID int64 `json:"repo_id"` - Title string `json:"title"` - Content string `json:"content"` - Comments []string `json:"comments"` - IsDelete bool `json:"is_delete"` - IDs []int64 `json:"ids"` -} - -// Match represents on search result -type Match struct { - ID int64 `json:"id"` - Score float64 `json:"score"` -} - -// SearchResult represents search results -type SearchResult struct { - Total int64 - Hits []Match -} - -// Indexer defines an interface to indexer issues contents -type Indexer interface { - Init() (bool, error) - Ping() bool - Index(issue []*IndexerData) error - Delete(ids ...int64) error - Search(ctx context.Context, kw string, repoIDs []int64, limit, start int) (*SearchResult, error) - Close() -} - -type indexerHolder struct { - indexer Indexer - mutex sync.RWMutex - cond *sync.Cond - cancelled bool -} - -func newIndexerHolder() *indexerHolder { - h := &indexerHolder{} - h.cond = sync.NewCond(h.mutex.RLocker()) - return h -} - -func (h *indexerHolder) cancel() { - h.mutex.Lock() - defer h.mutex.Unlock() - h.cancelled = true - h.cond.Broadcast() -} - -func (h *indexerHolder) set(indexer Indexer) { - h.mutex.Lock() - defer h.mutex.Unlock() - h.indexer = indexer - h.cond.Broadcast() -} - -func (h *indexerHolder) get() Indexer { - h.mutex.RLock() - defer h.mutex.RUnlock() - if h.indexer == nil && !h.cancelled { - h.cond.Wait() - } - return h.indexer -} - var ( // issueIndexerQueue queue of issue ids to be updated - issueIndexerQueue *queue.WorkerPoolQueue[*IndexerData] - holder = newIndexerHolder() + issueIndexerQueue *queue.WorkerPoolQueue[*internal.IndexerData] + // globalIndexer is the global indexer, it cannot be nil. + // When the real indexer is not ready, it will be a dummy indexer which will return error to explain it's not ready. + // So it's always safe use it as *globalIndexer.Load() and call its methods. + globalIndexer atomic.Pointer[internal.Indexer] + dummyIndexer *internal.Indexer ) +func init() { + i := internal.NewDummyIndexer() + dummyIndexer = &i + globalIndexer.Store(dummyIndexer) +} + // InitIssueIndexer initialize issue indexer, syncReindex is true then reindex until // all issue index done. func InitIssueIndexer(syncReindex bool) { @@ -107,33 +52,23 @@ func InitIssueIndexer(syncReindex bool) { // Create the Queue switch setting.Indexer.IssueType { case "bleve", "elasticsearch", "meilisearch": - handler := func(items ...*IndexerData) (unhandled []*IndexerData) { - indexer := holder.get() - if indexer == nil { - log.Warn("Issue indexer handler: indexer is not ready, retry later.") - return items - } - toIndex := make([]*IndexerData, 0, len(items)) + handler := func(items ...*internal.IndexerData) (unhandled []*internal.IndexerData) { + indexer := *globalIndexer.Load() + toIndex := make([]*internal.IndexerData, 0, len(items)) for _, indexerData := range items { log.Trace("IndexerData Process: %d %v %t", indexerData.ID, indexerData.IDs, indexerData.IsDelete) if indexerData.IsDelete { - if err := indexer.Delete(indexerData.IDs...); err != nil { + if err := indexer.Delete(ctx, indexerData.IDs...); err != nil { log.Error("Issue indexer handler: failed to from index: %v Error: %v", indexerData.IDs, err) - if !indexer.Ping() { - log.Error("Issue indexer handler: indexer is unavailable when deleting") - unhandled = append(unhandled, indexerData) - } + unhandled = append(unhandled, indexerData) } continue } toIndex = append(toIndex, indexerData) } - if err := indexer.Index(toIndex); err != nil { + if err := indexer.Index(ctx, toIndex); err != nil { log.Error("Error whilst indexing: %v Error: %v", toIndex, err) - if !indexer.Ping() { - log.Error("Issue indexer handler: indexer is unavailable when indexing") - unhandled = append(unhandled, toIndex...) - } + unhandled = append(unhandled, toIndex...) } return unhandled } @@ -144,7 +79,7 @@ func InitIssueIndexer(syncReindex bool) { log.Fatal("Unable to create issue indexer queue") } default: - issueIndexerQueue = queue.CreateSimpleQueue[*IndexerData](ctx, "issue_indexer", nil) + issueIndexerQueue = queue.CreateSimpleQueue[*internal.IndexerData](ctx, "issue_indexer", nil) } graceful.GetManager().RunAtTerminate(finished) @@ -154,7 +89,11 @@ func InitIssueIndexer(syncReindex bool) { pprof.SetGoroutineLabels(ctx) start := time.Now() log.Info("PID %d: Initializing Issue Indexer: %s", os.Getpid(), setting.Indexer.IssueType) - var populate bool + var ( + issueIndexer internal.Indexer + existed bool + err error + ) switch setting.Indexer.IssueType { case "bleve": defer func() { @@ -162,62 +101,45 @@ func InitIssueIndexer(syncReindex bool) { log.Error("PANIC whilst initializing issue indexer: %v\nStacktrace: %s", err, log.Stack(2)) log.Error("The indexer files are likely corrupted and may need to be deleted") log.Error("You can completely remove the %q directory to make Gitea recreate the indexes", setting.Indexer.IssuePath) - holder.cancel() + globalIndexer.Store(dummyIndexer) log.Fatal("PID: %d Unable to initialize the Bleve Issue Indexer at path: %s Error: %v", os.Getpid(), setting.Indexer.IssuePath, err) } }() - issueIndexer := NewBleveIndexer(setting.Indexer.IssuePath) - exist, err := issueIndexer.Init() + issueIndexer = bleve.NewIndexer(setting.Indexer.IssuePath) + existed, err = issueIndexer.Init(ctx) if err != nil { - holder.cancel() log.Fatal("Unable to initialize Bleve Issue Indexer at path: %s Error: %v", setting.Indexer.IssuePath, err) } - populate = !exist - holder.set(issueIndexer) - graceful.GetManager().RunAtTerminate(func() { - log.Debug("Closing issue indexer") - issueIndexer := holder.get() - if issueIndexer != nil { - issueIndexer.Close() - } - log.Info("PID: %d Issue Indexer closed", os.Getpid()) - }) - log.Debug("Created Bleve Indexer") case "elasticsearch": - issueIndexer, err := NewElasticSearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueIndexerName) - if err != nil { - log.Fatal("Unable to initialize Elastic Search Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err) - } - exist, err := issueIndexer.Init() + issueIndexer = elasticsearch.NewIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueIndexerName) + existed, err = issueIndexer.Init(ctx) if err != nil { log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err) } - populate = !exist - holder.set(issueIndexer) case "db": - issueIndexer := &DBIndexer{} - holder.set(issueIndexer) + issueIndexer = db.NewIndexer() case "meilisearch": - issueIndexer, err := NewMeilisearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueConnAuth, setting.Indexer.IssueIndexerName) - if err != nil { - log.Fatal("Unable to initialize Meilisearch Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err) - } - exist, err := issueIndexer.Init() + issueIndexer = meilisearch.NewIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueConnAuth, setting.Indexer.IssueIndexerName) + existed, err = issueIndexer.Init(ctx) if err != nil { log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err) } - populate = !exist - holder.set(issueIndexer) default: - holder.cancel() log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType) } + globalIndexer.Store(&issueIndexer) + + graceful.GetManager().RunAtTerminate(func() { + log.Debug("Closing issue indexer") + (*globalIndexer.Load()).Close() + log.Info("PID: %d Issue Indexer closed", os.Getpid()) + }) // Start processing the queue go graceful.GetManager().RunWithCancel(issueIndexerQueue) // Populate the index - if populate { + if !existed { if syncReindex { graceful.GetManager().RunWithShutdownContext(populateIssueIndexer) } else { @@ -266,8 +188,8 @@ func populateIssueIndexer(ctx context.Context) { default: } repos, _, err := repo_model.SearchRepositoryByName(ctx, &repo_model.SearchRepoOptions{ - ListOptions: db.ListOptions{Page: page, PageSize: repo_model.RepositoryListDefaultPageSize}, - OrderBy: db.SearchOrderByID, + ListOptions: db_model.ListOptions{Page: page, PageSize: repo_model.RepositoryListDefaultPageSize}, + OrderBy: db_model.SearchOrderByID, Private: true, Collaborate: util.OptionalBoolFalse, }) @@ -320,7 +242,7 @@ func UpdateIssueIndexer(issue *issues_model.Issue) { comments = append(comments, comment.Content) } } - indexerData := &IndexerData{ + indexerData := &internal.IndexerData{ ID: issue.ID, RepoID: issue.RepoID, Title: issue.Title, @@ -345,7 +267,7 @@ func DeleteRepoIssueIndexer(ctx context.Context, repo *repo_model.Repository) { if len(ids) == 0 { return } - indexerData := &IndexerData{ + indexerData := &internal.IndexerData{ IDs: ids, IsDelete: true, } @@ -358,12 +280,7 @@ func DeleteRepoIssueIndexer(ctx context.Context, repo *repo_model.Repository) { // WARNNING: You have to ensure user have permission to visit repoIDs' issues func SearchIssuesByKeyword(ctx context.Context, repoIDs []int64, keyword string) ([]int64, error) { var issueIDs []int64 - indexer := holder.get() - - if indexer == nil { - log.Error("SearchIssuesByKeyword(): unable to get indexer!") - return nil, fmt.Errorf("unable to get issue indexer") - } + indexer := *globalIndexer.Load() res, err := indexer.Search(ctx, keyword, repoIDs, 50, 0) if err != nil { return nil, err @@ -375,12 +292,6 @@ func SearchIssuesByKeyword(ctx context.Context, repoIDs []int64, keyword string) } // IsAvailable checks if issue indexer is available -func IsAvailable() bool { - indexer := holder.get() - if indexer == nil { - log.Error("IsAvailable(): unable to get indexer!") - return false - } - - return indexer.Ping() +func IsAvailable(ctx context.Context) bool { + return (*globalIndexer.Load()).Ping(ctx) == nil } |