aboutsummaryrefslogtreecommitdiffstats
path: root/modules/setting
diff options
context:
space:
mode:
authorzeripath <art27@cantab.net>2020-01-07 11:23:09 +0000
committerAntoine GIRARD <sapk@users.noreply.github.com>2020-01-07 12:23:09 +0100
commit62eb1b0f2530a5ae1ce9b729378c0c8066174215 (patch)
treee567b2a9d91e69c0f2bccfeaf1a7341b4dda2706 /modules/setting
parentf71e1c8e796b099f4634bcd358e48189a97dcbad (diff)
downloadgitea-62eb1b0f2530a5ae1ce9b729378c0c8066174215.tar.gz
gitea-62eb1b0f2530a5ae1ce9b729378c0c8066174215.zip
Graceful Queues: Issue Indexing and Tasks (#9363)
* Queue: Add generic graceful queues with settings * Queue & Setting: Add worker pool implementation * Queue: Add worker settings * Queue: Make resizing worker pools * Queue: Add name variable to queues * Queue: Add monitoring * Queue: Improve logging * Issues: Gracefulise the issues indexer Remove the old now unused specific queues * Task: Move to generic queue and gracefulise * Issues: Standardise the issues indexer queue settings * Fix test * Queue: Allow Redis to connect to unix * Prevent deadlock during early shutdown of issue indexer * Add MaxWorker settings to queues * Merge branch 'master' into graceful-queues * Update modules/indexer/issues/indexer.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/indexer/issues/indexer.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/queue/queue_channel.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/queue/queue_disk.go * Update modules/queue/queue_disk_channel.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Rename queue.Description to queue.ManagedQueue as per @guillep2k * Cancel pool workers when removed * Remove dependency on queue from setting * Update modules/queue/queue_redis.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * As per @guillep2k add mutex locks on shutdown/terminate * move unlocking out of setInternal * Add warning if number of workers < 0 * Small changes as per @guillep2k * No redis host specified not found * Clean up documentation for queues * Update docs/content/doc/advanced/config-cheat-sheet.en-us.md * Update modules/indexer/issues/indexer_test.go * Ensure that persistable channel queue is added to manager * Rename QUEUE_NAME REDIS_QUEUE_NAME * Revert "Rename QUEUE_NAME REDIS_QUEUE_NAME" This reverts commit 1f83b4fc9b9dabda186257b38c265fe7012f90df. Co-authored-by: guillep2k <18600385+guillep2k@users.noreply.github.com> Co-authored-by: Lauris BH <lauris@nix.lv> Co-authored-by: techknowlogick <matti@mdranta.net> Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
Diffstat (limited to 'modules/setting')
-rw-r--r--modules/setting/queue.go159
-rw-r--r--modules/setting/setting.go1
-rw-r--r--modules/setting/task.go27
3 files changed, 170 insertions, 17 deletions
diff --git a/modules/setting/queue.go b/modules/setting/queue.go
new file mode 100644
index 0000000000..546802715f
--- /dev/null
+++ b/modules/setting/queue.go
@@ -0,0 +1,159 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package setting
+
+import (
+ "fmt"
+ "path"
+ "strconv"
+ "strings"
+ "time"
+
+ "code.gitea.io/gitea/modules/log"
+)
+
+// QueueSettings represent the settings for a queue from the ini
+type QueueSettings struct {
+ DataDir string
+ Length int
+ BatchLength int
+ ConnectionString string
+ Type string
+ Network string
+ Addresses string
+ Password string
+ QueueName string
+ DBIndex int
+ WrapIfNecessary bool
+ MaxAttempts int
+ Timeout time.Duration
+ Workers int
+ MaxWorkers int
+ BlockTimeout time.Duration
+ BoostTimeout time.Duration
+ BoostWorkers int
+}
+
+// Queue settings
+var Queue = QueueSettings{}
+
+// GetQueueSettings returns the queue settings for the appropriately named queue
+func GetQueueSettings(name string) QueueSettings {
+ q := QueueSettings{}
+ sec := Cfg.Section("queue." + name)
+ // DataDir is not directly inheritable
+ q.DataDir = path.Join(Queue.DataDir, name)
+ // QueueName is not directly inheritable either
+ q.QueueName = name + Queue.QueueName
+ for _, key := range sec.Keys() {
+ switch key.Name() {
+ case "DATADIR":
+ q.DataDir = key.MustString(q.DataDir)
+ case "QUEUE_NAME":
+ q.QueueName = key.MustString(q.QueueName)
+ }
+ }
+ if !path.IsAbs(q.DataDir) {
+ q.DataDir = path.Join(AppDataPath, q.DataDir)
+ }
+ sec.Key("DATADIR").SetValue(q.DataDir)
+ // The rest are...
+ q.Length = sec.Key("LENGTH").MustInt(Queue.Length)
+ q.BatchLength = sec.Key("BATCH_LENGTH").MustInt(Queue.BatchLength)
+ q.ConnectionString = sec.Key("CONN_STR").MustString(Queue.ConnectionString)
+ q.Type = sec.Key("TYPE").MustString(Queue.Type)
+ q.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(Queue.WrapIfNecessary)
+ q.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(Queue.MaxAttempts)
+ q.Timeout = sec.Key("TIMEOUT").MustDuration(Queue.Timeout)
+ q.Workers = sec.Key("WORKERS").MustInt(Queue.Workers)
+ q.MaxWorkers = sec.Key("MAX_WORKERS").MustInt(Queue.MaxWorkers)
+ q.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(Queue.BlockTimeout)
+ q.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(Queue.BoostTimeout)
+ q.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(Queue.BoostWorkers)
+
+ q.Network, q.Addresses, q.Password, q.DBIndex, _ = ParseQueueConnStr(q.ConnectionString)
+ return q
+}
+
+// NewQueueService sets up the default settings for Queues
+// This is exported for tests to be able to use the queue
+func NewQueueService() {
+ sec := Cfg.Section("queue")
+ Queue.DataDir = sec.Key("DATADIR").MustString("queues/")
+ if !path.IsAbs(Queue.DataDir) {
+ Queue.DataDir = path.Join(AppDataPath, Queue.DataDir)
+ }
+ Queue.Length = sec.Key("LENGTH").MustInt(20)
+ Queue.BatchLength = sec.Key("BATCH_LENGTH").MustInt(20)
+ Queue.ConnectionString = sec.Key("CONN_STR").MustString(path.Join(AppDataPath, ""))
+ Queue.Type = sec.Key("TYPE").MustString("")
+ Queue.Network, Queue.Addresses, Queue.Password, Queue.DBIndex, _ = ParseQueueConnStr(Queue.ConnectionString)
+ Queue.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(true)
+ Queue.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(10)
+ Queue.Timeout = sec.Key("TIMEOUT").MustDuration(GracefulHammerTime + 30*time.Second)
+ Queue.Workers = sec.Key("WORKERS").MustInt(1)
+ Queue.MaxWorkers = sec.Key("MAX_WORKERS").MustInt(10)
+ Queue.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(1 * time.Second)
+ Queue.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(5 * time.Minute)
+ Queue.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(5)
+ Queue.QueueName = sec.Key("QUEUE_NAME").MustString("_queue")
+
+ // Now handle the old issue_indexer configuration
+ section := Cfg.Section("queue.issue_indexer")
+ issueIndexerSectionMap := map[string]string{}
+ for _, key := range section.Keys() {
+ issueIndexerSectionMap[key.Name()] = key.Value()
+ }
+ if _, ok := issueIndexerSectionMap["TYPE"]; !ok {
+ switch Indexer.IssueQueueType {
+ case LevelQueueType:
+ section.Key("TYPE").SetValue("level")
+ case ChannelQueueType:
+ section.Key("TYPE").SetValue("persistable-channel")
+ case RedisQueueType:
+ section.Key("TYPE").SetValue("redis")
+ default:
+ log.Fatal("Unsupported indexer queue type: %v",
+ Indexer.IssueQueueType)
+ }
+ }
+ if _, ok := issueIndexerSectionMap["LENGTH"]; !ok {
+ section.Key("LENGTH").SetValue(fmt.Sprintf("%d", Indexer.UpdateQueueLength))
+ }
+ if _, ok := issueIndexerSectionMap["BATCH_LENGTH"]; !ok {
+ section.Key("BATCH_LENGTH").SetValue(fmt.Sprintf("%d", Indexer.IssueQueueBatchNumber))
+ }
+ if _, ok := issueIndexerSectionMap["DATADIR"]; !ok {
+ section.Key("DATADIR").SetValue(Indexer.IssueQueueDir)
+ }
+ if _, ok := issueIndexerSectionMap["CONN_STR"]; !ok {
+ section.Key("CONN_STR").SetValue(Indexer.IssueQueueConnStr)
+ }
+}
+
+// ParseQueueConnStr parses a queue connection string
+func ParseQueueConnStr(connStr string) (network, addrs, password string, dbIdx int, err error) {
+ fields := strings.Fields(connStr)
+ for _, f := range fields {
+ items := strings.SplitN(f, "=", 2)
+ if len(items) < 2 {
+ continue
+ }
+ switch strings.ToLower(items[0]) {
+ case "network":
+ network = items[1]
+ case "addrs":
+ addrs = items[1]
+ case "password":
+ password = items[1]
+ case "db":
+ dbIdx, err = strconv.Atoi(items[1])
+ if err != nil {
+ return
+ }
+ }
+ }
+ return
+}
diff --git a/modules/setting/setting.go b/modules/setting/setting.go
index 2a5e37b41b..17c84d3d31 100644
--- a/modules/setting/setting.go
+++ b/modules/setting/setting.go
@@ -1093,4 +1093,5 @@ func NewServices() {
newMigrationsService()
newIndexerService()
newTaskService()
+ NewQueueService()
}
diff --git a/modules/setting/task.go b/modules/setting/task.go
index 97704d4a4d..81ed39a9fb 100644
--- a/modules/setting/task.go
+++ b/modules/setting/task.go
@@ -4,22 +4,15 @@
package setting
-var (
- // Task settings
- Task = struct {
- QueueType string
- QueueLength int
- QueueConnStr string
- }{
- QueueType: ChannelQueueType,
- QueueLength: 1000,
- QueueConnStr: "addrs=127.0.0.1:6379 db=0",
- }
-)
-
func newTaskService() {
- sec := Cfg.Section("task")
- Task.QueueType = sec.Key("QUEUE_TYPE").MustString(ChannelQueueType)
- Task.QueueLength = sec.Key("QUEUE_LENGTH").MustInt(1000)
- Task.QueueConnStr = sec.Key("QUEUE_CONN_STR").MustString("addrs=127.0.0.1:6379 db=0")
+ taskSec := Cfg.Section("task")
+ queueTaskSec := Cfg.Section("queue.task")
+ switch taskSec.Key("QUEUE_TYPE").MustString(ChannelQueueType) {
+ case ChannelQueueType:
+ queueTaskSec.Key("TYPE").MustString("persistable-channel")
+ case RedisQueueType:
+ queueTaskSec.Key("TYPE").MustString("redis")
+ }
+ queueTaskSec.Key("LENGTH").MustInt(taskSec.Key("QUEUE_LENGTH").MustInt(1000))
+ queueTaskSec.Key("CONN_STR").MustString(taskSec.Key("QUEUE_CONN_STR").MustString("addrs=127.0.0.1:6379 db=0"))
}