summaryrefslogtreecommitdiffstats
path: root/services
diff options
context:
space:
mode:
authorzeripath <art27@cantab.net>2020-02-02 23:19:58 +0000
committerGitHub <noreply@github.com>2020-02-02 23:19:58 +0000
commit2c903383b5154795b90e4b4ed8eaadc6fac17a13 (patch)
treed5ca361d9597e027ad92f1e02a841be1d266b554 /services
parentb4914249ee389a733e7dcfd2df20708ab3215827 (diff)
downloadgitea-2c903383b5154795b90e4b4ed8eaadc6fac17a13.tar.gz
gitea-2c903383b5154795b90e4b4ed8eaadc6fac17a13.zip
Add Unique Queue infrastructure and move TestPullRequests to this (#9856)
* Upgrade levelqueue to version 0.2.0 This adds functionality for Unique Queues * Add UniqueQueue interface and functions to create them * Add UniqueQueue implementations * Move TestPullRequests over to use UniqueQueue * Reduce code duplication * Add bytefifos * Ensure invalid types are logged * Fix close race in PersistableChannelQueue Shutdown
Diffstat (limited to 'services')
-rw-r--r--services/pull/check.go134
-rw-r--r--services/pull/check_test.go59
2 files changed, 133 insertions, 60 deletions
diff --git a/services/pull/check.go b/services/pull/check.go
index 5d380b4609..d64f49de3b 100644
--- a/services/pull/check.go
+++ b/services/pull/check.go
@@ -10,6 +10,7 @@ import (
"fmt"
"io/ioutil"
"os"
+ "strconv"
"strings"
"code.gitea.io/gitea/models"
@@ -17,24 +18,32 @@ import (
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/notification"
- "code.gitea.io/gitea/modules/setting"
- "code.gitea.io/gitea/modules/sync"
+ "code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/timeutil"
"github.com/unknwon/com"
)
-// pullRequestQueue represents a queue to handle update pull request tests
-var pullRequestQueue = sync.NewUniqueQueue(setting.Repository.PullRequestQueueLength)
+// prQueue represents a queue to handle update pull request tests
+var prQueue queue.UniqueQueue
// AddToTaskQueue adds itself to pull request test task queue.
func AddToTaskQueue(pr *models.PullRequest) {
- go pullRequestQueue.AddFunc(pr.ID, func() {
- pr.Status = models.PullRequestStatusChecking
- if err := pr.UpdateCols("status"); err != nil {
- log.Error("AddToTaskQueue.UpdateCols[%d].(add to queue): %v", pr.ID, err)
+ go func() {
+ err := prQueue.PushFunc(strconv.FormatInt(pr.ID, 10), func() error {
+ pr.Status = models.PullRequestStatusChecking
+ err := pr.UpdateCols("status")
+ if err != nil {
+ log.Error("AddToTaskQueue.UpdateCols[%d].(add to queue): %v", pr.ID, err)
+ } else {
+ log.Trace("Adding PR ID: %d to the test pull requests queue", pr.ID)
+ }
+ return err
+ })
+ if err != nil && err != queue.ErrAlreadyInQueue {
+ log.Error("Error adding prID %d to the test pull requests queue: %v", pr.ID, err)
}
- })
+ }()
}
// checkAndUpdateStatus checks if pull request is possible to leaving checking status,
@@ -46,7 +55,12 @@ func checkAndUpdateStatus(pr *models.PullRequest) {
}
// Make sure there is no waiting test to process before leaving the checking status.
- if !pullRequestQueue.Exist(pr.ID) {
+ has, err := prQueue.Has(strconv.FormatInt(pr.ID, 10))
+ if err != nil {
+ log.Error("Unable to check if the queue is waiting to reprocess pr.ID %d. Error: %v", pr.ID, err)
+ }
+
+ if !has {
if err := pr.UpdateCols("status, conflicted_files"); err != nil {
log.Error("Update[%d]: %v", pr.ID, err)
}
@@ -73,7 +87,8 @@ func getMergeCommit(pr *models.PullRequest) (*git.Commit, error) {
headFile := pr.GetGitRefName()
// Check if a pull request is merged into BaseBranch
- _, err = git.NewCommand("merge-base", "--is-ancestor", headFile, pr.BaseBranch).RunInDirWithEnv(pr.BaseRepo.RepoPath(), []string{"GIT_INDEX_FILE=" + indexTmpPath, "GIT_DIR=" + pr.BaseRepo.RepoPath()})
+ _, err = git.NewCommand("merge-base", "--is-ancestor", headFile, pr.BaseBranch).
+ RunInDirWithEnv(pr.BaseRepo.RepoPath(), []string{"GIT_INDEX_FILE=" + indexTmpPath, "GIT_DIR=" + pr.BaseRepo.RepoPath()})
if err != nil {
// Errors are signaled by a non-zero status that is not 1
if strings.Contains(err.Error(), "exit status 1") {
@@ -93,7 +108,8 @@ func getMergeCommit(pr *models.PullRequest) (*git.Commit, error) {
cmd := commitID[:40] + ".." + pr.BaseBranch
// Get the commit from BaseBranch where the pull request got merged
- mergeCommit, err := git.NewCommand("rev-list", "--ancestry-path", "--merges", "--reverse", cmd).RunInDirWithEnv("", []string{"GIT_INDEX_FILE=" + indexTmpPath, "GIT_DIR=" + pr.BaseRepo.RepoPath()})
+ mergeCommit, err := git.NewCommand("rev-list", "--ancestry-path", "--merges", "--reverse", cmd).
+ RunInDirWithEnv("", []string{"GIT_INDEX_FILE=" + indexTmpPath, "GIT_DIR=" + pr.BaseRepo.RepoPath()})
if err != nil {
return nil, fmt.Errorf("git rev-list --ancestry-path --merges --reverse: %v", err)
} else if len(mergeCommit) < 40 {
@@ -155,61 +171,65 @@ func manuallyMerged(pr *models.PullRequest) bool {
return false
}
-// TestPullRequests checks and tests untested patches of pull requests.
-// TODO: test more pull requests at same time.
-func TestPullRequests(ctx context.Context) {
-
- go func() {
- prs, err := models.GetPullRequestIDsByCheckStatus(models.PullRequestStatusChecking)
- if err != nil {
- log.Error("Find Checking PRs: %v", err)
+// InitializePullRequests checks and tests untested patches of pull requests.
+func InitializePullRequests(ctx context.Context) {
+ prs, err := models.GetPullRequestIDsByCheckStatus(models.PullRequestStatusChecking)
+ if err != nil {
+ log.Error("Find Checking PRs: %v", err)
+ return
+ }
+ for _, prID := range prs {
+ select {
+ case <-ctx.Done():
return
- }
- for _, prID := range prs {
- select {
- case <-ctx.Done():
- return
- default:
- pullRequestQueue.Add(prID)
+ default:
+ if err := prQueue.PushFunc(strconv.FormatInt(prID, 10), func() error {
+ log.Trace("Adding PR ID: %d to the pull requests patch checking queue", prID)
+ return nil
+ }); err != nil {
+ log.Error("Error adding prID: %s to the pull requests patch checking queue %v", prID, err)
}
}
- }()
+ }
+}
- // Start listening on new test requests.
- for {
- select {
- case prID := <-pullRequestQueue.Queue():
- log.Trace("TestPullRequests[%v]: processing test task", prID)
- pullRequestQueue.Remove(prID)
+// handle passed PR IDs and test the PRs
+func handle(data ...queue.Data) {
+ for _, datum := range data {
+ prID := datum.(string)
+ id := com.StrTo(prID).MustInt64()
- id := com.StrTo(prID).MustInt64()
+ log.Trace("Testing PR ID %d from the pull requests patch checking queue", id)
- pr, err := models.GetPullRequestByID(id)
- if err != nil {
- log.Error("GetPullRequestByID[%s]: %v", prID, err)
- continue
- } else if pr.Status != models.PullRequestStatusChecking {
- continue
- } else if manuallyMerged(pr) {
- continue
- } else if err = TestPatch(pr); err != nil {
- log.Error("testPatch[%d]: %v", pr.ID, err)
- pr.Status = models.PullRequestStatusError
- if err := pr.UpdateCols("status"); err != nil {
- log.Error("update pr [%d] status to PullRequestStatusError failed: %v", pr.ID, err)
- }
- continue
+ pr, err := models.GetPullRequestByID(id)
+ if err != nil {
+ log.Error("GetPullRequestByID[%s]: %v", prID, err)
+ continue
+ } else if pr.Status != models.PullRequestStatusChecking {
+ continue
+ } else if manuallyMerged(pr) {
+ continue
+ } else if err = TestPatch(pr); err != nil {
+ log.Error("testPatch[%d]: %v", pr.ID, err)
+ pr.Status = models.PullRequestStatusError
+ if err := pr.UpdateCols("status"); err != nil {
+ log.Error("update pr [%d] status to PullRequestStatusError failed: %v", pr.ID, err)
}
- checkAndUpdateStatus(pr)
- case <-ctx.Done():
- pullRequestQueue.Close()
- log.Info("PID: %d Pull Request testing shutdown", os.Getpid())
- return
+ continue
}
+ checkAndUpdateStatus(pr)
}
}
// Init runs the task queue to test all the checking status pull requests
-func Init() {
- go graceful.GetManager().RunWithShutdownContext(TestPullRequests)
+func Init() error {
+ prQueue = queue.CreateUniqueQueue("pr_patch_checker", handle, "").(queue.UniqueQueue)
+
+ if prQueue == nil {
+ return fmt.Errorf("Unable to create pr_patch_checker Queue")
+ }
+
+ go graceful.GetManager().RunWithShutdownFns(prQueue.Run)
+ go graceful.GetManager().RunWithShutdownContext(InitializePullRequests)
+ return nil
}
diff --git a/services/pull/check_test.go b/services/pull/check_test.go
index 48a7774a61..4591edd7aa 100644
--- a/services/pull/check_test.go
+++ b/services/pull/check_test.go
@@ -6,29 +6,82 @@
package pull
import (
+ "context"
"strconv"
"testing"
"time"
"code.gitea.io/gitea/models"
+ "code.gitea.io/gitea/modules/queue"
"github.com/stretchr/testify/assert"
+ "github.com/unknwon/com"
)
func TestPullRequest_AddToTaskQueue(t *testing.T) {
assert.NoError(t, models.PrepareTestDatabase())
+ idChan := make(chan int64, 10)
+
+ q, err := queue.NewChannelUniqueQueue(func(data ...queue.Data) {
+ for _, datum := range data {
+ prID := datum.(string)
+ id := com.StrTo(prID).MustInt64()
+ idChan <- id
+ }
+ }, queue.ChannelUniqueQueueConfiguration{
+ WorkerPoolConfiguration: queue.WorkerPoolConfiguration{
+ QueueLength: 10,
+ BatchLength: 1,
+ },
+ Workers: 1,
+ Name: "temporary-queue",
+ }, "")
+ assert.NoError(t, err)
+
+ queueShutdown := []func(){}
+ queueTerminate := []func(){}
+
+ prQueue = q.(queue.UniqueQueue)
+
pr := models.AssertExistsAndLoadBean(t, &models.PullRequest{ID: 1}).(*models.PullRequest)
AddToTaskQueue(pr)
+ assert.Eventually(t, func() bool {
+ pr = models.AssertExistsAndLoadBean(t, &models.PullRequest{ID: 1}).(*models.PullRequest)
+ return pr.Status == models.PullRequestStatusChecking
+ }, 1*time.Second, 100*time.Millisecond)
+
+ has, err := prQueue.Has(strconv.FormatInt(pr.ID, 10))
+ assert.True(t, has)
+ assert.NoError(t, err)
+
+ prQueue.Run(func(_ context.Context, shutdown func()) {
+ queueShutdown = append(queueShutdown, shutdown)
+ }, func(_ context.Context, terminate func()) {
+ queueTerminate = append(queueTerminate, terminate)
+ })
+
select {
- case id := <-pullRequestQueue.Queue():
- assert.EqualValues(t, strconv.FormatInt(pr.ID, 10), id)
+ case id := <-idChan:
+ assert.EqualValues(t, pr.ID, id)
case <-time.After(time.Second):
assert.Fail(t, "Timeout: nothing was added to pullRequestQueue")
}
- assert.True(t, pullRequestQueue.Exist(pr.ID))
+ has, err = prQueue.Has(strconv.FormatInt(pr.ID, 10))
+ assert.False(t, has)
+ assert.NoError(t, err)
+
pr = models.AssertExistsAndLoadBean(t, &models.PullRequest{ID: 1}).(*models.PullRequest)
assert.Equal(t, models.PullRequestStatusChecking, pr.Status)
+
+ for _, callback := range queueShutdown {
+ callback()
+ }
+ for _, callback := range queueTerminate {
+ callback()
+ }
+
+ prQueue = nil
}