summaryrefslogtreecommitdiffstats
path: root/modules/setting/queue.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/setting/queue.go')
-rw-r--r--modules/setting/queue.go42
1 files changed, 25 insertions, 17 deletions
diff --git a/modules/setting/queue.go b/modules/setting/queue.go
index a67d0d849a..bd4bf48e33 100644
--- a/modules/setting/queue.go
+++ b/modules/setting/queue.go
@@ -39,8 +39,12 @@ var Queue = QueueSettings{}
// GetQueueSettings returns the queue settings for the appropriately named queue
func GetQueueSettings(name string) QueueSettings {
+ return getQueueSettings(CfgProvider, name)
+}
+
+func getQueueSettings(rootCfg ConfigProvider, name string) QueueSettings {
q := QueueSettings{}
- sec := Cfg.Section("queue." + name)
+ sec := rootCfg.Section("queue." + name)
q.Name = name
// DataDir is not directly inheritable
@@ -82,10 +86,14 @@ func GetQueueSettings(name string) QueueSettings {
return q
}
-// NewQueueService sets up the default settings for Queues
+// LoadQueueSettings sets up the default settings for Queues
// This is exported for tests to be able to use the queue
-func NewQueueService() {
- sec := Cfg.Section("queue")
+func LoadQueueSettings() {
+ loadQueueFrom(CfgProvider)
+}
+
+func loadQueueFrom(rootCfg ConfigProvider) {
+ sec := rootCfg.Section("queue")
Queue.DataDir = filepath.ToSlash(sec.Key("DATADIR").MustString("queues/"))
if !filepath.IsAbs(Queue.DataDir) {
Queue.DataDir = filepath.ToSlash(filepath.Join(AppDataPath, Queue.DataDir))
@@ -108,10 +116,10 @@ func NewQueueService() {
// Now handle the old issue_indexer configuration
// FIXME: DEPRECATED to be removed in v1.18.0
- section := Cfg.Section("queue.issue_indexer")
+ section := rootCfg.Section("queue.issue_indexer")
directlySet := toDirectlySetKeysSet(section)
if !directlySet.Contains("TYPE") && defaultType == "" {
- switch typ := Cfg.Section("indexer").Key("ISSUE_INDEXER_QUEUE_TYPE").MustString(""); typ {
+ switch typ := rootCfg.Section("indexer").Key("ISSUE_INDEXER_QUEUE_TYPE").MustString(""); typ {
case "levelqueue":
_, _ = section.NewKey("TYPE", "level")
case "channel":
@@ -125,25 +133,25 @@ func NewQueueService() {
}
}
if !directlySet.Contains("LENGTH") {
- length := Cfg.Section("indexer").Key("UPDATE_BUFFER_LEN").MustInt(0)
+ length := rootCfg.Section("indexer").Key("UPDATE_BUFFER_LEN").MustInt(0)
if length != 0 {
_, _ = section.NewKey("LENGTH", strconv.Itoa(length))
}
}
if !directlySet.Contains("BATCH_LENGTH") {
- fallback := Cfg.Section("indexer").Key("ISSUE_INDEXER_QUEUE_BATCH_NUMBER").MustInt(0)
+ fallback := rootCfg.Section("indexer").Key("ISSUE_INDEXER_QUEUE_BATCH_NUMBER").MustInt(0)
if fallback != 0 {
_, _ = section.NewKey("BATCH_LENGTH", strconv.Itoa(fallback))
}
}
if !directlySet.Contains("DATADIR") {
- queueDir := filepath.ToSlash(Cfg.Section("indexer").Key("ISSUE_INDEXER_QUEUE_DIR").MustString(""))
+ queueDir := filepath.ToSlash(rootCfg.Section("indexer").Key("ISSUE_INDEXER_QUEUE_DIR").MustString(""))
if queueDir != "" {
_, _ = section.NewKey("DATADIR", queueDir)
}
}
if !directlySet.Contains("CONN_STR") {
- connStr := Cfg.Section("indexer").Key("ISSUE_INDEXER_QUEUE_CONN_STR").MustString("")
+ connStr := rootCfg.Section("indexer").Key("ISSUE_INDEXER_QUEUE_CONN_STR").MustString("")
if connStr != "" {
_, _ = section.NewKey("CONN_STR", connStr)
}
@@ -153,31 +161,31 @@ func NewQueueService() {
// - will need to set default for [queue.*)] LENGTH appropriately though though
// Handle the old mailer configuration
- handleOldLengthConfiguration("mailer", "mailer", "SEND_BUFFER_LEN", 100)
+ handleOldLengthConfiguration(rootCfg, "mailer", "mailer", "SEND_BUFFER_LEN", 100)
// Handle the old test pull requests configuration
// Please note this will be a unique queue
- handleOldLengthConfiguration("pr_patch_checker", "repository", "PULL_REQUEST_QUEUE_LENGTH", 1000)
+ handleOldLengthConfiguration(rootCfg, "pr_patch_checker", "repository", "PULL_REQUEST_QUEUE_LENGTH", 1000)
// Handle the old mirror queue configuration
// Please note this will be a unique queue
- handleOldLengthConfiguration("mirror", "repository", "MIRROR_QUEUE_LENGTH", 1000)
+ handleOldLengthConfiguration(rootCfg, "mirror", "repository", "MIRROR_QUEUE_LENGTH", 1000)
}
// handleOldLengthConfiguration allows fallback to older configuration. `[queue.name]` `LENGTH` will override this configuration, but
// if that is left unset then we should fallback to the older configuration. (Except where the new length woul be <=0)
-func handleOldLengthConfiguration(queueName, oldSection, oldKey string, defaultValue int) {
- if Cfg.Section(oldSection).HasKey(oldKey) {
+func handleOldLengthConfiguration(rootCfg ConfigProvider, queueName, oldSection, oldKey string, defaultValue int) {
+ if rootCfg.Section(oldSection).HasKey(oldKey) {
log.Error("Deprecated fallback for %s queue length `[%s]` `%s` present. Use `[queue.%s]` `LENGTH`. This will be removed in v1.18.0", queueName, queueName, oldSection, oldKey)
}
- value := Cfg.Section(oldSection).Key(oldKey).MustInt(defaultValue)
+ value := rootCfg.Section(oldSection).Key(oldKey).MustInt(defaultValue)
// Don't override with 0
if value <= 0 {
return
}
- section := Cfg.Section("queue." + queueName)
+ section := rootCfg.Section("queue." + queueName)
directlySet := toDirectlySetKeysSet(section)
if !directlySet.Contains("LENGTH") {
_, _ = section.NewKey("LENGTH", strconv.Itoa(value))