diff options
Diffstat (limited to 'modules/indexer/code/indexer.go')
-rw-r--r-- | modules/indexer/code/indexer.go | 158 |
1 files changed, 153 insertions, 5 deletions
diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go index 468955cd89..5456373398 100644 --- a/modules/indexer/code/indexer.go +++ b/modules/indexer/code/indexer.go @@ -14,6 +14,7 @@ import ( "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/timeutil" ) @@ -38,7 +39,7 @@ type SearchResultLanguages struct { Count int } -// Indexer defines an interface to indexer issues contents +// Indexer defines an interface to index and search code contents type Indexer interface { Index(repo *models.Repository, sha string, changes *repoChanges) error Delete(repoID int64) error @@ -67,6 +68,40 @@ func filenameOfIndexerID(indexerID string) string { return indexerID[index+1:] } +// IndexerData represents data stored in the code indexer +type IndexerData struct { + RepoID int64 + IsDelete bool +} + +var ( + indexerQueue queue.Queue +) + +func index(indexer Indexer, repoID int64) error { + repo, err := models.GetRepositoryByID(repoID) + if err != nil { + return err + } + + sha, err := getDefaultBranchSha(repo) + if err != nil { + return err + } + changes, err := getRepoChanges(repo, sha) + if err != nil { + return err + } else if changes == nil { + return nil + } + + if err := indexer.Index(repo, sha, changes); err != nil { + return err + } + + return repo.UpdateIndexerStatus(models.RepoIndexerTypeCode, sha) +} + // Init initialize the repo indexer func Init() { if !setting.Indexer.RepoIndexerEnabled { @@ -74,8 +109,6 @@ func Init() { return } - initQueue(setting.Indexer.UpdateQueueLength) - ctx, cancel := context.WithCancel(context.Background()) graceful.GetManager().RunAtTerminate(ctx, func() { @@ -85,6 +118,46 @@ func Init() { }) waitChannel := make(chan time.Duration) + + // Create the Queue + switch setting.Indexer.RepoType { + case "bleve", "elasticsearch": + handler := func(data ...queue.Data) { + idx, err := indexer.get() + if idx == nil || err != nil { + log.Error("Codes indexer handler: unable to get indexer!") + return + } + + for _, datum := range data { + indexerData, ok := datum.(*IndexerData) + if !ok { + log.Error("Unable to process provided datum: %v - not possible to cast to IndexerData", datum) + continue + } + log.Trace("IndexerData Process: %v %t", indexerData.RepoID, indexerData.IsDelete) + + if indexerData.IsDelete { + if err := indexer.Delete(indexerData.RepoID); err != nil { + log.Error("indexer.Delete: %v", err) + } + } else { + if err := index(indexer, indexerData.RepoID); err != nil { + log.Error("index: %v", err) + continue + } + } + } + } + + indexerQueue = queue.CreateQueue("code_indexer", handler, &IndexerData{}) + if indexerQueue == nil { + log.Fatal("Unable to create codes indexer queue") + } + default: + log.Fatal("Unknown codes indexer type; %s", setting.Indexer.RepoType) + } + go func() { start := time.Now() var ( @@ -139,10 +212,11 @@ func Init() { indexer.set(rIndexer) - go processRepoIndexerOperationQueue(indexer) + // Start processing the queue + go graceful.GetManager().RunWithShutdownFns(indexerQueue.Run) if populate { - go populateRepoIndexer() + go graceful.GetManager().RunWithShutdownContext(populateRepoIndexer) } select { case waitChannel <- time.Since(start): @@ -179,3 +253,77 @@ func Init() { }() } } + +// DeleteRepoFromIndexer remove all of a repository's entries from the indexer +func DeleteRepoFromIndexer(repo *models.Repository) { + indexData := &IndexerData{RepoID: repo.ID, IsDelete: true} + if err := indexerQueue.Push(indexData); err != nil { + log.Error("Delete repo index data %v failed: %v", indexData, err) + } +} + +// UpdateRepoIndexer update a repository's entries in the indexer +func UpdateRepoIndexer(repo *models.Repository) { + indexData := &IndexerData{RepoID: repo.ID} + if err := indexerQueue.Push(indexData); err != nil { + log.Error("Update repo index data %v failed: %v", indexData, err) + } +} + +// populateRepoIndexer populate the repo indexer with pre-existing data. This +// should only be run when the indexer is created for the first time. +func populateRepoIndexer(ctx context.Context) { + log.Info("Populating the repo indexer with existing repositories") + + exist, err := models.IsTableNotEmpty("repository") + if err != nil { + log.Fatal("System error: %v", err) + } else if !exist { + return + } + + // if there is any existing repo indexer metadata in the DB, delete it + // since we are starting afresh. Also, xorm requires deletes to have a + // condition, and we want to delete everything, thus 1=1. + if err := models.DeleteAllRecords("repo_indexer_status"); err != nil { + log.Fatal("System error: %v", err) + } + + var maxRepoID int64 + if maxRepoID, err = models.GetMaxID("repository"); err != nil { + log.Fatal("System error: %v", err) + } + + // start with the maximum existing repo ID and work backwards, so that we + // don't include repos that are created after gitea starts; such repos will + // already be added to the indexer, and we don't need to add them again. + for maxRepoID > 0 { + select { + case <-ctx.Done(): + log.Info("Repository Indexer population shutdown before completion") + return + default: + } + ids, err := models.GetUnindexedRepos(models.RepoIndexerTypeCode, maxRepoID, 0, 50) + if err != nil { + log.Error("populateRepoIndexer: %v", err) + return + } else if len(ids) == 0 { + break + } + for _, id := range ids { + select { + case <-ctx.Done(): + log.Info("Repository Indexer population shutdown before completion") + return + default: + } + if err := indexerQueue.Push(&IndexerData{RepoID: id}); err != nil { + log.Error("indexerQueue.Push: %v", err) + return + } + maxRepoID = id - 1 + } + } + log.Info("Done (re)populating the repo indexer with existing repositories") +} |