You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

run_job.go 4.3KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. // Copyright 2022 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package actions
  4. import (
  5. "context"
  6. "fmt"
  7. "slices"
  8. "time"
  9. "code.gitea.io/gitea/models/db"
  10. "code.gitea.io/gitea/modules/timeutil"
  11. "code.gitea.io/gitea/modules/util"
  12. "xorm.io/builder"
  13. )
  14. // ActionRunJob represents a job of a run
  15. type ActionRunJob struct {
  16. ID int64
  17. RunID int64 `xorm:"index"`
  18. Run *ActionRun `xorm:"-"`
  19. RepoID int64 `xorm:"index"`
  20. OwnerID int64 `xorm:"index"`
  21. CommitSHA string `xorm:"index"`
  22. IsForkPullRequest bool
  23. Name string `xorm:"VARCHAR(255)"`
  24. Attempt int64
  25. WorkflowPayload []byte
  26. JobID string `xorm:"VARCHAR(255)"` // job id in workflow, not job's id
  27. Needs []string `xorm:"JSON TEXT"`
  28. RunsOn []string `xorm:"JSON TEXT"`
  29. TaskID int64 // the latest task of the job
  30. Status Status `xorm:"index"`
  31. Started timeutil.TimeStamp
  32. Stopped timeutil.TimeStamp
  33. Created timeutil.TimeStamp `xorm:"created"`
  34. Updated timeutil.TimeStamp `xorm:"updated index"`
  35. }
  36. func init() {
  37. db.RegisterModel(new(ActionRunJob))
  38. }
  39. func (job *ActionRunJob) Duration() time.Duration {
  40. return calculateDuration(job.Started, job.Stopped, job.Status)
  41. }
  42. func (job *ActionRunJob) LoadRun(ctx context.Context) error {
  43. if job.Run == nil {
  44. run, err := GetRunByID(ctx, job.RunID)
  45. if err != nil {
  46. return err
  47. }
  48. job.Run = run
  49. }
  50. return nil
  51. }
  52. // LoadAttributes load Run if not loaded
  53. func (job *ActionRunJob) LoadAttributes(ctx context.Context) error {
  54. if job == nil {
  55. return nil
  56. }
  57. if err := job.LoadRun(ctx); err != nil {
  58. return err
  59. }
  60. return job.Run.LoadAttributes(ctx)
  61. }
  62. func GetRunJobByID(ctx context.Context, id int64) (*ActionRunJob, error) {
  63. var job ActionRunJob
  64. has, err := db.GetEngine(ctx).Where("id=?", id).Get(&job)
  65. if err != nil {
  66. return nil, err
  67. } else if !has {
  68. return nil, fmt.Errorf("run job with id %d: %w", id, util.ErrNotExist)
  69. }
  70. return &job, nil
  71. }
  72. func GetRunJobsByRunID(ctx context.Context, runID int64) ([]*ActionRunJob, error) {
  73. var jobs []*ActionRunJob
  74. if err := db.GetEngine(ctx).Where("run_id=?", runID).OrderBy("id").Find(&jobs); err != nil {
  75. return nil, err
  76. }
  77. return jobs, nil
  78. }
  79. func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, cols ...string) (int64, error) {
  80. e := db.GetEngine(ctx)
  81. sess := e.ID(job.ID)
  82. if len(cols) > 0 {
  83. sess.Cols(cols...)
  84. }
  85. if cond != nil {
  86. sess.Where(cond)
  87. }
  88. affected, err := sess.Update(job)
  89. if err != nil {
  90. return 0, err
  91. }
  92. if affected == 0 || (!slices.Contains(cols, "status") && job.Status == 0) {
  93. return affected, nil
  94. }
  95. if affected != 0 && slices.Contains(cols, "status") && job.Status.IsWaiting() {
  96. // if the status of job changes to waiting again, increase tasks version.
  97. if err := IncreaseTaskVersion(ctx, job.OwnerID, job.RepoID); err != nil {
  98. return 0, err
  99. }
  100. }
  101. if job.RunID == 0 {
  102. var err error
  103. if job, err = GetRunJobByID(ctx, job.ID); err != nil {
  104. return 0, err
  105. }
  106. }
  107. {
  108. // Other goroutines may aggregate the status of the run and update it too.
  109. // So we need load the run and its jobs before updating the run.
  110. run, err := GetRunByID(ctx, job.RunID)
  111. if err != nil {
  112. return 0, err
  113. }
  114. jobs, err := GetRunJobsByRunID(ctx, job.RunID)
  115. if err != nil {
  116. return 0, err
  117. }
  118. run.Status = aggregateJobStatus(jobs)
  119. if run.Started.IsZero() && run.Status.IsRunning() {
  120. run.Started = timeutil.TimeStampNow()
  121. }
  122. if run.Stopped.IsZero() && run.Status.IsDone() {
  123. run.Stopped = timeutil.TimeStampNow()
  124. }
  125. if err := UpdateRun(ctx, run, "status", "started", "stopped"); err != nil {
  126. return 0, fmt.Errorf("update run %d: %w", run.ID, err)
  127. }
  128. }
  129. return affected, nil
  130. }
  131. func aggregateJobStatus(jobs []*ActionRunJob) Status {
  132. allDone := true
  133. allWaiting := true
  134. hasFailure := false
  135. for _, job := range jobs {
  136. if !job.Status.IsDone() {
  137. allDone = false
  138. }
  139. if job.Status != StatusWaiting && !job.Status.IsDone() {
  140. allWaiting = false
  141. }
  142. if job.Status == StatusFailure || job.Status == StatusCancelled {
  143. hasFailure = true
  144. }
  145. }
  146. if allDone {
  147. if hasFailure {
  148. return StatusFailure
  149. }
  150. return StatusSuccess
  151. }
  152. if allWaiting {
  153. return StatusWaiting
  154. }
  155. return StatusRunning
  156. }