diff options
Diffstat (limited to 'modules/task/task.go')
-rw-r--r-- | modules/task/task.go | 41 |
1 files changed, 18 insertions, 23 deletions
diff --git a/modules/task/task.go b/modules/task/task.go index 64744afe7a..416f0c696a 100644 --- a/modules/task/task.go +++ b/modules/task/task.go @@ -8,14 +8,15 @@ import ( "fmt" "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/migrations/base" - "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/structs" ) // taskQueue is a global queue of tasks -var taskQueue Queue +var taskQueue queue.Queue // Run a task func Run(t *models.Task) error { @@ -23,38 +24,32 @@ func Run(t *models.Task) error { case structs.TaskTypeMigrateRepo: return runMigrateTask(t) default: - return fmt.Errorf("Unknow task type: %d", t.Type) + return fmt.Errorf("Unknown task type: %d", t.Type) } } // Init will start the service to get all unfinished tasks and run them func Init() error { - switch setting.Task.QueueType { - case setting.ChannelQueueType: - taskQueue = NewChannelQueue(setting.Task.QueueLength) - case setting.RedisQueueType: - var err error - addrs, pass, idx, err := parseConnStr(setting.Task.QueueConnStr) - if err != nil { - return err - } - taskQueue, err = NewRedisQueue(addrs, pass, idx) - if err != nil { - return err - } - default: - return fmt.Errorf("Unsupported task queue type: %v", setting.Task.QueueType) + taskQueue = queue.CreateQueue("task", handle, &models.Task{}) + + if taskQueue == nil { + return fmt.Errorf("Unable to create Task Queue") } - go func() { - if err := taskQueue.Run(); err != nil { - log.Error("taskQueue.Run end failed: %v", err) - } - }() + go graceful.GetManager().RunWithShutdownFns(taskQueue.Run) return nil } +func handle(data ...queue.Data) { + for _, datum := range data { + task := datum.(*models.Task) + if err := Run(task); err != nil { + log.Error("Run task failed: %v", err) + } + } +} + // MigrateRepository add migration repository to task func MigrateRepository(doer, u *models.User, opts base.MigrateOptions) error { task, err := models.CreateMigrateTask(doer, u, opts) |