diff options
author | Lunny Xiao <xiaolunwen@gmail.com> | 2020-09-07 23:05:08 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-07 23:05:08 +0800 |
commit | 91e7ad569ac9590b521e5c4fdfb2162f528db49f (patch) | |
tree | 675316438ca6d716b87849dcf23f5ecd772176d5 /modules/indexer/code | |
parent | a722dd72db7fb6efd9ec0bf730d6b2364ef6337c (diff) | |
download | gitea-91e7ad569ac9590b521e5c4fdfb2162f528db49f.tar.gz gitea-91e7ad569ac9590b521e5c4fdfb2162f528db49f.zip |
Add queue for code indexer (#10332)
* Add queue for code indexer
* Fix lint
* Fix test
* Fix lint
* Fix bug
* Fix bug
* Fix lint
* Add noqueue
* Fix tests
* Rename noqueue to immediate
Diffstat (limited to 'modules/indexer/code')
-rw-r--r-- | modules/indexer/code/elastic_search.go | 5 | ||||
-rw-r--r-- | modules/indexer/code/indexer.go | 158 | ||||
-rw-r--r-- | modules/indexer/code/queue.go | 154 |
3 files changed, 158 insertions, 159 deletions
diff --git a/modules/indexer/code/elastic_search.go b/modules/indexer/code/elastic_search.go index 4f690ed806..db36c5e0c4 100644 --- a/modules/indexer/code/elastic_search.go +++ b/modules/indexer/code/elastic_search.go @@ -168,6 +168,11 @@ func (b *ElasticSearchIndexer) init() (bool, error) { } func (b *ElasticSearchIndexer) addUpdate(sha string, update fileUpdate, repo *models.Repository) ([]elastic.BulkableRequest, error) { + // Ignore vendored files in code search + if setting.Indexer.ExcludeVendored && enry.IsVendor(update.Filename) { + return nil, nil + } + stdout, err := git.NewCommand("cat-file", "-s", update.BlobSha). RunInDir(repo.RepoPath()) if err != nil { diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go index 468955cd89..5456373398 100644 --- a/modules/indexer/code/indexer.go +++ b/modules/indexer/code/indexer.go @@ -14,6 +14,7 @@ import ( "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/timeutil" ) @@ -38,7 +39,7 @@ type SearchResultLanguages struct { Count int } -// Indexer defines an interface to indexer issues contents +// Indexer defines an interface to index and search code contents type Indexer interface { Index(repo *models.Repository, sha string, changes *repoChanges) error Delete(repoID int64) error @@ -67,6 +68,40 @@ func filenameOfIndexerID(indexerID string) string { return indexerID[index+1:] } +// IndexerData represents data stored in the code indexer +type IndexerData struct { + RepoID int64 + IsDelete bool +} + +var ( + indexerQueue queue.Queue +) + +func index(indexer Indexer, repoID int64) error { + repo, err := models.GetRepositoryByID(repoID) + if err != nil { + return err + } + + sha, err := getDefaultBranchSha(repo) + if err != nil { + return err + } + changes, err := getRepoChanges(repo, sha) + if err != nil { + return err + } else if changes == nil { + return nil + } + + if err := indexer.Index(repo, sha, changes); err != nil { + return err + } + + return repo.UpdateIndexerStatus(models.RepoIndexerTypeCode, sha) +} + // Init initialize the repo indexer func Init() { if !setting.Indexer.RepoIndexerEnabled { @@ -74,8 +109,6 @@ func Init() { return } - initQueue(setting.Indexer.UpdateQueueLength) - ctx, cancel := context.WithCancel(context.Background()) graceful.GetManager().RunAtTerminate(ctx, func() { @@ -85,6 +118,46 @@ func Init() { }) waitChannel := make(chan time.Duration) + + // Create the Queue + switch setting.Indexer.RepoType { + case "bleve", "elasticsearch": + handler := func(data ...queue.Data) { + idx, err := indexer.get() + if idx == nil || err != nil { + log.Error("Codes indexer handler: unable to get indexer!") + return + } + + for _, datum := range data { + indexerData, ok := datum.(*IndexerData) + if !ok { + log.Error("Unable to process provided datum: %v - not possible to cast to IndexerData", datum) + continue + } + log.Trace("IndexerData Process: %v %t", indexerData.RepoID, indexerData.IsDelete) + + if indexerData.IsDelete { + if err := indexer.Delete(indexerData.RepoID); err != nil { + log.Error("indexer.Delete: %v", err) + } + } else { + if err := index(indexer, indexerData.RepoID); err != nil { + log.Error("index: %v", err) + continue + } + } + } + } + + indexerQueue = queue.CreateQueue("code_indexer", handler, &IndexerData{}) + if indexerQueue == nil { + log.Fatal("Unable to create codes indexer queue") + } + default: + log.Fatal("Unknown codes indexer type; %s", setting.Indexer.RepoType) + } + go func() { start := time.Now() var ( @@ -139,10 +212,11 @@ func Init() { indexer.set(rIndexer) - go processRepoIndexerOperationQueue(indexer) + // Start processing the queue + go graceful.GetManager().RunWithShutdownFns(indexerQueue.Run) if populate { - go populateRepoIndexer() + go graceful.GetManager().RunWithShutdownContext(populateRepoIndexer) } select { case waitChannel <- time.Since(start): @@ -179,3 +253,77 @@ func Init() { }() } } + +// DeleteRepoFromIndexer remove all of a repository's entries from the indexer +func DeleteRepoFromIndexer(repo *models.Repository) { + indexData := &IndexerData{RepoID: repo.ID, IsDelete: true} + if err := indexerQueue.Push(indexData); err != nil { + log.Error("Delete repo index data %v failed: %v", indexData, err) + } +} + +// UpdateRepoIndexer update a repository's entries in the indexer +func UpdateRepoIndexer(repo *models.Repository) { + indexData := &IndexerData{RepoID: repo.ID} + if err := indexerQueue.Push(indexData); err != nil { + log.Error("Update repo index data %v failed: %v", indexData, err) + } +} + +// populateRepoIndexer populate the repo indexer with pre-existing data. This +// should only be run when the indexer is created for the first time. +func populateRepoIndexer(ctx context.Context) { + log.Info("Populating the repo indexer with existing repositories") + + exist, err := models.IsTableNotEmpty("repository") + if err != nil { + log.Fatal("System error: %v", err) + } else if !exist { + return + } + + // if there is any existing repo indexer metadata in the DB, delete it + // since we are starting afresh. Also, xorm requires deletes to have a + // condition, and we want to delete everything, thus 1=1. + if err := models.DeleteAllRecords("repo_indexer_status"); err != nil { + log.Fatal("System error: %v", err) + } + + var maxRepoID int64 + if maxRepoID, err = models.GetMaxID("repository"); err != nil { + log.Fatal("System error: %v", err) + } + + // start with the maximum existing repo ID and work backwards, so that we + // don't include repos that are created after gitea starts; such repos will + // already be added to the indexer, and we don't need to add them again. + for maxRepoID > 0 { + select { + case <-ctx.Done(): + log.Info("Repository Indexer population shutdown before completion") + return + default: + } + ids, err := models.GetUnindexedRepos(models.RepoIndexerTypeCode, maxRepoID, 0, 50) + if err != nil { + log.Error("populateRepoIndexer: %v", err) + return + } else if len(ids) == 0 { + break + } + for _, id := range ids { + select { + case <-ctx.Done(): + log.Info("Repository Indexer population shutdown before completion") + return + default: + } + if err := indexerQueue.Push(&IndexerData{RepoID: id}); err != nil { + log.Error("indexerQueue.Push: %v", err) + return + } + maxRepoID = id - 1 + } + } + log.Info("Done (re)populating the repo indexer with existing repositories") +} diff --git a/modules/indexer/code/queue.go b/modules/indexer/code/queue.go deleted file mode 100644 index 844003e1fc..0000000000 --- a/modules/indexer/code/queue.go +++ /dev/null @@ -1,154 +0,0 @@ -// Copyright 2019 The Gitea Authors. All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package code - -import ( - "os" - - "code.gitea.io/gitea/models" - "code.gitea.io/gitea/modules/graceful" - "code.gitea.io/gitea/modules/log" -) - -type repoIndexerOperation struct { - repoID int64 - deleted bool - watchers []chan<- error -} - -var repoIndexerOperationQueue chan repoIndexerOperation - -func initQueue(queueLength int) { - repoIndexerOperationQueue = make(chan repoIndexerOperation, queueLength) -} - -func index(indexer Indexer, repoID int64) error { - repo, err := models.GetRepositoryByID(repoID) - if err != nil { - return err - } - - sha, err := getDefaultBranchSha(repo) - if err != nil { - return err - } - changes, err := getRepoChanges(repo, sha) - if err != nil { - return err - } else if changes == nil { - return nil - } - - if err := indexer.Index(repo, sha, changes); err != nil { - return err - } - - return repo.UpdateIndexerStatus(models.RepoIndexerTypeCode, sha) -} - -func processRepoIndexerOperationQueue(indexer Indexer) { - for { - select { - case op := <-repoIndexerOperationQueue: - var err error - if op.deleted { - if err = indexer.Delete(op.repoID); err != nil { - log.Error("indexer.Delete: %v", err) - } - } else { - if err = index(indexer, op.repoID); err != nil { - log.Error("indexer.Index: %v", err) - } - } - for _, watcher := range op.watchers { - watcher <- err - } - case <-graceful.GetManager().IsShutdown(): - log.Info("PID: %d Repository indexer queue processing stopped", os.Getpid()) - return - } - } -} - -// DeleteRepoFromIndexer remove all of a repository's entries from the indexer -func DeleteRepoFromIndexer(repo *models.Repository, watchers ...chan<- error) { - addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: true, watchers: watchers}) -} - -// UpdateRepoIndexer update a repository's entries in the indexer -func UpdateRepoIndexer(repo *models.Repository, watchers ...chan<- error) { - addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: false, watchers: watchers}) -} - -func addOperationToQueue(op repoIndexerOperation) { - select { - case repoIndexerOperationQueue <- op: - break - default: - go func() { - repoIndexerOperationQueue <- op - }() - } -} - -// populateRepoIndexer populate the repo indexer with pre-existing data. This -// should only be run when the indexer is created for the first time. -func populateRepoIndexer() { - log.Info("Populating the repo indexer with existing repositories") - - isShutdown := graceful.GetManager().IsShutdown() - - exist, err := models.IsTableNotEmpty("repository") - if err != nil { - log.Fatal("System error: %v", err) - } else if !exist { - return - } - - // if there is any existing repo indexer metadata in the DB, delete it - // since we are starting afresh. Also, xorm requires deletes to have a - // condition, and we want to delete everything, thus 1=1. - if err := models.DeleteAllRecords("repo_indexer_status"); err != nil { - log.Fatal("System error: %v", err) - } - - var maxRepoID int64 - if maxRepoID, err = models.GetMaxID("repository"); err != nil { - log.Fatal("System error: %v", err) - } - - // start with the maximum existing repo ID and work backwards, so that we - // don't include repos that are created after gitea starts; such repos will - // already be added to the indexer, and we don't need to add them again. - for maxRepoID > 0 { - select { - case <-isShutdown: - log.Info("Repository Indexer population shutdown before completion") - return - default: - } - ids, err := models.GetUnindexedRepos(models.RepoIndexerTypeCode, maxRepoID, 0, 50) - if err != nil { - log.Error("populateRepoIndexer: %v", err) - return - } else if len(ids) == 0 { - break - } - for _, id := range ids { - select { - case <-isShutdown: - log.Info("Repository Indexer population shutdown before completion") - return - default: - } - repoIndexerOperationQueue <- repoIndexerOperation{ - repoID: id, - deleted: false, - } - maxRepoID = id - 1 - } - } - log.Info("Done (re)populating the repo indexer with existing repositories") -} |