aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBo-Yi Wu <appleboy.tw@gmail.com>2023-07-25 11:15:55 +0800
committerGitHub <noreply@github.com>2023-07-25 11:15:55 +0800
commit44781f9f5c4ede618660d8cfe42437f0e8dc22a0 (patch)
treeefcdc2e7876ede8c7e721432760e6ef9cefe4806
parent5db640abcd8608b065a1b390404bba2233220c95 (diff)
downloadgitea-44781f9f5c4ede618660d8cfe42437f0e8dc22a0.tar.gz
gitea-44781f9f5c4ede618660d8cfe42437f0e8dc22a0.zip
Implement auto-cancellation of concurrent jobs if the event is push (#25716)
- cancel running jobs if the event is push - Add a new function `CancelRunningJobs` to cancel all running jobs of a run - Update `FindRunOptions` struct to include `Ref` field and update its condition in `toConds` function - Implement auto cancellation of running jobs in the same workflow in `notify` function related task: https://github.com/go-gitea/gitea/pull/22751/ --------- Signed-off-by: Bo-Yi Wu <appleboy.tw@gmail.com> Signed-off-by: appleboy <appleboy.tw@gmail.com> Co-authored-by: Jason Song <i@wolfogre.com> Co-authored-by: delvh <dev.lh@web.de>
-rw-r--r--models/actions/run.go69
-rw-r--r--models/actions/run_list.go24
-rw-r--r--models/migrations/migrations.go2
-rw-r--r--models/migrations/v1_21/v268.go16
-rw-r--r--routers/web/repo/actions/actions.go12
-rw-r--r--services/actions/notifier_helper.go23
6 files changed, 127 insertions, 19 deletions
diff --git a/models/actions/run.go b/models/actions/run.go
index 5396c612f6..ab6e319b1c 100644
--- a/models/actions/run.go
+++ b/models/actions/run.go
@@ -34,7 +34,7 @@ type ActionRun struct {
Index int64 `xorm:"index unique(repo_index)"` // a unique number for each run of a repository
TriggerUserID int64 `xorm:"index"`
TriggerUser *user_model.User `xorm:"-"`
- Ref string
+ Ref string `xorm:"index"` // the commit/tag/… that caused the run
CommitSHA string
IsForkPullRequest bool // If this is triggered by a PR from a forked repository or an untrusted user, we need to check if it is approved and limit permissions when running the workflow.
NeedApproval bool // may need approval if it's a fork pull request
@@ -164,6 +164,73 @@ func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) err
return err
}
+// CancelRunningJobs cancels all running and waiting jobs associated with a specific workflow.
+func CancelRunningJobs(ctx context.Context, repoID int64, ref, workflowID string) error {
+ // Find all runs in the specified repository, reference, and workflow with statuses 'Running' or 'Waiting'.
+ runs, total, err := FindRuns(ctx, FindRunOptions{
+ RepoID: repoID,
+ Ref: ref,
+ WorkflowID: workflowID,
+ Status: []Status{StatusRunning, StatusWaiting},
+ })
+ if err != nil {
+ return err
+ }
+
+ // If there are no runs found, there's no need to proceed with cancellation, so return nil.
+ if total == 0 {
+ return nil
+ }
+
+ // Iterate over each found run and cancel its associated jobs.
+ for _, run := range runs {
+ // Find all jobs associated with the current run.
+ jobs, _, err := FindRunJobs(ctx, FindRunJobOptions{
+ RunID: run.ID,
+ })
+ if err != nil {
+ return err
+ }
+
+ // Iterate over each job and attempt to cancel it.
+ for _, job := range jobs {
+ // Skip jobs that are already in a terminal state (completed, cancelled, etc.).
+ status := job.Status
+ if status.IsDone() {
+ continue
+ }
+
+ // If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
+ if job.TaskID == 0 {
+ job.Status = StatusCancelled
+ job.Stopped = timeutil.TimeStampNow()
+
+ // Update the job's status and stopped time in the database.
+ n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
+ if err != nil {
+ return err
+ }
+
+ // If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
+ if n == 0 {
+ return fmt.Errorf("job has changed, try again")
+ }
+
+ // Continue with the next job.
+ continue
+ }
+
+ // If the job has an associated task, try to stop the task, effectively cancelling the job.
+ if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
+ return err
+ }
+ }
+ }
+
+ // Return nil to indicate successful cancellation of all running and waiting jobs.
+ return nil
+}
+
// InsertRun inserts a run
func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWorkflow) error {
ctx, commiter, err := db.TxContext(ctx)
diff --git a/models/actions/run_list.go b/models/actions/run_list.go
index 29ab193d57..db36f6df98 100644
--- a/models/actions/run_list.go
+++ b/models/actions/run_list.go
@@ -66,12 +66,13 @@ func (runs RunList) LoadRepos() error {
type FindRunOptions struct {
db.ListOptions
- RepoID int64
- OwnerID int64
- WorkflowFileName string
- TriggerUserID int64
- Approved bool // not util.OptionalBool, it works only when it's true
- Status Status
+ RepoID int64
+ OwnerID int64
+ WorkflowID string
+ Ref string // the commit/tag/… that caused this workflow
+ TriggerUserID int64
+ Approved bool // not util.OptionalBool, it works only when it's true
+ Status []Status
}
func (opts FindRunOptions) toConds() builder.Cond {
@@ -82,8 +83,8 @@ func (opts FindRunOptions) toConds() builder.Cond {
if opts.OwnerID > 0 {
cond = cond.And(builder.Eq{"owner_id": opts.OwnerID})
}
- if opts.WorkflowFileName != "" {
- cond = cond.And(builder.Eq{"workflow_id": opts.WorkflowFileName})
+ if opts.WorkflowID != "" {
+ cond = cond.And(builder.Eq{"workflow_id": opts.WorkflowID})
}
if opts.TriggerUserID > 0 {
cond = cond.And(builder.Eq{"trigger_user_id": opts.TriggerUserID})
@@ -91,8 +92,11 @@ func (opts FindRunOptions) toConds() builder.Cond {
if opts.Approved {
cond = cond.And(builder.Gt{"approved_by": 0})
}
- if opts.Status > StatusUnknown {
- cond = cond.And(builder.Eq{"status": opts.Status})
+ if len(opts.Status) > 0 {
+ cond = cond.And(builder.In("status", opts.Status))
+ }
+ if opts.Ref != "" {
+ cond = cond.And(builder.Eq{"ref": opts.Ref})
}
return cond
}
diff --git a/models/migrations/migrations.go b/models/migrations/migrations.go
index bfe4b56cd1..fc11d54071 100644
--- a/models/migrations/migrations.go
+++ b/models/migrations/migrations.go
@@ -517,6 +517,8 @@ var migrations = []Migration{
NewMigration("Reduce commit status", v1_21.ReduceCommitStatus),
// v267 -> v268
NewMigration("Add action_tasks_version table", v1_21.CreateActionTasksVersionTable),
+ // v268 -> v269
+ NewMigration("Update Action Ref", v1_21.UpdateActionsRefIndex),
}
// GetCurrentDBVersion returns the current db version
diff --git a/models/migrations/v1_21/v268.go b/models/migrations/v1_21/v268.go
new file mode 100644
index 0000000000..332793ff07
--- /dev/null
+++ b/models/migrations/v1_21/v268.go
@@ -0,0 +1,16 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package v1_21 //nolint
+
+import (
+ "xorm.io/xorm"
+)
+
+// UpdateActionsRefIndex updates the index of actions ref field
+func UpdateActionsRefIndex(x *xorm.Engine) error {
+ type ActionRun struct {
+ Ref string `xorm:"index"` // the commit/tag/… causing the run
+ }
+ return x.Sync(new(ActionRun))
+}
diff --git a/routers/web/repo/actions/actions.go b/routers/web/repo/actions/actions.go
index d215201bcd..5a12f52dcd 100644
--- a/routers/web/repo/actions/actions.go
+++ b/routers/web/repo/actions/actions.go
@@ -150,10 +150,14 @@ func List(ctx *context.Context) {
Page: page,
PageSize: convert.ToCorrectPageSize(ctx.FormInt("limit")),
},
- RepoID: ctx.Repo.Repository.ID,
- WorkflowFileName: workflow,
- TriggerUserID: actorID,
- Status: actions_model.Status(status),
+ RepoID: ctx.Repo.Repository.ID,
+ WorkflowID: workflow,
+ TriggerUserID: actorID,
+ }
+
+ // if status is not StatusUnknown, it means user has selected a status filter
+ if actions_model.Status(status) != actions_model.StatusUnknown {
+ opts.Status = []actions_model.Status{actions_model.Status(status)}
}
runs, total, err := actions_model.FindRuns(ctx, opts)
diff --git a/services/actions/notifier_helper.go b/services/actions/notifier_helper.go
index 90ad3001ba..764d24a7db 100644
--- a/services/actions/notifier_helper.go
+++ b/services/actions/notifier_helper.go
@@ -230,16 +230,31 @@ func notify(ctx context.Context, input *notifyInput) error {
log.Error("jobparser.Parse: %v", err)
continue
}
+
+ // cancel running jobs if the event is push
+ if run.Event == webhook_module.HookEventPush {
+ // cancel running jobs of the same workflow
+ if err := actions_model.CancelRunningJobs(
+ ctx,
+ run.RepoID,
+ run.Ref,
+ run.WorkflowID,
+ ); err != nil {
+ log.Error("CancelRunningJobs: %v", err)
+ }
+ }
+
if err := actions_model.InsertRun(ctx, run, jobs); err != nil {
log.Error("InsertRun: %v", err)
continue
}
- if jobs, _, err := actions_model.FindRunJobs(ctx, actions_model.FindRunJobOptions{RunID: run.ID}); err != nil {
+
+ alljobs, _, err := actions_model.FindRunJobs(ctx, actions_model.FindRunJobOptions{RunID: run.ID})
+ if err != nil {
log.Error("FindRunJobs: %v", err)
- } else {
- CreateCommitStatus(ctx, jobs...)
+ continue
}
-
+ CreateCommitStatus(ctx, alljobs...)
}
return nil
}