aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--models/actions/run.go10
-rw-r--r--models/actions/run_job.go7
-rw-r--r--models/actions/task.go3
-rw-r--r--models/actions/tasks_version.go105
-rw-r--r--models/migrations/migrations.go2
-rw-r--r--models/migrations/v1_21/v267.go23
-rw-r--r--routers/api/actions/runner/runner.go33
7 files changed, 174 insertions, 9 deletions
diff --git a/models/actions/run.go b/models/actions/run.go
index 7b62ff884f..5396c612f6 100644
--- a/models/actions/run.go
+++ b/models/actions/run.go
@@ -195,6 +195,7 @@ func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWork
}
runJobs := make([]*ActionRunJob, 0, len(jobs))
+ var hasWaiting bool
for _, v := range jobs {
id, job := v.Job()
needs := job.Needs()
@@ -205,6 +206,8 @@ func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWork
status := StatusWaiting
if len(needs) > 0 || run.NeedApproval {
status = StatusBlocked
+ } else {
+ hasWaiting = true
}
job.Name, _ = util.SplitStringAtByteN(job.Name, 255)
runJobs = append(runJobs, &ActionRunJob{
@@ -225,6 +228,13 @@ func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWork
return err
}
+ // if there is a job in the waiting status, increase tasks version.
+ if hasWaiting {
+ if err := IncreaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil {
+ return err
+ }
+ }
+
return commiter.Commit()
}
diff --git a/models/actions/run_job.go b/models/actions/run_job.go
index 0002e50770..c7620cd8bc 100644
--- a/models/actions/run_job.go
+++ b/models/actions/run_job.go
@@ -111,6 +111,13 @@ func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, col
return affected, nil
}
+ if affected != 0 && util.SliceContains(cols, "status") && job.Status.IsWaiting() {
+ // if the status of job changes to waiting again, increase tasks version.
+ if err := IncreaseTaskVersion(ctx, job.OwnerID, job.RepoID); err != nil {
+ return affected, err
+ }
+ }
+
if job.RunID == 0 {
var err error
if job, err = GetRunJobByID(ctx, job.ID); err != nil {
diff --git a/models/actions/task.go b/models/actions/task.go
index 55044ec82d..9cc0fd0df8 100644
--- a/models/actions/task.go
+++ b/models/actions/task.go
@@ -215,12 +215,11 @@ func GetRunningTaskByToken(ctx context.Context, token string) (*ActionTask, erro
}
func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask, bool, error) {
- dbCtx, commiter, err := db.TxContext(ctx)
+ ctx, commiter, err := db.TxContext(ctx)
if err != nil {
return nil, false, err
}
defer commiter.Close()
- ctx = dbCtx.WithContext(ctx)
e := db.GetEngine(ctx)
diff --git a/models/actions/tasks_version.go b/models/actions/tasks_version.go
new file mode 100644
index 0000000000..5c0a86538d
--- /dev/null
+++ b/models/actions/tasks_version.go
@@ -0,0 +1,105 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package actions
+
+import (
+ "context"
+
+ "code.gitea.io/gitea/models/db"
+ "code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/timeutil"
+)
+
+// ActionTasksVersion
+// If both ownerID and repoID is zero, its scope is global.
+// If ownerID is not zero and repoID is zero, its scope is org (there is no user-level runner currrently).
+// If ownerID is zero and repoID is not zero, its scope is repo.
+type ActionTasksVersion struct {
+ ID int64 `xorm:"pk autoincr"`
+ OwnerID int64 `xorm:"UNIQUE(owner_repo)"`
+ RepoID int64 `xorm:"INDEX UNIQUE(owner_repo)"`
+ Version int64
+ CreatedUnix timeutil.TimeStamp `xorm:"created"`
+ UpdatedUnix timeutil.TimeStamp `xorm:"updated"`
+}
+
+func init() {
+ db.RegisterModel(new(ActionTasksVersion))
+}
+
+func GetTasksVersionByScope(ctx context.Context, ownerID, repoID int64) (int64, error) {
+ var tasksVersion ActionTasksVersion
+ has, err := db.GetEngine(ctx).Where("owner_id = ? AND repo_id = ?", ownerID, repoID).Get(&tasksVersion)
+ if err != nil {
+ return 0, err
+ } else if !has {
+ return 0, nil
+ }
+ return tasksVersion.Version, err
+}
+
+func insertTasksVersion(ctx context.Context, ownerID, repoID int64) (*ActionTasksVersion, error) {
+ tasksVersion := &ActionTasksVersion{
+ OwnerID: ownerID,
+ RepoID: repoID,
+ Version: 1,
+ }
+ if _, err := db.GetEngine(ctx).Insert(tasksVersion); err != nil {
+ return nil, err
+ }
+ return tasksVersion, nil
+}
+
+func increaseTasksVersionByScope(ctx context.Context, ownerID, repoID int64) error {
+ result, err := db.GetEngine(ctx).Exec("UPDATE action_tasks_version SET version = version + 1 WHERE owner_id = ? AND repo_id = ?", ownerID, repoID)
+ if err != nil {
+ return err
+ }
+ affected, err := result.RowsAffected()
+ if err != nil {
+ return err
+ }
+
+ if affected == 0 {
+ // if update sql does not affect any rows, the database may be broken,
+ // so re-insert the row of version data here.
+ if _, err := insertTasksVersion(ctx, ownerID, repoID); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func IncreaseTaskVersion(ctx context.Context, ownerID, repoID int64) error {
+ ctx, commiter, err := db.TxContext(ctx)
+ if err != nil {
+ return err
+ }
+ defer commiter.Close()
+
+ // 1. increase global
+ if err := increaseTasksVersionByScope(ctx, 0, 0); err != nil {
+ log.Error("IncreaseTasksVersionByScope(Global): %v", err)
+ return err
+ }
+
+ // 2. increase owner
+ if ownerID > 0 {
+ if err := increaseTasksVersionByScope(ctx, ownerID, 0); err != nil {
+ log.Error("IncreaseTasksVersionByScope(Owner): %v", err)
+ return err
+ }
+ }
+
+ // 3. increase repo
+ if repoID > 0 {
+ if err := increaseTasksVersionByScope(ctx, 0, repoID); err != nil {
+ log.Error("IncreaseTasksVersionByScope(Repo): %v", err)
+ return err
+ }
+ }
+
+ return commiter.Commit()
+}
diff --git a/models/migrations/migrations.go b/models/migrations/migrations.go
index 6599cb9cda..bfe4b56cd1 100644
--- a/models/migrations/migrations.go
+++ b/models/migrations/migrations.go
@@ -515,6 +515,8 @@ var migrations = []Migration{
NewMigration("Alter Actions Artifact table", v1_21.AlterActionArtifactTable),
// v266 -> v267
NewMigration("Reduce commit status", v1_21.ReduceCommitStatus),
+ // v267 -> v268
+ NewMigration("Add action_tasks_version table", v1_21.CreateActionTasksVersionTable),
}
// GetCurrentDBVersion returns the current db version
diff --git a/models/migrations/v1_21/v267.go b/models/migrations/v1_21/v267.go
new file mode 100644
index 0000000000..bc0e954bdc
--- /dev/null
+++ b/models/migrations/v1_21/v267.go
@@ -0,0 +1,23 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package v1_21 //nolint
+
+import (
+ "code.gitea.io/gitea/modules/timeutil"
+
+ "xorm.io/xorm"
+)
+
+func CreateActionTasksVersionTable(x *xorm.Engine) error {
+ type ActionTasksVersion struct {
+ ID int64 `xorm:"pk autoincr"`
+ OwnerID int64 `xorm:"UNIQUE(owner_repo)"`
+ RepoID int64 `xorm:"INDEX UNIQUE(owner_repo)"`
+ Version int64
+ CreatedUnix timeutil.TimeStamp `xorm:"created"`
+ UpdatedUnix timeutil.TimeStamp `xorm:"updated"`
+ }
+
+ return x.Sync(new(ActionTasksVersion))
+}
diff --git a/routers/api/actions/runner/runner.go b/routers/api/actions/runner/runner.go
index 17801cb322..6de5964cb7 100644
--- a/routers/api/actions/runner/runner.go
+++ b/routers/api/actions/runner/runner.go
@@ -127,20 +127,39 @@ func (s *Service) Declare(
// FetchTask assigns a task to the runner
func (s *Service) FetchTask(
ctx context.Context,
- _ *connect.Request[runnerv1.FetchTaskRequest],
+ req *connect.Request[runnerv1.FetchTaskRequest],
) (*connect.Response[runnerv1.FetchTaskResponse], error) {
runner := GetRunner(ctx)
var task *runnerv1.Task
- if t, ok, err := pickTask(ctx, runner); err != nil {
- log.Error("pick task failed: %v", err)
- return nil, status.Errorf(codes.Internal, "pick task: %v", err)
- } else if ok {
- task = t
+ tasksVersion := req.Msg.TasksVersion // task version from runner
+ latestVersion, err := actions_model.GetTasksVersionByScope(ctx, runner.OwnerID, runner.RepoID)
+ if err != nil {
+ return nil, status.Errorf(codes.Internal, "query tasks version failed: %v", err)
+ } else if latestVersion == 0 {
+ if err := actions_model.IncreaseTaskVersion(ctx, runner.OwnerID, runner.RepoID); err != nil {
+ return nil, status.Errorf(codes.Internal, "fail to increase task version: %v", err)
+ }
+ // if we don't increase the value of `latestVersion` here,
+ // the response of FetchTask will return tasksVersion as zero.
+ // and the runner will treat it as an old version of Gitea.
+ latestVersion++
}
+ if tasksVersion != latestVersion {
+ // if the task version in request is not equal to the version in db,
+ // it means there may still be some tasks not be assgined.
+ // try to pick a task for the runner that send the request.
+ if t, ok, err := pickTask(ctx, runner); err != nil {
+ log.Error("pick task failed: %v", err)
+ return nil, status.Errorf(codes.Internal, "pick task: %v", err)
+ } else if ok {
+ task = t
+ }
+ }
res := connect.NewResponse(&runnerv1.FetchTaskResponse{
- Task: task,
+ Task: task,
+ TasksVersion: latestVersion,
})
return res, nil
}