You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

schedule_tasks.go 4.0KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. // Copyright 2023 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package actions
  4. import (
  5. "context"
  6. "fmt"
  7. "time"
  8. actions_model "code.gitea.io/gitea/models/actions"
  9. "code.gitea.io/gitea/models/db"
  10. repo_model "code.gitea.io/gitea/models/repo"
  11. "code.gitea.io/gitea/models/unit"
  12. "code.gitea.io/gitea/modules/log"
  13. "code.gitea.io/gitea/modules/timeutil"
  14. webhook_module "code.gitea.io/gitea/modules/webhook"
  15. "github.com/nektos/act/pkg/jobparser"
  16. )
  17. // StartScheduleTasks start the task
  18. func StartScheduleTasks(ctx context.Context) error {
  19. return startTasks(ctx)
  20. }
  21. // startTasks retrieves specifications in pages, creates a schedule task for each specification,
  22. // and updates the specification's next run time and previous run time.
  23. // The function returns an error if there's an issue with finding or updating the specifications.
  24. func startTasks(ctx context.Context) error {
  25. // Set the page size
  26. pageSize := 50
  27. // Retrieve specs in pages until all specs have been retrieved
  28. now := time.Now()
  29. for page := 1; ; page++ {
  30. // Retrieve the specs for the current page
  31. specs, _, err := actions_model.FindSpecs(ctx, actions_model.FindSpecOptions{
  32. ListOptions: db.ListOptions{
  33. Page: page,
  34. PageSize: pageSize,
  35. },
  36. Next: now.Unix(),
  37. })
  38. if err != nil {
  39. return fmt.Errorf("find specs: %w", err)
  40. }
  41. if err := specs.LoadRepos(); err != nil {
  42. return fmt.Errorf("LoadRepos: %w", err)
  43. }
  44. // Loop through each spec and create a schedule task for it
  45. for _, row := range specs {
  46. // cancel running jobs if the event is push
  47. if row.Schedule.Event == webhook_module.HookEventPush {
  48. // cancel running jobs of the same workflow
  49. if err := actions_model.CancelRunningJobs(
  50. ctx,
  51. row.RepoID,
  52. row.Schedule.Ref,
  53. row.Schedule.WorkflowID,
  54. webhook_module.HookEventSchedule,
  55. ); err != nil {
  56. log.Error("CancelRunningJobs: %v", err)
  57. }
  58. }
  59. cfg, err := row.Repo.GetUnit(ctx, unit.TypeActions)
  60. if err != nil {
  61. if repo_model.IsErrUnitTypeNotExist(err) {
  62. // Skip the actions unit of this repo is disabled.
  63. continue
  64. }
  65. return fmt.Errorf("GetUnit: %w", err)
  66. }
  67. if cfg.ActionsConfig().IsWorkflowDisabled(row.Schedule.WorkflowID) {
  68. continue
  69. }
  70. if err := CreateScheduleTask(ctx, row.Schedule); err != nil {
  71. log.Error("CreateScheduleTask: %v", err)
  72. return err
  73. }
  74. // Parse the spec
  75. schedule, err := row.Parse()
  76. if err != nil {
  77. log.Error("Parse: %v", err)
  78. return err
  79. }
  80. // Update the spec's next run time and previous run time
  81. row.Prev = row.Next
  82. row.Next = timeutil.TimeStamp(schedule.Next(now.Add(1 * time.Minute)).Unix())
  83. if err := actions_model.UpdateScheduleSpec(ctx, row, "prev", "next"); err != nil {
  84. log.Error("UpdateScheduleSpec: %v", err)
  85. return err
  86. }
  87. }
  88. // Stop if all specs have been retrieved
  89. if len(specs) < pageSize {
  90. break
  91. }
  92. }
  93. return nil
  94. }
  95. // CreateScheduleTask creates a scheduled task from a cron action schedule.
  96. // It creates an action run based on the schedule, inserts it into the database, and creates commit statuses for each job.
  97. func CreateScheduleTask(ctx context.Context, cron *actions_model.ActionSchedule) error {
  98. // Create a new action run based on the schedule
  99. run := &actions_model.ActionRun{
  100. Title: cron.Title,
  101. RepoID: cron.RepoID,
  102. OwnerID: cron.OwnerID,
  103. WorkflowID: cron.WorkflowID,
  104. TriggerUserID: cron.TriggerUserID,
  105. Ref: cron.Ref,
  106. CommitSHA: cron.CommitSHA,
  107. Event: cron.Event,
  108. EventPayload: cron.EventPayload,
  109. TriggerEvent: string(webhook_module.HookEventSchedule),
  110. ScheduleID: cron.ID,
  111. Status: actions_model.StatusWaiting,
  112. }
  113. // Parse the workflow specification from the cron schedule
  114. workflows, err := jobparser.Parse(cron.Content)
  115. if err != nil {
  116. return err
  117. }
  118. // Insert the action run and its associated jobs into the database
  119. if err := actions_model.InsertRun(ctx, run, workflows); err != nil {
  120. return err
  121. }
  122. // Return nil if no errors occurred
  123. return nil
  124. }