diff options
Diffstat (limited to 'modules/task')
-rw-r--r-- | modules/task/queue.go | 14 | ||||
-rw-r--r-- | modules/task/queue_channel.go | 48 | ||||
-rw-r--r-- | modules/task/queue_redis.go | 130 | ||||
-rw-r--r-- | modules/task/task.go | 41 |
4 files changed, 18 insertions, 215 deletions
diff --git a/modules/task/queue.go b/modules/task/queue.go deleted file mode 100644 index ddee0b3d46..0000000000 --- a/modules/task/queue.go +++ /dev/null @@ -1,14 +0,0 @@ -// Copyright 2019 Gitea. All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package task - -import "code.gitea.io/gitea/models" - -// Queue defines an interface to run task queue -type Queue interface { - Run() error - Push(*models.Task) error - Stop() -} diff --git a/modules/task/queue_channel.go b/modules/task/queue_channel.go deleted file mode 100644 index da541f4755..0000000000 --- a/modules/task/queue_channel.go +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright 2019 The Gitea Authors. All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package task - -import ( - "code.gitea.io/gitea/models" - "code.gitea.io/gitea/modules/log" -) - -var ( - _ Queue = &ChannelQueue{} -) - -// ChannelQueue implements -type ChannelQueue struct { - queue chan *models.Task -} - -// NewChannelQueue create a memory channel queue -func NewChannelQueue(queueLen int) *ChannelQueue { - return &ChannelQueue{ - queue: make(chan *models.Task, queueLen), - } -} - -// Run starts to run the queue -func (c *ChannelQueue) Run() error { - for task := range c.queue { - err := Run(task) - if err != nil { - log.Error("Run task failed: %s", err.Error()) - } - } - return nil -} - -// Push will push the task ID to queue -func (c *ChannelQueue) Push(task *models.Task) error { - c.queue <- task - return nil -} - -// Stop stop the queue -func (c *ChannelQueue) Stop() { - close(c.queue) -} diff --git a/modules/task/queue_redis.go b/modules/task/queue_redis.go deleted file mode 100644 index 127de0cdbf..0000000000 --- a/modules/task/queue_redis.go +++ /dev/null @@ -1,130 +0,0 @@ -// Copyright 2019 The Gitea Authors. All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package task - -import ( - "encoding/json" - "errors" - "strconv" - "strings" - "time" - - "code.gitea.io/gitea/models" - "code.gitea.io/gitea/modules/log" - - "github.com/go-redis/redis" -) - -var ( - _ Queue = &RedisQueue{} -) - -type redisClient interface { - RPush(key string, args ...interface{}) *redis.IntCmd - LPop(key string) *redis.StringCmd - Ping() *redis.StatusCmd -} - -// RedisQueue redis queue -type RedisQueue struct { - client redisClient - queueName string - closeChan chan bool -} - -func parseConnStr(connStr string) (addrs, password string, dbIdx int, err error) { - fields := strings.Fields(connStr) - for _, f := range fields { - items := strings.SplitN(f, "=", 2) - if len(items) < 2 { - continue - } - switch strings.ToLower(items[0]) { - case "addrs": - addrs = items[1] - case "password": - password = items[1] - case "db": - dbIdx, err = strconv.Atoi(items[1]) - if err != nil { - return - } - } - } - return -} - -// NewRedisQueue creates single redis or cluster redis queue -func NewRedisQueue(addrs string, password string, dbIdx int) (*RedisQueue, error) { - dbs := strings.Split(addrs, ",") - var queue = RedisQueue{ - queueName: "task_queue", - closeChan: make(chan bool), - } - if len(dbs) == 0 { - return nil, errors.New("no redis host found") - } else if len(dbs) == 1 { - queue.client = redis.NewClient(&redis.Options{ - Addr: strings.TrimSpace(dbs[0]), // use default Addr - Password: password, // no password set - DB: dbIdx, // use default DB - }) - } else { - // cluster will ignore db - queue.client = redis.NewClusterClient(&redis.ClusterOptions{ - Addrs: dbs, - Password: password, - }) - } - if err := queue.client.Ping().Err(); err != nil { - return nil, err - } - return &queue, nil -} - -// Run starts to run the queue -func (r *RedisQueue) Run() error { - for { - select { - case <-r.closeChan: - return nil - case <-time.After(time.Millisecond * 100): - } - - bs, err := r.client.LPop(r.queueName).Bytes() - if err != nil { - if err != redis.Nil { - log.Error("LPop failed: %v", err) - } - time.Sleep(time.Millisecond * 100) - continue - } - - var task models.Task - err = json.Unmarshal(bs, &task) - if err != nil { - log.Error("Unmarshal task failed: %s", err.Error()) - } else { - err = Run(&task) - if err != nil { - log.Error("Run task failed: %s", err.Error()) - } - } - } -} - -// Push implements Queue -func (r *RedisQueue) Push(task *models.Task) error { - bs, err := json.Marshal(task) - if err != nil { - return err - } - return r.client.RPush(r.queueName, bs).Err() -} - -// Stop stop the queue -func (r *RedisQueue) Stop() { - r.closeChan <- true -} diff --git a/modules/task/task.go b/modules/task/task.go index 64744afe7a..416f0c696a 100644 --- a/modules/task/task.go +++ b/modules/task/task.go @@ -8,14 +8,15 @@ import ( "fmt" "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/migrations/base" - "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/structs" ) // taskQueue is a global queue of tasks -var taskQueue Queue +var taskQueue queue.Queue // Run a task func Run(t *models.Task) error { @@ -23,38 +24,32 @@ func Run(t *models.Task) error { case structs.TaskTypeMigrateRepo: return runMigrateTask(t) default: - return fmt.Errorf("Unknow task type: %d", t.Type) + return fmt.Errorf("Unknown task type: %d", t.Type) } } // Init will start the service to get all unfinished tasks and run them func Init() error { - switch setting.Task.QueueType { - case setting.ChannelQueueType: - taskQueue = NewChannelQueue(setting.Task.QueueLength) - case setting.RedisQueueType: - var err error - addrs, pass, idx, err := parseConnStr(setting.Task.QueueConnStr) - if err != nil { - return err - } - taskQueue, err = NewRedisQueue(addrs, pass, idx) - if err != nil { - return err - } - default: - return fmt.Errorf("Unsupported task queue type: %v", setting.Task.QueueType) + taskQueue = queue.CreateQueue("task", handle, &models.Task{}) + + if taskQueue == nil { + return fmt.Errorf("Unable to create Task Queue") } - go func() { - if err := taskQueue.Run(); err != nil { - log.Error("taskQueue.Run end failed: %v", err) - } - }() + go graceful.GetManager().RunWithShutdownFns(taskQueue.Run) return nil } +func handle(data ...queue.Data) { + for _, datum := range data { + task := datum.(*models.Task) + if err := Run(task); err != nil { + log.Error("Run task failed: %v", err) + } + } +} + // MigrateRepository add migration repository to task func MigrateRepository(doer, u *models.User, opts base.MigrateOptions) error { task, err := models.CreateMigrateTask(doer, u, opts) |