diff options
Diffstat (limited to 'modules/indexer/code/bleve.go')
-rw-r--r-- | modules/indexer/code/bleve.go | 82 |
1 files changed, 49 insertions, 33 deletions
diff --git a/modules/indexer/code/bleve.go b/modules/indexer/code/bleve.go index c8e1bb1879..bb2fc5bc74 100644 --- a/modules/indexer/code/bleve.go +++ b/modules/indexer/code/bleve.go @@ -6,6 +6,7 @@ package code import ( "fmt" + "os" "strconv" "strings" "time" @@ -34,10 +35,11 @@ func InitRepoIndexer() { return } waitChannel := make(chan time.Duration) + // FIXME: graceful: This should use a persistable queue repoIndexerOperationQueue = make(chan repoIndexerOperation, setting.Indexer.UpdateQueueLength) go func() { start := time.Now() - log.Info("Initializing Repository Indexer") + log.Info("PID: %d: Initializing Repository Indexer", os.Getpid()) initRepoIndexer(populateRepoIndexerAsynchronously) go processRepoIndexerOperationQueue() waitChannel <- time.Since(start) @@ -45,7 +47,7 @@ func InitRepoIndexer() { if setting.Indexer.StartupTimeout > 0 { go func() { timeout := setting.Indexer.StartupTimeout - if graceful.Manager.IsChild() && setting.GracefulHammerTime > 0 { + if graceful.GetManager().IsChild() && setting.GracefulHammerTime > 0 { timeout += setting.GracefulHammerTime } select { @@ -70,13 +72,6 @@ func populateRepoIndexerAsynchronously() error { return nil } - // if there is any existing repo indexer metadata in the DB, delete it - // since we are starting afresh. Also, xorm requires deletes to have a - // condition, and we want to delete everything, thus 1=1. - if err := models.DeleteAllRecords("repo_indexer_status"); err != nil { - return err - } - var maxRepoID int64 if maxRepoID, err = models.GetMaxID("repository"); err != nil { return err @@ -87,44 +82,59 @@ func populateRepoIndexerAsynchronously() error { // populateRepoIndexer populate the repo indexer with pre-existing data. This // should only be run when the indexer is created for the first time. +// FIXME: graceful: This should use a persistable queue func populateRepoIndexer(maxRepoID int64) { log.Info("Populating the repo indexer with existing repositories") + + isShutdown := graceful.GetManager().IsShutdown() + // start with the maximum existing repo ID and work backwards, so that we // don't include repos that are created after gitea starts; such repos will // already be added to the indexer, and we don't need to add them again. for maxRepoID > 0 { - repos := make([]*models.Repository, 0, models.RepositoryListDefaultPageSize) - err := models.FindByMaxID(maxRepoID, models.RepositoryListDefaultPageSize, &repos) + select { + case <-isShutdown: + log.Info("Repository Indexer population shutdown before completion") + return + default: + } + ids, err := models.GetUnindexedRepos(maxRepoID, 0, 50) if err != nil { log.Error("populateRepoIndexer: %v", err) return - } else if len(repos) == 0 { + } else if len(ids) == 0 { break } - for _, repo := range repos { + for _, id := range ids { + select { + case <-isShutdown: + log.Info("Repository Indexer population shutdown before completion") + return + default: + } repoIndexerOperationQueue <- repoIndexerOperation{ - repoID: repo.ID, + repoID: id, deleted: false, } - maxRepoID = repo.ID - 1 + maxRepoID = id - 1 } } - log.Info("Done populating the repo indexer with existing repositories") + log.Info("Done (re)populating the repo indexer with existing repositories") } func updateRepoIndexer(repoID int64) error { repo, err := models.GetRepositoryByID(repoID) if err != nil { - return err + return fmt.Errorf("UpdateRepoIndexer: Unable to GetRepositoryByID: %d, Error: %v", repoID, err) } sha, err := getDefaultBranchSha(repo) if err != nil { - return err + return fmt.Errorf("UpdateRepoIndexer: Unable to GetDefaultBranchSha for: %s/%s, Error: %v", repo.MustOwnerName(), repo.Name, err) } changes, err := getRepoChanges(repo, sha) if err != nil { - return err + return fmt.Errorf("UpdateRepoIndexer: Unable to GetRepoChanges for: %s/%s Sha: %s Error: %v", repo.MustOwnerName(), repo.Name, sha, err) } else if changes == nil { return nil } @@ -132,16 +142,16 @@ func updateRepoIndexer(repoID int64) error { batch := RepoIndexerBatch() for _, update := range changes.Updates { if err := addUpdate(update, repo, batch); err != nil { - return err + return fmt.Errorf("UpdateRepoIndexer: Unable to addUpdate to: %s/%s Sha: %s, update: %s(%s) Error: %v", repo.MustOwnerName(), repo.Name, sha, update.Filename, update.BlobSha, err) } } for _, filename := range changes.RemovedFilenames { if err := addDelete(filename, repo, batch); err != nil { - return err + return fmt.Errorf("UpdateRepoIndexer: Unable to addDelete to: %s/%s Sha: %s, filename: %s Error: %v", repo.MustOwnerName(), repo.Name, sha, filename, err) } } if err = batch.Flush(); err != nil { - return err + return fmt.Errorf("UpdateRepoIndexer: Unable to flush batch to indexer for repo: %s/%s Error: %v", repo.MustOwnerName(), repo.Name, err) } return repo.UpdateIndexerStatus(sha) } @@ -322,20 +332,26 @@ func nonGenesisChanges(repo *models.Repository, revision string) (*repoChanges, func processRepoIndexerOperationQueue() { for { - op := <-repoIndexerOperationQueue - var err error - if op.deleted { - if err = deleteRepoFromIndexer(op.repoID); err != nil { - log.Error("deleteRepoFromIndexer: %v", err) + select { + case op := <-repoIndexerOperationQueue: + var err error + if op.deleted { + if err = deleteRepoFromIndexer(op.repoID); err != nil { + log.Error("DeleteRepoFromIndexer: %v", err) + } + } else { + if err = updateRepoIndexer(op.repoID); err != nil { + log.Error("updateRepoIndexer: %v", err) + } } - } else { - if err = updateRepoIndexer(op.repoID); err != nil { - log.Error("updateRepoIndexer: %v", err) + for _, watcher := range op.watchers { + watcher <- err } + case <-graceful.GetManager().IsShutdown(): + log.Info("PID: %d Repository indexer queue processing stopped", os.Getpid()) + return } - for _, watcher := range op.watchers { - watcher <- err - } + } } |