diff options
author | Lunny Xiao <xiaolunwen@gmail.com> | 2021-11-16 21:30:11 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-11-16 21:30:11 +0800 |
commit | 48ccd325a1b81a58ac6d1d5d94fc4e90974599ea (patch) | |
tree | e7bb31d69343dd9845a1b0df013b093c8720f14b /services/cron/tasks.go | |
parent | 447428f44659cee0e94e1c444f6291defac2bda2 (diff) | |
download | gitea-48ccd325a1b81a58ac6d1d5d94fc4e90974599ea.tar.gz gitea-48ccd325a1b81a58ac6d1d5d94fc4e90974599ea.zip |
Move some functions into services/repository (#17660)
Diffstat (limited to 'services/cron/tasks.go')
-rw-r--r-- | services/cron/tasks.go | 169 |
1 files changed, 169 insertions, 0 deletions
diff --git a/services/cron/tasks.go b/services/cron/tasks.go new file mode 100644 index 0000000000..56c363e0b8 --- /dev/null +++ b/services/cron/tasks.go @@ -0,0 +1,169 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package cron + +import ( + "context" + "fmt" + "reflect" + "sync" + + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/process" + "code.gitea.io/gitea/modules/setting" +) + +var lock = sync.Mutex{} +var started = false +var tasks = []*Task{} +var tasksMap = map[string]*Task{} + +// Task represents a Cron task +type Task struct { + lock sync.Mutex + Name string + config Config + fun func(context.Context, *models.User, Config) error + ExecTimes int64 +} + +// DoRunAtStart returns if this task should run at the start +func (t *Task) DoRunAtStart() bool { + return t.config.DoRunAtStart() +} + +// IsEnabled returns if this task is enabled as cron task +func (t *Task) IsEnabled() bool { + return t.config.IsEnabled() +} + +// GetConfig will return a copy of the task's config +func (t *Task) GetConfig() Config { + if reflect.TypeOf(t.config).Kind() == reflect.Ptr { + // Pointer: + return reflect.New(reflect.ValueOf(t.config).Elem().Type()).Interface().(Config) + } + // Not pointer: + return reflect.New(reflect.TypeOf(t.config)).Elem().Interface().(Config) +} + +// Run will run the task incrementing the cron counter with no user defined +func (t *Task) Run() { + t.RunWithUser(&models.User{ + ID: -1, + Name: "(Cron)", + LowerName: "(cron)", + }, t.config) +} + +// RunWithUser will run the task incrementing the cron counter at the time with User +func (t *Task) RunWithUser(doer *models.User, config Config) { + if !taskStatusTable.StartIfNotRunning(t.Name) { + return + } + t.lock.Lock() + if config == nil { + config = t.config + } + t.ExecTimes++ + t.lock.Unlock() + defer func() { + taskStatusTable.Stop(t.Name) + if err := recover(); err != nil { + // Recover a panic within the + combinedErr := fmt.Errorf("%s\n%s", err, log.Stack(2)) + log.Error("PANIC whilst running task: %s Value: %v", t.Name, combinedErr) + } + }() + graceful.GetManager().RunWithShutdownContext(func(baseCtx context.Context) { + ctx, cancel := context.WithCancel(baseCtx) + defer cancel() + pm := process.GetManager() + pid := pm.Add(config.FormatMessage(t.Name, "process", doer), cancel) + defer pm.Remove(pid) + if err := t.fun(ctx, doer, config); err != nil { + if db.IsErrCancelled(err) { + message := err.(db.ErrCancelled).Message + if err := models.CreateNotice(models.NoticeTask, config.FormatMessage(t.Name, "aborted", doer, message)); err != nil { + log.Error("CreateNotice: %v", err) + } + return + } + if err := models.CreateNotice(models.NoticeTask, config.FormatMessage(t.Name, "error", doer, err)); err != nil { + log.Error("CreateNotice: %v", err) + } + return + } + if config.DoNoticeOnSuccess() { + if err := models.CreateNotice(models.NoticeTask, config.FormatMessage(t.Name, "finished", doer)); err != nil { + log.Error("CreateNotice: %v", err) + } + } + }) +} + +// GetTask gets the named task +func GetTask(name string) *Task { + lock.Lock() + defer lock.Unlock() + log.Info("Getting %s in %v", name, tasksMap[name]) + + return tasksMap[name] +} + +// RegisterTask allows a task to be registered with the cron service +func RegisterTask(name string, config Config, fun func(context.Context, *models.User, Config) error) error { + log.Debug("Registering task: %s", name) + _, err := setting.GetCronSettings(name, config) + if err != nil { + log.Error("Unable to register cron task with name: %s Error: %v", name, err) + return err + } + + task := &Task{ + Name: name, + config: config, + fun: fun, + } + lock.Lock() + locked := true + defer func() { + if locked { + lock.Unlock() + } + }() + if _, has := tasksMap[task.Name]; has { + log.Error("A task with this name: %s has already been registered", name) + return fmt.Errorf("duplicate task with name: %s", task.Name) + } + + if config.IsEnabled() { + // We cannot use the entry return as there is no way to lock it + if _, err = c.AddJob(name, config.GetSchedule(), task); err != nil { + log.Error("Unable to register cron task with name: %s Error: %v", name, err) + return err + } + } + + tasks = append(tasks, task) + tasksMap[task.Name] = task + if started && config.IsEnabled() && config.DoRunAtStart() { + lock.Unlock() + locked = false + task.Run() + } + + return nil +} + +// RegisterTaskFatal will register a task but if there is an error log.Fatal +func RegisterTaskFatal(name string, config Config, fun func(context.Context, *models.User, Config) error) { + if err := RegisterTask(name, config, fun); err != nil { + log.Fatal("Unable to register cron task %s Error: %v", name, err) + } +} |