summaryrefslogtreecommitdiffstats
path: root/modules/setting
diff options
context:
space:
mode:
authorwxiaoguang <wxiaoguang@gmail.com>2023-05-08 19:49:59 +0800
committerGitHub <noreply@github.com>2023-05-08 19:49:59 +0800
commit6f9c278559789066aa831c1df25b0d866103d02d (patch)
treee3a1880e0d4cf88916f9d1b65d82fbd4c41ea47f /modules/setting
parentcb700aedd1e670fb47b8cf0cd67fb117a1ad88a2 (diff)
downloadgitea-6f9c278559789066aa831c1df25b0d866103d02d.tar.gz
gitea-6f9c278559789066aa831c1df25b0d866103d02d.zip
Rewrite queue (#24505)
# ⚠️ Breaking Many deprecated queue config options are removed (actually, they should have been removed in 1.18/1.19). If you see the fatal message when starting Gitea: "Please update your app.ini to remove deprecated config options", please follow the error messages to remove these options from your app.ini. Example: ``` 2023/05/06 19:39:22 [E] Removed queue option: `[indexer].ISSUE_INDEXER_QUEUE_TYPE`. Use new options in `[queue.issue_indexer]` 2023/05/06 19:39:22 [E] Removed queue option: `[indexer].UPDATE_BUFFER_LEN`. Use new options in `[queue.issue_indexer]` 2023/05/06 19:39:22 [F] Please update your app.ini to remove deprecated config options ``` Many options in `[queue]` are are dropped, including: `WRAP_IF_NECESSARY`, `MAX_ATTEMPTS`, `TIMEOUT`, `WORKERS`, `BLOCK_TIMEOUT`, `BOOST_TIMEOUT`, `BOOST_WORKERS`, they can be removed from app.ini. # The problem The old queue package has some legacy problems: * complexity: I doubt few people could tell how it works. * maintainability: Too many channels and mutex/cond are mixed together, too many different structs/interfaces depends each other. * stability: due to the complexity & maintainability, sometimes there are strange bugs and difficult to debug, and some code doesn't have test (indeed some code is difficult to test because a lot of things are mixed together). * general applicability: although it is called "queue", its behavior is not a well-known queue. * scalability: it doesn't seem easy to make it work with a cluster without breaking its behaviors. It came from some very old code to "avoid breaking", however, its technical debt is too heavy now. It's a good time to introduce a better "queue" package. # The new queue package It keeps using old config and concept as much as possible. * It only contains two major kinds of concepts: * The "base queue": channel, levelqueue, redis * They have the same abstraction, the same interface, and they are tested by the same testing code. * The "WokerPoolQueue", it uses the "base queue" to provide "worker pool" function, calls the "handler" to process the data in the base queue. * The new code doesn't do "PushBack" * Think about a queue with many workers, the "PushBack" can't guarantee the order for re-queued unhandled items, so in new code it just does "normal push" * The new code doesn't do "pause/resume" * The "pause/resume" was designed to handle some handler's failure: eg: document indexer (elasticsearch) is down * If a queue is paused for long time, either the producers blocks or the new items are dropped. * The new code doesn't do such "pause/resume" trick, it's not a common queue's behavior and it doesn't help much. * If there are unhandled items, the "push" function just blocks for a few seconds and then re-queue them and retry. * The new code doesn't do "worker booster" * Gitea's queue's handlers are light functions, the cost is only the go-routine, so it doesn't make sense to "boost" them. * The new code only use "max worker number" to limit the concurrent workers. * The new "Push" never blocks forever * Instead of creating more and more blocking goroutines, return an error is more friendly to the server and to the end user. There are more details in code comments: eg: the "Flush" problem, the strange "code.index" hanging problem, the "immediate" queue problem. Almost ready for review. TODO: * [x] add some necessary comments during review * [x] add some more tests if necessary * [x] update documents and config options * [x] test max worker / active worker * [x] re-run the CI tasks to see whether any test is flaky * [x] improve the `handleOldLengthConfiguration` to provide more friendly messages * [x] fine tune default config values (eg: length?) ## Code coverage: ![image](https://user-images.githubusercontent.com/2114189/236620635-55576955-f95d-4810-b12f-879026a3afdf.png)
Diffstat (limited to 'modules/setting')
-rw-r--r--modules/setting/config_provider.go6
-rw-r--r--modules/setting/cron_test.go2
-rw-r--r--modules/setting/indexer.go9
-rw-r--r--modules/setting/queue.go241
-rw-r--r--modules/setting/storage_test.go18
5 files changed, 89 insertions, 187 deletions
diff --git a/modules/setting/config_provider.go b/modules/setting/config_provider.go
index 1685958298..24825a6205 100644
--- a/modules/setting/config_provider.go
+++ b/modules/setting/config_provider.go
@@ -42,12 +42,12 @@ type iniFileConfigProvider struct {
// NewEmptyConfigProvider create a new empty config provider
func NewEmptyConfigProvider() ConfigProvider {
- cp, _ := newConfigProviderFromData("")
+ cp, _ := NewConfigProviderFromData("")
return cp
}
-// newConfigProviderFromData this function is only for testing
-func newConfigProviderFromData(configContent string) (ConfigProvider, error) {
+// NewConfigProviderFromData this function is only for testing
+func NewConfigProviderFromData(configContent string) (ConfigProvider, error) {
var cfg *ini.File
var err error
if configContent == "" {
diff --git a/modules/setting/cron_test.go b/modules/setting/cron_test.go
index 8d58cf8b48..3187ab18a2 100644
--- a/modules/setting/cron_test.go
+++ b/modules/setting/cron_test.go
@@ -26,7 +26,7 @@ BASE = true
SECOND = white rabbit
EXTEND = true
`
- cfg, err := newConfigProviderFromData(iniStr)
+ cfg, err := NewConfigProviderFromData(iniStr)
assert.NoError(t, err)
extended := &Extended{
diff --git a/modules/setting/indexer.go b/modules/setting/indexer.go
index 8aee8596de..6836e62311 100644
--- a/modules/setting/indexer.go
+++ b/modules/setting/indexer.go
@@ -70,15 +70,6 @@ func loadIndexerFrom(rootCfg ConfigProvider) {
Indexer.IssueIndexerName = sec.Key("ISSUE_INDEXER_NAME").MustString(Indexer.IssueIndexerName)
- // The following settings are deprecated and can be overridden by settings in [queue] or [queue.issue_indexer]
- // DEPRECATED should not be removed because users maybe upgrade from lower version to the latest version
- // if these are removed, the warning will not be shown
- deprecatedSetting(rootCfg, "indexer", "ISSUE_INDEXER_QUEUE_TYPE", "queue.issue_indexer", "TYPE", "v1.19.0")
- deprecatedSetting(rootCfg, "indexer", "ISSUE_INDEXER_QUEUE_DIR", "queue.issue_indexer", "DATADIR", "v1.19.0")
- deprecatedSetting(rootCfg, "indexer", "ISSUE_INDEXER_QUEUE_CONN_STR", "queue.issue_indexer", "CONN_STR", "v1.19.0")
- deprecatedSetting(rootCfg, "indexer", "ISSUE_INDEXER_QUEUE_BATCH_NUMBER", "queue.issue_indexer", "BATCH_LENGTH", "v1.19.0")
- deprecatedSetting(rootCfg, "indexer", "UPDATE_BUFFER_LEN", "queue.issue_indexer", "LENGTH", "v1.19.0")
-
Indexer.RepoIndexerEnabled = sec.Key("REPO_INDEXER_ENABLED").MustBool(false)
Indexer.RepoType = sec.Key("REPO_INDEXER_TYPE").MustString("bleve")
Indexer.RepoPath = filepath.ToSlash(sec.Key("REPO_INDEXER_PATH").MustString(filepath.ToSlash(filepath.Join(AppDataPath, "indexers/repos.bleve"))))
diff --git a/modules/setting/queue.go b/modules/setting/queue.go
index 8c37e538bb..8673537b52 100644
--- a/modules/setting/queue.go
+++ b/modules/setting/queue.go
@@ -5,198 +5,109 @@ package setting
import (
"path/filepath"
- "strconv"
- "time"
- "code.gitea.io/gitea/modules/container"
+ "code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
)
// QueueSettings represent the settings for a queue from the ini
type QueueSettings struct {
- Name string
- DataDir string
- QueueLength int `ini:"LENGTH"`
- BatchLength int
- ConnectionString string
- Type string
- QueueName string
- SetName string
- WrapIfNecessary bool
- MaxAttempts int
- Timeout time.Duration
- Workers int
- MaxWorkers int
- BlockTimeout time.Duration
- BoostTimeout time.Duration
- BoostWorkers int
-}
+ Name string // not an INI option, it is the name for [queue.the-name] section
-// Queue settings
-var Queue = QueueSettings{}
+ Type string
+ Datadir string
+ ConnStr string // for leveldb or redis
+ Length int // max queue length before blocking
-// GetQueueSettings returns the queue settings for the appropriately named queue
-func GetQueueSettings(name string) QueueSettings {
- return getQueueSettings(CfgProvider, name)
-}
+ QueueName, SetName string // the name suffix for storage (db key, redis key), "set" is for unique queue
-func getQueueSettings(rootCfg ConfigProvider, name string) QueueSettings {
- q := QueueSettings{}
- sec := rootCfg.Section("queue." + name)
- q.Name = name
-
- // DataDir is not directly inheritable
- q.DataDir = filepath.ToSlash(filepath.Join(Queue.DataDir, "common"))
- // QueueName is not directly inheritable either
- q.QueueName = name + Queue.QueueName
- for _, key := range sec.Keys() {
- switch key.Name() {
- case "DATADIR":
- q.DataDir = key.MustString(q.DataDir)
- case "QUEUE_NAME":
- q.QueueName = key.MustString(q.QueueName)
- case "SET_NAME":
- q.SetName = key.MustString(q.SetName)
- }
- }
- if len(q.SetName) == 0 && len(Queue.SetName) > 0 {
- q.SetName = q.QueueName + Queue.SetName
- }
- if !filepath.IsAbs(q.DataDir) {
- q.DataDir = filepath.ToSlash(filepath.Join(AppDataPath, q.DataDir))
- }
- _, _ = sec.NewKey("DATADIR", q.DataDir)
-
- // The rest are...
- q.QueueLength = sec.Key("LENGTH").MustInt(Queue.QueueLength)
- q.BatchLength = sec.Key("BATCH_LENGTH").MustInt(Queue.BatchLength)
- q.ConnectionString = sec.Key("CONN_STR").MustString(Queue.ConnectionString)
- q.Type = sec.Key("TYPE").MustString(Queue.Type)
- q.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(Queue.WrapIfNecessary)
- q.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(Queue.MaxAttempts)
- q.Timeout = sec.Key("TIMEOUT").MustDuration(Queue.Timeout)
- q.Workers = sec.Key("WORKERS").MustInt(Queue.Workers)
- q.MaxWorkers = sec.Key("MAX_WORKERS").MustInt(Queue.MaxWorkers)
- q.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(Queue.BlockTimeout)
- q.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(Queue.BoostTimeout)
- q.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(Queue.BoostWorkers)
-
- return q
+ BatchLength int
+ MaxWorkers int
}
-// LoadQueueSettings sets up the default settings for Queues
-// This is exported for tests to be able to use the queue
-func LoadQueueSettings() {
- loadQueueFrom(CfgProvider)
+var queueSettingsDefault = QueueSettings{
+ Type: "level", // dummy, channel, level, redis
+ Datadir: "queues/common", // relative to AppDataPath
+ Length: 100, // queue length before a channel queue will block
+
+ QueueName: "_queue",
+ SetName: "_unique",
+ BatchLength: 20,
+ MaxWorkers: 10,
}
-func loadQueueFrom(rootCfg ConfigProvider) {
- sec := rootCfg.Section("queue")
- Queue.DataDir = filepath.ToSlash(sec.Key("DATADIR").MustString("queues/"))
- if !filepath.IsAbs(Queue.DataDir) {
- Queue.DataDir = filepath.ToSlash(filepath.Join(AppDataPath, Queue.DataDir))
+func GetQueueSettings(rootCfg ConfigProvider, name string) (QueueSettings, error) {
+ // deep copy default settings
+ cfg := QueueSettings{}
+ if cfgBs, err := json.Marshal(queueSettingsDefault); err != nil {
+ return cfg, err
+ } else if err = json.Unmarshal(cfgBs, &cfg); err != nil {
+ return cfg, err
}
- Queue.QueueLength = sec.Key("LENGTH").MustInt(20)
- Queue.BatchLength = sec.Key("BATCH_LENGTH").MustInt(20)
- Queue.ConnectionString = sec.Key("CONN_STR").MustString("")
- defaultType := sec.Key("TYPE").String()
- Queue.Type = sec.Key("TYPE").MustString("persistable-channel")
- Queue.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(true)
- Queue.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(10)
- Queue.Timeout = sec.Key("TIMEOUT").MustDuration(GracefulHammerTime + 30*time.Second)
- Queue.Workers = sec.Key("WORKERS").MustInt(0)
- Queue.MaxWorkers = sec.Key("MAX_WORKERS").MustInt(10)
- Queue.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(1 * time.Second)
- Queue.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(5 * time.Minute)
- Queue.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(1)
- Queue.QueueName = sec.Key("QUEUE_NAME").MustString("_queue")
- Queue.SetName = sec.Key("SET_NAME").MustString("")
-
- // Now handle the old issue_indexer configuration
- // FIXME: DEPRECATED to be removed in v1.18.0
- section := rootCfg.Section("queue.issue_indexer")
- directlySet := toDirectlySetKeysSet(section)
- if !directlySet.Contains("TYPE") && defaultType == "" {
- switch typ := rootCfg.Section("indexer").Key("ISSUE_INDEXER_QUEUE_TYPE").MustString(""); typ {
- case "levelqueue":
- _, _ = section.NewKey("TYPE", "level")
- case "channel":
- _, _ = section.NewKey("TYPE", "persistable-channel")
- case "redis":
- _, _ = section.NewKey("TYPE", "redis")
- case "":
- _, _ = section.NewKey("TYPE", "level")
- default:
- log.Fatal("Unsupported indexer queue type: %v", typ)
+
+ cfg.Name = name
+ if sec, err := rootCfg.GetSection("queue"); err == nil {
+ if err = sec.MapTo(&cfg); err != nil {
+ log.Error("Failed to map queue common config for %q: %v", name, err)
+ return cfg, nil
}
}
- if !directlySet.Contains("LENGTH") {
- length := rootCfg.Section("indexer").Key("UPDATE_BUFFER_LEN").MustInt(0)
- if length != 0 {
- _, _ = section.NewKey("LENGTH", strconv.Itoa(length))
+ if sec, err := rootCfg.GetSection("queue." + name); err == nil {
+ if err = sec.MapTo(&cfg); err != nil {
+ log.Error("Failed to map queue spec config for %q: %v", name, err)
+ return cfg, nil
}
- }
- if !directlySet.Contains("BATCH_LENGTH") {
- fallback := rootCfg.Section("indexer").Key("ISSUE_INDEXER_QUEUE_BATCH_NUMBER").MustInt(0)
- if fallback != 0 {
- _, _ = section.NewKey("BATCH_LENGTH", strconv.Itoa(fallback))
+ if sec.HasKey("CONN_STR") {
+ cfg.ConnStr = sec.Key("CONN_STR").String()
}
}
- if !directlySet.Contains("DATADIR") {
- queueDir := filepath.ToSlash(rootCfg.Section("indexer").Key("ISSUE_INDEXER_QUEUE_DIR").MustString(""))
- if queueDir != "" {
- _, _ = section.NewKey("DATADIR", queueDir)
- }
+
+ if cfg.Datadir == "" {
+ cfg.Datadir = queueSettingsDefault.Datadir
}
- if !directlySet.Contains("CONN_STR") {
- connStr := rootCfg.Section("indexer").Key("ISSUE_INDEXER_QUEUE_CONN_STR").MustString("")
- if connStr != "" {
- _, _ = section.NewKey("CONN_STR", connStr)
- }
+ if !filepath.IsAbs(cfg.Datadir) {
+ cfg.Datadir = filepath.Join(AppDataPath, cfg.Datadir)
}
+ cfg.Datadir = filepath.ToSlash(cfg.Datadir)
- // FIXME: DEPRECATED to be removed in v1.18.0
- // - will need to set default for [queue.*)] LENGTH appropriately though though
-
- // Handle the old mailer configuration
- handleOldLengthConfiguration(rootCfg, "mailer", "mailer", "SEND_BUFFER_LEN", 100)
-
- // Handle the old test pull requests configuration
- // Please note this will be a unique queue
- handleOldLengthConfiguration(rootCfg, "pr_patch_checker", "repository", "PULL_REQUEST_QUEUE_LENGTH", 1000)
-
- // Handle the old mirror queue configuration
- // Please note this will be a unique queue
- handleOldLengthConfiguration(rootCfg, "mirror", "repository", "MIRROR_QUEUE_LENGTH", 1000)
-}
-
-// handleOldLengthConfiguration allows fallback to older configuration. `[queue.name]` `LENGTH` will override this configuration, but
-// if that is left unset then we should fallback to the older configuration. (Except where the new length woul be <=0)
-func handleOldLengthConfiguration(rootCfg ConfigProvider, queueName, oldSection, oldKey string, defaultValue int) {
- if rootCfg.Section(oldSection).HasKey(oldKey) {
- log.Error("Deprecated fallback for %s queue length `[%s]` `%s` present. Use `[queue.%s]` `LENGTH`. This will be removed in v1.18.0", queueName, queueName, oldSection, oldKey)
+ if cfg.Type == "redis" && cfg.ConnStr == "" {
+ cfg.ConnStr = "redis://127.0.0.1:6379/0"
}
- value := rootCfg.Section(oldSection).Key(oldKey).MustInt(defaultValue)
- // Don't override with 0
- if value <= 0 {
- return
+ if cfg.Length <= 0 {
+ cfg.Length = queueSettingsDefault.Length
}
-
- section := rootCfg.Section("queue." + queueName)
- directlySet := toDirectlySetKeysSet(section)
- if !directlySet.Contains("LENGTH") {
- _, _ = section.NewKey("LENGTH", strconv.Itoa(value))
+ if cfg.MaxWorkers <= 0 {
+ cfg.MaxWorkers = queueSettingsDefault.MaxWorkers
}
+ if cfg.BatchLength <= 0 {
+ cfg.BatchLength = queueSettingsDefault.BatchLength
+ }
+
+ return cfg, nil
}
-// toDirectlySetKeysSet returns a set of keys directly set by this section
-// Note: we cannot use section.HasKey(...) as that will immediately set the Key if a parent section has the Key
-// but this section does not.
-func toDirectlySetKeysSet(section ConfigSection) container.Set[string] {
- sections := make(container.Set[string])
- for _, key := range section.Keys() {
- sections.Add(key.Name())
+func LoadQueueSettings() {
+ loadQueueFrom(CfgProvider)
+}
+
+func loadQueueFrom(rootCfg ConfigProvider) {
+ hasOld := false
+ handleOldLengthConfiguration := func(rootCfg ConfigProvider, newQueueName, oldSection, oldKey string) {
+ if rootCfg.Section(oldSection).HasKey(oldKey) {
+ hasOld = true
+ log.Error("Removed queue option: `[%s].%s`. Use new options in `[queue.%s]`", oldSection, oldKey, newQueueName)
+ }
+ }
+ handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_TYPE")
+ handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_BATCH_NUMBER")
+ handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_DIR")
+ handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_CONN_STR")
+ handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "UPDATE_BUFFER_LEN")
+ handleOldLengthConfiguration(rootCfg, "mailer", "mailer", "SEND_BUFFER_LEN")
+ handleOldLengthConfiguration(rootCfg, "pr_patch_checker", "repository", "PULL_REQUEST_QUEUE_LENGTH")
+ handleOldLengthConfiguration(rootCfg, "mirror", "repository", "MIRROR_QUEUE_LENGTH")
+ if hasOld {
+ log.Fatal("Please update your app.ini to remove deprecated config options")
}
- return sections
}
diff --git a/modules/setting/storage_test.go b/modules/setting/storage_test.go
index 9c51bbc081..5e213606e3 100644
--- a/modules/setting/storage_test.go
+++ b/modules/setting/storage_test.go
@@ -19,7 +19,7 @@ MINIO_BUCKET = gitea-attachment
STORAGE_TYPE = minio
MINIO_ENDPOINT = my_minio:9000
`
- cfg, err := newConfigProviderFromData(iniStr)
+ cfg, err := NewConfigProviderFromData(iniStr)
assert.NoError(t, err)
sec := cfg.Section("attachment")
@@ -42,7 +42,7 @@ MINIO_BUCKET = gitea-attachment
[storage.minio]
MINIO_BUCKET = gitea
`
- cfg, err := newConfigProviderFromData(iniStr)
+ cfg, err := NewConfigProviderFromData(iniStr)
assert.NoError(t, err)
sec := cfg.Section("attachment")
@@ -64,7 +64,7 @@ MINIO_BUCKET = gitea-minio
[storage]
MINIO_BUCKET = gitea
`
- cfg, err := newConfigProviderFromData(iniStr)
+ cfg, err := NewConfigProviderFromData(iniStr)
assert.NoError(t, err)
sec := cfg.Section("attachment")
@@ -87,7 +87,7 @@ MINIO_BUCKET = gitea
[storage]
STORAGE_TYPE = local
`
- cfg, err := newConfigProviderFromData(iniStr)
+ cfg, err := NewConfigProviderFromData(iniStr)
assert.NoError(t, err)
sec := cfg.Section("attachment")
@@ -99,7 +99,7 @@ STORAGE_TYPE = local
}
func Test_getStorageGetDefaults(t *testing.T) {
- cfg, err := newConfigProviderFromData("")
+ cfg, err := NewConfigProviderFromData("")
assert.NoError(t, err)
sec := cfg.Section("attachment")
@@ -120,7 +120,7 @@ MINIO_BUCKET = gitea-attachment
[storage]
MINIO_BUCKET = gitea-storage
`
- cfg, err := newConfigProviderFromData(iniStr)
+ cfg, err := NewConfigProviderFromData(iniStr)
assert.NoError(t, err)
{
@@ -154,7 +154,7 @@ STORAGE_TYPE = lfs
[storage.lfs]
MINIO_BUCKET = gitea-storage
`
- cfg, err := newConfigProviderFromData(iniStr)
+ cfg, err := NewConfigProviderFromData(iniStr)
assert.NoError(t, err)
{
@@ -178,7 +178,7 @@ func Test_getStorageInheritStorageType(t *testing.T) {
[storage]
STORAGE_TYPE = minio
`
- cfg, err := newConfigProviderFromData(iniStr)
+ cfg, err := NewConfigProviderFromData(iniStr)
assert.NoError(t, err)
sec := cfg.Section("attachment")
@@ -193,7 +193,7 @@ func Test_getStorageInheritNameSectionType(t *testing.T) {
[storage.attachments]
STORAGE_TYPE = minio
`
- cfg, err := newConfigProviderFromData(iniStr)
+ cfg, err := NewConfigProviderFromData(iniStr)
assert.NoError(t, err)
sec := cfg.Section("attachment")