You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

tasks.go 4.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. // Copyright 2020 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package cron
  5. import (
  6. "context"
  7. "fmt"
  8. "reflect"
  9. "sync"
  10. "code.gitea.io/gitea/models"
  11. "code.gitea.io/gitea/modules/graceful"
  12. "code.gitea.io/gitea/modules/log"
  13. "code.gitea.io/gitea/modules/process"
  14. "code.gitea.io/gitea/modules/setting"
  15. )
  16. var lock = sync.Mutex{}
  17. var started = false
  18. var tasks = []*Task{}
  19. var tasksMap = map[string]*Task{}
  20. // Task represents a Cron task
  21. type Task struct {
  22. lock sync.Mutex
  23. Name string
  24. config Config
  25. fun func(context.Context, *models.User, Config) error
  26. ExecTimes int64
  27. }
  28. // DoRunAtStart returns if this task should run at the start
  29. func (t *Task) DoRunAtStart() bool {
  30. return t.config.DoRunAtStart()
  31. }
  32. // IsEnabled returns if this task is enabled as cron task
  33. func (t *Task) IsEnabled() bool {
  34. return t.config.IsEnabled()
  35. }
  36. // GetConfig will return a copy of the task's config
  37. func (t *Task) GetConfig() Config {
  38. if reflect.TypeOf(t.config).Kind() == reflect.Ptr {
  39. // Pointer:
  40. return reflect.New(reflect.ValueOf(t.config).Elem().Type()).Interface().(Config)
  41. }
  42. // Not pointer:
  43. return reflect.New(reflect.TypeOf(t.config)).Elem().Interface().(Config)
  44. }
  45. // Run will run the task incrementing the cron counter with no user defined
  46. func (t *Task) Run() {
  47. t.RunWithUser(&models.User{
  48. ID: -1,
  49. Name: "(Cron)",
  50. LowerName: "(cron)",
  51. }, t.config)
  52. }
  53. // RunWithUser will run the task incrementing the cron counter at the time with User
  54. func (t *Task) RunWithUser(doer *models.User, config Config) {
  55. if !taskStatusTable.StartIfNotRunning(t.Name) {
  56. return
  57. }
  58. t.lock.Lock()
  59. if config == nil {
  60. config = t.config
  61. }
  62. t.ExecTimes++
  63. t.lock.Unlock()
  64. defer func() {
  65. taskStatusTable.Stop(t.Name)
  66. if err := recover(); err != nil {
  67. // Recover a panic within the
  68. combinedErr := fmt.Errorf("%s\n%s", err, log.Stack(2))
  69. log.Error("PANIC whilst running task: %s Value: %v", t.Name, combinedErr)
  70. }
  71. }()
  72. graceful.GetManager().RunWithShutdownContext(func(baseCtx context.Context) {
  73. ctx, cancel := context.WithCancel(baseCtx)
  74. defer cancel()
  75. pm := process.GetManager()
  76. pid := pm.Add(config.FormatMessage(t.Name, "process", doer), cancel)
  77. defer pm.Remove(pid)
  78. if err := t.fun(ctx, doer, config); err != nil {
  79. if models.IsErrCancelled(err) {
  80. message := err.(models.ErrCancelled).Message
  81. if err := models.CreateNotice(models.NoticeTask, config.FormatMessage(t.Name, "aborted", doer, message)); err != nil {
  82. log.Error("CreateNotice: %v", err)
  83. }
  84. return
  85. }
  86. if err := models.CreateNotice(models.NoticeTask, config.FormatMessage(t.Name, "error", doer, err)); err != nil {
  87. log.Error("CreateNotice: %v", err)
  88. }
  89. return
  90. }
  91. if config.DoNoticeOnSuccess() {
  92. if err := models.CreateNotice(models.NoticeTask, config.FormatMessage(t.Name, "finished", doer)); err != nil {
  93. log.Error("CreateNotice: %v", err)
  94. }
  95. }
  96. })
  97. }
  98. // GetTask gets the named task
  99. func GetTask(name string) *Task {
  100. lock.Lock()
  101. defer lock.Unlock()
  102. log.Info("Getting %s in %v", name, tasksMap[name])
  103. return tasksMap[name]
  104. }
  105. // RegisterTask allows a task to be registered with the cron service
  106. func RegisterTask(name string, config Config, fun func(context.Context, *models.User, Config) error) error {
  107. log.Debug("Registering task: %s", name)
  108. _, err := setting.GetCronSettings(name, config)
  109. if err != nil {
  110. log.Error("Unable to register cron task with name: %s Error: %v", name, err)
  111. return err
  112. }
  113. task := &Task{
  114. Name: name,
  115. config: config,
  116. fun: fun,
  117. }
  118. lock.Lock()
  119. locked := true
  120. defer func() {
  121. if locked {
  122. lock.Unlock()
  123. }
  124. }()
  125. if _, has := tasksMap[task.Name]; has {
  126. log.Error("A task with this name: %s has already been registered", name)
  127. return fmt.Errorf("duplicate task with name: %s", task.Name)
  128. }
  129. if config.IsEnabled() {
  130. // We cannot use the entry return as there is no way to lock it
  131. if _, err = c.AddJob(name, config.GetSchedule(), task); err != nil {
  132. log.Error("Unable to register cron task with name: %s Error: %v", name, err)
  133. return err
  134. }
  135. }
  136. tasks = append(tasks, task)
  137. tasksMap[task.Name] = task
  138. if started && config.IsEnabled() && config.DoRunAtStart() {
  139. lock.Unlock()
  140. locked = false
  141. task.Run()
  142. }
  143. return nil
  144. }
  145. // RegisterTaskFatal will register a task but if there is an error log.Fatal
  146. func RegisterTaskFatal(name string, config Config, fun func(context.Context, *models.User, Config) error) {
  147. if err := RegisterTask(name, config, fun); err != nil {
  148. log.Fatal("Unable to register cron task %s Error: %v", name, err)
  149. }
  150. }