diff options
author | Lunny Xiao <xiaolunwen@gmail.com> | 2023-08-24 11:06:51 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-08-24 03:06:51 +0000 |
commit | 0d55f64e6cd3de2e1e5c0ee795605823efb14231 (patch) | |
tree | 7098b87f2a624905ade7f2e3d4af1ca0327327e1 /services | |
parent | b62c8e7765a371600a300f62da96483a1ae0c731 (diff) | |
download | gitea-0d55f64e6cd3de2e1e5c0ee795605823efb14231.tar.gz gitea-0d55f64e6cd3de2e1e5c0ee795605823efb14231.zip |
chore(actions): support cron schedule task (#26655)
Replace #22751
1. only support the default branch in the repository setting.
2. autoload schedule data from the schedule table after starting the
service.
3. support specific syntax like `@yearly`, `@monthly`, `@weekly`,
`@daily`, `@hourly`
## How to use
See the [GitHub Actions
document](https://docs.github.com/en/actions/using-workflows/events-that-trigger-workflows#schedule)
for getting more detailed information.
```yaml
on:
schedule:
- cron: '30 5 * * 1,3'
- cron: '30 5 * * 2,4'
jobs:
test_schedule:
runs-on: ubuntu-latest
steps:
- name: Not on Monday or Wednesday
if: github.event.schedule != '30 5 * * 1,3'
run: echo "This step will be skipped on Monday and Wednesday"
- name: Every time
run: echo "This step will always run"
```
Signed-off-by: Bo-Yi.Wu <appleboy.tw@gmail.com>
---------
Co-authored-by: Jason Song <i@wolfogre.com>
Co-authored-by: techknowlogick <techknowlogick@gitea.io>
Co-authored-by: wxiaoguang <wxiaoguang@gmail.com>
Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
Diffstat (limited to 'services')
-rw-r--r-- | services/actions/notifier_helper.go | 108 | ||||
-rw-r--r-- | services/actions/schedule_tasks.go | 135 | ||||
-rw-r--r-- | services/cron/tasks_actions.go | 14 |
3 files changed, 253 insertions, 4 deletions
diff --git a/services/actions/notifier_helper.go b/services/actions/notifier_helper.go index 75c99ff19c..ff00e48c64 100644 --- a/services/actions/notifier_helper.go +++ b/services/actions/notifier_helper.go @@ -4,6 +4,7 @@ package actions import ( + "bytes" "context" "fmt" "strings" @@ -24,6 +25,7 @@ import ( "code.gitea.io/gitea/services/convert" "github.com/nektos/act/pkg/jobparser" + "github.com/nektos/act/pkg/model" ) var methodCtxKey struct{} @@ -143,15 +145,15 @@ func notify(ctx context.Context, input *notifyInput) error { } var detectedWorkflows []*actions_module.DetectedWorkflow - workflows, err := actions_module.DetectWorkflows(gitRepo, commit, input.Event, input.Payload) + actionsConfig := input.Repo.MustGetUnit(ctx, unit_model.TypeActions).ActionsConfig() + workflows, schedules, err := actions_module.DetectWorkflows(gitRepo, commit, input.Event, input.Payload) if err != nil { return fmt.Errorf("DetectWorkflows: %w", err) } + if len(workflows) == 0 { log.Trace("repo %s with commit %s couldn't find workflows", input.Repo.RepoPath(), commit.ID) } else { - actionsConfig := input.Repo.MustGetUnit(ctx, unit_model.TypeActions).ActionsConfig() - for _, wf := range workflows { if actionsConfig.IsWorkflowDisabled(wf.EntryName) { log.Trace("repo %s has disable workflows %s", input.Repo.RepoPath(), wf.EntryName) @@ -171,7 +173,7 @@ func notify(ctx context.Context, input *notifyInput) error { if err != nil { return fmt.Errorf("gitRepo.GetCommit: %w", err) } - baseWorkflows, err := actions_module.DetectWorkflows(gitRepo, baseCommit, input.Event, input.Payload) + baseWorkflows, _, err := actions_module.DetectWorkflows(gitRepo, baseCommit, input.Event, input.Payload) if err != nil { return fmt.Errorf("DetectWorkflows: %w", err) } @@ -186,7 +188,22 @@ func notify(ctx context.Context, input *notifyInput) error { } } + if err := handleSchedules(ctx, schedules, commit, input); err != nil { + return err + } + + return handleWorkflows(ctx, detectedWorkflows, commit, input, ref) +} + +func handleWorkflows( + ctx context.Context, + detectedWorkflows []*actions_module.DetectedWorkflow, + commit *git.Commit, + input *notifyInput, + ref string, +) error { if len(detectedWorkflows) == 0 { + log.Trace("repo %s with commit %s couldn't find workflows", input.Repo.RepoPath(), commit.ID) return nil } @@ -350,3 +367,86 @@ func ifNeedApproval(ctx context.Context, run *actions_model.ActionRun, repo *rep log.Trace("need approval because it's the first time user %d triggered actions", user.ID) return true, nil } + +func handleSchedules( + ctx context.Context, + detectedWorkflows []*actions_module.DetectedWorkflow, + commit *git.Commit, + input *notifyInput, +) error { + if len(detectedWorkflows) == 0 { + log.Trace("repo %s with commit %s couldn't find schedules", input.Repo.RepoPath(), commit.ID) + return nil + } + + branch, err := commit.GetBranchName() + if err != nil { + return err + } + if branch != input.Repo.DefaultBranch { + log.Trace("commit branch is not default branch in repo") + return nil + } + + rows, _, err := actions_model.FindSchedules(ctx, actions_model.FindScheduleOptions{RepoID: input.Repo.ID}) + if err != nil { + log.Error("FindCrons: %v", err) + return err + } + + if len(rows) > 0 { + if err := actions_model.DeleteScheduleTaskByRepo(ctx, input.Repo.ID); err != nil { + log.Error("DeleteCronTaskByRepo: %v", err) + } + } + + p, err := json.Marshal(input.Payload) + if err != nil { + return fmt.Errorf("json.Marshal: %w", err) + } + + crons := make([]*actions_model.ActionSchedule, 0, len(detectedWorkflows)) + for _, dwf := range detectedWorkflows { + // Check cron job condition. Only working in default branch + workflow, err := model.ReadWorkflow(bytes.NewReader(dwf.Content)) + if err != nil { + log.Error("ReadWorkflow: %v", err) + continue + } + schedules := workflow.OnSchedule() + if len(schedules) == 0 { + log.Warn("no schedule event") + continue + } + + run := &actions_model.ActionSchedule{ + Title: strings.SplitN(commit.CommitMessage, "\n", 2)[0], + RepoID: input.Repo.ID, + OwnerID: input.Repo.OwnerID, + WorkflowID: dwf.EntryName, + TriggerUserID: input.Doer.ID, + Ref: input.Ref, + CommitSHA: commit.ID.String(), + Event: input.Event, + EventPayload: string(p), + Specs: schedules, + Content: dwf.Content, + } + + // cancel running jobs if the event is push + if run.Event == webhook_module.HookEventPush { + // cancel running jobs of the same workflow + if err := actions_model.CancelRunningJobs( + ctx, + run.RepoID, + run.Ref, + run.WorkflowID, + ); err != nil { + log.Error("CancelRunningJobs: %v", err) + } + } + crons = append(crons, run) + } + + return actions_model.CreateScheduleTask(ctx, crons) +} diff --git a/services/actions/schedule_tasks.go b/services/actions/schedule_tasks.go new file mode 100644 index 0000000000..87131e0aab --- /dev/null +++ b/services/actions/schedule_tasks.go @@ -0,0 +1,135 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + "fmt" + "time" + + actions_model "code.gitea.io/gitea/models/actions" + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/timeutil" + webhook_module "code.gitea.io/gitea/modules/webhook" + + "github.com/nektos/act/pkg/jobparser" +) + +// StartScheduleTasks start the task +func StartScheduleTasks(ctx context.Context) error { + return startTasks(ctx) +} + +// startTasks retrieves specifications in pages, creates a schedule task for each specification, +// and updates the specification's next run time and previous run time. +// The function returns an error if there's an issue with finding or updating the specifications. +func startTasks(ctx context.Context) error { + // Set the page size + pageSize := 50 + + // Retrieve specs in pages until all specs have been retrieved + now := time.Now() + for page := 1; ; page++ { + // Retrieve the specs for the current page + specs, _, err := actions_model.FindSpecs(ctx, actions_model.FindSpecOptions{ + ListOptions: db.ListOptions{ + Page: page, + PageSize: pageSize, + }, + Next: now.Unix(), + }) + if err != nil { + return fmt.Errorf("find specs: %w", err) + } + + // Loop through each spec and create a schedule task for it + for _, row := range specs { + // cancel running jobs if the event is push + if row.Schedule.Event == webhook_module.HookEventPush { + // cancel running jobs of the same workflow + if err := actions_model.CancelRunningJobs( + ctx, + row.RepoID, + row.Schedule.Ref, + row.Schedule.WorkflowID, + ); err != nil { + log.Error("CancelRunningJobs: %v", err) + } + } + + if err := CreateScheduleTask(ctx, row.Schedule); err != nil { + log.Error("CreateScheduleTask: %v", err) + return err + } + + // Parse the spec + schedule, err := row.Parse() + if err != nil { + log.Error("Parse: %v", err) + return err + } + + // Update the spec's next run time and previous run time + row.Prev = row.Next + row.Next = timeutil.TimeStamp(schedule.Next(now.Add(1 * time.Minute)).Unix()) + if err := actions_model.UpdateScheduleSpec(ctx, row, "prev", "next"); err != nil { + log.Error("UpdateScheduleSpec: %v", err) + return err + } + } + + // Stop if all specs have been retrieved + if len(specs) < pageSize { + break + } + } + + return nil +} + +// CreateScheduleTask creates a scheduled task from a cron action schedule. +// It creates an action run based on the schedule, inserts it into the database, and creates commit statuses for each job. +func CreateScheduleTask(ctx context.Context, cron *actions_model.ActionSchedule) error { + // Create a new action run based on the schedule + run := &actions_model.ActionRun{ + Title: cron.Title, + RepoID: cron.RepoID, + OwnerID: cron.OwnerID, + WorkflowID: cron.WorkflowID, + TriggerUserID: cron.TriggerUserID, + Ref: cron.Ref, + CommitSHA: cron.CommitSHA, + Event: cron.Event, + EventPayload: cron.EventPayload, + Status: actions_model.StatusWaiting, + } + + // Parse the workflow specification from the cron schedule + workflows, err := jobparser.Parse(cron.Content) + if err != nil { + return err + } + + // Insert the action run and its associated jobs into the database + if err := actions_model.InsertRun(ctx, run, workflows); err != nil { + return err + } + + // Retrieve the jobs for the newly created action run + jobs, _, err := actions_model.FindRunJobs(ctx, actions_model.FindRunJobOptions{RunID: run.ID}) + if err != nil { + return err + } + + // Create commit statuses for each job + for _, job := range jobs { + if err := createCommitStatus(ctx, job); err != nil { + return err + } + } + + // Return nil if no errors occurred + return nil +} diff --git a/services/cron/tasks_actions.go b/services/cron/tasks_actions.go index 30e8749a5e..0875792503 100644 --- a/services/cron/tasks_actions.go +++ b/services/cron/tasks_actions.go @@ -18,6 +18,7 @@ func initActionsTasks() { registerStopZombieTasks() registerStopEndlessTasks() registerCancelAbandonedJobs() + registerScheduleTasks() } func registerStopZombieTasks() { @@ -49,3 +50,16 @@ func registerCancelAbandonedJobs() { return actions_service.CancelAbandonedJobs(ctx) }) } + +// registerScheduleTasks registers a scheduled task that runs every minute to start any due schedule tasks. +func registerScheduleTasks() { + // Register the task with a unique name, enabled status, and schedule for every minute. + RegisterTaskFatal("start_schedule_tasks", &BaseConfig{ + Enabled: true, + RunAtStart: false, + Schedule: "@every 1m", + }, func(ctx context.Context, _ *user_model.User, cfg Config) error { + // Call the function to start schedule tasks and pass the context. + return actions_service.StartScheduleTasks(ctx) + }) +} |