Du kannst nicht mehr als 25 Themen auswählen Themen müssen mit entweder einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. // Copyright 2022 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package actions
  4. import (
  5. "context"
  6. "fmt"
  7. "slices"
  8. "strings"
  9. "time"
  10. "code.gitea.io/gitea/models/db"
  11. repo_model "code.gitea.io/gitea/models/repo"
  12. user_model "code.gitea.io/gitea/models/user"
  13. "code.gitea.io/gitea/modules/git"
  14. "code.gitea.io/gitea/modules/json"
  15. api "code.gitea.io/gitea/modules/structs"
  16. "code.gitea.io/gitea/modules/timeutil"
  17. "code.gitea.io/gitea/modules/util"
  18. webhook_module "code.gitea.io/gitea/modules/webhook"
  19. "github.com/nektos/act/pkg/jobparser"
  20. "xorm.io/builder"
  21. )
  22. // ActionRun represents a run of a workflow file
  23. type ActionRun struct {
  24. ID int64
  25. Title string
  26. RepoID int64 `xorm:"index unique(repo_index)"`
  27. Repo *repo_model.Repository `xorm:"-"`
  28. OwnerID int64 `xorm:"index"`
  29. WorkflowID string `xorm:"index"` // the name of workflow file
  30. Index int64 `xorm:"index unique(repo_index)"` // a unique number for each run of a repository
  31. TriggerUserID int64 `xorm:"index"`
  32. TriggerUser *user_model.User `xorm:"-"`
  33. ScheduleID int64
  34. Ref string `xorm:"index"` // the commit/tag/… that caused the run
  35. CommitSHA string
  36. IsForkPullRequest bool // If this is triggered by a PR from a forked repository or an untrusted user, we need to check if it is approved and limit permissions when running the workflow.
  37. NeedApproval bool // may need approval if it's a fork pull request
  38. ApprovedBy int64 `xorm:"index"` // who approved
  39. Event webhook_module.HookEventType // the webhook event that causes the workflow to run
  40. EventPayload string `xorm:"LONGTEXT"`
  41. TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow
  42. Status Status `xorm:"index"`
  43. Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed
  44. // Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0
  45. Started timeutil.TimeStamp
  46. Stopped timeutil.TimeStamp
  47. // PreviousDuration is used for recording previous duration
  48. PreviousDuration time.Duration
  49. Created timeutil.TimeStamp `xorm:"created"`
  50. Updated timeutil.TimeStamp `xorm:"updated"`
  51. }
  52. func init() {
  53. db.RegisterModel(new(ActionRun))
  54. db.RegisterModel(new(ActionRunIndex))
  55. }
  56. func (run *ActionRun) HTMLURL() string {
  57. if run.Repo == nil {
  58. return ""
  59. }
  60. return fmt.Sprintf("%s/actions/runs/%d", run.Repo.HTMLURL(), run.Index)
  61. }
  62. func (run *ActionRun) Link() string {
  63. if run.Repo == nil {
  64. return ""
  65. }
  66. return fmt.Sprintf("%s/actions/runs/%d", run.Repo.Link(), run.Index)
  67. }
  68. // RefLink return the url of run's ref
  69. func (run *ActionRun) RefLink() string {
  70. refName := git.RefName(run.Ref)
  71. if refName.IsPull() {
  72. return run.Repo.Link() + "/pulls/" + refName.ShortName()
  73. }
  74. return git.RefURL(run.Repo.Link(), run.Ref)
  75. }
  76. // PrettyRef return #id for pull ref or ShortName for others
  77. func (run *ActionRun) PrettyRef() string {
  78. refName := git.RefName(run.Ref)
  79. if refName.IsPull() {
  80. return "#" + strings.TrimSuffix(strings.TrimPrefix(run.Ref, git.PullPrefix), "/head")
  81. }
  82. return refName.ShortName()
  83. }
  84. // LoadAttributes load Repo TriggerUser if not loaded
  85. func (run *ActionRun) LoadAttributes(ctx context.Context) error {
  86. if run == nil {
  87. return nil
  88. }
  89. if run.Repo == nil {
  90. repo, err := repo_model.GetRepositoryByID(ctx, run.RepoID)
  91. if err != nil {
  92. return err
  93. }
  94. run.Repo = repo
  95. }
  96. if err := run.Repo.LoadAttributes(ctx); err != nil {
  97. return err
  98. }
  99. if run.TriggerUser == nil {
  100. u, err := user_model.GetPossibleUserByID(ctx, run.TriggerUserID)
  101. if err != nil {
  102. return err
  103. }
  104. run.TriggerUser = u
  105. }
  106. return nil
  107. }
  108. func (run *ActionRun) Duration() time.Duration {
  109. return calculateDuration(run.Started, run.Stopped, run.Status) + run.PreviousDuration
  110. }
  111. func (run *ActionRun) GetPushEventPayload() (*api.PushPayload, error) {
  112. if run.Event == webhook_module.HookEventPush {
  113. var payload api.PushPayload
  114. if err := json.Unmarshal([]byte(run.EventPayload), &payload); err != nil {
  115. return nil, err
  116. }
  117. return &payload, nil
  118. }
  119. return nil, fmt.Errorf("event %s is not a push event", run.Event)
  120. }
  121. func (run *ActionRun) GetPullRequestEventPayload() (*api.PullRequestPayload, error) {
  122. if run.Event == webhook_module.HookEventPullRequest || run.Event == webhook_module.HookEventPullRequestSync {
  123. var payload api.PullRequestPayload
  124. if err := json.Unmarshal([]byte(run.EventPayload), &payload); err != nil {
  125. return nil, err
  126. }
  127. return &payload, nil
  128. }
  129. return nil, fmt.Errorf("event %s is not a pull request event", run.Event)
  130. }
  131. func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error {
  132. _, err := db.GetEngine(ctx).ID(repo.ID).
  133. SetExpr("num_action_runs",
  134. builder.Select("count(*)").From("action_run").
  135. Where(builder.Eq{"repo_id": repo.ID}),
  136. ).
  137. SetExpr("num_closed_action_runs",
  138. builder.Select("count(*)").From("action_run").
  139. Where(builder.Eq{
  140. "repo_id": repo.ID,
  141. }.And(
  142. builder.In("status",
  143. StatusSuccess,
  144. StatusFailure,
  145. StatusCancelled,
  146. StatusSkipped,
  147. ),
  148. ),
  149. ),
  150. ).
  151. Update(repo)
  152. return err
  153. }
  154. // CancelPreviousJobs cancels all previous jobs of the same repository, reference, workflow, and event.
  155. // It's useful when a new run is triggered, and all previous runs needn't be continued anymore.
  156. func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) error {
  157. // Find all runs in the specified repository, reference, and workflow with non-final status
  158. runs, total, err := db.FindAndCount[ActionRun](ctx, FindRunOptions{
  159. RepoID: repoID,
  160. Ref: ref,
  161. WorkflowID: workflowID,
  162. TriggerEvent: event,
  163. Status: []Status{StatusRunning, StatusWaiting, StatusBlocked},
  164. })
  165. if err != nil {
  166. return err
  167. }
  168. // If there are no runs found, there's no need to proceed with cancellation, so return nil.
  169. if total == 0 {
  170. return nil
  171. }
  172. // Iterate over each found run and cancel its associated jobs.
  173. for _, run := range runs {
  174. // Find all jobs associated with the current run.
  175. jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{
  176. RunID: run.ID,
  177. })
  178. if err != nil {
  179. return err
  180. }
  181. // Iterate over each job and attempt to cancel it.
  182. for _, job := range jobs {
  183. // Skip jobs that are already in a terminal state (completed, cancelled, etc.).
  184. status := job.Status
  185. if status.IsDone() {
  186. continue
  187. }
  188. // If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
  189. if job.TaskID == 0 {
  190. job.Status = StatusCancelled
  191. job.Stopped = timeutil.TimeStampNow()
  192. // Update the job's status and stopped time in the database.
  193. n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
  194. if err != nil {
  195. return err
  196. }
  197. // If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
  198. if n == 0 {
  199. return fmt.Errorf("job has changed, try again")
  200. }
  201. // Continue with the next job.
  202. continue
  203. }
  204. // If the job has an associated task, try to stop the task, effectively cancelling the job.
  205. if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
  206. return err
  207. }
  208. }
  209. }
  210. // Return nil to indicate successful cancellation of all running and waiting jobs.
  211. return nil
  212. }
  213. // InsertRun inserts a run
  214. func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWorkflow) error {
  215. ctx, commiter, err := db.TxContext(ctx)
  216. if err != nil {
  217. return err
  218. }
  219. defer commiter.Close()
  220. index, err := db.GetNextResourceIndex(ctx, "action_run_index", run.RepoID)
  221. if err != nil {
  222. return err
  223. }
  224. run.Index = index
  225. if err := db.Insert(ctx, run); err != nil {
  226. return err
  227. }
  228. if run.Repo == nil {
  229. repo, err := repo_model.GetRepositoryByID(ctx, run.RepoID)
  230. if err != nil {
  231. return err
  232. }
  233. run.Repo = repo
  234. }
  235. if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil {
  236. return err
  237. }
  238. runJobs := make([]*ActionRunJob, 0, len(jobs))
  239. var hasWaiting bool
  240. for _, v := range jobs {
  241. id, job := v.Job()
  242. needs := job.Needs()
  243. if err := v.SetJob(id, job.EraseNeeds()); err != nil {
  244. return err
  245. }
  246. payload, _ := v.Marshal()
  247. status := StatusWaiting
  248. if len(needs) > 0 || run.NeedApproval {
  249. status = StatusBlocked
  250. } else {
  251. hasWaiting = true
  252. }
  253. job.Name, _ = util.SplitStringAtByteN(job.Name, 255)
  254. runJobs = append(runJobs, &ActionRunJob{
  255. RunID: run.ID,
  256. RepoID: run.RepoID,
  257. OwnerID: run.OwnerID,
  258. CommitSHA: run.CommitSHA,
  259. IsForkPullRequest: run.IsForkPullRequest,
  260. Name: job.Name,
  261. WorkflowPayload: payload,
  262. JobID: id,
  263. Needs: needs,
  264. RunsOn: job.RunsOn(),
  265. Status: status,
  266. })
  267. }
  268. if err := db.Insert(ctx, runJobs); err != nil {
  269. return err
  270. }
  271. // if there is a job in the waiting status, increase tasks version.
  272. if hasWaiting {
  273. if err := IncreaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil {
  274. return err
  275. }
  276. }
  277. return commiter.Commit()
  278. }
  279. func GetRunByID(ctx context.Context, id int64) (*ActionRun, error) {
  280. var run ActionRun
  281. has, err := db.GetEngine(ctx).Where("id=?", id).Get(&run)
  282. if err != nil {
  283. return nil, err
  284. } else if !has {
  285. return nil, fmt.Errorf("run with id %d: %w", id, util.ErrNotExist)
  286. }
  287. return &run, nil
  288. }
  289. func GetRunByIndex(ctx context.Context, repoID, index int64) (*ActionRun, error) {
  290. run := &ActionRun{
  291. RepoID: repoID,
  292. Index: index,
  293. }
  294. has, err := db.GetEngine(ctx).Get(run)
  295. if err != nil {
  296. return nil, err
  297. } else if !has {
  298. return nil, fmt.Errorf("run with index %d %d: %w", repoID, index, util.ErrNotExist)
  299. }
  300. return run, nil
  301. }
  302. func GetWorkflowLatestRun(ctx context.Context, repoID int64, workflowFile, branch, event string) (*ActionRun, error) {
  303. var run ActionRun
  304. q := db.GetEngine(ctx).Where("repo_id=?", repoID).
  305. And("ref = ?", branch).
  306. And("workflow_id = ?", workflowFile)
  307. if event != "" {
  308. q.And("event = ?", event)
  309. }
  310. has, err := q.Desc("id").Get(&run)
  311. if err != nil {
  312. return nil, err
  313. } else if !has {
  314. return nil, util.NewNotExistErrorf("run with repo_id %d, ref %s, workflow_id %s", repoID, branch, workflowFile)
  315. }
  316. return &run, nil
  317. }
  318. // UpdateRun updates a run.
  319. // It requires the inputted run has Version set.
  320. // It will return error if the version is not matched (it means the run has been changed after loaded).
  321. func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
  322. sess := db.GetEngine(ctx).ID(run.ID)
  323. if len(cols) > 0 {
  324. sess.Cols(cols...)
  325. }
  326. affected, err := sess.Update(run)
  327. if err != nil {
  328. return err
  329. }
  330. if affected == 0 {
  331. return fmt.Errorf("run has changed")
  332. // It's impossible that the run is not found, since Gitea never deletes runs.
  333. }
  334. if run.Status != 0 || slices.Contains(cols, "status") {
  335. if run.RepoID == 0 {
  336. run, err = GetRunByID(ctx, run.ID)
  337. if err != nil {
  338. return err
  339. }
  340. }
  341. if run.Repo == nil {
  342. repo, err := repo_model.GetRepositoryByID(ctx, run.RepoID)
  343. if err != nil {
  344. return err
  345. }
  346. run.Repo = repo
  347. }
  348. if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil {
  349. return err
  350. }
  351. }
  352. return nil
  353. }
  354. type ActionRunIndex db.ResourceIndex