diff options
author | zeripath <art27@cantab.net> | 2020-01-07 11:23:09 +0000 |
---|---|---|
committer | Antoine GIRARD <sapk@users.noreply.github.com> | 2020-01-07 12:23:09 +0100 |
commit | 62eb1b0f2530a5ae1ce9b729378c0c8066174215 (patch) | |
tree | e567b2a9d91e69c0f2bccfeaf1a7341b4dda2706 /modules | |
parent | f71e1c8e796b099f4634bcd358e48189a97dcbad (diff) | |
download | gitea-62eb1b0f2530a5ae1ce9b729378c0c8066174215.tar.gz gitea-62eb1b0f2530a5ae1ce9b729378c0c8066174215.zip |
Graceful Queues: Issue Indexing and Tasks (#9363)
* Queue: Add generic graceful queues with settings
* Queue & Setting: Add worker pool implementation
* Queue: Add worker settings
* Queue: Make resizing worker pools
* Queue: Add name variable to queues
* Queue: Add monitoring
* Queue: Improve logging
* Issues: Gracefulise the issues indexer
Remove the old now unused specific queues
* Task: Move to generic queue and gracefulise
* Issues: Standardise the issues indexer queue settings
* Fix test
* Queue: Allow Redis to connect to unix
* Prevent deadlock during early shutdown of issue indexer
* Add MaxWorker settings to queues
* Merge branch 'master' into graceful-queues
* Update modules/indexer/issues/indexer.go
Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com>
* Update modules/indexer/issues/indexer.go
Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com>
* Update modules/queue/queue_channel.go
Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com>
* Update modules/queue/queue_disk.go
* Update modules/queue/queue_disk_channel.go
Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com>
* Rename queue.Description to queue.ManagedQueue as per @guillep2k
* Cancel pool workers when removed
* Remove dependency on queue from setting
* Update modules/queue/queue_redis.go
Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com>
* As per @guillep2k add mutex locks on shutdown/terminate
* move unlocking out of setInternal
* Add warning if number of workers < 0
* Small changes as per @guillep2k
* No redis host specified not found
* Clean up documentation for queues
* Update docs/content/doc/advanced/config-cheat-sheet.en-us.md
* Update modules/indexer/issues/indexer_test.go
* Ensure that persistable channel queue is added to manager
* Rename QUEUE_NAME REDIS_QUEUE_NAME
* Revert "Rename QUEUE_NAME REDIS_QUEUE_NAME"
This reverts commit 1f83b4fc9b9dabda186257b38c265fe7012f90df.
Co-authored-by: guillep2k <18600385+guillep2k@users.noreply.github.com>
Co-authored-by: Lauris BH <lauris@nix.lv>
Co-authored-by: techknowlogick <matti@mdranta.net>
Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
Diffstat (limited to 'modules')
27 files changed, 2453 insertions, 643 deletions
diff --git a/modules/indexer/issues/db.go b/modules/indexer/issues/db.go index a758cfeaee..d0cca4fd18 100644 --- a/modules/indexer/issues/db.go +++ b/modules/indexer/issues/db.go @@ -25,6 +25,10 @@ func (db *DBIndexer) Delete(ids ...int64) error { return nil } +// Close dummy function +func (db *DBIndexer) Close() { +} + // Search dummy function func (db *DBIndexer) Search(kw string, repoIDs []int64, limit, start int) (*SearchResult, error) { total, ids, err := models.SearchIssueIDsByKeyword(kw, repoIDs, limit, start) diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index ebcd3f68dd..894f37a963 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -5,12 +5,16 @@ package issues import ( + "context" + "fmt" + "os" "sync" "time" "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/util" ) @@ -44,12 +48,14 @@ type Indexer interface { Index(issue []*IndexerData) error Delete(ids ...int64) error Search(kw string, repoIDs []int64, limit, start int) (*SearchResult, error) + Close() } type indexerHolder struct { - indexer Indexer - mutex sync.RWMutex - cond *sync.Cond + indexer Indexer + mutex sync.RWMutex + cond *sync.Cond + cancelled bool } func newIndexerHolder() *indexerHolder { @@ -58,6 +64,13 @@ func newIndexerHolder() *indexerHolder { return h } +func (h *indexerHolder) cancel() { + h.mutex.Lock() + defer h.mutex.Unlock() + h.cancelled = true + h.cond.Broadcast() +} + func (h *indexerHolder) set(indexer Indexer) { h.mutex.Lock() defer h.mutex.Unlock() @@ -68,16 +81,15 @@ func (h *indexerHolder) set(indexer Indexer) { func (h *indexerHolder) get() Indexer { h.mutex.RLock() defer h.mutex.RUnlock() - if h.indexer == nil { + if h.indexer == nil && !h.cancelled { h.cond.Wait() } return h.indexer } var ( - issueIndexerChannel = make(chan *IndexerData, setting.Indexer.UpdateQueueLength) // issueIndexerQueue queue of issue ids to be updated - issueIndexerQueue Queue + issueIndexerQueue queue.Queue holder = newIndexerHolder() ) @@ -85,90 +97,99 @@ var ( // all issue index done. func InitIssueIndexer(syncReindex bool) { waitChannel := make(chan time.Duration) + + // Create the Queue + switch setting.Indexer.IssueType { + case "bleve": + handler := func(data ...queue.Data) { + indexer := holder.get() + if indexer == nil { + log.Error("Issue indexer handler: unable to get indexer!") + return + } + + iData := make([]*IndexerData, 0, setting.Indexer.IssueQueueBatchNumber) + for _, datum := range data { + indexerData, ok := datum.(*IndexerData) + if !ok { + log.Error("Unable to process provided datum: %v - not possible to cast to IndexerData", datum) + continue + } + log.Trace("IndexerData Process: %d %v %t", indexerData.ID, indexerData.IDs, indexerData.IsDelete) + if indexerData.IsDelete { + _ = indexer.Delete(indexerData.IDs...) + continue + } + iData = append(iData, indexerData) + } + if err := indexer.Index(iData); err != nil { + log.Error("Error whilst indexing: %v Error: %v", iData, err) + } + } + + issueIndexerQueue = queue.CreateQueue("issue_indexer", handler, &IndexerData{}) + + if issueIndexerQueue == nil { + log.Fatal("Unable to create issue indexer queue") + } + default: + issueIndexerQueue = &queue.DummyQueue{} + } + + // Create the Indexer go func() { start := time.Now() - log.Info("Initializing Issue Indexer") + log.Info("PID %d: Initializing Issue Indexer: %s", os.Getpid(), setting.Indexer.IssueType) var populate bool - var dummyQueue bool switch setting.Indexer.IssueType { case "bleve": - issueIndexer := NewBleveIndexer(setting.Indexer.IssuePath) - exist, err := issueIndexer.Init() - if err != nil { - log.Fatal("Unable to initialize Bleve Issue Indexer: %v", err) - } - populate = !exist - holder.set(issueIndexer) + graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(context.Context, func())) { + issueIndexer := NewBleveIndexer(setting.Indexer.IssuePath) + exist, err := issueIndexer.Init() + if err != nil { + holder.cancel() + log.Fatal("Unable to initialize Bleve Issue Indexer: %v", err) + } + populate = !exist + holder.set(issueIndexer) + atTerminate(context.Background(), func() { + log.Debug("Closing issue indexer") + issueIndexer := holder.get() + if issueIndexer != nil { + issueIndexer.Close() + } + log.Info("PID: %d Issue Indexer closed", os.Getpid()) + }) + log.Debug("Created Bleve Indexer") + }) case "db": issueIndexer := &DBIndexer{} holder.set(issueIndexer) - dummyQueue = true default: + holder.cancel() log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType) } - if dummyQueue { - issueIndexerQueue = &DummyQueue{} - } else { - var err error - switch setting.Indexer.IssueQueueType { - case setting.LevelQueueType: - issueIndexerQueue, err = NewLevelQueue( - holder.get(), - setting.Indexer.IssueQueueDir, - setting.Indexer.IssueQueueBatchNumber) - if err != nil { - log.Fatal( - "Unable create level queue for issue queue dir: %s batch number: %d : %v", - setting.Indexer.IssueQueueDir, - setting.Indexer.IssueQueueBatchNumber, - err) - } - case setting.ChannelQueueType: - issueIndexerQueue = NewChannelQueue(holder.get(), setting.Indexer.IssueQueueBatchNumber) - case setting.RedisQueueType: - addrs, pass, idx, err := parseConnStr(setting.Indexer.IssueQueueConnStr) - if err != nil { - log.Fatal("Unable to parse connection string for RedisQueueType: %s : %v", - setting.Indexer.IssueQueueConnStr, - err) - } - issueIndexerQueue, err = NewRedisQueue(addrs, pass, idx, holder.get(), setting.Indexer.IssueQueueBatchNumber) - if err != nil { - log.Fatal("Unable to create RedisQueue: %s : %v", - setting.Indexer.IssueQueueConnStr, - err) - } - default: - log.Fatal("Unsupported indexer queue type: %v", - setting.Indexer.IssueQueueType) - } - - go func() { - err = issueIndexerQueue.Run() - if err != nil { - log.Error("issueIndexerQueue.Run: %v", err) - } - }() - } - - go func() { - for data := range issueIndexerChannel { - _ = issueIndexerQueue.Push(data) - } - }() + // Start processing the queue + go graceful.GetManager().RunWithShutdownFns(issueIndexerQueue.Run) + // Populate the index if populate { if syncReindex { - populateIssueIndexer() + graceful.GetManager().RunWithShutdownContext(populateIssueIndexer) } else { - go populateIssueIndexer() + go graceful.GetManager().RunWithShutdownContext(populateIssueIndexer) } } waitChannel <- time.Since(start) + close(waitChannel) }() + if syncReindex { - <-waitChannel + select { + case <-waitChannel: + case <-graceful.GetManager().IsShutdown(): + } } else if setting.Indexer.StartupTimeout > 0 { go func() { timeout := setting.Indexer.StartupTimeout @@ -178,7 +199,12 @@ func InitIssueIndexer(syncReindex bool) { select { case duration := <-waitChannel: log.Info("Issue Indexer Initialization took %v", duration) + case <-graceful.GetManager().IsShutdown(): + log.Warn("Shutdown occurred before issue index initialisation was complete") case <-time.After(timeout): + if shutdownable, ok := issueIndexerQueue.(queue.Shutdownable); ok { + shutdownable.Terminate() + } log.Fatal("Issue Indexer Initialization timed-out after: %v", timeout) } }() @@ -186,8 +212,14 @@ func InitIssueIndexer(syncReindex bool) { } // populateIssueIndexer populate the issue indexer with issue data -func populateIssueIndexer() { +func populateIssueIndexer(ctx context.Context) { for page := 1; ; page++ { + select { + case <-ctx.Done(): + log.Warn("Issue Indexer population shutdown before completion") + return + default: + } repos, _, err := models.SearchRepositoryByName(&models.SearchRepoOptions{ Page: page, PageSize: models.RepositoryListDefaultPageSize, @@ -200,10 +232,17 @@ func populateIssueIndexer() { continue } if len(repos) == 0 { + log.Debug("Issue Indexer population complete") return } for _, repo := range repos { + select { + case <-ctx.Done(): + log.Info("Issue Indexer population shutdown before completion") + return + default: + } UpdateRepoIndexer(repo) } } @@ -237,13 +276,17 @@ func UpdateIssueIndexer(issue *models.Issue) { comments = append(comments, comment.Content) } } - issueIndexerChannel <- &IndexerData{ + indexerData := &IndexerData{ ID: issue.ID, RepoID: issue.RepoID, Title: issue.Title, Content: issue.Content, Comments: comments, } + log.Debug("Adding to channel: %v", indexerData) + if err := issueIndexerQueue.Push(indexerData); err != nil { + log.Error("Unable to push to issue indexer: %v: Error: %v", indexerData, err) + } } // DeleteRepoIssueIndexer deletes repo's all issues indexes @@ -258,17 +301,25 @@ func DeleteRepoIssueIndexer(repo *models.Repository) { if len(ids) == 0 { return } - - issueIndexerChannel <- &IndexerData{ + indexerData := &IndexerData{ IDs: ids, IsDelete: true, } + if err := issueIndexerQueue.Push(indexerData); err != nil { + log.Error("Unable to push to issue indexer: %v: Error: %v", indexerData, err) + } } // SearchIssuesByKeyword search issue ids by keywords and repo id func SearchIssuesByKeyword(repoIDs []int64, keyword string) ([]int64, error) { var issueIDs []int64 - res, err := holder.get().Search(keyword, repoIDs, 1000, 0) + indexer := holder.get() + + if indexer == nil { + log.Error("SearchIssuesByKeyword(): unable to get indexer!") + return nil, fmt.Errorf("unable to get issue indexer") + } + res, err := indexer.Search(keyword, repoIDs, 1000, 0) if err != nil { return nil, err } diff --git a/modules/indexer/issues/indexer_test.go b/modules/indexer/issues/indexer_test.go index ca7ba29703..4028a6c8b5 100644 --- a/modules/indexer/issues/indexer_test.go +++ b/modules/indexer/issues/indexer_test.go @@ -15,6 +15,8 @@ import ( "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/setting" + "gopkg.in/ini.v1" + "github.com/stretchr/testify/assert" ) @@ -24,6 +26,7 @@ func TestMain(m *testing.M) { func TestBleveSearchIssues(t *testing.T) { assert.NoError(t, models.PrepareTestDatabase()) + setting.Cfg = ini.Empty() tmpIndexerDir, err := ioutil.TempDir("", "issues-indexer") if err != nil { @@ -41,6 +44,7 @@ func TestBleveSearchIssues(t *testing.T) { }() setting.Indexer.IssueType = "bleve" + setting.NewQueueService() InitIssueIndexer(true) defer func() { indexer := holder.get() diff --git a/modules/indexer/issues/queue.go b/modules/indexer/issues/queue.go deleted file mode 100644 index f93e5c47a4..0000000000 --- a/modules/indexer/issues/queue.go +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright 2018 The Gitea Authors. All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package issues - -// Queue defines an interface to save an issue indexer queue -type Queue interface { - Run() error - Push(*IndexerData) error -} - -// DummyQueue represents an empty queue -type DummyQueue struct { -} - -// Run starts to run the queue -func (b *DummyQueue) Run() error { - return nil -} - -// Push pushes data to indexer -func (b *DummyQueue) Push(*IndexerData) error { - return nil -} diff --git a/modules/indexer/issues/queue_channel.go b/modules/indexer/issues/queue_channel.go deleted file mode 100644 index b6458d3eb5..0000000000 --- a/modules/indexer/issues/queue_channel.go +++ /dev/null @@ -1,62 +0,0 @@ -// Copyright 2018 The Gitea Authors. All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package issues - -import ( - "time" - - "code.gitea.io/gitea/modules/setting" -) - -// ChannelQueue implements -type ChannelQueue struct { - queue chan *IndexerData - indexer Indexer - batchNumber int -} - -// NewChannelQueue create a memory channel queue -func NewChannelQueue(indexer Indexer, batchNumber int) *ChannelQueue { - return &ChannelQueue{ - queue: make(chan *IndexerData, setting.Indexer.UpdateQueueLength), - indexer: indexer, - batchNumber: batchNumber, - } -} - -// Run starts to run the queue -func (c *ChannelQueue) Run() error { - var i int - var datas = make([]*IndexerData, 0, c.batchNumber) - for { - select { - case data := <-c.queue: - if data.IsDelete { - _ = c.indexer.Delete(data.IDs...) - continue - } - - datas = append(datas, data) - if len(datas) >= c.batchNumber { - _ = c.indexer.Index(datas) - // TODO: save the point - datas = make([]*IndexerData, 0, c.batchNumber) - } - case <-time.After(time.Millisecond * 100): - i++ - if i >= 3 && len(datas) > 0 { - _ = c.indexer.Index(datas) - // TODO: save the point - datas = make([]*IndexerData, 0, c.batchNumber) - } - } - } -} - -// Push will push the indexer data to queue -func (c *ChannelQueue) Push(data *IndexerData) error { - c.queue <- data - return nil -} diff --git a/modules/indexer/issues/queue_disk.go b/modules/indexer/issues/queue_disk.go deleted file mode 100644 index d6187f2acb..0000000000 --- a/modules/indexer/issues/queue_disk.go +++ /dev/null @@ -1,104 +0,0 @@ -// Copyright 2019 The Gitea Authors. All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package issues - -import ( - "encoding/json" - "time" - - "code.gitea.io/gitea/modules/log" - - "gitea.com/lunny/levelqueue" -) - -var ( - _ Queue = &LevelQueue{} -) - -// LevelQueue implements a disk library queue -type LevelQueue struct { - indexer Indexer - queue *levelqueue.Queue - batchNumber int -} - -// NewLevelQueue creates a ledis local queue -func NewLevelQueue(indexer Indexer, dataDir string, batchNumber int) (*LevelQueue, error) { - queue, err := levelqueue.Open(dataDir) - if err != nil { - return nil, err - } - - return &LevelQueue{ - indexer: indexer, - queue: queue, - batchNumber: batchNumber, - }, nil -} - -// Run starts to run the queue -func (l *LevelQueue) Run() error { - var i int - var datas = make([]*IndexerData, 0, l.batchNumber) - for { - i++ - if len(datas) > l.batchNumber || (len(datas) > 0 && i > 3) { - _ = l.indexer.Index(datas) - datas = make([]*IndexerData, 0, l.batchNumber) - i = 0 - continue - } - - bs, err := l.queue.RPop() - if err != nil { - if err != levelqueue.ErrNotFound { - log.Error("RPop: %v", err) - } - time.Sleep(time.Millisecond * 100) - continue - } - - if len(bs) == 0 { - time.Sleep(time.Millisecond * 100) - continue - } - - var data IndexerData - err = json.Unmarshal(bs, &data) - if err != nil { - log.Error("Unmarshal: %v", err) - time.Sleep(time.Millisecond * 100) - continue - } - - log.Trace("LevelQueue: task found: %#v", data) - - if data.IsDelete { - if data.ID > 0 { - if err = l.indexer.Delete(data.ID); err != nil { - log.Error("indexer.Delete: %v", err) - } - } else if len(data.IDs) > 0 { - if err = l.indexer.Delete(data.IDs...); err != nil { - log.Error("indexer.Delete: %v", err) - } - } - time.Sleep(time.Millisecond * 10) - continue - } - - datas = append(datas, &data) - time.Sleep(time.Millisecond * 10) - } -} - -// Push will push the indexer data to queue -func (l *LevelQueue) Push(data *IndexerData) error { - bs, err := json.Marshal(data) - if err != nil { - return err - } - return l.queue.LPush(bs) -} diff --git a/modules/indexer/issues/queue_redis.go b/modules/indexer/issues/queue_redis.go deleted file mode 100644 index 0344d3c87a..0000000000 --- a/modules/indexer/issues/queue_redis.go +++ /dev/null @@ -1,146 +0,0 @@ -// Copyright 2019 The Gitea Authors. All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package issues - -import ( - "encoding/json" - "errors" - "strconv" - "strings" - "time" - - "code.gitea.io/gitea/modules/log" - - "github.com/go-redis/redis" -) - -var ( - _ Queue = &RedisQueue{} -) - -type redisClient interface { - RPush(key string, args ...interface{}) *redis.IntCmd - LPop(key string) *redis.StringCmd - Ping() *redis.StatusCmd -} - -// RedisQueue redis queue -type RedisQueue struct { - client redisClient - queueName string - indexer Indexer - batchNumber int -} - -func parseConnStr(connStr string) (addrs, password string, dbIdx int, err error) { - fields := strings.Fields(connStr) - for _, f := range fields { - items := strings.SplitN(f, "=", 2) - if len(items) < 2 { - continue - } - switch strings.ToLower(items[0]) { - case "addrs": - addrs = items[1] - case "password": - password = items[1] - case "db": - dbIdx, err = strconv.Atoi(items[1]) - if err != nil { - return - } - } - } - return -} - -// NewRedisQueue creates single redis or cluster redis queue -func NewRedisQueue(addrs string, password string, dbIdx int, indexer Indexer, batchNumber int) (*RedisQueue, error) { - dbs := strings.Split(addrs, ",") - var queue = RedisQueue{ - queueName: "issue_indexer_queue", - indexer: indexer, - batchNumber: batchNumber, - } - if len(dbs) == 0 { - return nil, errors.New("no redis host found") - } else if len(dbs) == 1 { - queue.client = redis.NewClient(&redis.Options{ - Addr: strings.TrimSpace(dbs[0]), // use default Addr - Password: password, // no password set - DB: dbIdx, // use default DB - }) - } else { - queue.client = redis.NewClusterClient(&redis.ClusterOptions{ - Addrs: dbs, - }) - } - if err := queue.client.Ping().Err(); err != nil { - return nil, err - } - return &queue, nil -} - -// Run runs the redis queue -func (r *RedisQueue) Run() error { - var i int - var datas = make([]*IndexerData, 0, r.batchNumber) - for { - bs, err := r.client.LPop(r.queueName).Bytes() - if err != nil && err != redis.Nil { - log.Error("LPop faile: %v", err) - time.Sleep(time.Millisecond * 100) - continue - } - - i++ - if len(datas) > r.batchNumber || (len(datas) > 0 && i > 3) { - _ = r.indexer.Index(datas) - datas = make([]*IndexerData, 0, r.batchNumber) - i = 0 - } - - if len(bs) == 0 { - time.Sleep(time.Millisecond * 100) - continue - } - - var data IndexerData - err = json.Unmarshal(bs, &data) - if err != nil { - log.Error("Unmarshal: %v", err) - time.Sleep(time.Millisecond * 100) - continue - } - - log.Trace("RedisQueue: task found: %#v", data) - - if data.IsDelete { - if data.ID > 0 { - if err = r.indexer.Delete(data.ID); err != nil { - log.Error("indexer.Delete: %v", err) - } - } else if len(data.IDs) > 0 { - if err = r.indexer.Delete(data.IDs...); err != nil { - log.Error("indexer.Delete: %v", err) - } - } - time.Sleep(time.Millisecond * 100) - continue - } - - datas = append(datas, &data) - time.Sleep(time.Millisecond * 100) - } -} - -// Push implements Queue -func (r *RedisQueue) Push(data *IndexerData) error { - bs, err := json.Marshal(data) - if err != nil { - return err - } - return r.client.RPush(r.queueName, bs).Err() -} diff --git a/modules/queue/manager.go b/modules/queue/manager.go new file mode 100644 index 0000000000..88b2644848 --- /dev/null +++ b/modules/queue/manager.go @@ -0,0 +1,270 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "context" + "encoding/json" + "fmt" + "reflect" + "sort" + "sync" + "time" + + "code.gitea.io/gitea/modules/log" +) + +var manager *Manager + +// Manager is a queue manager +type Manager struct { + mutex sync.Mutex + + counter int64 + Queues map[int64]*ManagedQueue +} + +// ManagedQueue represents a working queue inheriting from Gitea. +type ManagedQueue struct { + mutex sync.Mutex + QID int64 + Queue Queue + Type Type + Name string + Configuration interface{} + ExemplarType string + Pool ManagedPool + counter int64 + PoolWorkers map[int64]*PoolWorkers +} + +// ManagedPool is a simple interface to get certain details from a worker pool +type ManagedPool interface { + AddWorkers(number int, timeout time.Duration) context.CancelFunc + NumberOfWorkers() int + MaxNumberOfWorkers() int + SetMaxNumberOfWorkers(int) + BoostTimeout() time.Duration + BlockTimeout() time.Duration + BoostWorkers() int + SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) +} + +// ManagedQueueList implements the sort.Interface +type ManagedQueueList []*ManagedQueue + +// PoolWorkers represents a working queue inheriting from Gitea. +type PoolWorkers struct { + PID int64 + Workers int + Start time.Time + Timeout time.Time + HasTimeout bool + Cancel context.CancelFunc +} + +// PoolWorkersList implements the sort.Interface +type PoolWorkersList []*PoolWorkers + +func init() { + _ = GetManager() +} + +// GetManager returns a Manager and initializes one as singleton if there's none yet +func GetManager() *Manager { + if manager == nil { + manager = &Manager{ + Queues: make(map[int64]*ManagedQueue), + } + } + return manager +} + +// Add adds a queue to this manager +func (m *Manager) Add(queue Queue, + t Type, + configuration, + exemplar interface{}, + pool ManagedPool) int64 { + + cfg, _ := json.Marshal(configuration) + mq := &ManagedQueue{ + Queue: queue, + Type: t, + Configuration: string(cfg), + ExemplarType: reflect.TypeOf(exemplar).String(), + PoolWorkers: make(map[int64]*PoolWorkers), + Pool: pool, + } + m.mutex.Lock() + m.counter++ + mq.QID = m.counter + mq.Name = fmt.Sprintf("queue-%d", mq.QID) + if named, ok := queue.(Named); ok { + mq.Name = named.Name() + } + m.Queues[mq.QID] = mq + m.mutex.Unlock() + log.Trace("Queue Manager registered: %s (QID: %d)", mq.Name, mq.QID) + return mq.QID +} + +// Remove a queue from the Manager +func (m *Manager) Remove(qid int64) { + m.mutex.Lock() + delete(m.Queues, qid) + m.mutex.Unlock() + log.Trace("Queue Manager removed: QID: %d", qid) + +} + +// GetManagedQueue by qid +func (m *Manager) GetManagedQueue(qid int64) *ManagedQueue { + m.mutex.Lock() + defer m.mutex.Unlock() + return m.Queues[qid] +} + +// ManagedQueues returns the managed queues +func (m *Manager) ManagedQueues() []*ManagedQueue { + m.mutex.Lock() + mqs := make([]*ManagedQueue, 0, len(m.Queues)) + for _, mq := range m.Queues { + mqs = append(mqs, mq) + } + m.mutex.Unlock() + sort.Sort(ManagedQueueList(mqs)) + return mqs +} + +// Workers returns the poolworkers +func (q *ManagedQueue) Workers() []*PoolWorkers { + q.mutex.Lock() + workers := make([]*PoolWorkers, 0, len(q.PoolWorkers)) + for _, worker := range q.PoolWorkers { + workers = append(workers, worker) + } + q.mutex.Unlock() + sort.Sort(PoolWorkersList(workers)) + return workers +} + +// RegisterWorkers registers workers to this queue +func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, cancel context.CancelFunc) int64 { + q.mutex.Lock() + defer q.mutex.Unlock() + q.counter++ + q.PoolWorkers[q.counter] = &PoolWorkers{ + PID: q.counter, + Workers: number, + Start: start, + Timeout: timeout, + HasTimeout: hasTimeout, + Cancel: cancel, + } + return q.counter +} + +// CancelWorkers cancels pooled workers with pid +func (q *ManagedQueue) CancelWorkers(pid int64) { + q.mutex.Lock() + pw, ok := q.PoolWorkers[pid] + q.mutex.Unlock() + if !ok { + return + } + pw.Cancel() +} + +// RemoveWorkers deletes pooled workers with pid +func (q *ManagedQueue) RemoveWorkers(pid int64) { + q.mutex.Lock() + pw, ok := q.PoolWorkers[pid] + delete(q.PoolWorkers, pid) + q.mutex.Unlock() + if ok && pw.Cancel != nil { + pw.Cancel() + } +} + +// AddWorkers adds workers to the queue if it has registered an add worker function +func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.CancelFunc { + if q.Pool != nil { + // the cancel will be added to the pool workers description above + return q.Pool.AddWorkers(number, timeout) + } + return nil +} + +// NumberOfWorkers returns the number of workers in the queue +func (q *ManagedQueue) NumberOfWorkers() int { + if q.Pool != nil { + return q.Pool.NumberOfWorkers() + } + return -1 +} + +// MaxNumberOfWorkers returns the maximum number of workers for the pool +func (q *ManagedQueue) MaxNumberOfWorkers() int { + if q.Pool != nil { + return q.Pool.MaxNumberOfWorkers() + } + return 0 +} + +// BoostWorkers returns the number of workers for a boost +func (q *ManagedQueue) BoostWorkers() int { + if q.Pool != nil { + return q.Pool.BoostWorkers() + } + return -1 +} + +// BoostTimeout returns the timeout of the next boost +func (q *ManagedQueue) BoostTimeout() time.Duration { + if q.Pool != nil { + return q.Pool.BoostTimeout() + } + return 0 +} + +// BlockTimeout returns the timeout til the next boost +func (q *ManagedQueue) BlockTimeout() time.Duration { + if q.Pool != nil { + return q.Pool.BlockTimeout() + } + return 0 +} + +// SetSettings sets the setable boost values +func (q *ManagedQueue) SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) { + if q.Pool != nil { + q.Pool.SetSettings(maxNumberOfWorkers, boostWorkers, timeout) + } +} + +func (l ManagedQueueList) Len() int { + return len(l) +} + +func (l ManagedQueueList) Less(i, j int) bool { + return l[i].Name < l[j].Name +} + +func (l ManagedQueueList) Swap(i, j int) { + l[i], l[j] = l[j], l[i] +} + +func (l PoolWorkersList) Len() int { + return len(l) +} + +func (l PoolWorkersList) Less(i, j int) bool { + return l[i].Start.Before(l[j].Start) +} + +func (l PoolWorkersList) Swap(i, j int) { + l[i], l[j] = l[j], l[i] +} diff --git a/modules/queue/queue.go b/modules/queue/queue.go new file mode 100644 index 0000000000..d458a7d506 --- /dev/null +++ b/modules/queue/queue.go @@ -0,0 +1,133 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "context" + "encoding/json" + "fmt" + "reflect" +) + +// ErrInvalidConfiguration is called when there is invalid configuration for a queue +type ErrInvalidConfiguration struct { + cfg interface{} + err error +} + +func (err ErrInvalidConfiguration) Error() string { + if err.err != nil { + return fmt.Sprintf("Invalid Configuration Argument: %v: Error: %v", err.cfg, err.err) + } + return fmt.Sprintf("Invalid Configuration Argument: %v", err.cfg) +} + +// IsErrInvalidConfiguration checks if an error is an ErrInvalidConfiguration +func IsErrInvalidConfiguration(err error) bool { + _, ok := err.(ErrInvalidConfiguration) + return ok +} + +// Type is a type of Queue +type Type string + +// Data defines an type of queuable data +type Data interface{} + +// HandlerFunc is a function that takes a variable amount of data and processes it +type HandlerFunc func(...Data) + +// NewQueueFunc is a function that creates a queue +type NewQueueFunc func(handler HandlerFunc, config interface{}, exemplar interface{}) (Queue, error) + +// Shutdownable represents a queue that can be shutdown +type Shutdownable interface { + Shutdown() + Terminate() +} + +// Named represents a queue with a name +type Named interface { + Name() string +} + +// Queue defines an interface to save an issue indexer queue +type Queue interface { + Run(atShutdown, atTerminate func(context.Context, func())) + Push(Data) error +} + +// DummyQueueType is the type for the dummy queue +const DummyQueueType Type = "dummy" + +// NewDummyQueue creates a new DummyQueue +func NewDummyQueue(handler HandlerFunc, opts, exemplar interface{}) (Queue, error) { + return &DummyQueue{}, nil +} + +// DummyQueue represents an empty queue +type DummyQueue struct { +} + +// Run starts to run the queue +func (b *DummyQueue) Run(_, _ func(context.Context, func())) {} + +// Push pushes data to the queue +func (b *DummyQueue) Push(Data) error { + return nil +} + +func toConfig(exemplar, cfg interface{}) (interface{}, error) { + if reflect.TypeOf(cfg).AssignableTo(reflect.TypeOf(exemplar)) { + return cfg, nil + } + + configBytes, ok := cfg.([]byte) + if !ok { + configStr, ok := cfg.(string) + if !ok { + return nil, ErrInvalidConfiguration{cfg: cfg} + } + configBytes = []byte(configStr) + } + newVal := reflect.New(reflect.TypeOf(exemplar)) + if err := json.Unmarshal(configBytes, newVal.Interface()); err != nil { + return nil, ErrInvalidConfiguration{cfg: cfg, err: err} + } + return newVal.Elem().Interface(), nil +} + +var queuesMap = map[Type]NewQueueFunc{DummyQueueType: NewDummyQueue} + +// RegisteredTypes provides the list of requested types of queues +func RegisteredTypes() []Type { + types := make([]Type, len(queuesMap)) + i := 0 + for key := range queuesMap { + types[i] = key + i++ + } + return types +} + +// RegisteredTypesAsString provides the list of requested types of queues +func RegisteredTypesAsString() []string { + types := make([]string, len(queuesMap)) + i := 0 + for key := range queuesMap { + types[i] = string(key) + i++ + } + return types +} + +// NewQueue takes a queue Type and HandlerFunc some options and possibly an exemplar and returns a Queue or an error +func NewQueue(queueType Type, handlerFunc HandlerFunc, opts, exemplar interface{}) (Queue, error) { + newFn, ok := queuesMap[queueType] + if !ok { + return nil, fmt.Errorf("Unsupported queue type: %v", queueType) + } + return newFn(handlerFunc, opts, exemplar) +} diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go new file mode 100644 index 0000000000..c8f8a53804 --- /dev/null +++ b/modules/queue/queue_channel.go @@ -0,0 +1,106 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "context" + "fmt" + "reflect" + "time" + + "code.gitea.io/gitea/modules/log" +) + +// ChannelQueueType is the type for channel queue +const ChannelQueueType Type = "channel" + +// ChannelQueueConfiguration is the configuration for a ChannelQueue +type ChannelQueueConfiguration struct { + QueueLength int + BatchLength int + Workers int + MaxWorkers int + BlockTimeout time.Duration + BoostTimeout time.Duration + BoostWorkers int + Name string +} + +// ChannelQueue implements +type ChannelQueue struct { + pool *WorkerPool + exemplar interface{} + workers int + name string +} + +// NewChannelQueue create a memory channel queue +func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) { + configInterface, err := toConfig(ChannelQueueConfiguration{}, cfg) + if err != nil { + return nil, err + } + config := configInterface.(ChannelQueueConfiguration) + if config.BatchLength == 0 { + config.BatchLength = 1 + } + dataChan := make(chan Data, config.QueueLength) + + ctx, cancel := context.WithCancel(context.Background()) + queue := &ChannelQueue{ + pool: &WorkerPool{ + baseCtx: ctx, + cancel: cancel, + batchLength: config.BatchLength, + handle: handle, + dataChan: dataChan, + blockTimeout: config.BlockTimeout, + boostTimeout: config.BoostTimeout, + boostWorkers: config.BoostWorkers, + maxNumberOfWorkers: config.MaxWorkers, + }, + exemplar: exemplar, + workers: config.Workers, + name: config.Name, + } + queue.pool.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar, queue.pool) + return queue, nil +} + +// Run starts to run the queue +func (c *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { + atShutdown(context.Background(), func() { + log.Warn("ChannelQueue: %s is not shutdownable!", c.name) + }) + atTerminate(context.Background(), func() { + log.Warn("ChannelQueue: %s is not terminatable!", c.name) + }) + go func() { + _ = c.pool.AddWorkers(c.workers, 0) + }() +} + +// Push will push data into the queue +func (c *ChannelQueue) Push(data Data) error { + if c.exemplar != nil { + // Assert data is of same type as r.exemplar + t := reflect.TypeOf(data) + exemplarType := reflect.TypeOf(c.exemplar) + if !t.AssignableTo(exemplarType) || data == nil { + return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, c.exemplar, c.name) + } + } + c.pool.Push(data) + return nil +} + +// Name returns the name of this queue +func (c *ChannelQueue) Name() string { + return c.name +} + +func init() { + queuesMap[ChannelQueueType] = NewChannelQueue +} diff --git a/modules/queue/queue_channel_test.go b/modules/queue/queue_channel_test.go new file mode 100644 index 0000000000..fafc1e3303 --- /dev/null +++ b/modules/queue/queue_channel_test.go @@ -0,0 +1,91 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestChannelQueue(t *testing.T) { + handleChan := make(chan *testData) + handle := func(data ...Data) { + for _, datum := range data { + testDatum := datum.(*testData) + handleChan <- testDatum + } + } + + nilFn := func(_ context.Context, _ func()) {} + + queue, err := NewChannelQueue(handle, + ChannelQueueConfiguration{ + QueueLength: 20, + Workers: 1, + MaxWorkers: 10, + BlockTimeout: 1 * time.Second, + BoostTimeout: 5 * time.Minute, + BoostWorkers: 5, + }, &testData{}) + assert.NoError(t, err) + + go queue.Run(nilFn, nilFn) + + test1 := testData{"A", 1} + go queue.Push(&test1) + result1 := <-handleChan + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) + + err = queue.Push(test1) + assert.Error(t, err) +} + +func TestChannelQueue_Batch(t *testing.T) { + handleChan := make(chan *testData) + handle := func(data ...Data) { + assert.True(t, len(data) == 2) + for _, datum := range data { + testDatum := datum.(*testData) + handleChan <- testDatum + } + } + + nilFn := func(_ context.Context, _ func()) {} + + queue, err := NewChannelQueue(handle, + ChannelQueueConfiguration{ + QueueLength: 20, + BatchLength: 2, + Workers: 1, + MaxWorkers: 10, + BlockTimeout: 1 * time.Second, + BoostTimeout: 5 * time.Minute, + BoostWorkers: 5, + }, &testData{}) + assert.NoError(t, err) + + go queue.Run(nilFn, nilFn) + + test1 := testData{"A", 1} + test2 := testData{"B", 2} + + queue.Push(&test1) + go queue.Push(&test2) + + result1 := <-handleChan + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) + + result2 := <-handleChan + assert.Equal(t, test2.TestString, result2.TestString) + assert.Equal(t, test2.TestInt, result2.TestInt) + + err = queue.Push(test1) + assert.Error(t, err) +} diff --git a/modules/queue/queue_disk.go b/modules/queue/queue_disk.go new file mode 100644 index 0000000000..98e7b24e42 --- /dev/null +++ b/modules/queue/queue_disk.go @@ -0,0 +1,213 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "context" + "encoding/json" + "fmt" + "reflect" + "sync" + "time" + + "code.gitea.io/gitea/modules/log" + + "gitea.com/lunny/levelqueue" +) + +// LevelQueueType is the type for level queue +const LevelQueueType Type = "level" + +// LevelQueueConfiguration is the configuration for a LevelQueue +type LevelQueueConfiguration struct { + DataDir string + QueueLength int + BatchLength int + Workers int + MaxWorkers int + BlockTimeout time.Duration + BoostTimeout time.Duration + BoostWorkers int + Name string +} + +// LevelQueue implements a disk library queue +type LevelQueue struct { + pool *WorkerPool + queue *levelqueue.Queue + closed chan struct{} + terminated chan struct{} + lock sync.Mutex + exemplar interface{} + workers int + name string +} + +// NewLevelQueue creates a ledis local queue +func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) { + configInterface, err := toConfig(LevelQueueConfiguration{}, cfg) + if err != nil { + return nil, err + } + config := configInterface.(LevelQueueConfiguration) + + internal, err := levelqueue.Open(config.DataDir) + if err != nil { + return nil, err + } + + dataChan := make(chan Data, config.QueueLength) + ctx, cancel := context.WithCancel(context.Background()) + + queue := &LevelQueue{ + pool: &WorkerPool{ + baseCtx: ctx, + cancel: cancel, + batchLength: config.BatchLength, + handle: handle, + dataChan: dataChan, + blockTimeout: config.BlockTimeout, + boostTimeout: config.BoostTimeout, + boostWorkers: config.BoostWorkers, + maxNumberOfWorkers: config.MaxWorkers, + }, + queue: internal, + exemplar: exemplar, + closed: make(chan struct{}), + terminated: make(chan struct{}), + workers: config.Workers, + name: config.Name, + } + queue.pool.qid = GetManager().Add(queue, LevelQueueType, config, exemplar, queue.pool) + return queue, nil +} + +// Run starts to run the queue +func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { + atShutdown(context.Background(), l.Shutdown) + atTerminate(context.Background(), l.Terminate) + + go func() { + _ = l.pool.AddWorkers(l.workers, 0) + }() + + go l.readToChan() + + log.Trace("LevelQueue: %s Waiting til closed", l.name) + <-l.closed + + log.Trace("LevelQueue: %s Waiting til done", l.name) + l.pool.Wait() + + log.Trace("LevelQueue: %s Waiting til cleaned", l.name) + ctx, cancel := context.WithCancel(context.Background()) + atTerminate(ctx, cancel) + l.pool.CleanUp(ctx) + cancel() + log.Trace("LevelQueue: %s Cleaned", l.name) + +} + +func (l *LevelQueue) readToChan() { + for { + select { + case <-l.closed: + // tell the pool to shutdown. + l.pool.cancel() + return + default: + bs, err := l.queue.RPop() + if err != nil { + if err != levelqueue.ErrNotFound { + log.Error("LevelQueue: %s Error on RPop: %v", l.name, err) + } + time.Sleep(time.Millisecond * 100) + continue + } + + if len(bs) == 0 { + time.Sleep(time.Millisecond * 100) + continue + } + + var data Data + if l.exemplar != nil { + t := reflect.TypeOf(l.exemplar) + n := reflect.New(t) + ne := n.Elem() + err = json.Unmarshal(bs, ne.Addr().Interface()) + data = ne.Interface().(Data) + } else { + err = json.Unmarshal(bs, &data) + } + if err != nil { + log.Error("LevelQueue: %s Failed to unmarshal with error: %v", l.name, err) + time.Sleep(time.Millisecond * 100) + continue + } + + log.Trace("LevelQueue %s: Task found: %#v", l.name, data) + l.pool.Push(data) + + } + } +} + +// Push will push the indexer data to queue +func (l *LevelQueue) Push(data Data) error { + if l.exemplar != nil { + // Assert data is of same type as r.exemplar + value := reflect.ValueOf(data) + t := value.Type() + exemplarType := reflect.ValueOf(l.exemplar).Type() + if !t.AssignableTo(exemplarType) || data == nil { + return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, l.exemplar, l.name) + } + } + bs, err := json.Marshal(data) + if err != nil { + return err + } + return l.queue.LPush(bs) +} + +// Shutdown this queue and stop processing +func (l *LevelQueue) Shutdown() { + l.lock.Lock() + defer l.lock.Unlock() + log.Trace("LevelQueue: %s Shutdown", l.name) + select { + case <-l.closed: + default: + close(l.closed) + } +} + +// Terminate this queue and close the queue +func (l *LevelQueue) Terminate() { + log.Trace("LevelQueue: %s Terminating", l.name) + l.Shutdown() + l.lock.Lock() + select { + case <-l.terminated: + l.lock.Unlock() + default: + close(l.terminated) + l.lock.Unlock() + if err := l.queue.Close(); err != nil && err.Error() != "leveldb: closed" { + log.Error("Error whilst closing internal queue in %s: %v", l.name, err) + } + + } +} + +// Name returns the name of this queue +func (l *LevelQueue) Name() string { + return l.name +} + +func init() { + queuesMap[LevelQueueType] = NewLevelQueue +} diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go new file mode 100644 index 0000000000..895c8ce918 --- /dev/null +++ b/modules/queue/queue_disk_channel.go @@ -0,0 +1,193 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "context" + "time" + + "code.gitea.io/gitea/modules/log" +) + +// PersistableChannelQueueType is the type for persistable queue +const PersistableChannelQueueType Type = "persistable-channel" + +// PersistableChannelQueueConfiguration is the configuration for a PersistableChannelQueue +type PersistableChannelQueueConfiguration struct { + Name string + DataDir string + BatchLength int + QueueLength int + Timeout time.Duration + MaxAttempts int + Workers int + MaxWorkers int + BlockTimeout time.Duration + BoostTimeout time.Duration + BoostWorkers int +} + +// PersistableChannelQueue wraps a channel queue and level queue together +type PersistableChannelQueue struct { + *ChannelQueue + delayedStarter + closed chan struct{} +} + +// NewPersistableChannelQueue creates a wrapped batched channel queue with persistable level queue backend when shutting down +// This differs from a wrapped queue in that the persistent queue is only used to persist at shutdown/terminate +func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) { + configInterface, err := toConfig(PersistableChannelQueueConfiguration{}, cfg) + if err != nil { + return nil, err + } + config := configInterface.(PersistableChannelQueueConfiguration) + + channelQueue, err := NewChannelQueue(handle, ChannelQueueConfiguration{ + QueueLength: config.QueueLength, + BatchLength: config.BatchLength, + Workers: config.Workers, + MaxWorkers: config.MaxWorkers, + BlockTimeout: config.BlockTimeout, + BoostTimeout: config.BoostTimeout, + BoostWorkers: config.BoostWorkers, + Name: config.Name + "-channel", + }, exemplar) + if err != nil { + return nil, err + } + + // the level backend only needs temporary workers to catch up with the previously dropped work + levelCfg := LevelQueueConfiguration{ + DataDir: config.DataDir, + QueueLength: config.QueueLength, + BatchLength: config.BatchLength, + Workers: 1, + MaxWorkers: 6, + BlockTimeout: 1 * time.Second, + BoostTimeout: 5 * time.Minute, + BoostWorkers: 5, + Name: config.Name + "-level", + } + + levelQueue, err := NewLevelQueue(handle, levelCfg, exemplar) + if err == nil { + queue := &PersistableChannelQueue{ + ChannelQueue: channelQueue.(*ChannelQueue), + delayedStarter: delayedStarter{ + internal: levelQueue.(*LevelQueue), + name: config.Name, + }, + closed: make(chan struct{}), + } + _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar, nil) + return queue, nil + } + if IsErrInvalidConfiguration(err) { + // Retrying ain't gonna make this any better... + return nil, ErrInvalidConfiguration{cfg: cfg} + } + + queue := &PersistableChannelQueue{ + ChannelQueue: channelQueue.(*ChannelQueue), + delayedStarter: delayedStarter{ + cfg: levelCfg, + underlying: LevelQueueType, + timeout: config.Timeout, + maxAttempts: config.MaxAttempts, + name: config.Name, + }, + closed: make(chan struct{}), + } + _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar, nil) + return queue, nil +} + +// Name returns the name of this queue +func (p *PersistableChannelQueue) Name() string { + return p.delayedStarter.name +} + +// Push will push the indexer data to queue +func (p *PersistableChannelQueue) Push(data Data) error { + select { + case <-p.closed: + return p.internal.Push(data) + default: + return p.ChannelQueue.Push(data) + } +} + +// Run starts to run the queue +func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { + p.lock.Lock() + if p.internal == nil { + err := p.setInternal(atShutdown, p.ChannelQueue.pool.handle, p.exemplar) + p.lock.Unlock() + if err != nil { + log.Fatal("Unable to create internal queue for %s Error: %v", p.Name(), err) + return + } + } else { + p.lock.Unlock() + } + atShutdown(context.Background(), p.Shutdown) + atTerminate(context.Background(), p.Terminate) + + // Just run the level queue - we shut it down later + go p.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {}) + + go func() { + _ = p.ChannelQueue.pool.AddWorkers(p.workers, 0) + }() + + log.Trace("PersistableChannelQueue: %s Waiting til closed", p.delayedStarter.name) + <-p.closed + log.Trace("PersistableChannelQueue: %s Cancelling pools", p.delayedStarter.name) + p.ChannelQueue.pool.cancel() + p.internal.(*LevelQueue).pool.cancel() + log.Trace("PersistableChannelQueue: %s Waiting til done", p.delayedStarter.name) + p.ChannelQueue.pool.Wait() + p.internal.(*LevelQueue).pool.Wait() + // Redirect all remaining data in the chan to the internal channel + go func() { + log.Trace("PersistableChannelQueue: %s Redirecting remaining data", p.delayedStarter.name) + for data := range p.ChannelQueue.pool.dataChan { + _ = p.internal.Push(data) + } + log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", p.delayedStarter.name) + }() + log.Trace("PersistableChannelQueue: %s Done main loop", p.delayedStarter.name) +} + +// Shutdown processing this queue +func (p *PersistableChannelQueue) Shutdown() { + log.Trace("PersistableChannelQueue: %s Shutdown", p.delayedStarter.name) + select { + case <-p.closed: + default: + p.lock.Lock() + defer p.lock.Unlock() + if p.internal != nil { + p.internal.(*LevelQueue).Shutdown() + } + close(p.closed) + } +} + +// Terminate this queue and close the queue +func (p *PersistableChannelQueue) Terminate() { + log.Trace("PersistableChannelQueue: %s Terminating", p.delayedStarter.name) + p.Shutdown() + p.lock.Lock() + defer p.lock.Unlock() + if p.internal != nil { + p.internal.(*LevelQueue).Terminate() + } +} + +func init() { + queuesMap[PersistableChannelQueueType] = NewPersistableChannelQueue +} diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go new file mode 100644 index 0000000000..4ef68961c6 --- /dev/null +++ b/modules/queue/queue_disk_channel_test.go @@ -0,0 +1,117 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "context" + "io/ioutil" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestPersistableChannelQueue(t *testing.T) { + handleChan := make(chan *testData) + handle := func(data ...Data) { + assert.True(t, len(data) == 2) + for _, datum := range data { + testDatum := datum.(*testData) + handleChan <- testDatum + } + } + + queueShutdown := []func(){} + queueTerminate := []func(){} + + tmpDir, err := ioutil.TempDir("", "persistable-channel-queue-test-data") + assert.NoError(t, err) + defer os.RemoveAll(tmpDir) + + queue, err := NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{ + DataDir: tmpDir, + BatchLength: 2, + QueueLength: 20, + Workers: 1, + MaxWorkers: 10, + }, &testData{}) + assert.NoError(t, err) + + go queue.Run(func(_ context.Context, shutdown func()) { + queueShutdown = append(queueShutdown, shutdown) + }, func(_ context.Context, terminate func()) { + queueTerminate = append(queueTerminate, terminate) + }) + + test1 := testData{"A", 1} + test2 := testData{"B", 2} + + err = queue.Push(&test1) + assert.NoError(t, err) + go func() { + err = queue.Push(&test2) + assert.NoError(t, err) + }() + + result1 := <-handleChan + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) + + result2 := <-handleChan + assert.Equal(t, test2.TestString, result2.TestString) + assert.Equal(t, test2.TestInt, result2.TestInt) + + err = queue.Push(test1) + assert.Error(t, err) + + for _, callback := range queueShutdown { + callback() + } + time.Sleep(200 * time.Millisecond) + err = queue.Push(&test1) + assert.NoError(t, err) + err = queue.Push(&test2) + assert.NoError(t, err) + select { + case <-handleChan: + assert.Fail(t, "Handler processing should have stopped") + default: + } + for _, callback := range queueTerminate { + callback() + } + + // Reopen queue + queue, err = NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{ + DataDir: tmpDir, + BatchLength: 2, + QueueLength: 20, + Workers: 1, + MaxWorkers: 10, + }, &testData{}) + assert.NoError(t, err) + + go queue.Run(func(_ context.Context, shutdown func()) { + queueShutdown = append(queueShutdown, shutdown) + }, func(_ context.Context, terminate func()) { + queueTerminate = append(queueTerminate, terminate) + }) + + result3 := <-handleChan + assert.Equal(t, test1.TestString, result3.TestString) + assert.Equal(t, test1.TestInt, result3.TestInt) + + result4 := <-handleChan + assert.Equal(t, test2.TestString, result4.TestString) + assert.Equal(t, test2.TestInt, result4.TestInt) + for _, callback := range queueShutdown { + callback() + } + for _, callback := range queueTerminate { + callback() + } + +} diff --git a/modules/queue/queue_disk_test.go b/modules/queue/queue_disk_test.go new file mode 100644 index 0000000000..c5959d606f --- /dev/null +++ b/modules/queue/queue_disk_test.go @@ -0,0 +1,126 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "context" + "io/ioutil" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestLevelQueue(t *testing.T) { + handleChan := make(chan *testData) + handle := func(data ...Data) { + assert.True(t, len(data) == 2) + for _, datum := range data { + testDatum := datum.(*testData) + handleChan <- testDatum + } + } + + queueShutdown := []func(){} + queueTerminate := []func(){} + + tmpDir, err := ioutil.TempDir("", "level-queue-test-data") + assert.NoError(t, err) + defer os.RemoveAll(tmpDir) + + queue, err := NewLevelQueue(handle, LevelQueueConfiguration{ + DataDir: tmpDir, + BatchLength: 2, + Workers: 1, + MaxWorkers: 10, + QueueLength: 20, + BlockTimeout: 1 * time.Second, + BoostTimeout: 5 * time.Minute, + BoostWorkers: 5, + }, &testData{}) + assert.NoError(t, err) + + go queue.Run(func(_ context.Context, shutdown func()) { + queueShutdown = append(queueShutdown, shutdown) + }, func(_ context.Context, terminate func()) { + queueTerminate = append(queueTerminate, terminate) + }) + + test1 := testData{"A", 1} + test2 := testData{"B", 2} + + err = queue.Push(&test1) + assert.NoError(t, err) + go func() { + err = queue.Push(&test2) + assert.NoError(t, err) + }() + + result1 := <-handleChan + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) + + result2 := <-handleChan + assert.Equal(t, test2.TestString, result2.TestString) + assert.Equal(t, test2.TestInt, result2.TestInt) + + err = queue.Push(test1) + assert.Error(t, err) + + for _, callback := range queueShutdown { + callback() + } + time.Sleep(200 * time.Millisecond) + err = queue.Push(&test1) + assert.NoError(t, err) + err = queue.Push(&test2) + assert.NoError(t, err) + select { + case <-handleChan: + assert.Fail(t, "Handler processing should have stopped") + default: + } + for _, callback := range queueTerminate { + callback() + } + + // Reopen queue + queue, err = NewWrappedQueue(handle, + WrappedQueueConfiguration{ + Underlying: LevelQueueType, + Config: LevelQueueConfiguration{ + DataDir: tmpDir, + BatchLength: 2, + Workers: 1, + MaxWorkers: 10, + QueueLength: 20, + BlockTimeout: 1 * time.Second, + BoostTimeout: 5 * time.Minute, + BoostWorkers: 5, + }, + }, &testData{}) + assert.NoError(t, err) + + go queue.Run(func(_ context.Context, shutdown func()) { + queueShutdown = append(queueShutdown, shutdown) + }, func(_ context.Context, terminate func()) { + queueTerminate = append(queueTerminate, terminate) + }) + + result3 := <-handleChan + assert.Equal(t, test1.TestString, result3.TestString) + assert.Equal(t, test1.TestInt, result3.TestInt) + + result4 := <-handleChan + assert.Equal(t, test2.TestString, result4.TestString) + assert.Equal(t, test2.TestInt, result4.TestInt) + for _, callback := range queueShutdown { + callback() + } + for _, callback := range queueTerminate { + callback() + } +} diff --git a/modules/queue/queue_redis.go b/modules/queue/queue_redis.go new file mode 100644 index 0000000000..14e68937a5 --- /dev/null +++ b/modules/queue/queue_redis.go @@ -0,0 +1,234 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "reflect" + "strings" + "sync" + "time" + + "code.gitea.io/gitea/modules/log" + + "github.com/go-redis/redis" +) + +// RedisQueueType is the type for redis queue +const RedisQueueType Type = "redis" + +type redisClient interface { + RPush(key string, args ...interface{}) *redis.IntCmd + LPop(key string) *redis.StringCmd + Ping() *redis.StatusCmd + Close() error +} + +// RedisQueue redis queue +type RedisQueue struct { + pool *WorkerPool + client redisClient + queueName string + closed chan struct{} + terminated chan struct{} + exemplar interface{} + workers int + name string + lock sync.Mutex +} + +// RedisQueueConfiguration is the configuration for the redis queue +type RedisQueueConfiguration struct { + Network string + Addresses string + Password string + DBIndex int + BatchLength int + QueueLength int + QueueName string + Workers int + MaxWorkers int + BlockTimeout time.Duration + BoostTimeout time.Duration + BoostWorkers int + Name string +} + +// NewRedisQueue creates single redis or cluster redis queue +func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) { + configInterface, err := toConfig(RedisQueueConfiguration{}, cfg) + if err != nil { + return nil, err + } + config := configInterface.(RedisQueueConfiguration) + + dbs := strings.Split(config.Addresses, ",") + + dataChan := make(chan Data, config.QueueLength) + ctx, cancel := context.WithCancel(context.Background()) + + var queue = &RedisQueue{ + pool: &WorkerPool{ + baseCtx: ctx, + cancel: cancel, + batchLength: config.BatchLength, + handle: handle, + dataChan: dataChan, + blockTimeout: config.BlockTimeout, + boostTimeout: config.BoostTimeout, + boostWorkers: config.BoostWorkers, + maxNumberOfWorkers: config.MaxWorkers, + }, + queueName: config.QueueName, + exemplar: exemplar, + closed: make(chan struct{}), + workers: config.Workers, + name: config.Name, + } + if len(dbs) == 0 { + return nil, errors.New("no redis host specified") + } else if len(dbs) == 1 { + queue.client = redis.NewClient(&redis.Options{ + Network: config.Network, + Addr: strings.TrimSpace(dbs[0]), // use default Addr + Password: config.Password, // no password set + DB: config.DBIndex, // use default DB + }) + } else { + queue.client = redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: dbs, + }) + } + if err := queue.client.Ping().Err(); err != nil { + return nil, err + } + queue.pool.qid = GetManager().Add(queue, RedisQueueType, config, exemplar, queue.pool) + + return queue, nil +} + +// Run runs the redis queue +func (r *RedisQueue) Run(atShutdown, atTerminate func(context.Context, func())) { + atShutdown(context.Background(), r.Shutdown) + atTerminate(context.Background(), r.Terminate) + + go func() { + _ = r.pool.AddWorkers(r.workers, 0) + }() + + go r.readToChan() + + log.Trace("RedisQueue: %s Waiting til closed", r.name) + <-r.closed + log.Trace("RedisQueue: %s Waiting til done", r.name) + r.pool.Wait() + + log.Trace("RedisQueue: %s Waiting til cleaned", r.name) + ctx, cancel := context.WithCancel(context.Background()) + atTerminate(ctx, cancel) + r.pool.CleanUp(ctx) + cancel() +} + +func (r *RedisQueue) readToChan() { + for { + select { + case <-r.closed: + // tell the pool to shutdown + r.pool.cancel() + return + default: + bs, err := r.client.LPop(r.queueName).Bytes() + if err != nil && err != redis.Nil { + log.Error("RedisQueue: %s Error on LPop: %v", r.name, err) + time.Sleep(time.Millisecond * 100) + continue + } + + if len(bs) == 0 { + time.Sleep(time.Millisecond * 100) + continue + } + + var data Data + if r.exemplar != nil { + t := reflect.TypeOf(r.exemplar) + n := reflect.New(t) + ne := n.Elem() + err = json.Unmarshal(bs, ne.Addr().Interface()) + data = ne.Interface().(Data) + } else { + err = json.Unmarshal(bs, &data) + } + if err != nil { + log.Error("RedisQueue: %s Error on Unmarshal: %v", r.name, err) + time.Sleep(time.Millisecond * 100) + continue + } + + log.Trace("RedisQueue: %s Task found: %#v", r.name, data) + r.pool.Push(data) + } + } +} + +// Push implements Queue +func (r *RedisQueue) Push(data Data) error { + if r.exemplar != nil { + // Assert data is of same type as r.exemplar + value := reflect.ValueOf(data) + t := value.Type() + exemplarType := reflect.ValueOf(r.exemplar).Type() + if !t.AssignableTo(exemplarType) || data == nil { + return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, r.exemplar, r.name) + } + } + bs, err := json.Marshal(data) + if err != nil { + return err + } + return r.client.RPush(r.queueName, bs).Err() +} + +// Shutdown processing from this queue +func (r *RedisQueue) Shutdown() { + log.Trace("Shutdown: %s", r.name) + r.lock.Lock() + select { + case <-r.closed: + default: + close(r.closed) + } + r.lock.Unlock() +} + +// Terminate this queue and close the queue +func (r *RedisQueue) Terminate() { + log.Trace("Terminating: %s", r.name) + r.Shutdown() + r.lock.Lock() + select { + case <-r.terminated: + r.lock.Unlock() + default: + close(r.terminated) + r.lock.Unlock() + if err := r.client.Close(); err != nil { + log.Error("Error whilst closing internal redis client in %s: %v", r.name, err) + } + } +} + +// Name returns the name of this queue +func (r *RedisQueue) Name() string { + return r.name +} + +func init() { + queuesMap[RedisQueueType] = NewRedisQueue +} diff --git a/modules/queue/queue_test.go b/modules/queue/queue_test.go new file mode 100644 index 0000000000..3608f68d3d --- /dev/null +++ b/modules/queue/queue_test.go @@ -0,0 +1,43 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" +) + +type testData struct { + TestString string + TestInt int +} + +func TestToConfig(t *testing.T) { + cfg := testData{ + TestString: "Config", + TestInt: 10, + } + exemplar := testData{} + + cfg2I, err := toConfig(exemplar, cfg) + assert.NoError(t, err) + cfg2, ok := (cfg2I).(testData) + assert.True(t, ok) + assert.NotEqual(t, cfg2, exemplar) + assert.Equal(t, &cfg, &cfg2) + + cfgString, err := json.Marshal(cfg) + assert.NoError(t, err) + + cfg3I, err := toConfig(exemplar, cfgString) + assert.NoError(t, err) + cfg3, ok := (cfg3I).(testData) + assert.True(t, ok) + assert.Equal(t, cfg.TestString, cfg3.TestString) + assert.Equal(t, cfg.TestInt, cfg3.TestInt) + assert.NotEqual(t, cfg3, exemplar) +} diff --git a/modules/queue/queue_wrapped.go b/modules/queue/queue_wrapped.go new file mode 100644 index 0000000000..d0b93b54d0 --- /dev/null +++ b/modules/queue/queue_wrapped.go @@ -0,0 +1,206 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "context" + "fmt" + "reflect" + "sync" + "time" + + "code.gitea.io/gitea/modules/log" +) + +// WrappedQueueType is the type for a wrapped delayed starting queue +const WrappedQueueType Type = "wrapped" + +// WrappedQueueConfiguration is the configuration for a WrappedQueue +type WrappedQueueConfiguration struct { + Underlying Type + Timeout time.Duration + MaxAttempts int + Config interface{} + QueueLength int + Name string +} + +type delayedStarter struct { + lock sync.Mutex + internal Queue + underlying Type + cfg interface{} + timeout time.Duration + maxAttempts int + name string +} + +// setInternal must be called with the lock locked. +func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), handle HandlerFunc, exemplar interface{}) error { + var ctx context.Context + var cancel context.CancelFunc + if q.timeout > 0 { + ctx, cancel = context.WithTimeout(context.Background(), q.timeout) + } else { + ctx, cancel = context.WithCancel(context.Background()) + } + + defer cancel() + // Ensure we also stop at shutdown + atShutdown(ctx, func() { + cancel() + }) + + i := 1 + for q.internal == nil { + select { + case <-ctx.Done(): + return fmt.Errorf("Timedout creating queue %v with cfg %v in %s", q.underlying, q.cfg, q.name) + default: + queue, err := NewQueue(q.underlying, handle, q.cfg, exemplar) + if err == nil { + q.internal = queue + q.lock.Unlock() + break + } + if err.Error() != "resource temporarily unavailable" { + log.Warn("[Attempt: %d] Failed to create queue: %v for %s cfg: %v error: %v", i, q.underlying, q.name, q.cfg, err) + } + i++ + if q.maxAttempts > 0 && i > q.maxAttempts { + return fmt.Errorf("Unable to create queue %v for %s with cfg %v by max attempts: error: %v", q.underlying, q.name, q.cfg, err) + } + sleepTime := 100 * time.Millisecond + if q.timeout > 0 && q.maxAttempts > 0 { + sleepTime = (q.timeout - 200*time.Millisecond) / time.Duration(q.maxAttempts) + } + t := time.NewTimer(sleepTime) + select { + case <-ctx.Done(): + t.Stop() + case <-t.C: + } + } + } + return nil +} + +// WrappedQueue wraps a delayed starting queue +type WrappedQueue struct { + delayedStarter + handle HandlerFunc + exemplar interface{} + channel chan Data +} + +// NewWrappedQueue will attempt to create a queue of the provided type, +// but if there is a problem creating this queue it will instead create +// a WrappedQueue with delayed startup of the queue instead and a +// channel which will be redirected to the queue +func NewWrappedQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) { + configInterface, err := toConfig(WrappedQueueConfiguration{}, cfg) + if err != nil { + return nil, err + } + config := configInterface.(WrappedQueueConfiguration) + + queue, err := NewQueue(config.Underlying, handle, config.Config, exemplar) + if err == nil { + // Just return the queue there is no need to wrap + return queue, nil + } + if IsErrInvalidConfiguration(err) { + // Retrying ain't gonna make this any better... + return nil, ErrInvalidConfiguration{cfg: cfg} + } + + queue = &WrappedQueue{ + handle: handle, + channel: make(chan Data, config.QueueLength), + exemplar: exemplar, + delayedStarter: delayedStarter{ + cfg: config.Config, + underlying: config.Underlying, + timeout: config.Timeout, + maxAttempts: config.MaxAttempts, + name: config.Name, + }, + } + _ = GetManager().Add(queue, WrappedQueueType, config, exemplar, nil) + return queue, nil +} + +// Name returns the name of the queue +func (q *WrappedQueue) Name() string { + return q.name + "-wrapper" +} + +// Push will push the data to the internal channel checking it against the exemplar +func (q *WrappedQueue) Push(data Data) error { + if q.exemplar != nil { + // Assert data is of same type as r.exemplar + value := reflect.ValueOf(data) + t := value.Type() + exemplarType := reflect.ValueOf(q.exemplar).Type() + if !t.AssignableTo(exemplarType) || data == nil { + return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name) + } + } + q.channel <- data + return nil +} + +// Run starts to run the queue and attempts to create the internal queue +func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func())) { + q.lock.Lock() + if q.internal == nil { + err := q.setInternal(atShutdown, q.handle, q.exemplar) + q.lock.Unlock() + if err != nil { + log.Fatal("Unable to set the internal queue for %s Error: %v", q.Name(), err) + return + } + go func() { + for data := range q.channel { + _ = q.internal.Push(data) + } + }() + } else { + q.lock.Unlock() + } + + q.internal.Run(atShutdown, atTerminate) + log.Trace("WrappedQueue: %s Done", q.name) +} + +// Shutdown this queue and stop processing +func (q *WrappedQueue) Shutdown() { + log.Trace("WrappedQueue: %s Shutdown", q.name) + q.lock.Lock() + defer q.lock.Unlock() + if q.internal == nil { + return + } + if shutdownable, ok := q.internal.(Shutdownable); ok { + shutdownable.Shutdown() + } +} + +// Terminate this queue and close the queue +func (q *WrappedQueue) Terminate() { + log.Trace("WrappedQueue: %s Terminating", q.name) + q.lock.Lock() + defer q.lock.Unlock() + if q.internal == nil { + return + } + if shutdownable, ok := q.internal.(Shutdownable); ok { + shutdownable.Terminate() + } +} + +func init() { + queuesMap[WrappedQueueType] = NewWrappedQueue +} diff --git a/modules/queue/setting.go b/modules/queue/setting.go new file mode 100644 index 0000000000..d5a6b41882 --- /dev/null +++ b/modules/queue/setting.go @@ -0,0 +1,75 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "encoding/json" + "fmt" + + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" +) + +func validType(t string) (Type, error) { + if len(t) == 0 { + return PersistableChannelQueueType, nil + } + for _, typ := range RegisteredTypes() { + if t == string(typ) { + return typ, nil + } + } + return PersistableChannelQueueType, fmt.Errorf("Unknown queue type: %s defaulting to %s", t, string(PersistableChannelQueueType)) +} + +// CreateQueue for name with provided handler and exemplar +func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue { + q := setting.GetQueueSettings(name) + opts := make(map[string]interface{}) + opts["Name"] = name + opts["QueueLength"] = q.Length + opts["BatchLength"] = q.BatchLength + opts["DataDir"] = q.DataDir + opts["Addresses"] = q.Addresses + opts["Network"] = q.Network + opts["Password"] = q.Password + opts["DBIndex"] = q.DBIndex + opts["QueueName"] = q.QueueName + opts["Workers"] = q.Workers + opts["MaxWorkers"] = q.MaxWorkers + opts["BlockTimeout"] = q.BlockTimeout + opts["BoostTimeout"] = q.BoostTimeout + opts["BoostWorkers"] = q.BoostWorkers + + typ, err := validType(q.Type) + if err != nil { + log.Error("Invalid type %s provided for queue named %s defaulting to %s", q.Type, name, string(typ)) + } + + cfg, err := json.Marshal(opts) + if err != nil { + log.Error("Unable to marshall generic options: %v Error: %v", opts, err) + log.Error("Unable to create queue for %s", name, err) + return nil + } + + returnable, err := NewQueue(typ, handle, cfg, exemplar) + if q.WrapIfNecessary && err != nil { + log.Warn("Unable to create queue for %s: %v", name, err) + log.Warn("Attempting to create wrapped queue") + returnable, err = NewQueue(WrappedQueueType, handle, WrappedQueueConfiguration{ + Underlying: Type(q.Type), + Timeout: q.Timeout, + MaxAttempts: q.MaxAttempts, + Config: cfg, + QueueLength: q.Length, + }, exemplar) + } + if err != nil { + log.Error("Unable to create queue for %s: %v", name, err) + return nil + } + return returnable +} diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go new file mode 100644 index 0000000000..25fc7dd644 --- /dev/null +++ b/modules/queue/workerpool.go @@ -0,0 +1,325 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "context" + "sync" + "time" + + "code.gitea.io/gitea/modules/log" +) + +// WorkerPool takes +type WorkerPool struct { + lock sync.Mutex + baseCtx context.Context + cancel context.CancelFunc + cond *sync.Cond + qid int64 + maxNumberOfWorkers int + numberOfWorkers int + batchLength int + handle HandlerFunc + dataChan chan Data + blockTimeout time.Duration + boostTimeout time.Duration + boostWorkers int +} + +// Push pushes the data to the internal channel +func (p *WorkerPool) Push(data Data) { + p.lock.Lock() + if p.blockTimeout > 0 && p.boostTimeout > 0 && (p.numberOfWorkers <= p.maxNumberOfWorkers || p.maxNumberOfWorkers < 0) { + p.lock.Unlock() + p.pushBoost(data) + } else { + p.lock.Unlock() + p.dataChan <- data + } +} + +func (p *WorkerPool) pushBoost(data Data) { + select { + case p.dataChan <- data: + default: + p.lock.Lock() + if p.blockTimeout <= 0 { + p.lock.Unlock() + p.dataChan <- data + return + } + ourTimeout := p.blockTimeout + timer := time.NewTimer(p.blockTimeout) + p.lock.Unlock() + select { + case p.dataChan <- data: + if timer.Stop() { + select { + case <-timer.C: + default: + } + } + case <-timer.C: + p.lock.Lock() + if p.blockTimeout > ourTimeout || (p.numberOfWorkers > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0) { + p.lock.Unlock() + p.dataChan <- data + return + } + p.blockTimeout *= 2 + ctx, cancel := context.WithCancel(p.baseCtx) + mq := GetManager().GetManagedQueue(p.qid) + boost := p.boostWorkers + if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 { + boost = p.maxNumberOfWorkers - p.numberOfWorkers + } + if mq != nil { + log.Warn("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, mq.Name, ourTimeout, boost, p.boostTimeout, p.blockTimeout) + + start := time.Now() + pid := mq.RegisterWorkers(boost, start, false, start, cancel) + go func() { + <-ctx.Done() + mq.RemoveWorkers(pid) + cancel() + }() + } else { + log.Warn("WorkerPool: %d Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, ourTimeout, p.boostWorkers, p.boostTimeout, p.blockTimeout) + } + go func() { + <-time.After(p.boostTimeout) + cancel() + p.lock.Lock() + p.blockTimeout /= 2 + p.lock.Unlock() + }() + p.addWorkers(ctx, boost) + p.lock.Unlock() + p.dataChan <- data + } + } +} + +// NumberOfWorkers returns the number of current workers in the pool +func (p *WorkerPool) NumberOfWorkers() int { + p.lock.Lock() + defer p.lock.Unlock() + return p.numberOfWorkers +} + +// MaxNumberOfWorkers returns the maximum number of workers automatically added to the pool +func (p *WorkerPool) MaxNumberOfWorkers() int { + p.lock.Lock() + defer p.lock.Unlock() + return p.maxNumberOfWorkers +} + +// BoostWorkers returns the number of workers for a boost +func (p *WorkerPool) BoostWorkers() int { + p.lock.Lock() + defer p.lock.Unlock() + return p.boostWorkers +} + +// BoostTimeout returns the timeout of the next boost +func (p *WorkerPool) BoostTimeout() time.Duration { + p.lock.Lock() + defer p.lock.Unlock() + return p.boostTimeout +} + +// BlockTimeout returns the timeout til the next boost +func (p *WorkerPool) BlockTimeout() time.Duration { + p.lock.Lock() + defer p.lock.Unlock() + return p.blockTimeout +} + +// SetSettings sets the setable boost values +func (p *WorkerPool) SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) { + p.lock.Lock() + defer p.lock.Unlock() + p.maxNumberOfWorkers = maxNumberOfWorkers + p.boostWorkers = boostWorkers + p.boostTimeout = timeout +} + +// SetMaxNumberOfWorkers sets the maximum number of workers automatically added to the pool +// Changing this number will not change the number of current workers but will change the limit +// for future additions +func (p *WorkerPool) SetMaxNumberOfWorkers(newMax int) { + p.lock.Lock() + defer p.lock.Unlock() + p.maxNumberOfWorkers = newMax +} + +// AddWorkers adds workers to the pool - this allows the number of workers to go above the limit +func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc { + var ctx context.Context + var cancel context.CancelFunc + start := time.Now() + end := start + hasTimeout := false + if timeout > 0 { + ctx, cancel = context.WithTimeout(p.baseCtx, timeout) + end = start.Add(timeout) + hasTimeout = true + } else { + ctx, cancel = context.WithCancel(p.baseCtx) + } + + mq := GetManager().GetManagedQueue(p.qid) + if mq != nil { + pid := mq.RegisterWorkers(number, start, hasTimeout, end, cancel) + go func() { + <-ctx.Done() + mq.RemoveWorkers(pid) + cancel() + }() + log.Trace("WorkerPool: %d (for %s) adding %d workers with group id: %d", p.qid, mq.Name, number, pid) + } else { + log.Trace("WorkerPool: %d adding %d workers (no group id)", p.qid, number) + + } + p.addWorkers(ctx, number) + return cancel +} + +// addWorkers adds workers to the pool +func (p *WorkerPool) addWorkers(ctx context.Context, number int) { + for i := 0; i < number; i++ { + p.lock.Lock() + if p.cond == nil { + p.cond = sync.NewCond(&p.lock) + } + p.numberOfWorkers++ + p.lock.Unlock() + go func() { + p.doWork(ctx) + + p.lock.Lock() + p.numberOfWorkers-- + if p.numberOfWorkers == 0 { + p.cond.Broadcast() + } else if p.numberOfWorkers < 0 { + // numberOfWorkers can't go negative but... + log.Warn("Number of Workers < 0 for QID %d - this shouldn't happen", p.qid) + p.numberOfWorkers = 0 + p.cond.Broadcast() + } + p.lock.Unlock() + }() + } +} + +// Wait for WorkerPool to finish +func (p *WorkerPool) Wait() { + p.lock.Lock() + defer p.lock.Unlock() + if p.cond == nil { + p.cond = sync.NewCond(&p.lock) + } + if p.numberOfWorkers <= 0 { + return + } + p.cond.Wait() +} + +// CleanUp will drain the remaining contents of the channel +// This should be called after AddWorkers context is closed +func (p *WorkerPool) CleanUp(ctx context.Context) { + log.Trace("WorkerPool: %d CleanUp", p.qid) + close(p.dataChan) + for data := range p.dataChan { + p.handle(data) + select { + case <-ctx.Done(): + log.Warn("WorkerPool: %d Cleanup context closed before finishing clean-up", p.qid) + return + default: + } + } + log.Trace("WorkerPool: %d CleanUp Done", p.qid) +} + +func (p *WorkerPool) doWork(ctx context.Context) { + delay := time.Millisecond * 300 + var data = make([]Data, 0, p.batchLength) + for { + select { + case <-ctx.Done(): + if len(data) > 0 { + log.Trace("Handling: %d data, %v", len(data), data) + p.handle(data...) + } + log.Trace("Worker shutting down") + return + case datum, ok := <-p.dataChan: + if !ok { + // the dataChan has been closed - we should finish up: + if len(data) > 0 { + log.Trace("Handling: %d data, %v", len(data), data) + p.handle(data...) + } + log.Trace("Worker shutting down") + return + } + data = append(data, datum) + if len(data) >= p.batchLength { + log.Trace("Handling: %d data, %v", len(data), data) + p.handle(data...) + data = make([]Data, 0, p.batchLength) + } + default: + timer := time.NewTimer(delay) + select { + case <-ctx.Done(): + if timer.Stop() { + select { + case <-timer.C: + default: + } + } + if len(data) > 0 { + log.Trace("Handling: %d data, %v", len(data), data) + p.handle(data...) + } + log.Trace("Worker shutting down") + return + case datum, ok := <-p.dataChan: + if timer.Stop() { + select { + case <-timer.C: + default: + } + } + if !ok { + // the dataChan has been closed - we should finish up: + if len(data) > 0 { + log.Trace("Handling: %d data, %v", len(data), data) + p.handle(data...) + } + log.Trace("Worker shutting down") + return + } + data = append(data, datum) + if len(data) >= p.batchLength { + log.Trace("Handling: %d data, %v", len(data), data) + p.handle(data...) + data = make([]Data, 0, p.batchLength) + } + case <-timer.C: + delay = time.Millisecond * 100 + if len(data) > 0 { + log.Trace("Handling: %d data, %v", len(data), data) + p.handle(data...) + data = make([]Data, 0, p.batchLength) + } + + } + } + } +} diff --git a/modules/setting/queue.go b/modules/setting/queue.go new file mode 100644 index 0000000000..546802715f --- /dev/null +++ b/modules/setting/queue.go @@ -0,0 +1,159 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package setting + +import ( + "fmt" + "path" + "strconv" + "strings" + "time" + + "code.gitea.io/gitea/modules/log" +) + +// QueueSettings represent the settings for a queue from the ini +type QueueSettings struct { + DataDir string + Length int + BatchLength int + ConnectionString string + Type string + Network string + Addresses string + Password string + QueueName string + DBIndex int + WrapIfNecessary bool + MaxAttempts int + Timeout time.Duration + Workers int + MaxWorkers int + BlockTimeout time.Duration + BoostTimeout time.Duration + BoostWorkers int +} + +// Queue settings +var Queue = QueueSettings{} + +// GetQueueSettings returns the queue settings for the appropriately named queue +func GetQueueSettings(name string) QueueSettings { + q := QueueSettings{} + sec := Cfg.Section("queue." + name) + // DataDir is not directly inheritable + q.DataDir = path.Join(Queue.DataDir, name) + // QueueName is not directly inheritable either + q.QueueName = name + Queue.QueueName + for _, key := range sec.Keys() { + switch key.Name() { + case "DATADIR": + q.DataDir = key.MustString(q.DataDir) + case "QUEUE_NAME": + q.QueueName = key.MustString(q.QueueName) + } + } + if !path.IsAbs(q.DataDir) { + q.DataDir = path.Join(AppDataPath, q.DataDir) + } + sec.Key("DATADIR").SetValue(q.DataDir) + // The rest are... + q.Length = sec.Key("LENGTH").MustInt(Queue.Length) + q.BatchLength = sec.Key("BATCH_LENGTH").MustInt(Queue.BatchLength) + q.ConnectionString = sec.Key("CONN_STR").MustString(Queue.ConnectionString) + q.Type = sec.Key("TYPE").MustString(Queue.Type) + q.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(Queue.WrapIfNecessary) + q.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(Queue.MaxAttempts) + q.Timeout = sec.Key("TIMEOUT").MustDuration(Queue.Timeout) + q.Workers = sec.Key("WORKERS").MustInt(Queue.Workers) + q.MaxWorkers = sec.Key("MAX_WORKERS").MustInt(Queue.MaxWorkers) + q.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(Queue.BlockTimeout) + q.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(Queue.BoostTimeout) + q.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(Queue.BoostWorkers) + + q.Network, q.Addresses, q.Password, q.DBIndex, _ = ParseQueueConnStr(q.ConnectionString) + return q +} + +// NewQueueService sets up the default settings for Queues +// This is exported for tests to be able to use the queue +func NewQueueService() { + sec := Cfg.Section("queue") + Queue.DataDir = sec.Key("DATADIR").MustString("queues/") + if !path.IsAbs(Queue.DataDir) { + Queue.DataDir = path.Join(AppDataPath, Queue.DataDir) + } + Queue.Length = sec.Key("LENGTH").MustInt(20) + Queue.BatchLength = sec.Key("BATCH_LENGTH").MustInt(20) + Queue.ConnectionString = sec.Key("CONN_STR").MustString(path.Join(AppDataPath, "")) + Queue.Type = sec.Key("TYPE").MustString("") + Queue.Network, Queue.Addresses, Queue.Password, Queue.DBIndex, _ = ParseQueueConnStr(Queue.ConnectionString) + Queue.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(true) + Queue.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(10) + Queue.Timeout = sec.Key("TIMEOUT").MustDuration(GracefulHammerTime + 30*time.Second) + Queue.Workers = sec.Key("WORKERS").MustInt(1) + Queue.MaxWorkers = sec.Key("MAX_WORKERS").MustInt(10) + Queue.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(1 * time.Second) + Queue.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(5 * time.Minute) + Queue.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(5) + Queue.QueueName = sec.Key("QUEUE_NAME").MustString("_queue") + + // Now handle the old issue_indexer configuration + section := Cfg.Section("queue.issue_indexer") + issueIndexerSectionMap := map[string]string{} + for _, key := range section.Keys() { + issueIndexerSectionMap[key.Name()] = key.Value() + } + if _, ok := issueIndexerSectionMap["TYPE"]; !ok { + switch Indexer.IssueQueueType { + case LevelQueueType: + section.Key("TYPE").SetValue("level") + case ChannelQueueType: + section.Key("TYPE").SetValue("persistable-channel") + case RedisQueueType: + section.Key("TYPE").SetValue("redis") + default: + log.Fatal("Unsupported indexer queue type: %v", + Indexer.IssueQueueType) + } + } + if _, ok := issueIndexerSectionMap["LENGTH"]; !ok { + section.Key("LENGTH").SetValue(fmt.Sprintf("%d", Indexer.UpdateQueueLength)) + } + if _, ok := issueIndexerSectionMap["BATCH_LENGTH"]; !ok { + section.Key("BATCH_LENGTH").SetValue(fmt.Sprintf("%d", Indexer.IssueQueueBatchNumber)) + } + if _, ok := issueIndexerSectionMap["DATADIR"]; !ok { + section.Key("DATADIR").SetValue(Indexer.IssueQueueDir) + } + if _, ok := issueIndexerSectionMap["CONN_STR"]; !ok { + section.Key("CONN_STR").SetValue(Indexer.IssueQueueConnStr) + } +} + +// ParseQueueConnStr parses a queue connection string +func ParseQueueConnStr(connStr string) (network, addrs, password string, dbIdx int, err error) { + fields := strings.Fields(connStr) + for _, f := range fields { + items := strings.SplitN(f, "=", 2) + if len(items) < 2 { + continue + } + switch strings.ToLower(items[0]) { + case "network": + network = items[1] + case "addrs": + addrs = items[1] + case "password": + password = items[1] + case "db": + dbIdx, err = strconv.Atoi(items[1]) + if err != nil { + return + } + } + } + return +} diff --git a/modules/setting/setting.go b/modules/setting/setting.go index 2a5e37b41b..17c84d3d31 100644 --- a/modules/setting/setting.go +++ b/modules/setting/setting.go @@ -1093,4 +1093,5 @@ func NewServices() { newMigrationsService() newIndexerService() newTaskService() + NewQueueService() } diff --git a/modules/setting/task.go b/modules/setting/task.go index 97704d4a4d..81ed39a9fb 100644 --- a/modules/setting/task.go +++ b/modules/setting/task.go @@ -4,22 +4,15 @@ package setting -var ( - // Task settings - Task = struct { - QueueType string - QueueLength int - QueueConnStr string - }{ - QueueType: ChannelQueueType, - QueueLength: 1000, - QueueConnStr: "addrs=127.0.0.1:6379 db=0", - } -) - func newTaskService() { - sec := Cfg.Section("task") - Task.QueueType = sec.Key("QUEUE_TYPE").MustString(ChannelQueueType) - Task.QueueLength = sec.Key("QUEUE_LENGTH").MustInt(1000) - Task.QueueConnStr = sec.Key("QUEUE_CONN_STR").MustString("addrs=127.0.0.1:6379 db=0") + taskSec := Cfg.Section("task") + queueTaskSec := Cfg.Section("queue.task") + switch taskSec.Key("QUEUE_TYPE").MustString(ChannelQueueType) { + case ChannelQueueType: + queueTaskSec.Key("TYPE").MustString("persistable-channel") + case RedisQueueType: + queueTaskSec.Key("TYPE").MustString("redis") + } + queueTaskSec.Key("LENGTH").MustInt(taskSec.Key("QUEUE_LENGTH").MustInt(1000)) + queueTaskSec.Key("CONN_STR").MustString(taskSec.Key("QUEUE_CONN_STR").MustString("addrs=127.0.0.1:6379 db=0")) } diff --git a/modules/task/queue.go b/modules/task/queue.go deleted file mode 100644 index ddee0b3d46..0000000000 --- a/modules/task/queue.go +++ /dev/null @@ -1,14 +0,0 @@ -// Copyright 2019 Gitea. All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package task - -import "code.gitea.io/gitea/models" - -// Queue defines an interface to run task queue -type Queue interface { - Run() error - Push(*models.Task) error - Stop() -} diff --git a/modules/task/queue_channel.go b/modules/task/queue_channel.go deleted file mode 100644 index da541f4755..0000000000 --- a/modules/task/queue_channel.go +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright 2019 The Gitea Authors. All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package task - -import ( - "code.gitea.io/gitea/models" - "code.gitea.io/gitea/modules/log" -) - -var ( - _ Queue = &ChannelQueue{} -) - -// ChannelQueue implements -type ChannelQueue struct { - queue chan *models.Task -} - -// NewChannelQueue create a memory channel queue -func NewChannelQueue(queueLen int) *ChannelQueue { - return &ChannelQueue{ - queue: make(chan *models.Task, queueLen), - } -} - -// Run starts to run the queue -func (c *ChannelQueue) Run() error { - for task := range c.queue { - err := Run(task) - if err != nil { - log.Error("Run task failed: %s", err.Error()) - } - } - return nil -} - -// Push will push the task ID to queue -func (c *ChannelQueue) Push(task *models.Task) error { - c.queue <- task - return nil -} - -// Stop stop the queue -func (c *ChannelQueue) Stop() { - close(c.queue) -} diff --git a/modules/task/queue_redis.go b/modules/task/queue_redis.go deleted file mode 100644 index 127de0cdbf..0000000000 --- a/modules/task/queue_redis.go +++ /dev/null @@ -1,130 +0,0 @@ -// Copyright 2019 The Gitea Authors. All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package task - -import ( - "encoding/json" - "errors" - "strconv" - "strings" - "time" - - "code.gitea.io/gitea/models" - "code.gitea.io/gitea/modules/log" - - "github.com/go-redis/redis" -) - -var ( - _ Queue = &RedisQueue{} -) - -type redisClient interface { - RPush(key string, args ...interface{}) *redis.IntCmd - LPop(key string) *redis.StringCmd - Ping() *redis.StatusCmd -} - -// RedisQueue redis queue -type RedisQueue struct { - client redisClient - queueName string - closeChan chan bool -} - -func parseConnStr(connStr string) (addrs, password string, dbIdx int, err error) { - fields := strings.Fields(connStr) - for _, f := range fields { - items := strings.SplitN(f, "=", 2) - if len(items) < 2 { - continue - } - switch strings.ToLower(items[0]) { - case "addrs": - addrs = items[1] - case "password": - password = items[1] - case "db": - dbIdx, err = strconv.Atoi(items[1]) - if err != nil { - return - } - } - } - return -} - -// NewRedisQueue creates single redis or cluster redis queue -func NewRedisQueue(addrs string, password string, dbIdx int) (*RedisQueue, error) { - dbs := strings.Split(addrs, ",") - var queue = RedisQueue{ - queueName: "task_queue", - closeChan: make(chan bool), - } - if len(dbs) == 0 { - return nil, errors.New("no redis host found") - } else if len(dbs) == 1 { - queue.client = redis.NewClient(&redis.Options{ - Addr: strings.TrimSpace(dbs[0]), // use default Addr - Password: password, // no password set - DB: dbIdx, // use default DB - }) - } else { - // cluster will ignore db - queue.client = redis.NewClusterClient(&redis.ClusterOptions{ - Addrs: dbs, - Password: password, - }) - } - if err := queue.client.Ping().Err(); err != nil { - return nil, err - } - return &queue, nil -} - -// Run starts to run the queue -func (r *RedisQueue) Run() error { - for { - select { - case <-r.closeChan: - return nil - case <-time.After(time.Millisecond * 100): - } - - bs, err := r.client.LPop(r.queueName).Bytes() - if err != nil { - if err != redis.Nil { - log.Error("LPop failed: %v", err) - } - time.Sleep(time.Millisecond * 100) - continue - } - - var task models.Task - err = json.Unmarshal(bs, &task) - if err != nil { - log.Error("Unmarshal task failed: %s", err.Error()) - } else { - err = Run(&task) - if err != nil { - log.Error("Run task failed: %s", err.Error()) - } - } - } -} - -// Push implements Queue -func (r *RedisQueue) Push(task *models.Task) error { - bs, err := json.Marshal(task) - if err != nil { - return err - } - return r.client.RPush(r.queueName, bs).Err() -} - -// Stop stop the queue -func (r *RedisQueue) Stop() { - r.closeChan <- true -} diff --git a/modules/task/task.go b/modules/task/task.go index 64744afe7a..416f0c696a 100644 --- a/modules/task/task.go +++ b/modules/task/task.go @@ -8,14 +8,15 @@ import ( "fmt" "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/migrations/base" - "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/structs" ) // taskQueue is a global queue of tasks -var taskQueue Queue +var taskQueue queue.Queue // Run a task func Run(t *models.Task) error { @@ -23,38 +24,32 @@ func Run(t *models.Task) error { case structs.TaskTypeMigrateRepo: return runMigrateTask(t) default: - return fmt.Errorf("Unknow task type: %d", t.Type) + return fmt.Errorf("Unknown task type: %d", t.Type) } } // Init will start the service to get all unfinished tasks and run them func Init() error { - switch setting.Task.QueueType { - case setting.ChannelQueueType: - taskQueue = NewChannelQueue(setting.Task.QueueLength) - case setting.RedisQueueType: - var err error - addrs, pass, idx, err := parseConnStr(setting.Task.QueueConnStr) - if err != nil { - return err - } - taskQueue, err = NewRedisQueue(addrs, pass, idx) - if err != nil { - return err - } - default: - return fmt.Errorf("Unsupported task queue type: %v", setting.Task.QueueType) + taskQueue = queue.CreateQueue("task", handle, &models.Task{}) + + if taskQueue == nil { + return fmt.Errorf("Unable to create Task Queue") } - go func() { - if err := taskQueue.Run(); err != nil { - log.Error("taskQueue.Run end failed: %v", err) - } - }() + go graceful.GetManager().RunWithShutdownFns(taskQueue.Run) return nil } +func handle(data ...queue.Data) { + for _, datum := range data { + task := datum.(*models.Task) + if err := Run(task); err != nil { + log.Error("Run task failed: %v", err) + } + } +} + // MigrateRepository add migration repository to task func MigrateRepository(doer, u *models.User, opts base.MigrateOptions) error { task, err := models.CreateMigrateTask(doer, u, opts) |