summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--go.mod2
-rw-r--r--models/actions/schedule.go120
-rw-r--r--models/actions/schedule_list.go94
-rw-r--r--models/actions/schedule_spec.go50
-rw-r--r--models/actions/schedule_spec_list.go106
-rw-r--r--models/migrations/migrations.go2
-rw-r--r--models/migrations/v1_21/v273.go45
-rw-r--r--models/repo.go2
-rw-r--r--modules/actions/workflows.go23
-rw-r--r--options/locale/locale_en-US.ini1
-rw-r--r--services/actions/notifier_helper.go108
-rw-r--r--services/actions/schedule_tasks.go135
-rw-r--r--services/cron/tasks_actions.go14
13 files changed, 693 insertions, 9 deletions
diff --git a/go.mod b/go.mod
index 968c0663e0..1996d29a84 100644
--- a/go.mod
+++ b/go.mod
@@ -90,6 +90,7 @@ require (
github.com/prometheus/client_golang v1.16.0
github.com/quasoft/websspi v1.1.2
github.com/redis/go-redis/v9 v9.0.5
+ github.com/robfig/cron/v3 v3.0.1
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1
github.com/sassoftware/go-rpmutils v0.2.0
github.com/sergi/go-diff v1.3.1
@@ -254,7 +255,6 @@ require (
github.com/rhysd/actionlint v1.6.25 // indirect
github.com/rivo/uniseg v0.4.4 // indirect
github.com/robfig/cron v1.2.0 // indirect
- github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/rs/xid v1.5.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
diff --git a/models/actions/schedule.go b/models/actions/schedule.go
new file mode 100644
index 0000000000..b0bc40dadc
--- /dev/null
+++ b/models/actions/schedule.go
@@ -0,0 +1,120 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package actions
+
+import (
+ "context"
+ "time"
+
+ "code.gitea.io/gitea/models/db"
+ repo_model "code.gitea.io/gitea/models/repo"
+ user_model "code.gitea.io/gitea/models/user"
+ "code.gitea.io/gitea/modules/timeutil"
+ webhook_module "code.gitea.io/gitea/modules/webhook"
+
+ "github.com/robfig/cron/v3"
+)
+
+// ActionSchedule represents a schedule of a workflow file
+type ActionSchedule struct {
+ ID int64
+ Title string
+ Specs []string
+ RepoID int64 `xorm:"index"`
+ Repo *repo_model.Repository `xorm:"-"`
+ OwnerID int64 `xorm:"index"`
+ WorkflowID string
+ TriggerUserID int64
+ TriggerUser *user_model.User `xorm:"-"`
+ Ref string
+ CommitSHA string
+ Event webhook_module.HookEventType
+ EventPayload string `xorm:"LONGTEXT"`
+ Content []byte
+ Created timeutil.TimeStamp `xorm:"created"`
+ Updated timeutil.TimeStamp `xorm:"updated"`
+}
+
+func init() {
+ db.RegisterModel(new(ActionSchedule))
+}
+
+// GetSchedulesMapByIDs returns the schedules by given id slice.
+func GetSchedulesMapByIDs(ids []int64) (map[int64]*ActionSchedule, error) {
+ schedules := make(map[int64]*ActionSchedule, len(ids))
+ return schedules, db.GetEngine(db.DefaultContext).In("id", ids).Find(&schedules)
+}
+
+// GetReposMapByIDs returns the repos by given id slice.
+func GetReposMapByIDs(ids []int64) (map[int64]*repo_model.Repository, error) {
+ repos := make(map[int64]*repo_model.Repository, len(ids))
+ return repos, db.GetEngine(db.DefaultContext).In("id", ids).Find(&repos)
+}
+
+var cronParser = cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor)
+
+// CreateScheduleTask creates new schedule task.
+func CreateScheduleTask(ctx context.Context, rows []*ActionSchedule) error {
+ // Return early if there are no rows to insert
+ if len(rows) == 0 {
+ return nil
+ }
+
+ // Begin transaction
+ ctx, committer, err := db.TxContext(ctx)
+ if err != nil {
+ return err
+ }
+ defer committer.Close()
+
+ // Loop through each schedule row
+ for _, row := range rows {
+ // Create new schedule row
+ if err = db.Insert(ctx, row); err != nil {
+ return err
+ }
+
+ // Loop through each schedule spec and create a new spec row
+ now := time.Now()
+
+ for _, spec := range row.Specs {
+ // Parse the spec and check for errors
+ schedule, err := cronParser.Parse(spec)
+ if err != nil {
+ continue // skip to the next spec if there's an error
+ }
+
+ // Insert the new schedule spec row
+ if err = db.Insert(ctx, &ActionScheduleSpec{
+ RepoID: row.RepoID,
+ ScheduleID: row.ID,
+ Spec: spec,
+ Next: timeutil.TimeStamp(schedule.Next(now).Unix()),
+ }); err != nil {
+ return err
+ }
+ }
+ }
+
+ // Commit transaction
+ return committer.Commit()
+}
+
+func DeleteScheduleTaskByRepo(ctx context.Context, id int64) error {
+ ctx, committer, err := db.TxContext(ctx)
+ if err != nil {
+ return err
+ }
+ defer committer.Close()
+
+ if _, err := db.GetEngine(ctx).Delete(&ActionSchedule{RepoID: id}); err != nil {
+ return err
+ }
+
+ if _, err := db.GetEngine(ctx).Delete(&ActionScheduleSpec{RepoID: id}); err != nil {
+ return err
+ }
+
+ return committer.Commit()
+}
diff --git a/models/actions/schedule_list.go b/models/actions/schedule_list.go
new file mode 100644
index 0000000000..e873c05ec3
--- /dev/null
+++ b/models/actions/schedule_list.go
@@ -0,0 +1,94 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package actions
+
+import (
+ "context"
+
+ "code.gitea.io/gitea/models/db"
+ repo_model "code.gitea.io/gitea/models/repo"
+ user_model "code.gitea.io/gitea/models/user"
+ "code.gitea.io/gitea/modules/container"
+
+ "xorm.io/builder"
+)
+
+type ScheduleList []*ActionSchedule
+
+// GetUserIDs returns a slice of user's id
+func (schedules ScheduleList) GetUserIDs() []int64 {
+ ids := make(container.Set[int64], len(schedules))
+ for _, schedule := range schedules {
+ ids.Add(schedule.TriggerUserID)
+ }
+ return ids.Values()
+}
+
+func (schedules ScheduleList) GetRepoIDs() []int64 {
+ ids := make(container.Set[int64], len(schedules))
+ for _, schedule := range schedules {
+ ids.Add(schedule.RepoID)
+ }
+ return ids.Values()
+}
+
+func (schedules ScheduleList) LoadTriggerUser(ctx context.Context) error {
+ userIDs := schedules.GetUserIDs()
+ users := make(map[int64]*user_model.User, len(userIDs))
+ if err := db.GetEngine(ctx).In("id", userIDs).Find(&users); err != nil {
+ return err
+ }
+ for _, schedule := range schedules {
+ if schedule.TriggerUserID == user_model.ActionsUserID {
+ schedule.TriggerUser = user_model.NewActionsUser()
+ } else {
+ schedule.TriggerUser = users[schedule.TriggerUserID]
+ }
+ }
+ return nil
+}
+
+func (schedules ScheduleList) LoadRepos() error {
+ repoIDs := schedules.GetRepoIDs()
+ repos, err := repo_model.GetRepositoriesMapByIDs(repoIDs)
+ if err != nil {
+ return err
+ }
+ for _, schedule := range schedules {
+ schedule.Repo = repos[schedule.RepoID]
+ }
+ return nil
+}
+
+type FindScheduleOptions struct {
+ db.ListOptions
+ RepoID int64
+ OwnerID int64
+}
+
+func (opts FindScheduleOptions) toConds() builder.Cond {
+ cond := builder.NewCond()
+ if opts.RepoID > 0 {
+ cond = cond.And(builder.Eq{"repo_id": opts.RepoID})
+ }
+ if opts.OwnerID > 0 {
+ cond = cond.And(builder.Eq{"owner_id": opts.OwnerID})
+ }
+
+ return cond
+}
+
+func FindSchedules(ctx context.Context, opts FindScheduleOptions) (ScheduleList, int64, error) {
+ e := db.GetEngine(ctx).Where(opts.toConds())
+ if !opts.ListAll && opts.PageSize > 0 && opts.Page >= 1 {
+ e.Limit(opts.PageSize, (opts.Page-1)*opts.PageSize)
+ }
+ var schedules ScheduleList
+ total, err := e.Desc("id").FindAndCount(&schedules)
+ return schedules, total, err
+}
+
+func CountSchedules(ctx context.Context, opts FindScheduleOptions) (int64, error) {
+ return db.GetEngine(ctx).Where(opts.toConds()).Count(new(ActionSchedule))
+}
diff --git a/models/actions/schedule_spec.go b/models/actions/schedule_spec.go
new file mode 100644
index 0000000000..91240459a0
--- /dev/null
+++ b/models/actions/schedule_spec.go
@@ -0,0 +1,50 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package actions
+
+import (
+ "context"
+
+ "code.gitea.io/gitea/models/db"
+ repo_model "code.gitea.io/gitea/models/repo"
+ "code.gitea.io/gitea/modules/timeutil"
+
+ "github.com/robfig/cron/v3"
+)
+
+// ActionScheduleSpec represents a schedule spec of a workflow file
+type ActionScheduleSpec struct {
+ ID int64
+ RepoID int64 `xorm:"index"`
+ Repo *repo_model.Repository `xorm:"-"`
+ ScheduleID int64 `xorm:"index"`
+ Schedule *ActionSchedule `xorm:"-"`
+
+ // Next time the job will run, or the zero time if Cron has not been
+ // started or this entry's schedule is unsatisfiable
+ Next timeutil.TimeStamp `xorm:"index"`
+ // Prev is the last time this job was run, or the zero time if never.
+ Prev timeutil.TimeStamp
+ Spec string
+
+ Created timeutil.TimeStamp `xorm:"created"`
+ Updated timeutil.TimeStamp `xorm:"updated"`
+}
+
+func (s *ActionScheduleSpec) Parse() (cron.Schedule, error) {
+ return cronParser.Parse(s.Spec)
+}
+
+func init() {
+ db.RegisterModel(new(ActionScheduleSpec))
+}
+
+func UpdateScheduleSpec(ctx context.Context, spec *ActionScheduleSpec, cols ...string) error {
+ sess := db.GetEngine(ctx).ID(spec.ID)
+ if len(cols) > 0 {
+ sess.Cols(cols...)
+ }
+ _, err := sess.Update(spec)
+ return err
+}
diff --git a/models/actions/schedule_spec_list.go b/models/actions/schedule_spec_list.go
new file mode 100644
index 0000000000..d379490b4e
--- /dev/null
+++ b/models/actions/schedule_spec_list.go
@@ -0,0 +1,106 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package actions
+
+import (
+ "context"
+
+ "code.gitea.io/gitea/models/db"
+ repo_model "code.gitea.io/gitea/models/repo"
+ "code.gitea.io/gitea/modules/container"
+
+ "xorm.io/builder"
+)
+
+type SpecList []*ActionScheduleSpec
+
+func (specs SpecList) GetScheduleIDs() []int64 {
+ ids := make(container.Set[int64], len(specs))
+ for _, spec := range specs {
+ ids.Add(spec.ScheduleID)
+ }
+ return ids.Values()
+}
+
+func (specs SpecList) LoadSchedules() error {
+ scheduleIDs := specs.GetScheduleIDs()
+ schedules, err := GetSchedulesMapByIDs(scheduleIDs)
+ if err != nil {
+ return err
+ }
+ for _, spec := range specs {
+ spec.Schedule = schedules[spec.ScheduleID]
+ }
+
+ repoIDs := specs.GetRepoIDs()
+ repos, err := GetReposMapByIDs(repoIDs)
+ if err != nil {
+ return err
+ }
+ for _, spec := range specs {
+ spec.Repo = repos[spec.RepoID]
+ }
+
+ return nil
+}
+
+func (specs SpecList) GetRepoIDs() []int64 {
+ ids := make(container.Set[int64], len(specs))
+ for _, spec := range specs {
+ ids.Add(spec.RepoID)
+ }
+ return ids.Values()
+}
+
+func (specs SpecList) LoadRepos() error {
+ repoIDs := specs.GetRepoIDs()
+ repos, err := repo_model.GetRepositoriesMapByIDs(repoIDs)
+ if err != nil {
+ return err
+ }
+ for _, spec := range specs {
+ spec.Repo = repos[spec.RepoID]
+ }
+ return nil
+}
+
+type FindSpecOptions struct {
+ db.ListOptions
+ RepoID int64
+ Next int64
+}
+
+func (opts FindSpecOptions) toConds() builder.Cond {
+ cond := builder.NewCond()
+ if opts.RepoID > 0 {
+ cond = cond.And(builder.Eq{"repo_id": opts.RepoID})
+ }
+
+ if opts.Next > 0 {
+ cond = cond.And(builder.Lte{"next": opts.Next})
+ }
+
+ return cond
+}
+
+func FindSpecs(ctx context.Context, opts FindSpecOptions) (SpecList, int64, error) {
+ e := db.GetEngine(ctx).Where(opts.toConds())
+ if opts.PageSize > 0 && opts.Page >= 1 {
+ e.Limit(opts.PageSize, (opts.Page-1)*opts.PageSize)
+ }
+ var specs SpecList
+ total, err := e.Desc("id").FindAndCount(&specs)
+ if err != nil {
+ return nil, 0, err
+ }
+
+ if err := specs.LoadSchedules(); err != nil {
+ return nil, 0, err
+ }
+ return specs, total, nil
+}
+
+func CountSpecs(ctx context.Context, opts FindSpecOptions) (int64, error) {
+ return db.GetEngine(ctx).Where(opts.toConds()).Count(new(ActionScheduleSpec))
+}
diff --git a/models/migrations/migrations.go b/models/migrations/migrations.go
index 87c597b573..9f4acda236 100644
--- a/models/migrations/migrations.go
+++ b/models/migrations/migrations.go
@@ -526,6 +526,8 @@ var migrations = []Migration{
NewMigration("Allow archiving labels", v1_21.AddArchivedUnixColumInLabelTable),
// v272 -> v273
NewMigration("Add Version to ActionRun table", v1_21.AddVersionToActionRunTable),
+ // v273 -> v274
+ NewMigration("Add Action Schedule Table", v1_21.AddActionScheduleTable),
}
// GetCurrentDBVersion returns the current db version
diff --git a/models/migrations/v1_21/v273.go b/models/migrations/v1_21/v273.go
new file mode 100644
index 0000000000..61c79f4a76
--- /dev/null
+++ b/models/migrations/v1_21/v273.go
@@ -0,0 +1,45 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package v1_21 //nolint
+import (
+ "code.gitea.io/gitea/modules/timeutil"
+
+ "xorm.io/xorm"
+)
+
+func AddActionScheduleTable(x *xorm.Engine) error {
+ type ActionSchedule struct {
+ ID int64
+ Title string
+ Specs []string
+ RepoID int64 `xorm:"index"`
+ OwnerID int64 `xorm:"index"`
+ WorkflowID string
+ TriggerUserID int64
+ Ref string
+ CommitSHA string
+ Event string
+ EventPayload string `xorm:"LONGTEXT"`
+ Content []byte
+ Created timeutil.TimeStamp `xorm:"created"`
+ Updated timeutil.TimeStamp `xorm:"updated"`
+ }
+
+ type ActionScheduleSpec struct {
+ ID int64
+ RepoID int64 `xorm:"index"`
+ ScheduleID int64 `xorm:"index"`
+ Spec string
+ Next timeutil.TimeStamp `xorm:"index"`
+ Prev timeutil.TimeStamp
+
+ Created timeutil.TimeStamp `xorm:"created"`
+ Updated timeutil.TimeStamp `xorm:"updated"`
+ }
+
+ return x.Sync(
+ new(ActionSchedule),
+ new(ActionScheduleSpec),
+ )
+}
diff --git a/models/repo.go b/models/repo.go
index 7579d2ad73..74a88d4c48 100644
--- a/models/repo.go
+++ b/models/repo.go
@@ -170,6 +170,8 @@ func DeleteRepository(doer *user_model.User, uid, repoID int64) error {
&actions_model.ActionRunJob{RepoID: repoID},
&actions_model.ActionRun{RepoID: repoID},
&actions_model.ActionRunner{RepoID: repoID},
+ &actions_model.ActionScheduleSpec{RepoID: repoID},
+ &actions_model.ActionSchedule{RepoID: repoID},
&actions_model.ActionArtifact{RepoID: repoID},
); err != nil {
return fmt.Errorf("deleteBeans: %w", err)
diff --git a/modules/actions/workflows.go b/modules/actions/workflows.go
index de340a74ec..408fdb8f8e 100644
--- a/modules/actions/workflows.go
+++ b/modules/actions/workflows.go
@@ -95,18 +95,25 @@ func GetEventsFromContent(content []byte) ([]*jobparser.Event, error) {
return events, nil
}
-func DetectWorkflows(gitRepo *git.Repository, commit *git.Commit, triggedEvent webhook_module.HookEventType, payload api.Payloader) ([]*DetectedWorkflow, error) {
+func DetectWorkflows(
+ gitRepo *git.Repository,
+ commit *git.Commit,
+ triggedEvent webhook_module.HookEventType,
+ payload api.Payloader,
+) ([]*DetectedWorkflow, []*DetectedWorkflow, error) {
entries, err := ListWorkflows(commit)
if err != nil {
- return nil, err
+ return nil, nil, err
}
workflows := make([]*DetectedWorkflow, 0, len(entries))
+ schedules := make([]*DetectedWorkflow, 0, len(entries))
for _, entry := range entries {
content, err := GetContentFromEntry(entry)
if err != nil {
- return nil, err
+ return nil, nil, err
}
+
events, err := GetEventsFromContent(content)
if err != nil {
log.Warn("ignore invalid workflow %q: %v", entry.Name(), err)
@@ -114,6 +121,14 @@ func DetectWorkflows(gitRepo *git.Repository, commit *git.Commit, triggedEvent w
}
for _, evt := range events {
log.Trace("detect workflow %q for event %#v matching %q", entry.Name(), evt, triggedEvent)
+ if evt.IsSchedule() {
+ dwf := &DetectedWorkflow{
+ EntryName: entry.Name(),
+ TriggerEvent: evt.Name,
+ Content: content,
+ }
+ schedules = append(schedules, dwf)
+ }
if detectMatched(gitRepo, commit, triggedEvent, payload, evt) {
dwf := &DetectedWorkflow{
EntryName: entry.Name(),
@@ -125,7 +140,7 @@ func DetectWorkflows(gitRepo *git.Repository, commit *git.Commit, triggedEvent w
}
}
- return workflows, nil
+ return workflows, schedules, nil
}
func detectMatched(gitRepo *git.Repository, commit *git.Commit, triggedEvent webhook_module.HookEventType, payload api.Payloader, evt *jobparser.Event) bool {
diff --git a/options/locale/locale_en-US.ini b/options/locale/locale_en-US.ini
index 587a2b14bc..f08d2b7eae 100644
--- a/options/locale/locale_en-US.ini
+++ b/options/locale/locale_en-US.ini
@@ -2756,6 +2756,7 @@ dashboard.gc_lfs = Garbage collect LFS meta objects
dashboard.stop_zombie_tasks = Stop zombie tasks
dashboard.stop_endless_tasks = Stop endless tasks
dashboard.cancel_abandoned_jobs = Cancel abandoned jobs
+dashboard.start_schedule_tasks = Start schedule tasks
dashboard.sync_branch.started = Branches Sync started
dashboard.rebuild_issue_indexer = Rebuild issue indexer
diff --git a/services/actions/notifier_helper.go b/services/actions/notifier_helper.go
index 75c99ff19c..ff00e48c64 100644
--- a/services/actions/notifier_helper.go
+++ b/services/actions/notifier_helper.go
@@ -4,6 +4,7 @@
package actions
import (
+ "bytes"
"context"
"fmt"
"strings"
@@ -24,6 +25,7 @@ import (
"code.gitea.io/gitea/services/convert"
"github.com/nektos/act/pkg/jobparser"
+ "github.com/nektos/act/pkg/model"
)
var methodCtxKey struct{}
@@ -143,15 +145,15 @@ func notify(ctx context.Context, input *notifyInput) error {
}
var detectedWorkflows []*actions_module.DetectedWorkflow
- workflows, err := actions_module.DetectWorkflows(gitRepo, commit, input.Event, input.Payload)
+ actionsConfig := input.Repo.MustGetUnit(ctx, unit_model.TypeActions).ActionsConfig()
+ workflows, schedules, err := actions_module.DetectWorkflows(gitRepo, commit, input.Event, input.Payload)
if err != nil {
return fmt.Errorf("DetectWorkflows: %w", err)
}
+
if len(workflows) == 0 {
log.Trace("repo %s with commit %s couldn't find workflows", input.Repo.RepoPath(), commit.ID)
} else {
- actionsConfig := input.Repo.MustGetUnit(ctx, unit_model.TypeActions).ActionsConfig()
-
for _, wf := range workflows {
if actionsConfig.IsWorkflowDisabled(wf.EntryName) {
log.Trace("repo %s has disable workflows %s", input.Repo.RepoPath(), wf.EntryName)
@@ -171,7 +173,7 @@ func notify(ctx context.Context, input *notifyInput) error {
if err != nil {
return fmt.Errorf("gitRepo.GetCommit: %w", err)
}
- baseWorkflows, err := actions_module.DetectWorkflows(gitRepo, baseCommit, input.Event, input.Payload)
+ baseWorkflows, _, err := actions_module.DetectWorkflows(gitRepo, baseCommit, input.Event, input.Payload)
if err != nil {
return fmt.Errorf("DetectWorkflows: %w", err)
}
@@ -186,7 +188,22 @@ func notify(ctx context.Context, input *notifyInput) error {
}
}
+ if err := handleSchedules(ctx, schedules, commit, input); err != nil {
+ return err
+ }
+
+ return handleWorkflows(ctx, detectedWorkflows, commit, input, ref)
+}
+
+func handleWorkflows(
+ ctx context.Context,
+ detectedWorkflows []*actions_module.DetectedWorkflow,
+ commit *git.Commit,
+ input *notifyInput,
+ ref string,
+) error {
if len(detectedWorkflows) == 0 {
+ log.Trace("repo %s with commit %s couldn't find workflows", input.Repo.RepoPath(), commit.ID)
return nil
}
@@ -350,3 +367,86 @@ func ifNeedApproval(ctx context.Context, run *actions_model.ActionRun, repo *rep
log.Trace("need approval because it's the first time user %d triggered actions", user.ID)
return true, nil
}
+
+func handleSchedules(
+ ctx context.Context,
+ detectedWorkflows []*actions_module.DetectedWorkflow,
+ commit *git.Commit,
+ input *notifyInput,
+) error {
+ if len(detectedWorkflows) == 0 {
+ log.Trace("repo %s with commit %s couldn't find schedules", input.Repo.RepoPath(), commit.ID)
+ return nil
+ }
+
+ branch, err := commit.GetBranchName()
+ if err != nil {
+ return err
+ }
+ if branch != input.Repo.DefaultBranch {
+ log.Trace("commit branch is not default branch in repo")
+ return nil
+ }
+
+ rows, _, err := actions_model.FindSchedules(ctx, actions_model.FindScheduleOptions{RepoID: input.Repo.ID})
+ if err != nil {
+ log.Error("FindCrons: %v", err)
+ return err
+ }
+
+ if len(rows) > 0 {
+ if err := actions_model.DeleteScheduleTaskByRepo(ctx, input.Repo.ID); err != nil {
+ log.Error("DeleteCronTaskByRepo: %v", err)
+ }
+ }
+
+ p, err := json.Marshal(input.Payload)
+ if err != nil {
+ return fmt.Errorf("json.Marshal: %w", err)
+ }
+
+ crons := make([]*actions_model.ActionSchedule, 0, len(detectedWorkflows))
+ for _, dwf := range detectedWorkflows {
+ // Check cron job condition. Only working in default branch
+ workflow, err := model.ReadWorkflow(bytes.NewReader(dwf.Content))
+ if err != nil {
+ log.Error("ReadWorkflow: %v", err)
+ continue
+ }
+ schedules := workflow.OnSchedule()
+ if len(schedules) == 0 {
+ log.Warn("no schedule event")
+ continue
+ }
+
+ run := &actions_model.ActionSchedule{
+ Title: strings.SplitN(commit.CommitMessage, "\n", 2)[0],
+ RepoID: input.Repo.ID,
+ OwnerID: input.Repo.OwnerID,
+ WorkflowID: dwf.EntryName,
+ TriggerUserID: input.Doer.ID,
+ Ref: input.Ref,
+ CommitSHA: commit.ID.String(),
+ Event: input.Event,
+ EventPayload: string(p),
+ Specs: schedules,
+ Content: dwf.Content,
+ }
+
+ // cancel running jobs if the event is push
+ if run.Event == webhook_module.HookEventPush {
+ // cancel running jobs of the same workflow
+ if err := actions_model.CancelRunningJobs(
+ ctx,
+ run.RepoID,
+ run.Ref,
+ run.WorkflowID,
+ ); err != nil {
+ log.Error("CancelRunningJobs: %v", err)
+ }
+ }
+ crons = append(crons, run)
+ }
+
+ return actions_model.CreateScheduleTask(ctx, crons)
+}
diff --git a/services/actions/schedule_tasks.go b/services/actions/schedule_tasks.go
new file mode 100644
index 0000000000..87131e0aab
--- /dev/null
+++ b/services/actions/schedule_tasks.go
@@ -0,0 +1,135 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package actions
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ actions_model "code.gitea.io/gitea/models/actions"
+ "code.gitea.io/gitea/models/db"
+ "code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/timeutil"
+ webhook_module "code.gitea.io/gitea/modules/webhook"
+
+ "github.com/nektos/act/pkg/jobparser"
+)
+
+// StartScheduleTasks start the task
+func StartScheduleTasks(ctx context.Context) error {
+ return startTasks(ctx)
+}
+
+// startTasks retrieves specifications in pages, creates a schedule task for each specification,
+// and updates the specification's next run time and previous run time.
+// The function returns an error if there's an issue with finding or updating the specifications.
+func startTasks(ctx context.Context) error {
+ // Set the page size
+ pageSize := 50
+
+ // Retrieve specs in pages until all specs have been retrieved
+ now := time.Now()
+ for page := 1; ; page++ {
+ // Retrieve the specs for the current page
+ specs, _, err := actions_model.FindSpecs(ctx, actions_model.FindSpecOptions{
+ ListOptions: db.ListOptions{
+ Page: page,
+ PageSize: pageSize,
+ },
+ Next: now.Unix(),
+ })
+ if err != nil {
+ return fmt.Errorf("find specs: %w", err)
+ }
+
+ // Loop through each spec and create a schedule task for it
+ for _, row := range specs {
+ // cancel running jobs if the event is push
+ if row.Schedule.Event == webhook_module.HookEventPush {
+ // cancel running jobs of the same workflow
+ if err := actions_model.CancelRunningJobs(
+ ctx,
+ row.RepoID,
+ row.Schedule.Ref,
+ row.Schedule.WorkflowID,
+ ); err != nil {
+ log.Error("CancelRunningJobs: %v", err)
+ }
+ }
+
+ if err := CreateScheduleTask(ctx, row.Schedule); err != nil {
+ log.Error("CreateScheduleTask: %v", err)
+ return err
+ }
+
+ // Parse the spec
+ schedule, err := row.Parse()
+ if err != nil {
+ log.Error("Parse: %v", err)
+ return err
+ }
+
+ // Update the spec's next run time and previous run time
+ row.Prev = row.Next
+ row.Next = timeutil.TimeStamp(schedule.Next(now.Add(1 * time.Minute)).Unix())
+ if err := actions_model.UpdateScheduleSpec(ctx, row, "prev", "next"); err != nil {
+ log.Error("UpdateScheduleSpec: %v", err)
+ return err
+ }
+ }
+
+ // Stop if all specs have been retrieved
+ if len(specs) < pageSize {
+ break
+ }
+ }
+
+ return nil
+}
+
+// CreateScheduleTask creates a scheduled task from a cron action schedule.
+// It creates an action run based on the schedule, inserts it into the database, and creates commit statuses for each job.
+func CreateScheduleTask(ctx context.Context, cron *actions_model.ActionSchedule) error {
+ // Create a new action run based on the schedule
+ run := &actions_model.ActionRun{
+ Title: cron.Title,
+ RepoID: cron.RepoID,
+ OwnerID: cron.OwnerID,
+ WorkflowID: cron.WorkflowID,
+ TriggerUserID: cron.TriggerUserID,
+ Ref: cron.Ref,
+ CommitSHA: cron.CommitSHA,
+ Event: cron.Event,
+ EventPayload: cron.EventPayload,
+ Status: actions_model.StatusWaiting,
+ }
+
+ // Parse the workflow specification from the cron schedule
+ workflows, err := jobparser.Parse(cron.Content)
+ if err != nil {
+ return err
+ }
+
+ // Insert the action run and its associated jobs into the database
+ if err := actions_model.InsertRun(ctx, run, workflows); err != nil {
+ return err
+ }
+
+ // Retrieve the jobs for the newly created action run
+ jobs, _, err := actions_model.FindRunJobs(ctx, actions_model.FindRunJobOptions{RunID: run.ID})
+ if err != nil {
+ return err
+ }
+
+ // Create commit statuses for each job
+ for _, job := range jobs {
+ if err := createCommitStatus(ctx, job); err != nil {
+ return err
+ }
+ }
+
+ // Return nil if no errors occurred
+ return nil
+}
diff --git a/services/cron/tasks_actions.go b/services/cron/tasks_actions.go
index 30e8749a5e..0875792503 100644
--- a/services/cron/tasks_actions.go
+++ b/services/cron/tasks_actions.go
@@ -18,6 +18,7 @@ func initActionsTasks() {
registerStopZombieTasks()
registerStopEndlessTasks()
registerCancelAbandonedJobs()
+ registerScheduleTasks()
}
func registerStopZombieTasks() {
@@ -49,3 +50,16 @@ func registerCancelAbandonedJobs() {
return actions_service.CancelAbandonedJobs(ctx)
})
}
+
+// registerScheduleTasks registers a scheduled task that runs every minute to start any due schedule tasks.
+func registerScheduleTasks() {
+ // Register the task with a unique name, enabled status, and schedule for every minute.
+ RegisterTaskFatal("start_schedule_tasks", &BaseConfig{
+ Enabled: true,
+ RunAtStart: false,
+ Schedule: "@every 1m",
+ }, func(ctx context.Context, _ *user_model.User, cfg Config) error {
+ // Call the function to start schedule tasks and pass the context.
+ return actions_service.StartScheduleTasks(ctx)
+ })
+}