aboutsummaryrefslogtreecommitdiffstats
path: root/models/actions/task.go
diff options
context:
space:
mode:
Diffstat (limited to 'models/actions/task.go')
-rw-r--r--models/actions/task.go130
1 files changed, 63 insertions, 67 deletions
diff --git a/models/actions/task.go b/models/actions/task.go
index 43f11b2730..c1306a8418 100644
--- a/models/actions/task.go
+++ b/models/actions/task.go
@@ -278,14 +278,13 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask
return nil, false, err
}
- var workflowJob *jobparser.Job
- if gots, err := jobparser.Parse(job.WorkflowPayload); err != nil {
+ parsedWorkflows, err := jobparser.Parse(job.WorkflowPayload)
+ if err != nil {
return nil, false, fmt.Errorf("parse workflow of job %d: %w", job.ID, err)
- } else if len(gots) != 1 {
+ } else if len(parsedWorkflows) != 1 {
return nil, false, fmt.Errorf("workflow of job %d: not single workflow", job.ID)
- } else { //nolint:revive
- _, workflowJob = gots[0].Job()
}
+ _, workflowJob := parsedWorkflows[0].Job()
if _, err := e.Insert(task); err != nil {
return nil, false, err
@@ -336,6 +335,11 @@ func UpdateTask(ctx context.Context, task *ActionTask, cols ...string) error {
sess.Cols(cols...)
}
_, err := sess.Update(task)
+
+ // Automatically delete the ephemeral runner if the task is done
+ if err == nil && task.Status.IsDone() && util.SliceContainsString(cols, "status") {
+ return DeleteEphemeralRunner(ctx, task.RunnerID)
+ }
return err
}
@@ -348,78 +352,70 @@ func UpdateTaskByState(ctx context.Context, runnerID int64, state *runnerv1.Task
stepStates[v.Id] = v
}
- ctx, committer, err := db.TxContext(ctx)
- if err != nil {
- return nil, err
- }
- defer committer.Close()
-
- e := db.GetEngine(ctx)
-
- task := &ActionTask{}
- if has, err := e.ID(state.Id).Get(task); err != nil {
- return nil, err
- } else if !has {
- return nil, util.ErrNotExist
- } else if runnerID != task.RunnerID {
- return nil, errors.New("invalid runner for task")
- }
-
- if task.Status.IsDone() {
- // the state is final, do nothing
- return task, nil
- }
+ return db.WithTx2(ctx, func(ctx context.Context) (*ActionTask, error) {
+ e := db.GetEngine(ctx)
- // state.Result is not unspecified means the task is finished
- if state.Result != runnerv1.Result_RESULT_UNSPECIFIED {
- task.Status = Status(state.Result)
- task.Stopped = timeutil.TimeStamp(state.StoppedAt.AsTime().Unix())
- if err := UpdateTask(ctx, task, "status", "stopped"); err != nil {
- return nil, err
- }
- if _, err := UpdateRunJob(ctx, &ActionRunJob{
- ID: task.JobID,
- Status: task.Status,
- Stopped: task.Stopped,
- }, nil); err != nil {
+ task := &ActionTask{}
+ if has, err := e.ID(state.Id).Get(task); err != nil {
return nil, err
+ } else if !has {
+ return nil, util.ErrNotExist
+ } else if runnerID != task.RunnerID {
+ return nil, errors.New("invalid runner for task")
}
- } else {
- // Force update ActionTask.Updated to avoid the task being judged as a zombie task
- task.Updated = timeutil.TimeStampNow()
- if err := UpdateTask(ctx, task, "updated"); err != nil {
- return nil, err
- }
- }
- if err := task.LoadAttributes(ctx); err != nil {
- return nil, err
- }
-
- for _, step := range task.Steps {
- var result runnerv1.Result
- if v, ok := stepStates[step.Index]; ok {
- result = v.Result
- step.LogIndex = v.LogIndex
- step.LogLength = v.LogLength
- step.Started = convertTimestamp(v.StartedAt)
- step.Stopped = convertTimestamp(v.StoppedAt)
+ if task.Status.IsDone() {
+ // the state is final, do nothing
+ return task, nil
}
- if result != runnerv1.Result_RESULT_UNSPECIFIED {
- step.Status = Status(result)
- } else if step.Started != 0 {
- step.Status = StatusRunning
+
+ // state.Result is not unspecified means the task is finished
+ if state.Result != runnerv1.Result_RESULT_UNSPECIFIED {
+ task.Status = Status(state.Result)
+ task.Stopped = timeutil.TimeStamp(state.StoppedAt.AsTime().Unix())
+ if err := UpdateTask(ctx, task, "status", "stopped"); err != nil {
+ return nil, err
+ }
+ if _, err := UpdateRunJob(ctx, &ActionRunJob{
+ ID: task.JobID,
+ Status: task.Status,
+ Stopped: task.Stopped,
+ }, nil); err != nil {
+ return nil, err
+ }
+ } else {
+ // Force update ActionTask.Updated to avoid the task being judged as a zombie task
+ task.Updated = timeutil.TimeStampNow()
+ if err := UpdateTask(ctx, task, "updated"); err != nil {
+ return nil, err
+ }
}
- if _, err := e.ID(step.ID).Update(step); err != nil {
+
+ if err := task.LoadAttributes(ctx); err != nil {
return nil, err
}
- }
- if err := committer.Commit(); err != nil {
- return nil, err
- }
+ for _, step := range task.Steps {
+ var result runnerv1.Result
+ if v, ok := stepStates[step.Index]; ok {
+ result = v.Result
+ step.LogIndex = v.LogIndex
+ step.LogLength = v.LogLength
+ step.Started = convertTimestamp(v.StartedAt)
+ step.Stopped = convertTimestamp(v.StoppedAt)
+ }
+ if result != runnerv1.Result_RESULT_UNSPECIFIED {
+ step.Status = Status(result)
+ } else if step.Started != 0 {
+ step.Status = StatusRunning
+ }
+ if _, err := e.ID(step.ID).Update(step); err != nil {
+ return nil, err
+ }
+ }
- return task, nil
+ return task, nil
+ })
}
func StopTask(ctx context.Context, taskID int64, status Status) error {