You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

job_emitter.go 3.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. // Copyright 2022 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package actions
  4. import (
  5. "context"
  6. "errors"
  7. "fmt"
  8. actions_model "code.gitea.io/gitea/models/actions"
  9. "code.gitea.io/gitea/models/db"
  10. "code.gitea.io/gitea/modules/graceful"
  11. "code.gitea.io/gitea/modules/queue"
  12. "xorm.io/builder"
  13. )
  14. var jobEmitterQueue queue.UniqueQueue
  15. type jobUpdate struct {
  16. RunID int64
  17. }
  18. func EmitJobsIfReady(runID int64) error {
  19. err := jobEmitterQueue.Push(&jobUpdate{
  20. RunID: runID,
  21. })
  22. if errors.Is(err, queue.ErrAlreadyInQueue) {
  23. return nil
  24. }
  25. return err
  26. }
  27. func jobEmitterQueueHandle(data ...queue.Data) []queue.Data {
  28. ctx := graceful.GetManager().ShutdownContext()
  29. var ret []queue.Data
  30. for _, d := range data {
  31. update := d.(*jobUpdate)
  32. if err := checkJobsOfRun(ctx, update.RunID); err != nil {
  33. ret = append(ret, d)
  34. }
  35. }
  36. return ret
  37. }
  38. func checkJobsOfRun(ctx context.Context, runID int64) error {
  39. jobs, _, err := actions_model.FindRunJobs(ctx, actions_model.FindRunJobOptions{RunID: runID})
  40. if err != nil {
  41. return err
  42. }
  43. if err := db.WithTx(ctx, func(ctx context.Context) error {
  44. idToJobs := make(map[string][]*actions_model.ActionRunJob, len(jobs))
  45. for _, job := range jobs {
  46. idToJobs[job.JobID] = append(idToJobs[job.JobID], job)
  47. }
  48. updates := newJobStatusResolver(jobs).Resolve()
  49. for _, job := range jobs {
  50. if status, ok := updates[job.ID]; ok {
  51. job.Status = status
  52. if n, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": actions_model.StatusBlocked}, "status"); err != nil {
  53. return err
  54. } else if n != 1 {
  55. return fmt.Errorf("no affected for updating blocked job %v", job.ID)
  56. }
  57. }
  58. }
  59. return nil
  60. }); err != nil {
  61. return err
  62. }
  63. CreateCommitStatus(ctx, jobs...)
  64. return nil
  65. }
  66. type jobStatusResolver struct {
  67. statuses map[int64]actions_model.Status
  68. needs map[int64][]int64
  69. }
  70. func newJobStatusResolver(jobs actions_model.ActionJobList) *jobStatusResolver {
  71. idToJobs := make(map[string][]*actions_model.ActionRunJob, len(jobs))
  72. for _, job := range jobs {
  73. idToJobs[job.JobID] = append(idToJobs[job.JobID], job)
  74. }
  75. statuses := make(map[int64]actions_model.Status, len(jobs))
  76. needs := make(map[int64][]int64, len(jobs))
  77. for _, job := range jobs {
  78. statuses[job.ID] = job.Status
  79. for _, need := range job.Needs {
  80. for _, v := range idToJobs[need] {
  81. needs[job.ID] = append(needs[job.ID], v.ID)
  82. }
  83. }
  84. }
  85. return &jobStatusResolver{
  86. statuses: statuses,
  87. needs: needs,
  88. }
  89. }
  90. func (r *jobStatusResolver) Resolve() map[int64]actions_model.Status {
  91. ret := map[int64]actions_model.Status{}
  92. for i := 0; i < len(r.statuses); i++ {
  93. updated := r.resolve()
  94. if len(updated) == 0 {
  95. return ret
  96. }
  97. for k, v := range updated {
  98. ret[k] = v
  99. r.statuses[k] = v
  100. }
  101. }
  102. return ret
  103. }
  104. func (r *jobStatusResolver) resolve() map[int64]actions_model.Status {
  105. ret := map[int64]actions_model.Status{}
  106. for id, status := range r.statuses {
  107. if status != actions_model.StatusBlocked {
  108. continue
  109. }
  110. allDone, allSucceed := true, true
  111. for _, need := range r.needs[id] {
  112. needStatus := r.statuses[need]
  113. if !needStatus.IsDone() {
  114. allDone = false
  115. }
  116. if needStatus.In(actions_model.StatusFailure, actions_model.StatusCancelled, actions_model.StatusSkipped) {
  117. allSucceed = false
  118. }
  119. }
  120. if allDone {
  121. if allSucceed {
  122. ret[id] = actions_model.StatusWaiting
  123. } else {
  124. ret[id] = actions_model.StatusSkipped
  125. }
  126. }
  127. }
  128. return ret
  129. }