diff options
author | zeripath <art27@cantab.net> | 2020-01-07 11:23:09 +0000 |
---|---|---|
committer | Antoine GIRARD <sapk@users.noreply.github.com> | 2020-01-07 12:23:09 +0100 |
commit | 62eb1b0f2530a5ae1ce9b729378c0c8066174215 (patch) | |
tree | e567b2a9d91e69c0f2bccfeaf1a7341b4dda2706 /modules/setting | |
parent | f71e1c8e796b099f4634bcd358e48189a97dcbad (diff) | |
download | gitea-62eb1b0f2530a5ae1ce9b729378c0c8066174215.tar.gz gitea-62eb1b0f2530a5ae1ce9b729378c0c8066174215.zip |
Graceful Queues: Issue Indexing and Tasks (#9363)
* Queue: Add generic graceful queues with settings
* Queue & Setting: Add worker pool implementation
* Queue: Add worker settings
* Queue: Make resizing worker pools
* Queue: Add name variable to queues
* Queue: Add monitoring
* Queue: Improve logging
* Issues: Gracefulise the issues indexer
Remove the old now unused specific queues
* Task: Move to generic queue and gracefulise
* Issues: Standardise the issues indexer queue settings
* Fix test
* Queue: Allow Redis to connect to unix
* Prevent deadlock during early shutdown of issue indexer
* Add MaxWorker settings to queues
* Merge branch 'master' into graceful-queues
* Update modules/indexer/issues/indexer.go
Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com>
* Update modules/indexer/issues/indexer.go
Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com>
* Update modules/queue/queue_channel.go
Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com>
* Update modules/queue/queue_disk.go
* Update modules/queue/queue_disk_channel.go
Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com>
* Rename queue.Description to queue.ManagedQueue as per @guillep2k
* Cancel pool workers when removed
* Remove dependency on queue from setting
* Update modules/queue/queue_redis.go
Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com>
* As per @guillep2k add mutex locks on shutdown/terminate
* move unlocking out of setInternal
* Add warning if number of workers < 0
* Small changes as per @guillep2k
* No redis host specified not found
* Clean up documentation for queues
* Update docs/content/doc/advanced/config-cheat-sheet.en-us.md
* Update modules/indexer/issues/indexer_test.go
* Ensure that persistable channel queue is added to manager
* Rename QUEUE_NAME REDIS_QUEUE_NAME
* Revert "Rename QUEUE_NAME REDIS_QUEUE_NAME"
This reverts commit 1f83b4fc9b9dabda186257b38c265fe7012f90df.
Co-authored-by: guillep2k <18600385+guillep2k@users.noreply.github.com>
Co-authored-by: Lauris BH <lauris@nix.lv>
Co-authored-by: techknowlogick <matti@mdranta.net>
Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
Diffstat (limited to 'modules/setting')
-rw-r--r-- | modules/setting/queue.go | 159 | ||||
-rw-r--r-- | modules/setting/setting.go | 1 | ||||
-rw-r--r-- | modules/setting/task.go | 27 |
3 files changed, 170 insertions, 17 deletions
diff --git a/modules/setting/queue.go b/modules/setting/queue.go new file mode 100644 index 0000000000..546802715f --- /dev/null +++ b/modules/setting/queue.go @@ -0,0 +1,159 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package setting + +import ( + "fmt" + "path" + "strconv" + "strings" + "time" + + "code.gitea.io/gitea/modules/log" +) + +// QueueSettings represent the settings for a queue from the ini +type QueueSettings struct { + DataDir string + Length int + BatchLength int + ConnectionString string + Type string + Network string + Addresses string + Password string + QueueName string + DBIndex int + WrapIfNecessary bool + MaxAttempts int + Timeout time.Duration + Workers int + MaxWorkers int + BlockTimeout time.Duration + BoostTimeout time.Duration + BoostWorkers int +} + +// Queue settings +var Queue = QueueSettings{} + +// GetQueueSettings returns the queue settings for the appropriately named queue +func GetQueueSettings(name string) QueueSettings { + q := QueueSettings{} + sec := Cfg.Section("queue." + name) + // DataDir is not directly inheritable + q.DataDir = path.Join(Queue.DataDir, name) + // QueueName is not directly inheritable either + q.QueueName = name + Queue.QueueName + for _, key := range sec.Keys() { + switch key.Name() { + case "DATADIR": + q.DataDir = key.MustString(q.DataDir) + case "QUEUE_NAME": + q.QueueName = key.MustString(q.QueueName) + } + } + if !path.IsAbs(q.DataDir) { + q.DataDir = path.Join(AppDataPath, q.DataDir) + } + sec.Key("DATADIR").SetValue(q.DataDir) + // The rest are... + q.Length = sec.Key("LENGTH").MustInt(Queue.Length) + q.BatchLength = sec.Key("BATCH_LENGTH").MustInt(Queue.BatchLength) + q.ConnectionString = sec.Key("CONN_STR").MustString(Queue.ConnectionString) + q.Type = sec.Key("TYPE").MustString(Queue.Type) + q.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(Queue.WrapIfNecessary) + q.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(Queue.MaxAttempts) + q.Timeout = sec.Key("TIMEOUT").MustDuration(Queue.Timeout) + q.Workers = sec.Key("WORKERS").MustInt(Queue.Workers) + q.MaxWorkers = sec.Key("MAX_WORKERS").MustInt(Queue.MaxWorkers) + q.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(Queue.BlockTimeout) + q.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(Queue.BoostTimeout) + q.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(Queue.BoostWorkers) + + q.Network, q.Addresses, q.Password, q.DBIndex, _ = ParseQueueConnStr(q.ConnectionString) + return q +} + +// NewQueueService sets up the default settings for Queues +// This is exported for tests to be able to use the queue +func NewQueueService() { + sec := Cfg.Section("queue") + Queue.DataDir = sec.Key("DATADIR").MustString("queues/") + if !path.IsAbs(Queue.DataDir) { + Queue.DataDir = path.Join(AppDataPath, Queue.DataDir) + } + Queue.Length = sec.Key("LENGTH").MustInt(20) + Queue.BatchLength = sec.Key("BATCH_LENGTH").MustInt(20) + Queue.ConnectionString = sec.Key("CONN_STR").MustString(path.Join(AppDataPath, "")) + Queue.Type = sec.Key("TYPE").MustString("") + Queue.Network, Queue.Addresses, Queue.Password, Queue.DBIndex, _ = ParseQueueConnStr(Queue.ConnectionString) + Queue.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(true) + Queue.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(10) + Queue.Timeout = sec.Key("TIMEOUT").MustDuration(GracefulHammerTime + 30*time.Second) + Queue.Workers = sec.Key("WORKERS").MustInt(1) + Queue.MaxWorkers = sec.Key("MAX_WORKERS").MustInt(10) + Queue.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(1 * time.Second) + Queue.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(5 * time.Minute) + Queue.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(5) + Queue.QueueName = sec.Key("QUEUE_NAME").MustString("_queue") + + // Now handle the old issue_indexer configuration + section := Cfg.Section("queue.issue_indexer") + issueIndexerSectionMap := map[string]string{} + for _, key := range section.Keys() { + issueIndexerSectionMap[key.Name()] = key.Value() + } + if _, ok := issueIndexerSectionMap["TYPE"]; !ok { + switch Indexer.IssueQueueType { + case LevelQueueType: + section.Key("TYPE").SetValue("level") + case ChannelQueueType: + section.Key("TYPE").SetValue("persistable-channel") + case RedisQueueType: + section.Key("TYPE").SetValue("redis") + default: + log.Fatal("Unsupported indexer queue type: %v", + Indexer.IssueQueueType) + } + } + if _, ok := issueIndexerSectionMap["LENGTH"]; !ok { + section.Key("LENGTH").SetValue(fmt.Sprintf("%d", Indexer.UpdateQueueLength)) + } + if _, ok := issueIndexerSectionMap["BATCH_LENGTH"]; !ok { + section.Key("BATCH_LENGTH").SetValue(fmt.Sprintf("%d", Indexer.IssueQueueBatchNumber)) + } + if _, ok := issueIndexerSectionMap["DATADIR"]; !ok { + section.Key("DATADIR").SetValue(Indexer.IssueQueueDir) + } + if _, ok := issueIndexerSectionMap["CONN_STR"]; !ok { + section.Key("CONN_STR").SetValue(Indexer.IssueQueueConnStr) + } +} + +// ParseQueueConnStr parses a queue connection string +func ParseQueueConnStr(connStr string) (network, addrs, password string, dbIdx int, err error) { + fields := strings.Fields(connStr) + for _, f := range fields { + items := strings.SplitN(f, "=", 2) + if len(items) < 2 { + continue + } + switch strings.ToLower(items[0]) { + case "network": + network = items[1] + case "addrs": + addrs = items[1] + case "password": + password = items[1] + case "db": + dbIdx, err = strconv.Atoi(items[1]) + if err != nil { + return + } + } + } + return +} diff --git a/modules/setting/setting.go b/modules/setting/setting.go index 2a5e37b41b..17c84d3d31 100644 --- a/modules/setting/setting.go +++ b/modules/setting/setting.go @@ -1093,4 +1093,5 @@ func NewServices() { newMigrationsService() newIndexerService() newTaskService() + NewQueueService() } diff --git a/modules/setting/task.go b/modules/setting/task.go index 97704d4a4d..81ed39a9fb 100644 --- a/modules/setting/task.go +++ b/modules/setting/task.go @@ -4,22 +4,15 @@ package setting -var ( - // Task settings - Task = struct { - QueueType string - QueueLength int - QueueConnStr string - }{ - QueueType: ChannelQueueType, - QueueLength: 1000, - QueueConnStr: "addrs=127.0.0.1:6379 db=0", - } -) - func newTaskService() { - sec := Cfg.Section("task") - Task.QueueType = sec.Key("QUEUE_TYPE").MustString(ChannelQueueType) - Task.QueueLength = sec.Key("QUEUE_LENGTH").MustInt(1000) - Task.QueueConnStr = sec.Key("QUEUE_CONN_STR").MustString("addrs=127.0.0.1:6379 db=0") + taskSec := Cfg.Section("task") + queueTaskSec := Cfg.Section("queue.task") + switch taskSec.Key("QUEUE_TYPE").MustString(ChannelQueueType) { + case ChannelQueueType: + queueTaskSec.Key("TYPE").MustString("persistable-channel") + case RedisQueueType: + queueTaskSec.Key("TYPE").MustString("redis") + } + queueTaskSec.Key("LENGTH").MustInt(taskSec.Key("QUEUE_LENGTH").MustInt(1000)) + queueTaskSec.Key("CONN_STR").MustString(taskSec.Key("QUEUE_CONN_STR").MustString("addrs=127.0.0.1:6379 db=0")) } |