You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

tasks.go 6.1KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. // Copyright 2020 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package cron
  4. import (
  5. "context"
  6. "fmt"
  7. "reflect"
  8. "strings"
  9. "sync"
  10. "time"
  11. "code.gitea.io/gitea/models/db"
  12. system_model "code.gitea.io/gitea/models/system"
  13. user_model "code.gitea.io/gitea/models/user"
  14. "code.gitea.io/gitea/modules/graceful"
  15. "code.gitea.io/gitea/modules/log"
  16. "code.gitea.io/gitea/modules/process"
  17. "code.gitea.io/gitea/modules/setting"
  18. "code.gitea.io/gitea/modules/translation"
  19. )
  20. var (
  21. lock = sync.Mutex{}
  22. started = false
  23. tasks = []*Task{}
  24. tasksMap = map[string]*Task{}
  25. )
  26. // Task represents a Cron task
  27. type Task struct {
  28. lock sync.Mutex
  29. Name string
  30. config Config
  31. fun func(context.Context, *user_model.User, Config) error
  32. Status string
  33. LastMessage string
  34. LastDoer string
  35. ExecTimes int64
  36. // This stores the time of the last manual run of this task.
  37. LastRun time.Time
  38. }
  39. // DoRunAtStart returns if this task should run at the start
  40. func (t *Task) DoRunAtStart() bool {
  41. return t.config.DoRunAtStart()
  42. }
  43. // IsEnabled returns if this task is enabled as cron task
  44. func (t *Task) IsEnabled() bool {
  45. return t.config.IsEnabled()
  46. }
  47. // GetConfig will return a copy of the task's config
  48. func (t *Task) GetConfig() Config {
  49. if reflect.TypeOf(t.config).Kind() == reflect.Ptr {
  50. // Pointer:
  51. return reflect.New(reflect.ValueOf(t.config).Elem().Type()).Interface().(Config)
  52. }
  53. // Not pointer:
  54. return reflect.New(reflect.TypeOf(t.config)).Elem().Interface().(Config)
  55. }
  56. // Run will run the task incrementing the cron counter with no user defined
  57. func (t *Task) Run() {
  58. t.RunWithUser(&user_model.User{
  59. ID: -1,
  60. Name: "(Cron)",
  61. LowerName: "(cron)",
  62. }, t.config)
  63. }
  64. // RunWithUser will run the task incrementing the cron counter at the time with User
  65. func (t *Task) RunWithUser(doer *user_model.User, config Config) {
  66. if !taskStatusTable.StartIfNotRunning(t.Name) {
  67. return
  68. }
  69. t.lock.Lock()
  70. if config == nil {
  71. config = t.config
  72. }
  73. t.ExecTimes++
  74. t.lock.Unlock()
  75. defer func() {
  76. taskStatusTable.Stop(t.Name)
  77. }()
  78. graceful.GetManager().RunWithShutdownContext(func(baseCtx context.Context) {
  79. defer func() {
  80. if err := recover(); err != nil {
  81. // Recover a panic within the execution of the task.
  82. combinedErr := fmt.Errorf("%s\n%s", err, log.Stack(2))
  83. log.Error("PANIC whilst running task: %s Value: %v", t.Name, combinedErr)
  84. }
  85. }()
  86. // Store the time of this run, before the function is executed, so it
  87. // matches the behavior of what the cron library does.
  88. t.lock.Lock()
  89. t.LastRun = time.Now()
  90. t.lock.Unlock()
  91. pm := process.GetManager()
  92. doerName := ""
  93. if doer != nil && doer.ID != -1 {
  94. doerName = doer.Name
  95. }
  96. ctx, _, finished := pm.AddContext(baseCtx, config.FormatMessage(translation.NewLocale("en-US"), t.Name, "process", doerName))
  97. defer finished()
  98. if err := t.fun(ctx, doer, config); err != nil {
  99. var message string
  100. var status string
  101. if db.IsErrCancelled(err) {
  102. status = "cancelled"
  103. message = err.(db.ErrCancelled).Message
  104. } else {
  105. status = "error"
  106. message = err.Error()
  107. }
  108. t.lock.Lock()
  109. t.LastMessage = message
  110. t.Status = status
  111. t.LastDoer = doerName
  112. t.lock.Unlock()
  113. if err := system_model.CreateNotice(ctx, system_model.NoticeTask, config.FormatMessage(translation.NewLocale("en-US"), t.Name, "cancelled", doerName, message)); err != nil {
  114. log.Error("CreateNotice: %v", err)
  115. }
  116. return
  117. }
  118. t.lock.Lock()
  119. t.Status = "finished"
  120. t.LastMessage = ""
  121. t.LastDoer = doerName
  122. t.lock.Unlock()
  123. if config.DoNoticeOnSuccess() {
  124. if err := system_model.CreateNotice(ctx, system_model.NoticeTask, config.FormatMessage(translation.NewLocale("en-US"), t.Name, "finished", doerName)); err != nil {
  125. log.Error("CreateNotice: %v", err)
  126. }
  127. }
  128. })
  129. }
  130. // GetTask gets the named task
  131. func GetTask(name string) *Task {
  132. lock.Lock()
  133. defer lock.Unlock()
  134. log.Info("Getting %s in %v", name, tasksMap[name])
  135. return tasksMap[name]
  136. }
  137. // RegisterTask allows a task to be registered with the cron service
  138. func RegisterTask(name string, config Config, fun func(context.Context, *user_model.User, Config) error) error {
  139. log.Debug("Registering task: %s", name)
  140. i18nKey := "admin.dashboard." + name
  141. if value := translation.NewLocale("en-US").Tr(i18nKey); value == i18nKey {
  142. return fmt.Errorf("translation is missing for task %q, please add translation for %q", name, i18nKey)
  143. }
  144. _, err := setting.GetCronSettings(name, config)
  145. if err != nil {
  146. log.Error("Unable to register cron task with name: %s Error: %v", name, err)
  147. return err
  148. }
  149. task := &Task{
  150. Name: name,
  151. config: config,
  152. fun: fun,
  153. }
  154. lock.Lock()
  155. locked := true
  156. defer func() {
  157. if locked {
  158. lock.Unlock()
  159. }
  160. }()
  161. if _, has := tasksMap[task.Name]; has {
  162. log.Error("A task with this name: %s has already been registered", name)
  163. return fmt.Errorf("duplicate task with name: %s", task.Name)
  164. }
  165. if config.IsEnabled() {
  166. // We cannot use the entry return as there is no way to lock it
  167. if err := addTaskToScheduler(task); err != nil {
  168. return err
  169. }
  170. }
  171. tasks = append(tasks, task)
  172. tasksMap[task.Name] = task
  173. if started && config.IsEnabled() && config.DoRunAtStart() {
  174. lock.Unlock()
  175. locked = false
  176. task.Run()
  177. }
  178. return nil
  179. }
  180. // RegisterTaskFatal will register a task but if there is an error log.Fatal
  181. func RegisterTaskFatal(name string, config Config, fun func(context.Context, *user_model.User, Config) error) {
  182. if err := RegisterTask(name, config, fun); err != nil {
  183. log.Fatal("Unable to register cron task %s Error: %v", name, err)
  184. }
  185. }
  186. func addTaskToScheduler(task *Task) error {
  187. tags := []string{task.Name, task.config.GetSchedule()} // name and schedule can't be get from job, so we add them as tag
  188. if scheduleHasSeconds(task.config.GetSchedule()) {
  189. scheduler = scheduler.CronWithSeconds(task.config.GetSchedule())
  190. } else {
  191. scheduler = scheduler.Cron(task.config.GetSchedule())
  192. }
  193. if _, err := scheduler.Tag(tags...).Do(task.Run); err != nil {
  194. log.Error("Unable to register cron task with name: %s Error: %v", task.Name, err)
  195. return err
  196. }
  197. return nil
  198. }
  199. func scheduleHasSeconds(schedule string) bool {
  200. return len(strings.Fields(schedule)) >= 6
  201. }