diff options
author | zeripath <art27@cantab.net> | 2020-05-17 00:31:38 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-05-16 19:31:38 -0400 |
commit | 9a2e47b23a6d460acfce9b1b77e6f9fb06ca1b75 (patch) | |
tree | b1852472e1ecf6bdb1822b41655bdaf8afd87c1b /modules/cron | |
parent | c18144086f9d4a06adbd4a7c08cfa6dab91224ec (diff) | |
download | gitea-9a2e47b23a6d460acfce9b1b77e6f9fb06ca1b75.tar.gz gitea-9a2e47b23a6d460acfce9b1b77e6f9fb06ca1b75.zip |
Refactor Cron and merge dashboard tasks (#10745)
* Refactor Cron and merge dashboard tasks
* Merge Cron and Dashboard tasks
* Make every cron task report a system notice on completion
* Refactor the creation of these tasks
* Ensure that execution counts of tasks is correct
* Allow cron tasks to be started from the cron page
* golangci-lint fixes
* Enforce that only one task with the same name can be registered
Signed-off-by: Andrew Thornton <art27@cantab.net>
* fix name check
Signed-off-by: Andrew Thornton <art27@cantab.net>
* as per @guillep2k
* as per @lafriks
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Add git.CommandContext variants
Signed-off-by: Andrew Thornton <art27@cantab.net>
Co-authored-by: Lauris BH <lauris@nix.lv>
Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
Co-authored-by: techknowlogick <techknowlogick@gitea.io>
Diffstat (limited to 'modules/cron')
-rw-r--r-- | modules/cron/cron.go | 177 | ||||
-rw-r--r-- | modules/cron/setting.go | 72 | ||||
-rw-r--r-- | modules/cron/tasks.go | 166 | ||||
-rw-r--r-- | modules/cron/tasks_basic.go | 118 | ||||
-rw-r--r-- | modules/cron/tasks_extended.go | 119 |
5 files changed, 535 insertions, 117 deletions
diff --git a/modules/cron/cron.go b/modules/cron/cron.go index 692642e4ce..ae309bd866 100644 --- a/modules/cron/cron.go +++ b/modules/cron/cron.go @@ -9,143 +9,86 @@ import ( "context" "time" - "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/graceful" - "code.gitea.io/gitea/modules/log" - "code.gitea.io/gitea/modules/migrations" - repo_module "code.gitea.io/gitea/modules/repository" - "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/sync" - mirror_service "code.gitea.io/gitea/services/mirror" "github.com/gogs/cron" ) -const ( - mirrorUpdate = "mirror_update" - gitFsck = "git_fsck" - checkRepos = "check_repos" - archiveCleanup = "archive_cleanup" - syncExternalUsers = "sync_external_users" - deletedBranchesCleanup = "deleted_branches_cleanup" - updateMigrationPosterID = "update_migration_post_id" -) - var c = cron.New() // Prevent duplicate running tasks. var taskStatusTable = sync.NewStatusTable() -// Func defines a cron function body -type Func func() - -// WithUnique wrap a cron func with an unique running check -func WithUnique(name string, body func(context.Context)) Func { - return func() { - if !taskStatusTable.StartIfNotRunning(name) { - return - } - defer taskStatusTable.Stop(name) - graceful.GetManager().RunWithShutdownContext(body) - } -} - // NewContext begins cron tasks // Each cron task is run within the shutdown context as a running server // AtShutdown the cron server is stopped func NewContext() { - var ( - entry *cron.Entry - err error - ) - if setting.Cron.UpdateMirror.Enabled { - entry, err = c.AddFunc("Update mirrors", setting.Cron.UpdateMirror.Schedule, WithUnique(mirrorUpdate, mirror_service.Update)) - if err != nil { - log.Fatal("Cron[Update mirrors]: %v", err) - } - if setting.Cron.UpdateMirror.RunAtStart { - entry.Prev = time.Now() - entry.ExecTimes++ - go WithUnique(mirrorUpdate, mirror_service.Update)() - } - } - if setting.Cron.RepoHealthCheck.Enabled { - entry, err = c.AddFunc("Repository health check", setting.Cron.RepoHealthCheck.Schedule, WithUnique(gitFsck, func(ctx context.Context) { - if err := repo_module.GitFsck(ctx); err != nil { - log.Error("GitFsck: %s", err) - } - })) - if err != nil { - log.Fatal("Cron[Repository health check]: %v", err) - } - if setting.Cron.RepoHealthCheck.RunAtStart { - entry.Prev = time.Now() - entry.ExecTimes++ - go WithUnique(gitFsck, func(ctx context.Context) { - if err := repo_module.GitFsck(ctx); err != nil { - log.Error("GitFsck: %s", err) - } - })() - } - } - if setting.Cron.CheckRepoStats.Enabled { - entry, err = c.AddFunc("Check repository statistics", setting.Cron.CheckRepoStats.Schedule, WithUnique(checkRepos, models.CheckRepoStats)) - if err != nil { - log.Fatal("Cron[Check repository statistics]: %v", err) - } - if setting.Cron.CheckRepoStats.RunAtStart { - entry.Prev = time.Now() - entry.ExecTimes++ - go WithUnique(checkRepos, models.CheckRepoStats)() - } - } - if setting.Cron.ArchiveCleanup.Enabled { - entry, err = c.AddFunc("Clean up old repository archives", setting.Cron.ArchiveCleanup.Schedule, WithUnique(archiveCleanup, models.DeleteOldRepositoryArchives)) - if err != nil { - log.Fatal("Cron[Clean up old repository archives]: %v", err) - } - if setting.Cron.ArchiveCleanup.RunAtStart { - entry.Prev = time.Now() - entry.ExecTimes++ - go WithUnique(archiveCleanup, models.DeleteOldRepositoryArchives)() - } - } - if setting.Cron.SyncExternalUsers.Enabled { - entry, err = c.AddFunc("Synchronize external users", setting.Cron.SyncExternalUsers.Schedule, WithUnique(syncExternalUsers, models.SyncExternalUsers)) - if err != nil { - log.Fatal("Cron[Synchronize external users]: %v", err) - } - if setting.Cron.SyncExternalUsers.RunAtStart { - entry.Prev = time.Now() - entry.ExecTimes++ - go WithUnique(syncExternalUsers, models.SyncExternalUsers)() - } - } - if setting.Cron.DeletedBranchesCleanup.Enabled { - entry, err = c.AddFunc("Remove old deleted branches", setting.Cron.DeletedBranchesCleanup.Schedule, WithUnique(deletedBranchesCleanup, models.RemoveOldDeletedBranches)) - if err != nil { - log.Fatal("Cron[Remove old deleted branches]: %v", err) - } - if setting.Cron.DeletedBranchesCleanup.RunAtStart { - entry.Prev = time.Now() - entry.ExecTimes++ - go WithUnique(deletedBranchesCleanup, models.RemoveOldDeletedBranches)() - } - } + initBasicTasks() + initExtendedTasks() - entry, err = c.AddFunc("Update migrated repositories' issues and comments' posterid", setting.Cron.UpdateMigrationPosterID.Schedule, WithUnique(updateMigrationPosterID, migrations.UpdateMigrationPosterID)) - if err != nil { - log.Fatal("Cron[Update migrated repositories]: %v", err) + lock.Lock() + for _, task := range tasks { + if task.IsEnabled() && task.DoRunAtStart() { + go task.Run() + } } - entry.Prev = time.Now() - entry.ExecTimes++ - go WithUnique(updateMigrationPosterID, migrations.UpdateMigrationPosterID)() c.Start() - graceful.GetManager().RunAtShutdown(context.Background(), c.Stop) + started = true + lock.Unlock() + graceful.GetManager().RunAtShutdown(context.Background(), func() { + c.Stop() + lock.Lock() + started = false + lock.Unlock() + }) + +} + +// TaskTableRow represents a task row in the tasks table +type TaskTableRow struct { + Name string + Spec string + Next time.Time + Prev time.Time + ExecTimes int64 } +// TaskTable represents a table of tasks +type TaskTable []*TaskTableRow + // ListTasks returns all running cron tasks. -func ListTasks() []*cron.Entry { - return c.Entries() +func ListTasks() TaskTable { + entries := c.Entries() + eMap := map[string]*cron.Entry{} + for _, e := range entries { + eMap[e.Description] = e + } + lock.Lock() + defer lock.Unlock() + tTable := make([]*TaskTableRow, 0, len(tasks)) + for _, task := range tasks { + spec := "-" + var ( + next time.Time + prev time.Time + ) + if e, ok := eMap[task.Name]; ok { + spec = e.Spec + next = e.Next + prev = e.Prev + } + task.lock.Lock() + tTable = append(tTable, &TaskTableRow{ + Name: task.Name, + Spec: spec, + Next: next, + Prev: prev, + ExecTimes: task.ExecTimes, + }) + task.lock.Unlock() + } + + return tTable } diff --git a/modules/cron/setting.go b/modules/cron/setting.go new file mode 100644 index 0000000000..dd93d03986 --- /dev/null +++ b/modules/cron/setting.go @@ -0,0 +1,72 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package cron + +import ( + "time" + + "code.gitea.io/gitea/models" + "github.com/unknwon/i18n" +) + +// Config represents a basic configuration interface that cron task +type Config interface { + IsEnabled() bool + DoRunAtStart() bool + GetSchedule() string + FormatMessage(name, status string, doer *models.User, args ...interface{}) string +} + +// BaseConfig represents the basic config for a Cron task +type BaseConfig struct { + Enabled bool + RunAtStart bool + Schedule string +} + +// OlderThanConfig represents a cron task with OlderThan setting +type OlderThanConfig struct { + BaseConfig + OlderThan time.Duration +} + +// UpdateExistingConfig represents a cron task with UpdateExisting setting +type UpdateExistingConfig struct { + BaseConfig + UpdateExisting bool +} + +// GetSchedule returns the schedule for the base config +func (b *BaseConfig) GetSchedule() string { + return b.Schedule +} + +// IsEnabled returns the enabled status for the config +func (b *BaseConfig) IsEnabled() bool { + return b.Enabled +} + +// DoRunAtStart returns whether the task should be run at the start +func (b *BaseConfig) DoRunAtStart() bool { + return b.RunAtStart +} + +// FormatMessage returns a message for the task +func (b *BaseConfig) FormatMessage(name, status string, doer *models.User, args ...interface{}) string { + realArgs := make([]interface{}, 0, len(args)+2) + realArgs = append(realArgs, i18n.Tr("en-US", "admin.dashboard."+name)) + if doer == nil { + realArgs = append(realArgs, "(Cron)") + } else { + realArgs = append(realArgs, doer.Name) + } + if len(args) > 0 { + realArgs = append(realArgs, args...) + } + if doer == nil || (doer.ID == -1 && doer.Name == "(Cron)") { + return i18n.Tr("en-US", "admin.dashboard.cron."+status, realArgs...) + } + return i18n.Tr("en-US", "admin.dashboard.task."+status, realArgs...) +} diff --git a/modules/cron/tasks.go b/modules/cron/tasks.go new file mode 100644 index 0000000000..a97326bd0f --- /dev/null +++ b/modules/cron/tasks.go @@ -0,0 +1,166 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package cron + +import ( + "context" + "fmt" + "reflect" + "sync" + + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/process" + "code.gitea.io/gitea/modules/setting" +) + +var lock = sync.Mutex{} +var started = false +var tasks = []*Task{} +var tasksMap = map[string]*Task{} + +// Task represents a Cron task +type Task struct { + lock sync.Mutex + Name string + config Config + fun func(context.Context, *models.User, Config) error + ExecTimes int64 +} + +// DoRunAtStart returns if this task should run at the start +func (t *Task) DoRunAtStart() bool { + return t.config.DoRunAtStart() +} + +// IsEnabled returns if this task is enabled as cron task +func (t *Task) IsEnabled() bool { + return t.config.IsEnabled() +} + +// GetConfig will return a copy of the task's config +func (t *Task) GetConfig() Config { + if reflect.TypeOf(t.config).Kind() == reflect.Ptr { + // Pointer: + return reflect.New(reflect.ValueOf(t.config).Elem().Type()).Interface().(Config) + } + // Not pointer: + return reflect.New(reflect.TypeOf(t.config)).Elem().Interface().(Config) +} + +// Run will run the task incrementing the cron counter with no user defined +func (t *Task) Run() { + t.RunWithUser(&models.User{ + ID: -1, + Name: "(Cron)", + LowerName: "(cron)", + }, t.config) +} + +// RunWithUser will run the task incrementing the cron counter at the time with User +func (t *Task) RunWithUser(doer *models.User, config Config) { + if !taskStatusTable.StartIfNotRunning(t.Name) { + return + } + t.lock.Lock() + if config == nil { + config = t.config + } + t.ExecTimes++ + t.lock.Unlock() + defer func() { + taskStatusTable.Stop(t.Name) + if err := recover(); err != nil { + // Recover a panic within the + combinedErr := fmt.Errorf("%s\n%s", err, log.Stack(2)) + log.Error("PANIC whilst running task: %s Value: %v", t.Name, combinedErr) + } + }() + graceful.GetManager().RunWithShutdownContext(func(baseCtx context.Context) { + ctx, cancel := context.WithCancel(baseCtx) + defer cancel() + pm := process.GetManager() + pid := pm.Add(config.FormatMessage(t.Name, "process", doer), cancel) + defer pm.Remove(pid) + if err := t.fun(ctx, doer, config); err != nil { + if models.IsErrCancelled(err) { + message := err.(models.ErrCancelled).Message + if err := models.CreateNotice(models.NoticeTask, config.FormatMessage(t.Name, "aborted", doer, message)); err != nil { + log.Error("CreateNotice: %v", err) + } + return + } + if err := models.CreateNotice(models.NoticeTask, config.FormatMessage(t.Name, "error", doer, err)); err != nil { + log.Error("CreateNotice: %v", err) + } + return + } + if err := models.CreateNotice(models.NoticeTask, config.FormatMessage(t.Name, "finished", doer)); err != nil { + log.Error("CreateNotice: %v", err) + } + }) +} + +// GetTask gets the named task +func GetTask(name string) *Task { + lock.Lock() + defer lock.Unlock() + log.Info("Getting %s in %v", name, tasksMap[name]) + + return tasksMap[name] +} + +// RegisterTask allows a task to be registered with the cron service +func RegisterTask(name string, config Config, fun func(context.Context, *models.User, Config) error) error { + log.Debug("Registering task: %s", name) + _, err := setting.GetCronSettings(name, config) + if err != nil { + log.Error("Unable to register cron task with name: %s Error: %v", name, err) + return err + } + + task := &Task{ + Name: name, + config: config, + fun: fun, + } + lock.Lock() + locked := true + defer func() { + if locked { + lock.Unlock() + } + }() + if _, has := tasksMap[task.Name]; has { + log.Error("A task with this name: %s has already been registered", name) + return fmt.Errorf("duplicate task with name: %s", task.Name) + } + + if config.IsEnabled() { + // We cannot use the entry return as there is no way to lock it + if _, err = c.AddJob(name, config.GetSchedule(), task); err != nil { + log.Error("Unable to register cron task with name: %s Error: %v", name, err) + return err + } + } + + tasks = append(tasks, task) + tasksMap[task.Name] = task + if started && config.IsEnabled() && config.DoRunAtStart() { + lock.Unlock() + locked = false + task.Run() + } + + return nil +} + +// RegisterTaskFatal will register a task but if there is an error log.Fatal +func RegisterTaskFatal(name string, config Config, fun func(context.Context, *models.User, Config) error) { + if err := RegisterTask(name, config, fun); err != nil { + log.Fatal("Unable to register cron task %s Error: %v", name, err) + } +} diff --git a/modules/cron/tasks_basic.go b/modules/cron/tasks_basic.go new file mode 100644 index 0000000000..438c4a5004 --- /dev/null +++ b/modules/cron/tasks_basic.go @@ -0,0 +1,118 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package cron + +import ( + "context" + "time" + + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/migrations" + repository_service "code.gitea.io/gitea/modules/repository" + mirror_service "code.gitea.io/gitea/services/mirror" +) + +func registerUpdateMirrorTask() { + RegisterTaskFatal("update_mirrors", &BaseConfig{ + Enabled: true, + RunAtStart: false, + Schedule: "@every 10m", + }, func(ctx context.Context, _ *models.User, _ Config) error { + return mirror_service.Update(ctx) + }) +} + +func registerRepoHealthCheck() { + type RepoHealthCheckConfig struct { + BaseConfig + Timeout time.Duration + Args []string `delim:" "` + } + RegisterTaskFatal("repo_health_check", &RepoHealthCheckConfig{ + BaseConfig: BaseConfig{ + Enabled: true, + RunAtStart: false, + Schedule: "@every 24h", + }, + Timeout: 60 * time.Second, + Args: []string{}, + }, func(ctx context.Context, _ *models.User, config Config) error { + rhcConfig := config.(*RepoHealthCheckConfig) + return repository_service.GitFsck(ctx, rhcConfig.Timeout, rhcConfig.Args) + }) +} + +func registerCheckRepoStats() { + RegisterTaskFatal("check_repo_stats", &BaseConfig{ + Enabled: true, + RunAtStart: true, + Schedule: "@every 24h", + }, func(ctx context.Context, _ *models.User, _ Config) error { + return models.CheckRepoStats(ctx) + }) +} + +func registerArchiveCleanup() { + RegisterTaskFatal("archive_cleanup", &OlderThanConfig{ + BaseConfig: BaseConfig{ + Enabled: true, + RunAtStart: true, + Schedule: "@every 24h", + }, + OlderThan: 24 * time.Hour, + }, func(ctx context.Context, _ *models.User, config Config) error { + acConfig := config.(*OlderThanConfig) + return models.DeleteOldRepositoryArchives(ctx, acConfig.OlderThan) + }) +} + +func registerSyncExternalUsers() { + RegisterTaskFatal("sync_external_users", &UpdateExistingConfig{ + BaseConfig: BaseConfig{ + Enabled: true, + RunAtStart: false, + Schedule: "@every 24h", + }, + UpdateExisting: true, + }, func(ctx context.Context, _ *models.User, config Config) error { + realConfig := config.(*UpdateExistingConfig) + return models.SyncExternalUsers(ctx, realConfig.UpdateExisting) + }) +} + +func registerDeletedBranchesCleanup() { + RegisterTaskFatal("deleted_branches_cleanup", &OlderThanConfig{ + BaseConfig: BaseConfig{ + Enabled: true, + RunAtStart: true, + Schedule: "@every 24h", + }, + OlderThan: 24 * time.Hour, + }, func(ctx context.Context, _ *models.User, config Config) error { + realConfig := config.(*OlderThanConfig) + models.RemoveOldDeletedBranches(ctx, realConfig.OlderThan) + return nil + }) +} + +func registerUpdateMigrationPosterID() { + RegisterTaskFatal("update_migration_poster_id", &BaseConfig{ + Enabled: true, + RunAtStart: true, + Schedule: "@every 24h", + }, func(ctx context.Context, _ *models.User, _ Config) error { + return migrations.UpdateMigrationPosterID(ctx) + }) +} + +func initBasicTasks() { + registerUpdateMirrorTask() + registerRepoHealthCheck() + registerCheckRepoStats() + registerArchiveCleanup() + registerSyncExternalUsers() + registerDeletedBranchesCleanup() + registerUpdateMigrationPosterID() +} diff --git a/modules/cron/tasks_extended.go b/modules/cron/tasks_extended.go new file mode 100644 index 0000000000..fa2d6e0c38 --- /dev/null +++ b/modules/cron/tasks_extended.go @@ -0,0 +1,119 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package cron + +import ( + "context" + "time" + + "code.gitea.io/gitea/models" + repo_module "code.gitea.io/gitea/modules/repository" + "code.gitea.io/gitea/modules/setting" +) + +func registerDeleteInactiveUsers() { + RegisterTaskFatal("delete_inactive_accounts", &OlderThanConfig{ + BaseConfig: BaseConfig{ + Enabled: false, + RunAtStart: false, + Schedule: "@annually", + }, + OlderThan: 0 * time.Second, + }, func(ctx context.Context, _ *models.User, config Config) error { + olderThanConfig := config.(*OlderThanConfig) + return models.DeleteInactiveUsers(ctx, olderThanConfig.OlderThan) + }) +} + +func registerDeleteRepositoryArchives() { + RegisterTaskFatal("delete_repo_archives", &BaseConfig{ + Enabled: false, + RunAtStart: false, + Schedule: "@annually", + }, func(ctx context.Context, _ *models.User, _ Config) error { + return models.DeleteRepositoryArchives(ctx) + }) +} + +func registerGarbageCollectRepositories() { + type RepoHealthCheckConfig struct { + BaseConfig + Timeout time.Duration + Args []string `delim:" "` + } + RegisterTaskFatal("git_gc_repos", &RepoHealthCheckConfig{ + BaseConfig: BaseConfig{ + Enabled: false, + RunAtStart: false, + Schedule: "@every 72h", + }, + Timeout: time.Duration(setting.Git.Timeout.GC) * time.Second, + Args: setting.Git.GCArgs, + }, func(ctx context.Context, _ *models.User, config Config) error { + rhcConfig := config.(*RepoHealthCheckConfig) + return repo_module.GitGcRepos(ctx, rhcConfig.Timeout, rhcConfig.Args...) + }) +} + +func registerRewriteAllPublicKeys() { + RegisterTaskFatal("resync_all_sshkeys", &BaseConfig{ + Enabled: false, + RunAtStart: false, + Schedule: "@every 72h", + }, func(_ context.Context, _ *models.User, _ Config) error { + return models.RewriteAllPublicKeys() + }) +} + +func registerRepositoryUpdateHook() { + RegisterTaskFatal("resync_all_hooks", &BaseConfig{ + Enabled: false, + RunAtStart: false, + Schedule: "@every 72h", + }, func(ctx context.Context, _ *models.User, _ Config) error { + return repo_module.SyncRepositoryHooks(ctx) + }) +} + +func registerReinitMissingRepositories() { + RegisterTaskFatal("reinit_missing_repos", &BaseConfig{ + Enabled: false, + RunAtStart: false, + Schedule: "@every 72h", + }, func(ctx context.Context, _ *models.User, _ Config) error { + return repo_module.ReinitMissingRepositories(ctx) + }) +} + +func registerDeleteMissingRepositories() { + RegisterTaskFatal("delete_missing_repos", &BaseConfig{ + Enabled: false, + RunAtStart: false, + Schedule: "@every 72h", + }, func(ctx context.Context, user *models.User, _ Config) error { + return repo_module.DeleteMissingRepositories(ctx, user) + }) +} + +func registerRemoveRandomAvatars() { + RegisterTaskFatal("delete_generated_repository_avatars", &BaseConfig{ + Enabled: false, + RunAtStart: false, + Schedule: "@every 72h", + }, func(ctx context.Context, _ *models.User, _ Config) error { + return models.RemoveRandomAvatars(ctx) + }) +} + +func initExtendedTasks() { + registerDeleteInactiveUsers() + registerDeleteRepositoryArchives() + registerGarbageCollectRepositories() + registerRewriteAllPublicKeys() + registerRepositoryUpdateHook() + registerReinitMissingRepositories() + registerDeleteMissingRepositories() + registerRemoveRandomAvatars() +} |