diff options
author | zeripath <art27@cantab.net> | 2019-12-15 09:51:28 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-12-15 09:51:28 +0000 |
commit | e3c3b33ea7a5a223e22688c3f0eb2d3dab9f991c (patch) | |
tree | 21dcdc6ec138a502590550672ac0a11f364935ea /modules/indexer/code | |
parent | 8bea92c3dc162e24f6dcc2902dfed5ab94576826 (diff) | |
download | gitea-e3c3b33ea7a5a223e22688c3f0eb2d3dab9f991c.tar.gz gitea-e3c3b33ea7a5a223e22688c3f0eb2d3dab9f991c.zip |
Graceful: Xorm, RepoIndexer, Cron and Others (#9282)
* Change graceful to use a singleton obtained through GetManager instead of a global.
* Graceful: Make TestPullRequests shutdownable
* Graceful: Make the cron tasks graceful
* Graceful: AddTestPullRequest run in graceful ctx
* Graceful: SyncMirrors shutdown
* Graceful: SetDefaultContext for Xorm to be HammerContext
* Avoid starting graceful for migrate commands and checkout
* Graceful: DeliverHooks now can be shutdown
* Fix multiple syncing errors in modules/sync/UniqueQueue & Make UniqueQueue closable
* Begin the process of making the repo indexer shutdown gracefully
Diffstat (limited to 'modules/indexer/code')
-rw-r--r-- | modules/indexer/code/bleve.go | 82 | ||||
-rw-r--r-- | modules/indexer/code/repo.go | 35 |
2 files changed, 83 insertions, 34 deletions
diff --git a/modules/indexer/code/bleve.go b/modules/indexer/code/bleve.go index c8e1bb1879..bb2fc5bc74 100644 --- a/modules/indexer/code/bleve.go +++ b/modules/indexer/code/bleve.go @@ -6,6 +6,7 @@ package code import ( "fmt" + "os" "strconv" "strings" "time" @@ -34,10 +35,11 @@ func InitRepoIndexer() { return } waitChannel := make(chan time.Duration) + // FIXME: graceful: This should use a persistable queue repoIndexerOperationQueue = make(chan repoIndexerOperation, setting.Indexer.UpdateQueueLength) go func() { start := time.Now() - log.Info("Initializing Repository Indexer") + log.Info("PID: %d: Initializing Repository Indexer", os.Getpid()) initRepoIndexer(populateRepoIndexerAsynchronously) go processRepoIndexerOperationQueue() waitChannel <- time.Since(start) @@ -45,7 +47,7 @@ func InitRepoIndexer() { if setting.Indexer.StartupTimeout > 0 { go func() { timeout := setting.Indexer.StartupTimeout - if graceful.Manager.IsChild() && setting.GracefulHammerTime > 0 { + if graceful.GetManager().IsChild() && setting.GracefulHammerTime > 0 { timeout += setting.GracefulHammerTime } select { @@ -70,13 +72,6 @@ func populateRepoIndexerAsynchronously() error { return nil } - // if there is any existing repo indexer metadata in the DB, delete it - // since we are starting afresh. Also, xorm requires deletes to have a - // condition, and we want to delete everything, thus 1=1. - if err := models.DeleteAllRecords("repo_indexer_status"); err != nil { - return err - } - var maxRepoID int64 if maxRepoID, err = models.GetMaxID("repository"); err != nil { return err @@ -87,44 +82,59 @@ func populateRepoIndexerAsynchronously() error { // populateRepoIndexer populate the repo indexer with pre-existing data. This // should only be run when the indexer is created for the first time. +// FIXME: graceful: This should use a persistable queue func populateRepoIndexer(maxRepoID int64) { log.Info("Populating the repo indexer with existing repositories") + + isShutdown := graceful.GetManager().IsShutdown() + // start with the maximum existing repo ID and work backwards, so that we // don't include repos that are created after gitea starts; such repos will // already be added to the indexer, and we don't need to add them again. for maxRepoID > 0 { - repos := make([]*models.Repository, 0, models.RepositoryListDefaultPageSize) - err := models.FindByMaxID(maxRepoID, models.RepositoryListDefaultPageSize, &repos) + select { + case <-isShutdown: + log.Info("Repository Indexer population shutdown before completion") + return + default: + } + ids, err := models.GetUnindexedRepos(maxRepoID, 0, 50) if err != nil { log.Error("populateRepoIndexer: %v", err) return - } else if len(repos) == 0 { + } else if len(ids) == 0 { break } - for _, repo := range repos { + for _, id := range ids { + select { + case <-isShutdown: + log.Info("Repository Indexer population shutdown before completion") + return + default: + } repoIndexerOperationQueue <- repoIndexerOperation{ - repoID: repo.ID, + repoID: id, deleted: false, } - maxRepoID = repo.ID - 1 + maxRepoID = id - 1 } } - log.Info("Done populating the repo indexer with existing repositories") + log.Info("Done (re)populating the repo indexer with existing repositories") } func updateRepoIndexer(repoID int64) error { repo, err := models.GetRepositoryByID(repoID) if err != nil { - return err + return fmt.Errorf("UpdateRepoIndexer: Unable to GetRepositoryByID: %d, Error: %v", repoID, err) } sha, err := getDefaultBranchSha(repo) if err != nil { - return err + return fmt.Errorf("UpdateRepoIndexer: Unable to GetDefaultBranchSha for: %s/%s, Error: %v", repo.MustOwnerName(), repo.Name, err) } changes, err := getRepoChanges(repo, sha) if err != nil { - return err + return fmt.Errorf("UpdateRepoIndexer: Unable to GetRepoChanges for: %s/%s Sha: %s Error: %v", repo.MustOwnerName(), repo.Name, sha, err) } else if changes == nil { return nil } @@ -132,16 +142,16 @@ func updateRepoIndexer(repoID int64) error { batch := RepoIndexerBatch() for _, update := range changes.Updates { if err := addUpdate(update, repo, batch); err != nil { - return err + return fmt.Errorf("UpdateRepoIndexer: Unable to addUpdate to: %s/%s Sha: %s, update: %s(%s) Error: %v", repo.MustOwnerName(), repo.Name, sha, update.Filename, update.BlobSha, err) } } for _, filename := range changes.RemovedFilenames { if err := addDelete(filename, repo, batch); err != nil { - return err + return fmt.Errorf("UpdateRepoIndexer: Unable to addDelete to: %s/%s Sha: %s, filename: %s Error: %v", repo.MustOwnerName(), repo.Name, sha, filename, err) } } if err = batch.Flush(); err != nil { - return err + return fmt.Errorf("UpdateRepoIndexer: Unable to flush batch to indexer for repo: %s/%s Error: %v", repo.MustOwnerName(), repo.Name, err) } return repo.UpdateIndexerStatus(sha) } @@ -322,20 +332,26 @@ func nonGenesisChanges(repo *models.Repository, revision string) (*repoChanges, func processRepoIndexerOperationQueue() { for { - op := <-repoIndexerOperationQueue - var err error - if op.deleted { - if err = deleteRepoFromIndexer(op.repoID); err != nil { - log.Error("deleteRepoFromIndexer: %v", err) + select { + case op := <-repoIndexerOperationQueue: + var err error + if op.deleted { + if err = deleteRepoFromIndexer(op.repoID); err != nil { + log.Error("DeleteRepoFromIndexer: %v", err) + } + } else { + if err = updateRepoIndexer(op.repoID); err != nil { + log.Error("updateRepoIndexer: %v", err) + } } - } else { - if err = updateRepoIndexer(op.repoID); err != nil { - log.Error("updateRepoIndexer: %v", err) + for _, watcher := range op.watchers { + watcher <- err } + case <-graceful.GetManager().IsShutdown(): + log.Info("PID: %d Repository indexer queue processing stopped", os.Getpid()) + return } - for _, watcher := range op.watchers { - watcher <- err - } + } } diff --git a/modules/indexer/code/repo.go b/modules/indexer/code/repo.go index 31f0fa7f3d..bc5f317b7d 100644 --- a/modules/indexer/code/repo.go +++ b/modules/indexer/code/repo.go @@ -5,9 +5,13 @@ package code import ( + "context" + "os" "strings" "sync" + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" @@ -104,21 +108,50 @@ func (update RepoIndexerUpdate) AddToFlushingBatch(batch rupture.FlushingBatch) func initRepoIndexer(populateIndexer func() error) { indexer, err := openIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion) if err != nil { - log.Fatal("InitRepoIndexer: %v", err) + log.Fatal("InitRepoIndexer %s: %v", setting.Indexer.RepoPath, err) } if indexer != nil { indexerHolder.set(indexer) + closeAtTerminate() + + // Continue population from where left off + if err = populateIndexer(); err != nil { + log.Fatal("PopulateRepoIndex: %v", err) + } return } if err = createRepoIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion); err != nil { log.Fatal("CreateRepoIndexer: %v", err) } + closeAtTerminate() + + // if there is any existing repo indexer metadata in the DB, delete it + // since we are starting afresh. Also, xorm requires deletes to have a + // condition, and we want to delete everything, thus 1=1. + if err := models.DeleteAllRecords("repo_indexer_status"); err != nil { + log.Fatal("DeleteAllRepoIndexerStatus: %v", err) + } + if err = populateIndexer(); err != nil { log.Fatal("PopulateRepoIndex: %v", err) } } +func closeAtTerminate() { + graceful.GetManager().RunAtTerminate(context.Background(), func() { + log.Debug("Closing repo indexer") + indexer := indexerHolder.get() + if indexer != nil { + err := indexer.Close() + if err != nil { + log.Error("Error whilst closing the repository indexer: %v", err) + } + } + log.Info("PID: %d Repository Indexer closed", os.Getpid()) + }) +} + // createRepoIndexer create a repo indexer if one does not already exist func createRepoIndexer(path string, latestVersion int) error { docMapping := bleve.NewDocumentMapping() |