You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

schedule_tasks.go 3.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. // Copyright 2023 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package actions
  4. import (
  5. "context"
  6. "fmt"
  7. "time"
  8. actions_model "code.gitea.io/gitea/models/actions"
  9. "code.gitea.io/gitea/models/db"
  10. "code.gitea.io/gitea/models/unit"
  11. "code.gitea.io/gitea/modules/log"
  12. "code.gitea.io/gitea/modules/timeutil"
  13. webhook_module "code.gitea.io/gitea/modules/webhook"
  14. "github.com/nektos/act/pkg/jobparser"
  15. )
  16. // StartScheduleTasks start the task
  17. func StartScheduleTasks(ctx context.Context) error {
  18. return startTasks(ctx)
  19. }
  20. // startTasks retrieves specifications in pages, creates a schedule task for each specification,
  21. // and updates the specification's next run time and previous run time.
  22. // The function returns an error if there's an issue with finding or updating the specifications.
  23. func startTasks(ctx context.Context) error {
  24. // Set the page size
  25. pageSize := 50
  26. // Retrieve specs in pages until all specs have been retrieved
  27. now := time.Now()
  28. for page := 1; ; page++ {
  29. // Retrieve the specs for the current page
  30. specs, _, err := actions_model.FindSpecs(ctx, actions_model.FindSpecOptions{
  31. ListOptions: db.ListOptions{
  32. Page: page,
  33. PageSize: pageSize,
  34. },
  35. Next: now.Unix(),
  36. })
  37. if err != nil {
  38. return fmt.Errorf("find specs: %w", err)
  39. }
  40. if err := specs.LoadRepos(); err != nil {
  41. return fmt.Errorf("LoadRepos: %w", err)
  42. }
  43. // Loop through each spec and create a schedule task for it
  44. for _, row := range specs {
  45. // cancel running jobs if the event is push
  46. if row.Schedule.Event == webhook_module.HookEventPush {
  47. // cancel running jobs of the same workflow
  48. if err := actions_model.CancelRunningJobs(
  49. ctx,
  50. row.RepoID,
  51. row.Schedule.Ref,
  52. row.Schedule.WorkflowID,
  53. webhook_module.HookEventSchedule,
  54. ); err != nil {
  55. log.Error("CancelRunningJobs: %v", err)
  56. }
  57. }
  58. cfg := row.Repo.MustGetUnit(ctx, unit.TypeActions).ActionsConfig()
  59. if cfg.IsWorkflowDisabled(row.Schedule.WorkflowID) {
  60. continue
  61. }
  62. if err := CreateScheduleTask(ctx, row.Schedule); err != nil {
  63. log.Error("CreateScheduleTask: %v", err)
  64. return err
  65. }
  66. // Parse the spec
  67. schedule, err := row.Parse()
  68. if err != nil {
  69. log.Error("Parse: %v", err)
  70. return err
  71. }
  72. // Update the spec's next run time and previous run time
  73. row.Prev = row.Next
  74. row.Next = timeutil.TimeStamp(schedule.Next(now.Add(1 * time.Minute)).Unix())
  75. if err := actions_model.UpdateScheduleSpec(ctx, row, "prev", "next"); err != nil {
  76. log.Error("UpdateScheduleSpec: %v", err)
  77. return err
  78. }
  79. }
  80. // Stop if all specs have been retrieved
  81. if len(specs) < pageSize {
  82. break
  83. }
  84. }
  85. return nil
  86. }
  87. // CreateScheduleTask creates a scheduled task from a cron action schedule.
  88. // It creates an action run based on the schedule, inserts it into the database, and creates commit statuses for each job.
  89. func CreateScheduleTask(ctx context.Context, cron *actions_model.ActionSchedule) error {
  90. // Create a new action run based on the schedule
  91. run := &actions_model.ActionRun{
  92. Title: cron.Title,
  93. RepoID: cron.RepoID,
  94. OwnerID: cron.OwnerID,
  95. WorkflowID: cron.WorkflowID,
  96. TriggerUserID: cron.TriggerUserID,
  97. Ref: cron.Ref,
  98. CommitSHA: cron.CommitSHA,
  99. Event: cron.Event,
  100. EventPayload: cron.EventPayload,
  101. TriggerEvent: string(webhook_module.HookEventSchedule),
  102. ScheduleID: cron.ID,
  103. Status: actions_model.StatusWaiting,
  104. }
  105. // Parse the workflow specification from the cron schedule
  106. workflows, err := jobparser.Parse(cron.Content)
  107. if err != nil {
  108. return err
  109. }
  110. // Insert the action run and its associated jobs into the database
  111. if err := actions_model.InsertRun(ctx, run, workflows); err != nil {
  112. return err
  113. }
  114. // Return nil if no errors occurred
  115. return nil
  116. }