diff options
Diffstat (limited to 'modules/indexer/code/indexer.go')
-rw-r--r-- | modules/indexer/code/indexer.go | 145 |
1 files changed, 47 insertions, 98 deletions
diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go index f38fd6000c..13d06874c9 100644 --- a/modules/indexer/code/indexer.go +++ b/modules/indexer/code/indexer.go @@ -7,86 +7,41 @@ import ( "context" "os" "runtime/pprof" - "strconv" - "strings" + "sync/atomic" "time" "code.gitea.io/gitea/models/db" repo_model "code.gitea.io/gitea/models/repo" "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/indexer/code/bleve" + "code.gitea.io/gitea/modules/indexer/code/elasticsearch" + "code.gitea.io/gitea/modules/indexer/code/internal" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/process" "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" - "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/util" ) -// SearchResult result of performing a search in a repo -type SearchResult struct { - RepoID int64 - StartIndex int - EndIndex int - Filename string - Content string - CommitID string - UpdatedUnix timeutil.TimeStamp - Language string - Color string -} - -// SearchResultLanguages result of top languages count in search results -type SearchResultLanguages struct { - Language string - Color string - Count int -} - -// Indexer defines an interface to index and search code contents -type Indexer interface { - Ping() bool - Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *repoChanges) error - Delete(repoID int64) error - Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) - Close() -} - -func filenameIndexerID(repoID int64, filename string) string { - return indexerID(repoID) + "_" + filename -} - -func indexerID(id int64) string { - return strconv.FormatInt(id, 36) -} - -func parseIndexerID(indexerID string) (int64, string) { - index := strings.IndexByte(indexerID, '_') - if index == -1 { - log.Error("Unexpected ID in repo indexer: %s", indexerID) - } - repoID, _ := strconv.ParseInt(indexerID[:index], 36, 64) - return repoID, indexerID[index+1:] -} - -func filenameOfIndexerID(indexerID string) string { - index := strings.IndexByte(indexerID, '_') - if index == -1 { - log.Error("Unexpected ID in repo indexer: %s", indexerID) - } - return indexerID[index+1:] -} +var ( + indexerQueue *queue.WorkerPoolQueue[*internal.IndexerData] + // globalIndexer is the global indexer, it cannot be nil. + // When the real indexer is not ready, it will be a dummy indexer which will return error to explain it's not ready. + // So it's always safe use it as *globalIndexer.Load() and call its methods. + globalIndexer atomic.Pointer[internal.Indexer] + dummyIndexer *internal.Indexer +) -// IndexerData represents data stored in the code indexer -type IndexerData struct { - RepoID int64 +func init() { + i := internal.NewDummyIndexer() + dummyIndexer = &i + globalIndexer.Store(dummyIndexer) } -var indexerQueue *queue.WorkerPoolQueue[*IndexerData] - -func index(ctx context.Context, indexer Indexer, repoID int64) error { +func index(ctx context.Context, indexer internal.Indexer, repoID int64) error { repo, err := repo_model.GetRepositoryByID(ctx, repoID) if repo_model.IsErrRepoNotExist(err) { - return indexer.Delete(repoID) + return indexer.Delete(ctx, repoID) } if err != nil { return err @@ -139,7 +94,7 @@ func index(ctx context.Context, indexer Indexer, repoID int64) error { // Init initialize the repo indexer func Init() { if !setting.Indexer.RepoIndexerEnabled { - indexer.Close() + (*globalIndexer.Load()).Close() return } @@ -153,7 +108,7 @@ func Init() { } cancel() log.Debug("Closing repository indexer") - indexer.Close() + (*globalIndexer.Load()).Close() log.Info("PID: %d Repository Indexer closed", os.Getpid()) finished() }) @@ -163,13 +118,8 @@ func Init() { // Create the Queue switch setting.Indexer.RepoType { case "bleve", "elasticsearch": - handler := func(items ...*IndexerData) (unhandled []*IndexerData) { - idx, err := indexer.get() - if idx == nil || err != nil { - log.Warn("Codes indexer handler: indexer is not ready, retry later.") - return items - } - + handler := func(items ...*internal.IndexerData) (unhandled []*internal.IndexerData) { + indexer := *globalIndexer.Load() for _, indexerData := range items { log.Trace("IndexerData Process Repo: %d", indexerData.RepoID) @@ -188,11 +138,7 @@ func Init() { code.gitea.io/gitea/modules/indexer/code.index(indexer.go:105) */ if err := index(ctx, indexer, indexerData.RepoID); err != nil { - if !idx.Ping() { - log.Error("Code indexer handler: indexer is unavailable.") - unhandled = append(unhandled, indexerData) - continue - } + unhandled = append(unhandled, indexerData) if !setting.IsInTesting { log.Error("Codes indexer handler: index error for repo %v: %v", indexerData.RepoID, err) } @@ -213,8 +159,8 @@ func Init() { pprof.SetGoroutineLabels(ctx) start := time.Now() var ( - rIndexer Indexer - populate bool + rIndexer internal.Indexer + existed bool err error ) switch setting.Indexer.RepoType { @@ -228,10 +174,11 @@ func Init() { } }() - rIndexer, populate, err = NewBleveIndexer(setting.Indexer.RepoPath) + rIndexer = bleve.NewIndexer(setting.Indexer.RepoPath) + existed, err = rIndexer.Init(ctx) if err != nil { cancel() - indexer.Close() + (*globalIndexer.Load()).Close() close(waitChannel) log.Fatal("PID: %d Unable to initialize the bleve Repository Indexer at path: %s Error: %v", os.Getpid(), setting.Indexer.RepoPath, err) } @@ -245,23 +192,31 @@ func Init() { } }() - rIndexer, populate, err = NewElasticSearchIndexer(setting.Indexer.RepoConnStr, setting.Indexer.RepoIndexerName) + rIndexer = elasticsearch.NewIndexer(setting.Indexer.RepoConnStr, setting.Indexer.RepoIndexerName) + if err != nil { + cancel() + (*globalIndexer.Load()).Close() + close(waitChannel) + log.Fatal("PID: %d Unable to create the elasticsearch Repository Indexer connstr: %s Error: %v", os.Getpid(), setting.Indexer.RepoConnStr, err) + } + existed, err = rIndexer.Init(ctx) if err != nil { cancel() - indexer.Close() + (*globalIndexer.Load()).Close() close(waitChannel) log.Fatal("PID: %d Unable to initialize the elasticsearch Repository Indexer connstr: %s Error: %v", os.Getpid(), setting.Indexer.RepoConnStr, err) } + default: log.Fatal("PID: %d Unknown Indexer type: %s", os.Getpid(), setting.Indexer.RepoType) } - indexer.set(rIndexer) + globalIndexer.Store(&rIndexer) // Start processing the queue go graceful.GetManager().RunWithCancel(indexerQueue) - if populate { + if !existed { // populate the index because it's created for the first time go graceful.GetManager().RunWithShutdownContext(populateRepoIndexer) } select { @@ -283,18 +238,18 @@ func Init() { case <-graceful.GetManager().IsShutdown(): log.Warn("Shutdown before Repository Indexer completed initialization") cancel() - indexer.Close() + (*globalIndexer.Load()).Close() case duration, ok := <-waitChannel: if !ok { log.Warn("Repository Indexer Initialization failed") cancel() - indexer.Close() + (*globalIndexer.Load()).Close() return } log.Info("Repository Indexer Initialization took %v", duration) case <-time.After(timeout): cancel() - indexer.Close() + (*globalIndexer.Load()).Close() log.Fatal("Repository Indexer Initialization Timed-Out after: %v", timeout) } }() @@ -303,21 +258,15 @@ func Init() { // UpdateRepoIndexer update a repository's entries in the indexer func UpdateRepoIndexer(repo *repo_model.Repository) { - indexData := &IndexerData{RepoID: repo.ID} + indexData := &internal.IndexerData{RepoID: repo.ID} if err := indexerQueue.Push(indexData); err != nil { log.Error("Update repo index data %v failed: %v", indexData, err) } } // IsAvailable checks if issue indexer is available -func IsAvailable() bool { - idx, err := indexer.get() - if err != nil { - log.Error("IsAvailable(): unable to get indexer: %v", err) - return false - } - - return idx.Ping() +func IsAvailable(ctx context.Context) bool { + return (*globalIndexer.Load()).Ping(ctx) == nil } // populateRepoIndexer populate the repo indexer with pre-existing data. This @@ -368,7 +317,7 @@ func populateRepoIndexer(ctx context.Context) { return default: } - if err := indexerQueue.Push(&IndexerData{RepoID: id}); err != nil { + if err := indexerQueue.Push(&internal.IndexerData{RepoID: id}); err != nil { log.Error("indexerQueue.Push: %v", err) return } |