diff options
author | zeripath <art27@cantab.net> | 2019-12-15 09:51:28 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-12-15 09:51:28 +0000 |
commit | e3c3b33ea7a5a223e22688c3f0eb2d3dab9f991c (patch) | |
tree | 21dcdc6ec138a502590550672ac0a11f364935ea /services | |
parent | 8bea92c3dc162e24f6dcc2902dfed5ab94576826 (diff) | |
download | gitea-e3c3b33ea7a5a223e22688c3f0eb2d3dab9f991c.tar.gz gitea-e3c3b33ea7a5a223e22688c3f0eb2d3dab9f991c.zip |
Graceful: Xorm, RepoIndexer, Cron and Others (#9282)
* Change graceful to use a singleton obtained through GetManager instead of a global.
* Graceful: Make TestPullRequests shutdownable
* Graceful: Make the cron tasks graceful
* Graceful: AddTestPullRequest run in graceful ctx
* Graceful: SyncMirrors shutdown
* Graceful: SetDefaultContext for Xorm to be HammerContext
* Avoid starting graceful for migrate commands and checkout
* Graceful: DeliverHooks now can be shutdown
* Fix multiple syncing errors in modules/sync/UniqueQueue & Make UniqueQueue closable
* Begin the process of making the repo indexer shutdown gracefully
Diffstat (limited to 'services')
-rw-r--r-- | services/mirror/mirror.go | 32 | ||||
-rw-r--r-- | services/pull/check.go | 90 | ||||
-rw-r--r-- | services/pull/pull.go | 63 |
3 files changed, 98 insertions, 87 deletions
diff --git a/services/mirror/mirror.go b/services/mirror/mirror.go index 1ad9448b6b..7fc6e97b46 100644 --- a/services/mirror/mirror.go +++ b/services/mirror/mirror.go @@ -5,11 +5,14 @@ package mirror import ( + "context" "fmt" "net/url" "strings" "time" + "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/cache" "code.gitea.io/gitea/modules/git" @@ -294,29 +297,38 @@ func Password(m *models.Mirror) string { } // Update checks and updates mirror repositories. -func Update() { +func Update(ctx context.Context) { log.Trace("Doing: Update") - if err := models.MirrorsIterate(func(idx int, bean interface{}) error { m := bean.(*models.Mirror) if m.Repo == nil { log.Error("Disconnected mirror repository found: %d", m.ID) return nil } - - mirrorQueue.Add(m.RepoID) - return nil + select { + case <-ctx.Done(): + return fmt.Errorf("Aborted due to shutdown") + default: + mirrorQueue.Add(m.RepoID) + return nil + } }); err != nil { log.Error("Update: %v", err) } } // SyncMirrors checks and syncs mirrors. -// TODO: sync more mirrors at same time. -func SyncMirrors() { +// FIXME: graceful: this should be a persistable queue +func SyncMirrors(ctx context.Context) { // Start listening on new sync requests. - for repoID := range mirrorQueue.Queue() { - syncMirror(repoID) + for { + select { + case <-ctx.Done(): + mirrorQueue.Close() + return + case repoID := <-mirrorQueue.Queue(): + syncMirror(repoID) + } } } @@ -416,7 +428,7 @@ func syncMirror(repoID string) { // InitSyncMirrors initializes a go routine to sync the mirrors func InitSyncMirrors() { - go SyncMirrors() + go graceful.GetManager().RunWithShutdownContext(SyncMirrors) } // StartToMirror adds repoID to mirror queue diff --git a/services/pull/check.go b/services/pull/check.go index fc2ac927b8..7344f071ac 100644 --- a/services/pull/check.go +++ b/services/pull/check.go @@ -6,6 +6,7 @@ package pull import ( + "context" "fmt" "io/ioutil" "os" @@ -16,6 +17,7 @@ import ( "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/sync" @@ -151,65 +153,53 @@ func manuallyMerged(pr *models.PullRequest) bool { // TestPullRequests checks and tests untested patches of pull requests. // TODO: test more pull requests at same time. -func TestPullRequests() { - prs, err := models.GetPullRequestsByCheckStatus(models.PullRequestStatusChecking) - if err != nil { - log.Error("Find Checking PRs: %v", err) - return - } - - var checkedPRs = make(map[int64]struct{}) +func TestPullRequests(ctx context.Context) { - // Update pull request status. - for _, pr := range prs { - checkedPRs[pr.ID] = struct{}{} - if err := pr.GetBaseRepo(); err != nil { - log.Error("GetBaseRepo: %v", err) - continue - } - if manuallyMerged(pr) { - continue + go func() { + prs, err := models.GetPullRequestIDsByCheckStatus(models.PullRequestStatusChecking) + if err != nil { + log.Error("Find Checking PRs: %v", err) + return } - if err := TestPatch(pr); err != nil { - log.Error("testPatch: %v", err) - continue + for _, prID := range prs { + select { + case <-ctx.Done(): + return + default: + pullRequestQueue.Add(prID) + } } - - checkAndUpdateStatus(pr) - } + }() // Start listening on new test requests. - for prID := range pullRequestQueue.Queue() { - log.Trace("TestPullRequests[%v]: processing test task", prID) - pullRequestQueue.Remove(prID) - - id := com.StrTo(prID).MustInt64() - if _, ok := checkedPRs[id]; ok { - continue - } - - pr, err := models.GetPullRequestByID(id) - if err != nil { - log.Error("GetPullRequestByID[%s]: %v", prID, err) - continue - } else if manuallyMerged(pr) { - continue - } - pr.Status = models.PullRequestStatusChecking - if err := pr.Update(); err != nil { - log.Error("testPatch[%d]: Unable to update status to Checking Status %v", pr.ID, err) - continue - } - if err = TestPatch(pr); err != nil { - log.Error("testPatch[%d]: %v", pr.ID, err) - continue + for { + select { + case prID := <-pullRequestQueue.Queue(): + log.Trace("TestPullRequests[%v]: processing test task", prID) + pullRequestQueue.Remove(prID) + + id := com.StrTo(prID).MustInt64() + + pr, err := models.GetPullRequestByID(id) + if err != nil { + log.Error("GetPullRequestByID[%s]: %v", prID, err) + continue + } else if manuallyMerged(pr) { + continue + } else if err = TestPatch(pr); err != nil { + log.Error("testPatch[%d]: %v", pr.ID, err) + continue + } + checkAndUpdateStatus(pr) + case <-ctx.Done(): + pullRequestQueue.Close() + log.Info("PID: %d Pull Request testing shutdown", os.Getpid()) + return } - - checkAndUpdateStatus(pr) } } // Init runs the task queue to test all the checking status pull requests func Init() { - go TestPullRequests() + go graceful.GetManager().RunWithShutdownContext(TestPullRequests) } diff --git a/services/pull/pull.go b/services/pull/pull.go index 6447c8a87f..e7f4e4eede 100644 --- a/services/pull/pull.go +++ b/services/pull/pull.go @@ -5,12 +5,14 @@ package pull import ( + "context" "fmt" "os" "path" "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/notification" issue_service "code.gitea.io/gitea/services/issue" @@ -54,6 +56,7 @@ func checkForInvalidation(requests models.PullRequestList, repoID int64, doer *m return fmt.Errorf("git.OpenRepository: %v", err) } go func() { + // FIXME: graceful: We need to tell the manager we're doing something... err := requests.InvalidateCodeComments(doer, gitRepo, branch) if err != nil { log.Error("PullRequestList.InvalidateCodeComments: %v", err) @@ -79,39 +82,45 @@ func addHeadRepoTasks(prs []*models.PullRequest) { // and generate new patch for testing as needed. func AddTestPullRequestTask(doer *models.User, repoID int64, branch string, isSync bool) { log.Trace("AddTestPullRequestTask [head_repo_id: %d, head_branch: %s]: finding pull requests", repoID, branch) - prs, err := models.GetUnmergedPullRequestsByHeadInfo(repoID, branch) - if err != nil { - log.Error("Find pull requests [head_repo_id: %d, head_branch: %s]: %v", repoID, branch, err) - return - } + graceful.GetManager().RunWithShutdownContext(func(ctx context.Context) { + // There is no sensible way to shut this down ":-(" + // If you don't let it run all the way then you will lose data + // FIXME: graceful: AddTestPullRequestTask needs to become a queue! - if isSync { - requests := models.PullRequestList(prs) - if err = requests.LoadAttributes(); err != nil { - log.Error("PullRequestList.LoadAttributes: %v", err) - } - if invalidationErr := checkForInvalidation(requests, repoID, doer, branch); invalidationErr != nil { - log.Error("checkForInvalidation: %v", invalidationErr) + prs, err := models.GetUnmergedPullRequestsByHeadInfo(repoID, branch) + if err != nil { + log.Error("Find pull requests [head_repo_id: %d, head_branch: %s]: %v", repoID, branch, err) + return } - if err == nil { - for _, pr := range prs { - pr.Issue.PullRequest = pr - notification.NotifyPullRequestSynchronized(doer, pr) + + if isSync { + requests := models.PullRequestList(prs) + if err = requests.LoadAttributes(); err != nil { + log.Error("PullRequestList.LoadAttributes: %v", err) + } + if invalidationErr := checkForInvalidation(requests, repoID, doer, branch); invalidationErr != nil { + log.Error("checkForInvalidation: %v", invalidationErr) + } + if err == nil { + for _, pr := range prs { + pr.Issue.PullRequest = pr + notification.NotifyPullRequestSynchronized(doer, pr) + } } } - } - addHeadRepoTasks(prs) + addHeadRepoTasks(prs) - log.Trace("AddTestPullRequestTask [base_repo_id: %d, base_branch: %s]: finding pull requests", repoID, branch) - prs, err = models.GetUnmergedPullRequestsByBaseInfo(repoID, branch) - if err != nil { - log.Error("Find pull requests [base_repo_id: %d, base_branch: %s]: %v", repoID, branch, err) - return - } - for _, pr := range prs { - AddToTaskQueue(pr) - } + log.Trace("AddTestPullRequestTask [base_repo_id: %d, base_branch: %s]: finding pull requests", repoID, branch) + prs, err = models.GetUnmergedPullRequestsByBaseInfo(repoID, branch) + if err != nil { + log.Error("Find pull requests [base_repo_id: %d, base_branch: %s]: %v", repoID, branch, err) + return + } + for _, pr := range prs { + AddToTaskQueue(pr) + } + }) } // PushToBaseRepo pushes commits from branches of head repository to |