aboutsummaryrefslogtreecommitdiffstats
path: root/services
diff options
context:
space:
mode:
authorLunny Xiao <xiaolunwen@gmail.com>2023-08-24 11:06:51 +0800
committerGitHub <noreply@github.com>2023-08-24 03:06:51 +0000
commit0d55f64e6cd3de2e1e5c0ee795605823efb14231 (patch)
tree7098b87f2a624905ade7f2e3d4af1ca0327327e1 /services
parentb62c8e7765a371600a300f62da96483a1ae0c731 (diff)
downloadgitea-0d55f64e6cd3de2e1e5c0ee795605823efb14231.tar.gz
gitea-0d55f64e6cd3de2e1e5c0ee795605823efb14231.zip
chore(actions): support cron schedule task (#26655)
Replace #22751 1. only support the default branch in the repository setting. 2. autoload schedule data from the schedule table after starting the service. 3. support specific syntax like `@yearly`, `@monthly`, `@weekly`, `@daily`, `@hourly` ## How to use See the [GitHub Actions document](https://docs.github.com/en/actions/using-workflows/events-that-trigger-workflows#schedule) for getting more detailed information. ```yaml on: schedule: - cron: '30 5 * * 1,3' - cron: '30 5 * * 2,4' jobs: test_schedule: runs-on: ubuntu-latest steps: - name: Not on Monday or Wednesday if: github.event.schedule != '30 5 * * 1,3' run: echo "This step will be skipped on Monday and Wednesday" - name: Every time run: echo "This step will always run" ``` Signed-off-by: Bo-Yi.Wu <appleboy.tw@gmail.com> --------- Co-authored-by: Jason Song <i@wolfogre.com> Co-authored-by: techknowlogick <techknowlogick@gitea.io> Co-authored-by: wxiaoguang <wxiaoguang@gmail.com> Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
Diffstat (limited to 'services')
-rw-r--r--services/actions/notifier_helper.go108
-rw-r--r--services/actions/schedule_tasks.go135
-rw-r--r--services/cron/tasks_actions.go14
3 files changed, 253 insertions, 4 deletions
diff --git a/services/actions/notifier_helper.go b/services/actions/notifier_helper.go
index 75c99ff19c..ff00e48c64 100644
--- a/services/actions/notifier_helper.go
+++ b/services/actions/notifier_helper.go
@@ -4,6 +4,7 @@
package actions
import (
+ "bytes"
"context"
"fmt"
"strings"
@@ -24,6 +25,7 @@ import (
"code.gitea.io/gitea/services/convert"
"github.com/nektos/act/pkg/jobparser"
+ "github.com/nektos/act/pkg/model"
)
var methodCtxKey struct{}
@@ -143,15 +145,15 @@ func notify(ctx context.Context, input *notifyInput) error {
}
var detectedWorkflows []*actions_module.DetectedWorkflow
- workflows, err := actions_module.DetectWorkflows(gitRepo, commit, input.Event, input.Payload)
+ actionsConfig := input.Repo.MustGetUnit(ctx, unit_model.TypeActions).ActionsConfig()
+ workflows, schedules, err := actions_module.DetectWorkflows(gitRepo, commit, input.Event, input.Payload)
if err != nil {
return fmt.Errorf("DetectWorkflows: %w", err)
}
+
if len(workflows) == 0 {
log.Trace("repo %s with commit %s couldn't find workflows", input.Repo.RepoPath(), commit.ID)
} else {
- actionsConfig := input.Repo.MustGetUnit(ctx, unit_model.TypeActions).ActionsConfig()
-
for _, wf := range workflows {
if actionsConfig.IsWorkflowDisabled(wf.EntryName) {
log.Trace("repo %s has disable workflows %s", input.Repo.RepoPath(), wf.EntryName)
@@ -171,7 +173,7 @@ func notify(ctx context.Context, input *notifyInput) error {
if err != nil {
return fmt.Errorf("gitRepo.GetCommit: %w", err)
}
- baseWorkflows, err := actions_module.DetectWorkflows(gitRepo, baseCommit, input.Event, input.Payload)
+ baseWorkflows, _, err := actions_module.DetectWorkflows(gitRepo, baseCommit, input.Event, input.Payload)
if err != nil {
return fmt.Errorf("DetectWorkflows: %w", err)
}
@@ -186,7 +188,22 @@ func notify(ctx context.Context, input *notifyInput) error {
}
}
+ if err := handleSchedules(ctx, schedules, commit, input); err != nil {
+ return err
+ }
+
+ return handleWorkflows(ctx, detectedWorkflows, commit, input, ref)
+}
+
+func handleWorkflows(
+ ctx context.Context,
+ detectedWorkflows []*actions_module.DetectedWorkflow,
+ commit *git.Commit,
+ input *notifyInput,
+ ref string,
+) error {
if len(detectedWorkflows) == 0 {
+ log.Trace("repo %s with commit %s couldn't find workflows", input.Repo.RepoPath(), commit.ID)
return nil
}
@@ -350,3 +367,86 @@ func ifNeedApproval(ctx context.Context, run *actions_model.ActionRun, repo *rep
log.Trace("need approval because it's the first time user %d triggered actions", user.ID)
return true, nil
}
+
+func handleSchedules(
+ ctx context.Context,
+ detectedWorkflows []*actions_module.DetectedWorkflow,
+ commit *git.Commit,
+ input *notifyInput,
+) error {
+ if len(detectedWorkflows) == 0 {
+ log.Trace("repo %s with commit %s couldn't find schedules", input.Repo.RepoPath(), commit.ID)
+ return nil
+ }
+
+ branch, err := commit.GetBranchName()
+ if err != nil {
+ return err
+ }
+ if branch != input.Repo.DefaultBranch {
+ log.Trace("commit branch is not default branch in repo")
+ return nil
+ }
+
+ rows, _, err := actions_model.FindSchedules(ctx, actions_model.FindScheduleOptions{RepoID: input.Repo.ID})
+ if err != nil {
+ log.Error("FindCrons: %v", err)
+ return err
+ }
+
+ if len(rows) > 0 {
+ if err := actions_model.DeleteScheduleTaskByRepo(ctx, input.Repo.ID); err != nil {
+ log.Error("DeleteCronTaskByRepo: %v", err)
+ }
+ }
+
+ p, err := json.Marshal(input.Payload)
+ if err != nil {
+ return fmt.Errorf("json.Marshal: %w", err)
+ }
+
+ crons := make([]*actions_model.ActionSchedule, 0, len(detectedWorkflows))
+ for _, dwf := range detectedWorkflows {
+ // Check cron job condition. Only working in default branch
+ workflow, err := model.ReadWorkflow(bytes.NewReader(dwf.Content))
+ if err != nil {
+ log.Error("ReadWorkflow: %v", err)
+ continue
+ }
+ schedules := workflow.OnSchedule()
+ if len(schedules) == 0 {
+ log.Warn("no schedule event")
+ continue
+ }
+
+ run := &actions_model.ActionSchedule{
+ Title: strings.SplitN(commit.CommitMessage, "\n", 2)[0],
+ RepoID: input.Repo.ID,
+ OwnerID: input.Repo.OwnerID,
+ WorkflowID: dwf.EntryName,
+ TriggerUserID: input.Doer.ID,
+ Ref: input.Ref,
+ CommitSHA: commit.ID.String(),
+ Event: input.Event,
+ EventPayload: string(p),
+ Specs: schedules,
+ Content: dwf.Content,
+ }
+
+ // cancel running jobs if the event is push
+ if run.Event == webhook_module.HookEventPush {
+ // cancel running jobs of the same workflow
+ if err := actions_model.CancelRunningJobs(
+ ctx,
+ run.RepoID,
+ run.Ref,
+ run.WorkflowID,
+ ); err != nil {
+ log.Error("CancelRunningJobs: %v", err)
+ }
+ }
+ crons = append(crons, run)
+ }
+
+ return actions_model.CreateScheduleTask(ctx, crons)
+}
diff --git a/services/actions/schedule_tasks.go b/services/actions/schedule_tasks.go
new file mode 100644
index 0000000000..87131e0aab
--- /dev/null
+++ b/services/actions/schedule_tasks.go
@@ -0,0 +1,135 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package actions
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ actions_model "code.gitea.io/gitea/models/actions"
+ "code.gitea.io/gitea/models/db"
+ "code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/timeutil"
+ webhook_module "code.gitea.io/gitea/modules/webhook"
+
+ "github.com/nektos/act/pkg/jobparser"
+)
+
+// StartScheduleTasks start the task
+func StartScheduleTasks(ctx context.Context) error {
+ return startTasks(ctx)
+}
+
+// startTasks retrieves specifications in pages, creates a schedule task for each specification,
+// and updates the specification's next run time and previous run time.
+// The function returns an error if there's an issue with finding or updating the specifications.
+func startTasks(ctx context.Context) error {
+ // Set the page size
+ pageSize := 50
+
+ // Retrieve specs in pages until all specs have been retrieved
+ now := time.Now()
+ for page := 1; ; page++ {
+ // Retrieve the specs for the current page
+ specs, _, err := actions_model.FindSpecs(ctx, actions_model.FindSpecOptions{
+ ListOptions: db.ListOptions{
+ Page: page,
+ PageSize: pageSize,
+ },
+ Next: now.Unix(),
+ })
+ if err != nil {
+ return fmt.Errorf("find specs: %w", err)
+ }
+
+ // Loop through each spec and create a schedule task for it
+ for _, row := range specs {
+ // cancel running jobs if the event is push
+ if row.Schedule.Event == webhook_module.HookEventPush {
+ // cancel running jobs of the same workflow
+ if err := actions_model.CancelRunningJobs(
+ ctx,
+ row.RepoID,
+ row.Schedule.Ref,
+ row.Schedule.WorkflowID,
+ ); err != nil {
+ log.Error("CancelRunningJobs: %v", err)
+ }
+ }
+
+ if err := CreateScheduleTask(ctx, row.Schedule); err != nil {
+ log.Error("CreateScheduleTask: %v", err)
+ return err
+ }
+
+ // Parse the spec
+ schedule, err := row.Parse()
+ if err != nil {
+ log.Error("Parse: %v", err)
+ return err
+ }
+
+ // Update the spec's next run time and previous run time
+ row.Prev = row.Next
+ row.Next = timeutil.TimeStamp(schedule.Next(now.Add(1 * time.Minute)).Unix())
+ if err := actions_model.UpdateScheduleSpec(ctx, row, "prev", "next"); err != nil {
+ log.Error("UpdateScheduleSpec: %v", err)
+ return err
+ }
+ }
+
+ // Stop if all specs have been retrieved
+ if len(specs) < pageSize {
+ break
+ }
+ }
+
+ return nil
+}
+
+// CreateScheduleTask creates a scheduled task from a cron action schedule.
+// It creates an action run based on the schedule, inserts it into the database, and creates commit statuses for each job.
+func CreateScheduleTask(ctx context.Context, cron *actions_model.ActionSchedule) error {
+ // Create a new action run based on the schedule
+ run := &actions_model.ActionRun{
+ Title: cron.Title,
+ RepoID: cron.RepoID,
+ OwnerID: cron.OwnerID,
+ WorkflowID: cron.WorkflowID,
+ TriggerUserID: cron.TriggerUserID,
+ Ref: cron.Ref,
+ CommitSHA: cron.CommitSHA,
+ Event: cron.Event,
+ EventPayload: cron.EventPayload,
+ Status: actions_model.StatusWaiting,
+ }
+
+ // Parse the workflow specification from the cron schedule
+ workflows, err := jobparser.Parse(cron.Content)
+ if err != nil {
+ return err
+ }
+
+ // Insert the action run and its associated jobs into the database
+ if err := actions_model.InsertRun(ctx, run, workflows); err != nil {
+ return err
+ }
+
+ // Retrieve the jobs for the newly created action run
+ jobs, _, err := actions_model.FindRunJobs(ctx, actions_model.FindRunJobOptions{RunID: run.ID})
+ if err != nil {
+ return err
+ }
+
+ // Create commit statuses for each job
+ for _, job := range jobs {
+ if err := createCommitStatus(ctx, job); err != nil {
+ return err
+ }
+ }
+
+ // Return nil if no errors occurred
+ return nil
+}
diff --git a/services/cron/tasks_actions.go b/services/cron/tasks_actions.go
index 30e8749a5e..0875792503 100644
--- a/services/cron/tasks_actions.go
+++ b/services/cron/tasks_actions.go
@@ -18,6 +18,7 @@ func initActionsTasks() {
registerStopZombieTasks()
registerStopEndlessTasks()
registerCancelAbandonedJobs()
+ registerScheduleTasks()
}
func registerStopZombieTasks() {
@@ -49,3 +50,16 @@ func registerCancelAbandonedJobs() {
return actions_service.CancelAbandonedJobs(ctx)
})
}
+
+// registerScheduleTasks registers a scheduled task that runs every minute to start any due schedule tasks.
+func registerScheduleTasks() {
+ // Register the task with a unique name, enabled status, and schedule for every minute.
+ RegisterTaskFatal("start_schedule_tasks", &BaseConfig{
+ Enabled: true,
+ RunAtStart: false,
+ Schedule: "@every 1m",
+ }, func(ctx context.Context, _ *user_model.User, cfg Config) error {
+ // Call the function to start schedule tasks and pass the context.
+ return actions_service.StartScheduleTasks(ctx)
+ })
+}