aboutsummaryrefslogtreecommitdiffstats
path: root/services/actions/job_emitter.go
diff options
context:
space:
mode:
Diffstat (limited to 'services/actions/job_emitter.go')
-rw-r--r--services/actions/job_emitter.go140
1 files changed, 140 insertions, 0 deletions
diff --git a/services/actions/job_emitter.go b/services/actions/job_emitter.go
new file mode 100644
index 0000000000..cb2cc8d1ac
--- /dev/null
+++ b/services/actions/job_emitter.go
@@ -0,0 +1,140 @@
+// Copyright 2022 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package actions
+
+import (
+ "context"
+ "errors"
+ "fmt"
+
+ actions_model "code.gitea.io/gitea/models/actions"
+ "code.gitea.io/gitea/models/db"
+ "code.gitea.io/gitea/modules/graceful"
+ "code.gitea.io/gitea/modules/queue"
+
+ "xorm.io/builder"
+)
+
+var jobEmitterQueue queue.UniqueQueue
+
+type jobUpdate struct {
+ RunID int64
+}
+
+func EmitJobsIfReady(runID int64) error {
+ err := jobEmitterQueue.Push(&jobUpdate{
+ RunID: runID,
+ })
+ if errors.Is(err, queue.ErrAlreadyInQueue) {
+ return nil
+ }
+ return err
+}
+
+func jobEmitterQueueHandle(data ...queue.Data) []queue.Data {
+ ctx := graceful.GetManager().ShutdownContext()
+ var ret []queue.Data
+ for _, d := range data {
+ update := d.(*jobUpdate)
+ if err := checkJobsOfRun(ctx, update.RunID); err != nil {
+ ret = append(ret, d)
+ }
+ }
+ return ret
+}
+
+func checkJobsOfRun(ctx context.Context, runID int64) error {
+ return db.WithTx(ctx, func(ctx context.Context) error {
+ jobs, _, err := actions_model.FindRunJobs(ctx, actions_model.FindRunJobOptions{RunID: runID})
+ if err != nil {
+ return err
+ }
+ idToJobs := make(map[string][]*actions_model.ActionRunJob, len(jobs))
+ for _, job := range jobs {
+ idToJobs[job.JobID] = append(idToJobs[job.JobID], job)
+ }
+
+ updates := newJobStatusResolver(jobs).Resolve()
+ for _, job := range jobs {
+ if status, ok := updates[job.ID]; ok {
+ job.Status = status
+ if n, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": actions_model.StatusBlocked}, "status"); err != nil {
+ return err
+ } else if n != 1 {
+ return fmt.Errorf("no affected for updating blocked job %v", job.ID)
+ }
+ }
+ }
+ return nil
+ })
+}
+
+type jobStatusResolver struct {
+ statuses map[int64]actions_model.Status
+ needs map[int64][]int64
+}
+
+func newJobStatusResolver(jobs actions_model.ActionJobList) *jobStatusResolver {
+ idToJobs := make(map[string][]*actions_model.ActionRunJob, len(jobs))
+ for _, job := range jobs {
+ idToJobs[job.JobID] = append(idToJobs[job.JobID], job)
+ }
+
+ statuses := make(map[int64]actions_model.Status, len(jobs))
+ needs := make(map[int64][]int64, len(jobs))
+ for _, job := range jobs {
+ statuses[job.ID] = job.Status
+ for _, need := range job.Needs {
+ for _, v := range idToJobs[need] {
+ needs[job.ID] = append(needs[job.ID], v.ID)
+ }
+ }
+ }
+ return &jobStatusResolver{
+ statuses: statuses,
+ needs: needs,
+ }
+}
+
+func (r *jobStatusResolver) Resolve() map[int64]actions_model.Status {
+ ret := map[int64]actions_model.Status{}
+ for i := 0; i < len(r.statuses); i++ {
+ updated := r.resolve()
+ if len(updated) == 0 {
+ return ret
+ }
+ for k, v := range updated {
+ ret[k] = v
+ r.statuses[k] = v
+ }
+ }
+ return ret
+}
+
+func (r *jobStatusResolver) resolve() map[int64]actions_model.Status {
+ ret := map[int64]actions_model.Status{}
+ for id, status := range r.statuses {
+ if status != actions_model.StatusBlocked {
+ continue
+ }
+ allDone, allSucceed := true, true
+ for _, need := range r.needs[id] {
+ needStatus := r.statuses[need]
+ if !needStatus.IsDone() {
+ allDone = false
+ }
+ if needStatus.In(actions_model.StatusFailure, actions_model.StatusCancelled, actions_model.StatusSkipped) {
+ allSucceed = false
+ }
+ }
+ if allDone {
+ if allSucceed {
+ ret[id] = actions_model.StatusWaiting
+ } else {
+ ret[id] = actions_model.StatusSkipped
+ }
+ }
+ }
+ return ret
+}