summaryrefslogtreecommitdiffstats
path: root/services
diff options
context:
space:
mode:
authorzeripath <art27@cantab.net>2019-12-15 09:51:28 +0000
committerGitHub <noreply@github.com>2019-12-15 09:51:28 +0000
commite3c3b33ea7a5a223e22688c3f0eb2d3dab9f991c (patch)
tree21dcdc6ec138a502590550672ac0a11f364935ea /services
parent8bea92c3dc162e24f6dcc2902dfed5ab94576826 (diff)
downloadgitea-e3c3b33ea7a5a223e22688c3f0eb2d3dab9f991c.tar.gz
gitea-e3c3b33ea7a5a223e22688c3f0eb2d3dab9f991c.zip
Graceful: Xorm, RepoIndexer, Cron and Others (#9282)
* Change graceful to use a singleton obtained through GetManager instead of a global. * Graceful: Make TestPullRequests shutdownable * Graceful: Make the cron tasks graceful * Graceful: AddTestPullRequest run in graceful ctx * Graceful: SyncMirrors shutdown * Graceful: SetDefaultContext for Xorm to be HammerContext * Avoid starting graceful for migrate commands and checkout * Graceful: DeliverHooks now can be shutdown * Fix multiple syncing errors in modules/sync/UniqueQueue & Make UniqueQueue closable * Begin the process of making the repo indexer shutdown gracefully
Diffstat (limited to 'services')
-rw-r--r--services/mirror/mirror.go32
-rw-r--r--services/pull/check.go90
-rw-r--r--services/pull/pull.go63
3 files changed, 98 insertions, 87 deletions
diff --git a/services/mirror/mirror.go b/services/mirror/mirror.go
index 1ad9448b6b..7fc6e97b46 100644
--- a/services/mirror/mirror.go
+++ b/services/mirror/mirror.go
@@ -5,11 +5,14 @@
package mirror
import (
+ "context"
"fmt"
"net/url"
"strings"
"time"
+ "code.gitea.io/gitea/modules/graceful"
+
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/cache"
"code.gitea.io/gitea/modules/git"
@@ -294,29 +297,38 @@ func Password(m *models.Mirror) string {
}
// Update checks and updates mirror repositories.
-func Update() {
+func Update(ctx context.Context) {
log.Trace("Doing: Update")
-
if err := models.MirrorsIterate(func(idx int, bean interface{}) error {
m := bean.(*models.Mirror)
if m.Repo == nil {
log.Error("Disconnected mirror repository found: %d", m.ID)
return nil
}
-
- mirrorQueue.Add(m.RepoID)
- return nil
+ select {
+ case <-ctx.Done():
+ return fmt.Errorf("Aborted due to shutdown")
+ default:
+ mirrorQueue.Add(m.RepoID)
+ return nil
+ }
}); err != nil {
log.Error("Update: %v", err)
}
}
// SyncMirrors checks and syncs mirrors.
-// TODO: sync more mirrors at same time.
-func SyncMirrors() {
+// FIXME: graceful: this should be a persistable queue
+func SyncMirrors(ctx context.Context) {
// Start listening on new sync requests.
- for repoID := range mirrorQueue.Queue() {
- syncMirror(repoID)
+ for {
+ select {
+ case <-ctx.Done():
+ mirrorQueue.Close()
+ return
+ case repoID := <-mirrorQueue.Queue():
+ syncMirror(repoID)
+ }
}
}
@@ -416,7 +428,7 @@ func syncMirror(repoID string) {
// InitSyncMirrors initializes a go routine to sync the mirrors
func InitSyncMirrors() {
- go SyncMirrors()
+ go graceful.GetManager().RunWithShutdownContext(SyncMirrors)
}
// StartToMirror adds repoID to mirror queue
diff --git a/services/pull/check.go b/services/pull/check.go
index fc2ac927b8..7344f071ac 100644
--- a/services/pull/check.go
+++ b/services/pull/check.go
@@ -6,6 +6,7 @@
package pull
import (
+ "context"
"fmt"
"io/ioutil"
"os"
@@ -16,6 +17,7 @@ import (
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/git"
+ "code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/sync"
@@ -151,65 +153,53 @@ func manuallyMerged(pr *models.PullRequest) bool {
// TestPullRequests checks and tests untested patches of pull requests.
// TODO: test more pull requests at same time.
-func TestPullRequests() {
- prs, err := models.GetPullRequestsByCheckStatus(models.PullRequestStatusChecking)
- if err != nil {
- log.Error("Find Checking PRs: %v", err)
- return
- }
-
- var checkedPRs = make(map[int64]struct{})
+func TestPullRequests(ctx context.Context) {
- // Update pull request status.
- for _, pr := range prs {
- checkedPRs[pr.ID] = struct{}{}
- if err := pr.GetBaseRepo(); err != nil {
- log.Error("GetBaseRepo: %v", err)
- continue
- }
- if manuallyMerged(pr) {
- continue
+ go func() {
+ prs, err := models.GetPullRequestIDsByCheckStatus(models.PullRequestStatusChecking)
+ if err != nil {
+ log.Error("Find Checking PRs: %v", err)
+ return
}
- if err := TestPatch(pr); err != nil {
- log.Error("testPatch: %v", err)
- continue
+ for _, prID := range prs {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ pullRequestQueue.Add(prID)
+ }
}
-
- checkAndUpdateStatus(pr)
- }
+ }()
// Start listening on new test requests.
- for prID := range pullRequestQueue.Queue() {
- log.Trace("TestPullRequests[%v]: processing test task", prID)
- pullRequestQueue.Remove(prID)
-
- id := com.StrTo(prID).MustInt64()
- if _, ok := checkedPRs[id]; ok {
- continue
- }
-
- pr, err := models.GetPullRequestByID(id)
- if err != nil {
- log.Error("GetPullRequestByID[%s]: %v", prID, err)
- continue
- } else if manuallyMerged(pr) {
- continue
- }
- pr.Status = models.PullRequestStatusChecking
- if err := pr.Update(); err != nil {
- log.Error("testPatch[%d]: Unable to update status to Checking Status %v", pr.ID, err)
- continue
- }
- if err = TestPatch(pr); err != nil {
- log.Error("testPatch[%d]: %v", pr.ID, err)
- continue
+ for {
+ select {
+ case prID := <-pullRequestQueue.Queue():
+ log.Trace("TestPullRequests[%v]: processing test task", prID)
+ pullRequestQueue.Remove(prID)
+
+ id := com.StrTo(prID).MustInt64()
+
+ pr, err := models.GetPullRequestByID(id)
+ if err != nil {
+ log.Error("GetPullRequestByID[%s]: %v", prID, err)
+ continue
+ } else if manuallyMerged(pr) {
+ continue
+ } else if err = TestPatch(pr); err != nil {
+ log.Error("testPatch[%d]: %v", pr.ID, err)
+ continue
+ }
+ checkAndUpdateStatus(pr)
+ case <-ctx.Done():
+ pullRequestQueue.Close()
+ log.Info("PID: %d Pull Request testing shutdown", os.Getpid())
+ return
}
-
- checkAndUpdateStatus(pr)
}
}
// Init runs the task queue to test all the checking status pull requests
func Init() {
- go TestPullRequests()
+ go graceful.GetManager().RunWithShutdownContext(TestPullRequests)
}
diff --git a/services/pull/pull.go b/services/pull/pull.go
index 6447c8a87f..e7f4e4eede 100644
--- a/services/pull/pull.go
+++ b/services/pull/pull.go
@@ -5,12 +5,14 @@
package pull
import (
+ "context"
"fmt"
"os"
"path"
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/git"
+ "code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/notification"
issue_service "code.gitea.io/gitea/services/issue"
@@ -54,6 +56,7 @@ func checkForInvalidation(requests models.PullRequestList, repoID int64, doer *m
return fmt.Errorf("git.OpenRepository: %v", err)
}
go func() {
+ // FIXME: graceful: We need to tell the manager we're doing something...
err := requests.InvalidateCodeComments(doer, gitRepo, branch)
if err != nil {
log.Error("PullRequestList.InvalidateCodeComments: %v", err)
@@ -79,39 +82,45 @@ func addHeadRepoTasks(prs []*models.PullRequest) {
// and generate new patch for testing as needed.
func AddTestPullRequestTask(doer *models.User, repoID int64, branch string, isSync bool) {
log.Trace("AddTestPullRequestTask [head_repo_id: %d, head_branch: %s]: finding pull requests", repoID, branch)
- prs, err := models.GetUnmergedPullRequestsByHeadInfo(repoID, branch)
- if err != nil {
- log.Error("Find pull requests [head_repo_id: %d, head_branch: %s]: %v", repoID, branch, err)
- return
- }
+ graceful.GetManager().RunWithShutdownContext(func(ctx context.Context) {
+ // There is no sensible way to shut this down ":-("
+ // If you don't let it run all the way then you will lose data
+ // FIXME: graceful: AddTestPullRequestTask needs to become a queue!
- if isSync {
- requests := models.PullRequestList(prs)
- if err = requests.LoadAttributes(); err != nil {
- log.Error("PullRequestList.LoadAttributes: %v", err)
- }
- if invalidationErr := checkForInvalidation(requests, repoID, doer, branch); invalidationErr != nil {
- log.Error("checkForInvalidation: %v", invalidationErr)
+ prs, err := models.GetUnmergedPullRequestsByHeadInfo(repoID, branch)
+ if err != nil {
+ log.Error("Find pull requests [head_repo_id: %d, head_branch: %s]: %v", repoID, branch, err)
+ return
}
- if err == nil {
- for _, pr := range prs {
- pr.Issue.PullRequest = pr
- notification.NotifyPullRequestSynchronized(doer, pr)
+
+ if isSync {
+ requests := models.PullRequestList(prs)
+ if err = requests.LoadAttributes(); err != nil {
+ log.Error("PullRequestList.LoadAttributes: %v", err)
+ }
+ if invalidationErr := checkForInvalidation(requests, repoID, doer, branch); invalidationErr != nil {
+ log.Error("checkForInvalidation: %v", invalidationErr)
+ }
+ if err == nil {
+ for _, pr := range prs {
+ pr.Issue.PullRequest = pr
+ notification.NotifyPullRequestSynchronized(doer, pr)
+ }
}
}
- }
- addHeadRepoTasks(prs)
+ addHeadRepoTasks(prs)
- log.Trace("AddTestPullRequestTask [base_repo_id: %d, base_branch: %s]: finding pull requests", repoID, branch)
- prs, err = models.GetUnmergedPullRequestsByBaseInfo(repoID, branch)
- if err != nil {
- log.Error("Find pull requests [base_repo_id: %d, base_branch: %s]: %v", repoID, branch, err)
- return
- }
- for _, pr := range prs {
- AddToTaskQueue(pr)
- }
+ log.Trace("AddTestPullRequestTask [base_repo_id: %d, base_branch: %s]: finding pull requests", repoID, branch)
+ prs, err = models.GetUnmergedPullRequestsByBaseInfo(repoID, branch)
+ if err != nil {
+ log.Error("Find pull requests [base_repo_id: %d, base_branch: %s]: %v", repoID, branch, err)
+ return
+ }
+ for _, pr := range prs {
+ AddToTaskQueue(pr)
+ }
+ })
}
// PushToBaseRepo pushes commits from branches of head repository to