You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

schedule_tasks.go 3.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. // Copyright 2023 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package actions
  4. import (
  5. "context"
  6. "fmt"
  7. "time"
  8. actions_model "code.gitea.io/gitea/models/actions"
  9. "code.gitea.io/gitea/models/db"
  10. "code.gitea.io/gitea/modules/log"
  11. "code.gitea.io/gitea/modules/timeutil"
  12. webhook_module "code.gitea.io/gitea/modules/webhook"
  13. "github.com/nektos/act/pkg/jobparser"
  14. )
  15. // StartScheduleTasks start the task
  16. func StartScheduleTasks(ctx context.Context) error {
  17. return startTasks(ctx)
  18. }
  19. // startTasks retrieves specifications in pages, creates a schedule task for each specification,
  20. // and updates the specification's next run time and previous run time.
  21. // The function returns an error if there's an issue with finding or updating the specifications.
  22. func startTasks(ctx context.Context) error {
  23. // Set the page size
  24. pageSize := 50
  25. // Retrieve specs in pages until all specs have been retrieved
  26. now := time.Now()
  27. for page := 1; ; page++ {
  28. // Retrieve the specs for the current page
  29. specs, _, err := actions_model.FindSpecs(ctx, actions_model.FindSpecOptions{
  30. ListOptions: db.ListOptions{
  31. Page: page,
  32. PageSize: pageSize,
  33. },
  34. Next: now.Unix(),
  35. })
  36. if err != nil {
  37. return fmt.Errorf("find specs: %w", err)
  38. }
  39. // Loop through each spec and create a schedule task for it
  40. for _, row := range specs {
  41. // cancel running jobs if the event is push
  42. if row.Schedule.Event == webhook_module.HookEventPush {
  43. // cancel running jobs of the same workflow
  44. if err := actions_model.CancelRunningJobs(
  45. ctx,
  46. row.RepoID,
  47. row.Schedule.Ref,
  48. row.Schedule.WorkflowID,
  49. ); err != nil {
  50. log.Error("CancelRunningJobs: %v", err)
  51. }
  52. }
  53. if err := CreateScheduleTask(ctx, row.Schedule); err != nil {
  54. log.Error("CreateScheduleTask: %v", err)
  55. return err
  56. }
  57. // Parse the spec
  58. schedule, err := row.Parse()
  59. if err != nil {
  60. log.Error("Parse: %v", err)
  61. return err
  62. }
  63. // Update the spec's next run time and previous run time
  64. row.Prev = row.Next
  65. row.Next = timeutil.TimeStamp(schedule.Next(now.Add(1 * time.Minute)).Unix())
  66. if err := actions_model.UpdateScheduleSpec(ctx, row, "prev", "next"); err != nil {
  67. log.Error("UpdateScheduleSpec: %v", err)
  68. return err
  69. }
  70. }
  71. // Stop if all specs have been retrieved
  72. if len(specs) < pageSize {
  73. break
  74. }
  75. }
  76. return nil
  77. }
  78. // CreateScheduleTask creates a scheduled task from a cron action schedule.
  79. // It creates an action run based on the schedule, inserts it into the database, and creates commit statuses for each job.
  80. func CreateScheduleTask(ctx context.Context, cron *actions_model.ActionSchedule) error {
  81. // Create a new action run based on the schedule
  82. run := &actions_model.ActionRun{
  83. Title: cron.Title,
  84. RepoID: cron.RepoID,
  85. OwnerID: cron.OwnerID,
  86. WorkflowID: cron.WorkflowID,
  87. TriggerUserID: cron.TriggerUserID,
  88. Ref: cron.Ref,
  89. CommitSHA: cron.CommitSHA,
  90. Event: cron.Event,
  91. EventPayload: cron.EventPayload,
  92. Status: actions_model.StatusWaiting,
  93. }
  94. // Parse the workflow specification from the cron schedule
  95. workflows, err := jobparser.Parse(cron.Content)
  96. if err != nil {
  97. return err
  98. }
  99. // Insert the action run and its associated jobs into the database
  100. if err := actions_model.InsertRun(ctx, run, workflows); err != nil {
  101. return err
  102. }
  103. // Retrieve the jobs for the newly created action run
  104. jobs, _, err := actions_model.FindRunJobs(ctx, actions_model.FindRunJobOptions{RunID: run.ID})
  105. if err != nil {
  106. return err
  107. }
  108. // Create commit statuses for each job
  109. for _, job := range jobs {
  110. if err := createCommitStatus(ctx, job); err != nil {
  111. return err
  112. }
  113. }
  114. // Return nil if no errors occurred
  115. return nil
  116. }