You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

job_emitter.go 4.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. // Copyright 2022 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package actions
  4. import (
  5. "context"
  6. "errors"
  7. "fmt"
  8. "strings"
  9. actions_model "code.gitea.io/gitea/models/actions"
  10. "code.gitea.io/gitea/models/db"
  11. "code.gitea.io/gitea/modules/graceful"
  12. "code.gitea.io/gitea/modules/queue"
  13. "github.com/nektos/act/pkg/jobparser"
  14. "xorm.io/builder"
  15. )
  16. var jobEmitterQueue *queue.WorkerPoolQueue[*jobUpdate]
  17. type jobUpdate struct {
  18. RunID int64
  19. }
  20. func EmitJobsIfReady(runID int64) error {
  21. err := jobEmitterQueue.Push(&jobUpdate{
  22. RunID: runID,
  23. })
  24. if errors.Is(err, queue.ErrAlreadyInQueue) {
  25. return nil
  26. }
  27. return err
  28. }
  29. func jobEmitterQueueHandler(items ...*jobUpdate) []*jobUpdate {
  30. ctx := graceful.GetManager().ShutdownContext()
  31. var ret []*jobUpdate
  32. for _, update := range items {
  33. if err := checkJobsOfRun(ctx, update.RunID); err != nil {
  34. ret = append(ret, update)
  35. }
  36. }
  37. return ret
  38. }
  39. func checkJobsOfRun(ctx context.Context, runID int64) error {
  40. jobs, _, err := actions_model.FindRunJobs(ctx, actions_model.FindRunJobOptions{RunID: runID})
  41. if err != nil {
  42. return err
  43. }
  44. if err := db.WithTx(ctx, func(ctx context.Context) error {
  45. idToJobs := make(map[string][]*actions_model.ActionRunJob, len(jobs))
  46. for _, job := range jobs {
  47. idToJobs[job.JobID] = append(idToJobs[job.JobID], job)
  48. }
  49. updates := newJobStatusResolver(jobs).Resolve()
  50. for _, job := range jobs {
  51. if status, ok := updates[job.ID]; ok {
  52. job.Status = status
  53. if n, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": actions_model.StatusBlocked}, "status"); err != nil {
  54. return err
  55. } else if n != 1 {
  56. return fmt.Errorf("no affected for updating blocked job %v", job.ID)
  57. }
  58. }
  59. }
  60. return nil
  61. }); err != nil {
  62. return err
  63. }
  64. CreateCommitStatus(ctx, jobs...)
  65. return nil
  66. }
  67. type jobStatusResolver struct {
  68. statuses map[int64]actions_model.Status
  69. needs map[int64][]int64
  70. jobMap map[int64]*actions_model.ActionRunJob
  71. }
  72. func newJobStatusResolver(jobs actions_model.ActionJobList) *jobStatusResolver {
  73. idToJobs := make(map[string][]*actions_model.ActionRunJob, len(jobs))
  74. jobMap := make(map[int64]*actions_model.ActionRunJob)
  75. for _, job := range jobs {
  76. idToJobs[job.JobID] = append(idToJobs[job.JobID], job)
  77. jobMap[job.ID] = job
  78. }
  79. statuses := make(map[int64]actions_model.Status, len(jobs))
  80. needs := make(map[int64][]int64, len(jobs))
  81. for _, job := range jobs {
  82. statuses[job.ID] = job.Status
  83. for _, need := range job.Needs {
  84. for _, v := range idToJobs[need] {
  85. needs[job.ID] = append(needs[job.ID], v.ID)
  86. }
  87. }
  88. }
  89. return &jobStatusResolver{
  90. statuses: statuses,
  91. needs: needs,
  92. jobMap: jobMap,
  93. }
  94. }
  95. func (r *jobStatusResolver) Resolve() map[int64]actions_model.Status {
  96. ret := map[int64]actions_model.Status{}
  97. for i := 0; i < len(r.statuses); i++ {
  98. updated := r.resolve()
  99. if len(updated) == 0 {
  100. return ret
  101. }
  102. for k, v := range updated {
  103. ret[k] = v
  104. r.statuses[k] = v
  105. }
  106. }
  107. return ret
  108. }
  109. func (r *jobStatusResolver) resolve() map[int64]actions_model.Status {
  110. ret := map[int64]actions_model.Status{}
  111. for id, status := range r.statuses {
  112. if status != actions_model.StatusBlocked {
  113. continue
  114. }
  115. allDone, allSucceed := true, true
  116. for _, need := range r.needs[id] {
  117. needStatus := r.statuses[need]
  118. if !needStatus.IsDone() {
  119. allDone = false
  120. }
  121. if needStatus.In(actions_model.StatusFailure, actions_model.StatusCancelled, actions_model.StatusSkipped) {
  122. allSucceed = false
  123. }
  124. }
  125. if allDone {
  126. if allSucceed {
  127. ret[id] = actions_model.StatusWaiting
  128. } else {
  129. // If a job's "if" condition is "always()", the job should always run even if some of its dependencies did not succeed.
  130. // See https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_idneeds
  131. always := false
  132. if wfJobs, _ := jobparser.Parse(r.jobMap[id].WorkflowPayload); len(wfJobs) == 1 {
  133. _, wfJob := wfJobs[0].Job()
  134. expr := strings.TrimSpace(strings.TrimSuffix(strings.TrimPrefix(wfJob.If.Value, "${{"), "}}"))
  135. always = expr == "always()"
  136. }
  137. if always {
  138. ret[id] = actions_model.StatusWaiting
  139. } else {
  140. ret[id] = actions_model.StatusSkipped
  141. }
  142. }
  143. }
  144. }
  145. return ret
  146. }