aboutsummaryrefslogtreecommitdiffstats
path: root/modules/indexer
diff options
context:
space:
mode:
authorzeripath <art27@cantab.net>2019-12-15 09:51:28 +0000
committerGitHub <noreply@github.com>2019-12-15 09:51:28 +0000
commite3c3b33ea7a5a223e22688c3f0eb2d3dab9f991c (patch)
tree21dcdc6ec138a502590550672ac0a11f364935ea /modules/indexer
parent8bea92c3dc162e24f6dcc2902dfed5ab94576826 (diff)
downloadgitea-e3c3b33ea7a5a223e22688c3f0eb2d3dab9f991c.tar.gz
gitea-e3c3b33ea7a5a223e22688c3f0eb2d3dab9f991c.zip
Graceful: Xorm, RepoIndexer, Cron and Others (#9282)
* Change graceful to use a singleton obtained through GetManager instead of a global. * Graceful: Make TestPullRequests shutdownable * Graceful: Make the cron tasks graceful * Graceful: AddTestPullRequest run in graceful ctx * Graceful: SyncMirrors shutdown * Graceful: SetDefaultContext for Xorm to be HammerContext * Avoid starting graceful for migrate commands and checkout * Graceful: DeliverHooks now can be shutdown * Fix multiple syncing errors in modules/sync/UniqueQueue & Make UniqueQueue closable * Begin the process of making the repo indexer shutdown gracefully
Diffstat (limited to 'modules/indexer')
-rw-r--r--modules/indexer/code/bleve.go82
-rw-r--r--modules/indexer/code/repo.go35
-rw-r--r--modules/indexer/issues/indexer.go2
3 files changed, 84 insertions, 35 deletions
diff --git a/modules/indexer/code/bleve.go b/modules/indexer/code/bleve.go
index c8e1bb1879..bb2fc5bc74 100644
--- a/modules/indexer/code/bleve.go
+++ b/modules/indexer/code/bleve.go
@@ -6,6 +6,7 @@ package code
import (
"fmt"
+ "os"
"strconv"
"strings"
"time"
@@ -34,10 +35,11 @@ func InitRepoIndexer() {
return
}
waitChannel := make(chan time.Duration)
+ // FIXME: graceful: This should use a persistable queue
repoIndexerOperationQueue = make(chan repoIndexerOperation, setting.Indexer.UpdateQueueLength)
go func() {
start := time.Now()
- log.Info("Initializing Repository Indexer")
+ log.Info("PID: %d: Initializing Repository Indexer", os.Getpid())
initRepoIndexer(populateRepoIndexerAsynchronously)
go processRepoIndexerOperationQueue()
waitChannel <- time.Since(start)
@@ -45,7 +47,7 @@ func InitRepoIndexer() {
if setting.Indexer.StartupTimeout > 0 {
go func() {
timeout := setting.Indexer.StartupTimeout
- if graceful.Manager.IsChild() && setting.GracefulHammerTime > 0 {
+ if graceful.GetManager().IsChild() && setting.GracefulHammerTime > 0 {
timeout += setting.GracefulHammerTime
}
select {
@@ -70,13 +72,6 @@ func populateRepoIndexerAsynchronously() error {
return nil
}
- // if there is any existing repo indexer metadata in the DB, delete it
- // since we are starting afresh. Also, xorm requires deletes to have a
- // condition, and we want to delete everything, thus 1=1.
- if err := models.DeleteAllRecords("repo_indexer_status"); err != nil {
- return err
- }
-
var maxRepoID int64
if maxRepoID, err = models.GetMaxID("repository"); err != nil {
return err
@@ -87,44 +82,59 @@ func populateRepoIndexerAsynchronously() error {
// populateRepoIndexer populate the repo indexer with pre-existing data. This
// should only be run when the indexer is created for the first time.
+// FIXME: graceful: This should use a persistable queue
func populateRepoIndexer(maxRepoID int64) {
log.Info("Populating the repo indexer with existing repositories")
+
+ isShutdown := graceful.GetManager().IsShutdown()
+
// start with the maximum existing repo ID and work backwards, so that we
// don't include repos that are created after gitea starts; such repos will
// already be added to the indexer, and we don't need to add them again.
for maxRepoID > 0 {
- repos := make([]*models.Repository, 0, models.RepositoryListDefaultPageSize)
- err := models.FindByMaxID(maxRepoID, models.RepositoryListDefaultPageSize, &repos)
+ select {
+ case <-isShutdown:
+ log.Info("Repository Indexer population shutdown before completion")
+ return
+ default:
+ }
+ ids, err := models.GetUnindexedRepos(maxRepoID, 0, 50)
if err != nil {
log.Error("populateRepoIndexer: %v", err)
return
- } else if len(repos) == 0 {
+ } else if len(ids) == 0 {
break
}
- for _, repo := range repos {
+ for _, id := range ids {
+ select {
+ case <-isShutdown:
+ log.Info("Repository Indexer population shutdown before completion")
+ return
+ default:
+ }
repoIndexerOperationQueue <- repoIndexerOperation{
- repoID: repo.ID,
+ repoID: id,
deleted: false,
}
- maxRepoID = repo.ID - 1
+ maxRepoID = id - 1
}
}
- log.Info("Done populating the repo indexer with existing repositories")
+ log.Info("Done (re)populating the repo indexer with existing repositories")
}
func updateRepoIndexer(repoID int64) error {
repo, err := models.GetRepositoryByID(repoID)
if err != nil {
- return err
+ return fmt.Errorf("UpdateRepoIndexer: Unable to GetRepositoryByID: %d, Error: %v", repoID, err)
}
sha, err := getDefaultBranchSha(repo)
if err != nil {
- return err
+ return fmt.Errorf("UpdateRepoIndexer: Unable to GetDefaultBranchSha for: %s/%s, Error: %v", repo.MustOwnerName(), repo.Name, err)
}
changes, err := getRepoChanges(repo, sha)
if err != nil {
- return err
+ return fmt.Errorf("UpdateRepoIndexer: Unable to GetRepoChanges for: %s/%s Sha: %s Error: %v", repo.MustOwnerName(), repo.Name, sha, err)
} else if changes == nil {
return nil
}
@@ -132,16 +142,16 @@ func updateRepoIndexer(repoID int64) error {
batch := RepoIndexerBatch()
for _, update := range changes.Updates {
if err := addUpdate(update, repo, batch); err != nil {
- return err
+ return fmt.Errorf("UpdateRepoIndexer: Unable to addUpdate to: %s/%s Sha: %s, update: %s(%s) Error: %v", repo.MustOwnerName(), repo.Name, sha, update.Filename, update.BlobSha, err)
}
}
for _, filename := range changes.RemovedFilenames {
if err := addDelete(filename, repo, batch); err != nil {
- return err
+ return fmt.Errorf("UpdateRepoIndexer: Unable to addDelete to: %s/%s Sha: %s, filename: %s Error: %v", repo.MustOwnerName(), repo.Name, sha, filename, err)
}
}
if err = batch.Flush(); err != nil {
- return err
+ return fmt.Errorf("UpdateRepoIndexer: Unable to flush batch to indexer for repo: %s/%s Error: %v", repo.MustOwnerName(), repo.Name, err)
}
return repo.UpdateIndexerStatus(sha)
}
@@ -322,20 +332,26 @@ func nonGenesisChanges(repo *models.Repository, revision string) (*repoChanges,
func processRepoIndexerOperationQueue() {
for {
- op := <-repoIndexerOperationQueue
- var err error
- if op.deleted {
- if err = deleteRepoFromIndexer(op.repoID); err != nil {
- log.Error("deleteRepoFromIndexer: %v", err)
+ select {
+ case op := <-repoIndexerOperationQueue:
+ var err error
+ if op.deleted {
+ if err = deleteRepoFromIndexer(op.repoID); err != nil {
+ log.Error("DeleteRepoFromIndexer: %v", err)
+ }
+ } else {
+ if err = updateRepoIndexer(op.repoID); err != nil {
+ log.Error("updateRepoIndexer: %v", err)
+ }
}
- } else {
- if err = updateRepoIndexer(op.repoID); err != nil {
- log.Error("updateRepoIndexer: %v", err)
+ for _, watcher := range op.watchers {
+ watcher <- err
}
+ case <-graceful.GetManager().IsShutdown():
+ log.Info("PID: %d Repository indexer queue processing stopped", os.Getpid())
+ return
}
- for _, watcher := range op.watchers {
- watcher <- err
- }
+
}
}
diff --git a/modules/indexer/code/repo.go b/modules/indexer/code/repo.go
index 31f0fa7f3d..bc5f317b7d 100644
--- a/modules/indexer/code/repo.go
+++ b/modules/indexer/code/repo.go
@@ -5,9 +5,13 @@
package code
import (
+ "context"
+ "os"
"strings"
"sync"
+ "code.gitea.io/gitea/models"
+ "code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
@@ -104,21 +108,50 @@ func (update RepoIndexerUpdate) AddToFlushingBatch(batch rupture.FlushingBatch)
func initRepoIndexer(populateIndexer func() error) {
indexer, err := openIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion)
if err != nil {
- log.Fatal("InitRepoIndexer: %v", err)
+ log.Fatal("InitRepoIndexer %s: %v", setting.Indexer.RepoPath, err)
}
if indexer != nil {
indexerHolder.set(indexer)
+ closeAtTerminate()
+
+ // Continue population from where left off
+ if err = populateIndexer(); err != nil {
+ log.Fatal("PopulateRepoIndex: %v", err)
+ }
return
}
if err = createRepoIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion); err != nil {
log.Fatal("CreateRepoIndexer: %v", err)
}
+ closeAtTerminate()
+
+ // if there is any existing repo indexer metadata in the DB, delete it
+ // since we are starting afresh. Also, xorm requires deletes to have a
+ // condition, and we want to delete everything, thus 1=1.
+ if err := models.DeleteAllRecords("repo_indexer_status"); err != nil {
+ log.Fatal("DeleteAllRepoIndexerStatus: %v", err)
+ }
+
if err = populateIndexer(); err != nil {
log.Fatal("PopulateRepoIndex: %v", err)
}
}
+func closeAtTerminate() {
+ graceful.GetManager().RunAtTerminate(context.Background(), func() {
+ log.Debug("Closing repo indexer")
+ indexer := indexerHolder.get()
+ if indexer != nil {
+ err := indexer.Close()
+ if err != nil {
+ log.Error("Error whilst closing the repository indexer: %v", err)
+ }
+ }
+ log.Info("PID: %d Repository Indexer closed", os.Getpid())
+ })
+}
+
// createRepoIndexer create a repo indexer if one does not already exist
func createRepoIndexer(path string, latestVersion int) error {
docMapping := bleve.NewDocumentMapping()
diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go
index 78eba58095..50b8d6d224 100644
--- a/modules/indexer/issues/indexer.go
+++ b/modules/indexer/issues/indexer.go
@@ -172,7 +172,7 @@ func InitIssueIndexer(syncReindex bool) {
} else if setting.Indexer.StartupTimeout > 0 {
go func() {
timeout := setting.Indexer.StartupTimeout
- if graceful.Manager.IsChild() && setting.GracefulHammerTime > 0 {
+ if graceful.GetManager().IsChild() && setting.GracefulHammerTime > 0 {
timeout += setting.GracefulHammerTime
}
select {