diff options
author | zeripath <art27@cantab.net> | 2020-02-02 23:19:58 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-02 23:19:58 +0000 |
commit | 2c903383b5154795b90e4b4ed8eaadc6fac17a13 (patch) | |
tree | d5ca361d9597e027ad92f1e02a841be1d266b554 /services | |
parent | b4914249ee389a733e7dcfd2df20708ab3215827 (diff) | |
download | gitea-2c903383b5154795b90e4b4ed8eaadc6fac17a13.tar.gz gitea-2c903383b5154795b90e4b4ed8eaadc6fac17a13.zip |
Add Unique Queue infrastructure and move TestPullRequests to this (#9856)
* Upgrade levelqueue to version 0.2.0
This adds functionality for Unique Queues
* Add UniqueQueue interface and functions to create them
* Add UniqueQueue implementations
* Move TestPullRequests over to use UniqueQueue
* Reduce code duplication
* Add bytefifos
* Ensure invalid types are logged
* Fix close race in PersistableChannelQueue Shutdown
Diffstat (limited to 'services')
-rw-r--r-- | services/pull/check.go | 134 | ||||
-rw-r--r-- | services/pull/check_test.go | 59 |
2 files changed, 133 insertions, 60 deletions
diff --git a/services/pull/check.go b/services/pull/check.go index 5d380b4609..d64f49de3b 100644 --- a/services/pull/check.go +++ b/services/pull/check.go @@ -10,6 +10,7 @@ import ( "fmt" "io/ioutil" "os" + "strconv" "strings" "code.gitea.io/gitea/models" @@ -17,24 +18,32 @@ import ( "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/notification" - "code.gitea.io/gitea/modules/setting" - "code.gitea.io/gitea/modules/sync" + "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/timeutil" "github.com/unknwon/com" ) -// pullRequestQueue represents a queue to handle update pull request tests -var pullRequestQueue = sync.NewUniqueQueue(setting.Repository.PullRequestQueueLength) +// prQueue represents a queue to handle update pull request tests +var prQueue queue.UniqueQueue // AddToTaskQueue adds itself to pull request test task queue. func AddToTaskQueue(pr *models.PullRequest) { - go pullRequestQueue.AddFunc(pr.ID, func() { - pr.Status = models.PullRequestStatusChecking - if err := pr.UpdateCols("status"); err != nil { - log.Error("AddToTaskQueue.UpdateCols[%d].(add to queue): %v", pr.ID, err) + go func() { + err := prQueue.PushFunc(strconv.FormatInt(pr.ID, 10), func() error { + pr.Status = models.PullRequestStatusChecking + err := pr.UpdateCols("status") + if err != nil { + log.Error("AddToTaskQueue.UpdateCols[%d].(add to queue): %v", pr.ID, err) + } else { + log.Trace("Adding PR ID: %d to the test pull requests queue", pr.ID) + } + return err + }) + if err != nil && err != queue.ErrAlreadyInQueue { + log.Error("Error adding prID %d to the test pull requests queue: %v", pr.ID, err) } - }) + }() } // checkAndUpdateStatus checks if pull request is possible to leaving checking status, @@ -46,7 +55,12 @@ func checkAndUpdateStatus(pr *models.PullRequest) { } // Make sure there is no waiting test to process before leaving the checking status. - if !pullRequestQueue.Exist(pr.ID) { + has, err := prQueue.Has(strconv.FormatInt(pr.ID, 10)) + if err != nil { + log.Error("Unable to check if the queue is waiting to reprocess pr.ID %d. Error: %v", pr.ID, err) + } + + if !has { if err := pr.UpdateCols("status, conflicted_files"); err != nil { log.Error("Update[%d]: %v", pr.ID, err) } @@ -73,7 +87,8 @@ func getMergeCommit(pr *models.PullRequest) (*git.Commit, error) { headFile := pr.GetGitRefName() // Check if a pull request is merged into BaseBranch - _, err = git.NewCommand("merge-base", "--is-ancestor", headFile, pr.BaseBranch).RunInDirWithEnv(pr.BaseRepo.RepoPath(), []string{"GIT_INDEX_FILE=" + indexTmpPath, "GIT_DIR=" + pr.BaseRepo.RepoPath()}) + _, err = git.NewCommand("merge-base", "--is-ancestor", headFile, pr.BaseBranch). + RunInDirWithEnv(pr.BaseRepo.RepoPath(), []string{"GIT_INDEX_FILE=" + indexTmpPath, "GIT_DIR=" + pr.BaseRepo.RepoPath()}) if err != nil { // Errors are signaled by a non-zero status that is not 1 if strings.Contains(err.Error(), "exit status 1") { @@ -93,7 +108,8 @@ func getMergeCommit(pr *models.PullRequest) (*git.Commit, error) { cmd := commitID[:40] + ".." + pr.BaseBranch // Get the commit from BaseBranch where the pull request got merged - mergeCommit, err := git.NewCommand("rev-list", "--ancestry-path", "--merges", "--reverse", cmd).RunInDirWithEnv("", []string{"GIT_INDEX_FILE=" + indexTmpPath, "GIT_DIR=" + pr.BaseRepo.RepoPath()}) + mergeCommit, err := git.NewCommand("rev-list", "--ancestry-path", "--merges", "--reverse", cmd). + RunInDirWithEnv("", []string{"GIT_INDEX_FILE=" + indexTmpPath, "GIT_DIR=" + pr.BaseRepo.RepoPath()}) if err != nil { return nil, fmt.Errorf("git rev-list --ancestry-path --merges --reverse: %v", err) } else if len(mergeCommit) < 40 { @@ -155,61 +171,65 @@ func manuallyMerged(pr *models.PullRequest) bool { return false } -// TestPullRequests checks and tests untested patches of pull requests. -// TODO: test more pull requests at same time. -func TestPullRequests(ctx context.Context) { - - go func() { - prs, err := models.GetPullRequestIDsByCheckStatus(models.PullRequestStatusChecking) - if err != nil { - log.Error("Find Checking PRs: %v", err) +// InitializePullRequests checks and tests untested patches of pull requests. +func InitializePullRequests(ctx context.Context) { + prs, err := models.GetPullRequestIDsByCheckStatus(models.PullRequestStatusChecking) + if err != nil { + log.Error("Find Checking PRs: %v", err) + return + } + for _, prID := range prs { + select { + case <-ctx.Done(): return - } - for _, prID := range prs { - select { - case <-ctx.Done(): - return - default: - pullRequestQueue.Add(prID) + default: + if err := prQueue.PushFunc(strconv.FormatInt(prID, 10), func() error { + log.Trace("Adding PR ID: %d to the pull requests patch checking queue", prID) + return nil + }); err != nil { + log.Error("Error adding prID: %s to the pull requests patch checking queue %v", prID, err) } } - }() + } +} - // Start listening on new test requests. - for { - select { - case prID := <-pullRequestQueue.Queue(): - log.Trace("TestPullRequests[%v]: processing test task", prID) - pullRequestQueue.Remove(prID) +// handle passed PR IDs and test the PRs +func handle(data ...queue.Data) { + for _, datum := range data { + prID := datum.(string) + id := com.StrTo(prID).MustInt64() - id := com.StrTo(prID).MustInt64() + log.Trace("Testing PR ID %d from the pull requests patch checking queue", id) - pr, err := models.GetPullRequestByID(id) - if err != nil { - log.Error("GetPullRequestByID[%s]: %v", prID, err) - continue - } else if pr.Status != models.PullRequestStatusChecking { - continue - } else if manuallyMerged(pr) { - continue - } else if err = TestPatch(pr); err != nil { - log.Error("testPatch[%d]: %v", pr.ID, err) - pr.Status = models.PullRequestStatusError - if err := pr.UpdateCols("status"); err != nil { - log.Error("update pr [%d] status to PullRequestStatusError failed: %v", pr.ID, err) - } - continue + pr, err := models.GetPullRequestByID(id) + if err != nil { + log.Error("GetPullRequestByID[%s]: %v", prID, err) + continue + } else if pr.Status != models.PullRequestStatusChecking { + continue + } else if manuallyMerged(pr) { + continue + } else if err = TestPatch(pr); err != nil { + log.Error("testPatch[%d]: %v", pr.ID, err) + pr.Status = models.PullRequestStatusError + if err := pr.UpdateCols("status"); err != nil { + log.Error("update pr [%d] status to PullRequestStatusError failed: %v", pr.ID, err) } - checkAndUpdateStatus(pr) - case <-ctx.Done(): - pullRequestQueue.Close() - log.Info("PID: %d Pull Request testing shutdown", os.Getpid()) - return + continue } + checkAndUpdateStatus(pr) } } // Init runs the task queue to test all the checking status pull requests -func Init() { - go graceful.GetManager().RunWithShutdownContext(TestPullRequests) +func Init() error { + prQueue = queue.CreateUniqueQueue("pr_patch_checker", handle, "").(queue.UniqueQueue) + + if prQueue == nil { + return fmt.Errorf("Unable to create pr_patch_checker Queue") + } + + go graceful.GetManager().RunWithShutdownFns(prQueue.Run) + go graceful.GetManager().RunWithShutdownContext(InitializePullRequests) + return nil } diff --git a/services/pull/check_test.go b/services/pull/check_test.go index 48a7774a61..4591edd7aa 100644 --- a/services/pull/check_test.go +++ b/services/pull/check_test.go @@ -6,29 +6,82 @@ package pull import ( + "context" "strconv" "testing" "time" "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/queue" "github.com/stretchr/testify/assert" + "github.com/unknwon/com" ) func TestPullRequest_AddToTaskQueue(t *testing.T) { assert.NoError(t, models.PrepareTestDatabase()) + idChan := make(chan int64, 10) + + q, err := queue.NewChannelUniqueQueue(func(data ...queue.Data) { + for _, datum := range data { + prID := datum.(string) + id := com.StrTo(prID).MustInt64() + idChan <- id + } + }, queue.ChannelUniqueQueueConfiguration{ + WorkerPoolConfiguration: queue.WorkerPoolConfiguration{ + QueueLength: 10, + BatchLength: 1, + }, + Workers: 1, + Name: "temporary-queue", + }, "") + assert.NoError(t, err) + + queueShutdown := []func(){} + queueTerminate := []func(){} + + prQueue = q.(queue.UniqueQueue) + pr := models.AssertExistsAndLoadBean(t, &models.PullRequest{ID: 1}).(*models.PullRequest) AddToTaskQueue(pr) + assert.Eventually(t, func() bool { + pr = models.AssertExistsAndLoadBean(t, &models.PullRequest{ID: 1}).(*models.PullRequest) + return pr.Status == models.PullRequestStatusChecking + }, 1*time.Second, 100*time.Millisecond) + + has, err := prQueue.Has(strconv.FormatInt(pr.ID, 10)) + assert.True(t, has) + assert.NoError(t, err) + + prQueue.Run(func(_ context.Context, shutdown func()) { + queueShutdown = append(queueShutdown, shutdown) + }, func(_ context.Context, terminate func()) { + queueTerminate = append(queueTerminate, terminate) + }) + select { - case id := <-pullRequestQueue.Queue(): - assert.EqualValues(t, strconv.FormatInt(pr.ID, 10), id) + case id := <-idChan: + assert.EqualValues(t, pr.ID, id) case <-time.After(time.Second): assert.Fail(t, "Timeout: nothing was added to pullRequestQueue") } - assert.True(t, pullRequestQueue.Exist(pr.ID)) + has, err = prQueue.Has(strconv.FormatInt(pr.ID, 10)) + assert.False(t, has) + assert.NoError(t, err) + pr = models.AssertExistsAndLoadBean(t, &models.PullRequest{ID: 1}).(*models.PullRequest) assert.Equal(t, models.PullRequestStatusChecking, pr.Status) + + for _, callback := range queueShutdown { + callback() + } + for _, callback := range queueTerminate { + callback() + } + + prQueue = nil } |