diff options
Diffstat (limited to 'services/actions/job_emitter.go')
-rw-r--r-- | services/actions/job_emitter.go | 140 |
1 files changed, 140 insertions, 0 deletions
diff --git a/services/actions/job_emitter.go b/services/actions/job_emitter.go new file mode 100644 index 0000000000..cb2cc8d1ac --- /dev/null +++ b/services/actions/job_emitter.go @@ -0,0 +1,140 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + "errors" + "fmt" + + actions_model "code.gitea.io/gitea/models/actions" + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/queue" + + "xorm.io/builder" +) + +var jobEmitterQueue queue.UniqueQueue + +type jobUpdate struct { + RunID int64 +} + +func EmitJobsIfReady(runID int64) error { + err := jobEmitterQueue.Push(&jobUpdate{ + RunID: runID, + }) + if errors.Is(err, queue.ErrAlreadyInQueue) { + return nil + } + return err +} + +func jobEmitterQueueHandle(data ...queue.Data) []queue.Data { + ctx := graceful.GetManager().ShutdownContext() + var ret []queue.Data + for _, d := range data { + update := d.(*jobUpdate) + if err := checkJobsOfRun(ctx, update.RunID); err != nil { + ret = append(ret, d) + } + } + return ret +} + +func checkJobsOfRun(ctx context.Context, runID int64) error { + return db.WithTx(ctx, func(ctx context.Context) error { + jobs, _, err := actions_model.FindRunJobs(ctx, actions_model.FindRunJobOptions{RunID: runID}) + if err != nil { + return err + } + idToJobs := make(map[string][]*actions_model.ActionRunJob, len(jobs)) + for _, job := range jobs { + idToJobs[job.JobID] = append(idToJobs[job.JobID], job) + } + + updates := newJobStatusResolver(jobs).Resolve() + for _, job := range jobs { + if status, ok := updates[job.ID]; ok { + job.Status = status + if n, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": actions_model.StatusBlocked}, "status"); err != nil { + return err + } else if n != 1 { + return fmt.Errorf("no affected for updating blocked job %v", job.ID) + } + } + } + return nil + }) +} + +type jobStatusResolver struct { + statuses map[int64]actions_model.Status + needs map[int64][]int64 +} + +func newJobStatusResolver(jobs actions_model.ActionJobList) *jobStatusResolver { + idToJobs := make(map[string][]*actions_model.ActionRunJob, len(jobs)) + for _, job := range jobs { + idToJobs[job.JobID] = append(idToJobs[job.JobID], job) + } + + statuses := make(map[int64]actions_model.Status, len(jobs)) + needs := make(map[int64][]int64, len(jobs)) + for _, job := range jobs { + statuses[job.ID] = job.Status + for _, need := range job.Needs { + for _, v := range idToJobs[need] { + needs[job.ID] = append(needs[job.ID], v.ID) + } + } + } + return &jobStatusResolver{ + statuses: statuses, + needs: needs, + } +} + +func (r *jobStatusResolver) Resolve() map[int64]actions_model.Status { + ret := map[int64]actions_model.Status{} + for i := 0; i < len(r.statuses); i++ { + updated := r.resolve() + if len(updated) == 0 { + return ret + } + for k, v := range updated { + ret[k] = v + r.statuses[k] = v + } + } + return ret +} + +func (r *jobStatusResolver) resolve() map[int64]actions_model.Status { + ret := map[int64]actions_model.Status{} + for id, status := range r.statuses { + if status != actions_model.StatusBlocked { + continue + } + allDone, allSucceed := true, true + for _, need := range r.needs[id] { + needStatus := r.statuses[need] + if !needStatus.IsDone() { + allDone = false + } + if needStatus.In(actions_model.StatusFailure, actions_model.StatusCancelled, actions_model.StatusSkipped) { + allSucceed = false + } + } + if allDone { + if allSucceed { + ret[id] = actions_model.StatusWaiting + } else { + ret[id] = actions_model.StatusSkipped + } + } + } + return ret +} |