diff options
author | wxiaoguang <wxiaoguang@gmail.com> | 2023-05-08 19:49:59 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-05-08 19:49:59 +0800 |
commit | 6f9c278559789066aa831c1df25b0d866103d02d (patch) | |
tree | e3a1880e0d4cf88916f9d1b65d82fbd4c41ea47f /modules/setting | |
parent | cb700aedd1e670fb47b8cf0cd67fb117a1ad88a2 (diff) | |
download | gitea-6f9c278559789066aa831c1df25b0d866103d02d.tar.gz gitea-6f9c278559789066aa831c1df25b0d866103d02d.zip |
Rewrite queue (#24505)
# ⚠️ Breaking
Many deprecated queue config options are removed (actually, they should
have been removed in 1.18/1.19).
If you see the fatal message when starting Gitea: "Please update your
app.ini to remove deprecated config options", please follow the error
messages to remove these options from your app.ini.
Example:
```
2023/05/06 19:39:22 [E] Removed queue option: `[indexer].ISSUE_INDEXER_QUEUE_TYPE`. Use new options in `[queue.issue_indexer]`
2023/05/06 19:39:22 [E] Removed queue option: `[indexer].UPDATE_BUFFER_LEN`. Use new options in `[queue.issue_indexer]`
2023/05/06 19:39:22 [F] Please update your app.ini to remove deprecated config options
```
Many options in `[queue]` are are dropped, including:
`WRAP_IF_NECESSARY`, `MAX_ATTEMPTS`, `TIMEOUT`, `WORKERS`,
`BLOCK_TIMEOUT`, `BOOST_TIMEOUT`, `BOOST_WORKERS`, they can be removed
from app.ini.
# The problem
The old queue package has some legacy problems:
* complexity: I doubt few people could tell how it works.
* maintainability: Too many channels and mutex/cond are mixed together,
too many different structs/interfaces depends each other.
* stability: due to the complexity & maintainability, sometimes there
are strange bugs and difficult to debug, and some code doesn't have test
(indeed some code is difficult to test because a lot of things are mixed
together).
* general applicability: although it is called "queue", its behavior is
not a well-known queue.
* scalability: it doesn't seem easy to make it work with a cluster
without breaking its behaviors.
It came from some very old code to "avoid breaking", however, its
technical debt is too heavy now. It's a good time to introduce a better
"queue" package.
# The new queue package
It keeps using old config and concept as much as possible.
* It only contains two major kinds of concepts:
* The "base queue": channel, levelqueue, redis
* They have the same abstraction, the same interface, and they are
tested by the same testing code.
* The "WokerPoolQueue", it uses the "base queue" to provide "worker
pool" function, calls the "handler" to process the data in the base
queue.
* The new code doesn't do "PushBack"
* Think about a queue with many workers, the "PushBack" can't guarantee
the order for re-queued unhandled items, so in new code it just does
"normal push"
* The new code doesn't do "pause/resume"
* The "pause/resume" was designed to handle some handler's failure: eg:
document indexer (elasticsearch) is down
* If a queue is paused for long time, either the producers blocks or the
new items are dropped.
* The new code doesn't do such "pause/resume" trick, it's not a common
queue's behavior and it doesn't help much.
* If there are unhandled items, the "push" function just blocks for a
few seconds and then re-queue them and retry.
* The new code doesn't do "worker booster"
* Gitea's queue's handlers are light functions, the cost is only the
go-routine, so it doesn't make sense to "boost" them.
* The new code only use "max worker number" to limit the concurrent
workers.
* The new "Push" never blocks forever
* Instead of creating more and more blocking goroutines, return an error
is more friendly to the server and to the end user.
There are more details in code comments: eg: the "Flush" problem, the
strange "code.index" hanging problem, the "immediate" queue problem.
Almost ready for review.
TODO:
* [x] add some necessary comments during review
* [x] add some more tests if necessary
* [x] update documents and config options
* [x] test max worker / active worker
* [x] re-run the CI tasks to see whether any test is flaky
* [x] improve the `handleOldLengthConfiguration` to provide more
friendly messages
* [x] fine tune default config values (eg: length?)
## Code coverage:
![image](https://user-images.githubusercontent.com/2114189/236620635-55576955-f95d-4810-b12f-879026a3afdf.png)
Diffstat (limited to 'modules/setting')
-rw-r--r-- | modules/setting/config_provider.go | 6 | ||||
-rw-r--r-- | modules/setting/cron_test.go | 2 | ||||
-rw-r--r-- | modules/setting/indexer.go | 9 | ||||
-rw-r--r-- | modules/setting/queue.go | 241 | ||||
-rw-r--r-- | modules/setting/storage_test.go | 18 |
5 files changed, 89 insertions, 187 deletions
diff --git a/modules/setting/config_provider.go b/modules/setting/config_provider.go index 1685958298..24825a6205 100644 --- a/modules/setting/config_provider.go +++ b/modules/setting/config_provider.go @@ -42,12 +42,12 @@ type iniFileConfigProvider struct { // NewEmptyConfigProvider create a new empty config provider func NewEmptyConfigProvider() ConfigProvider { - cp, _ := newConfigProviderFromData("") + cp, _ := NewConfigProviderFromData("") return cp } -// newConfigProviderFromData this function is only for testing -func newConfigProviderFromData(configContent string) (ConfigProvider, error) { +// NewConfigProviderFromData this function is only for testing +func NewConfigProviderFromData(configContent string) (ConfigProvider, error) { var cfg *ini.File var err error if configContent == "" { diff --git a/modules/setting/cron_test.go b/modules/setting/cron_test.go index 8d58cf8b48..3187ab18a2 100644 --- a/modules/setting/cron_test.go +++ b/modules/setting/cron_test.go @@ -26,7 +26,7 @@ BASE = true SECOND = white rabbit EXTEND = true ` - cfg, err := newConfigProviderFromData(iniStr) + cfg, err := NewConfigProviderFromData(iniStr) assert.NoError(t, err) extended := &Extended{ diff --git a/modules/setting/indexer.go b/modules/setting/indexer.go index 8aee8596de..6836e62311 100644 --- a/modules/setting/indexer.go +++ b/modules/setting/indexer.go @@ -70,15 +70,6 @@ func loadIndexerFrom(rootCfg ConfigProvider) { Indexer.IssueIndexerName = sec.Key("ISSUE_INDEXER_NAME").MustString(Indexer.IssueIndexerName) - // The following settings are deprecated and can be overridden by settings in [queue] or [queue.issue_indexer] - // DEPRECATED should not be removed because users maybe upgrade from lower version to the latest version - // if these are removed, the warning will not be shown - deprecatedSetting(rootCfg, "indexer", "ISSUE_INDEXER_QUEUE_TYPE", "queue.issue_indexer", "TYPE", "v1.19.0") - deprecatedSetting(rootCfg, "indexer", "ISSUE_INDEXER_QUEUE_DIR", "queue.issue_indexer", "DATADIR", "v1.19.0") - deprecatedSetting(rootCfg, "indexer", "ISSUE_INDEXER_QUEUE_CONN_STR", "queue.issue_indexer", "CONN_STR", "v1.19.0") - deprecatedSetting(rootCfg, "indexer", "ISSUE_INDEXER_QUEUE_BATCH_NUMBER", "queue.issue_indexer", "BATCH_LENGTH", "v1.19.0") - deprecatedSetting(rootCfg, "indexer", "UPDATE_BUFFER_LEN", "queue.issue_indexer", "LENGTH", "v1.19.0") - Indexer.RepoIndexerEnabled = sec.Key("REPO_INDEXER_ENABLED").MustBool(false) Indexer.RepoType = sec.Key("REPO_INDEXER_TYPE").MustString("bleve") Indexer.RepoPath = filepath.ToSlash(sec.Key("REPO_INDEXER_PATH").MustString(filepath.ToSlash(filepath.Join(AppDataPath, "indexers/repos.bleve")))) diff --git a/modules/setting/queue.go b/modules/setting/queue.go index 8c37e538bb..8673537b52 100644 --- a/modules/setting/queue.go +++ b/modules/setting/queue.go @@ -5,198 +5,109 @@ package setting import ( "path/filepath" - "strconv" - "time" - "code.gitea.io/gitea/modules/container" + "code.gitea.io/gitea/modules/json" "code.gitea.io/gitea/modules/log" ) // QueueSettings represent the settings for a queue from the ini type QueueSettings struct { - Name string - DataDir string - QueueLength int `ini:"LENGTH"` - BatchLength int - ConnectionString string - Type string - QueueName string - SetName string - WrapIfNecessary bool - MaxAttempts int - Timeout time.Duration - Workers int - MaxWorkers int - BlockTimeout time.Duration - BoostTimeout time.Duration - BoostWorkers int -} + Name string // not an INI option, it is the name for [queue.the-name] section -// Queue settings -var Queue = QueueSettings{} + Type string + Datadir string + ConnStr string // for leveldb or redis + Length int // max queue length before blocking -// GetQueueSettings returns the queue settings for the appropriately named queue -func GetQueueSettings(name string) QueueSettings { - return getQueueSettings(CfgProvider, name) -} + QueueName, SetName string // the name suffix for storage (db key, redis key), "set" is for unique queue -func getQueueSettings(rootCfg ConfigProvider, name string) QueueSettings { - q := QueueSettings{} - sec := rootCfg.Section("queue." + name) - q.Name = name - - // DataDir is not directly inheritable - q.DataDir = filepath.ToSlash(filepath.Join(Queue.DataDir, "common")) - // QueueName is not directly inheritable either - q.QueueName = name + Queue.QueueName - for _, key := range sec.Keys() { - switch key.Name() { - case "DATADIR": - q.DataDir = key.MustString(q.DataDir) - case "QUEUE_NAME": - q.QueueName = key.MustString(q.QueueName) - case "SET_NAME": - q.SetName = key.MustString(q.SetName) - } - } - if len(q.SetName) == 0 && len(Queue.SetName) > 0 { - q.SetName = q.QueueName + Queue.SetName - } - if !filepath.IsAbs(q.DataDir) { - q.DataDir = filepath.ToSlash(filepath.Join(AppDataPath, q.DataDir)) - } - _, _ = sec.NewKey("DATADIR", q.DataDir) - - // The rest are... - q.QueueLength = sec.Key("LENGTH").MustInt(Queue.QueueLength) - q.BatchLength = sec.Key("BATCH_LENGTH").MustInt(Queue.BatchLength) - q.ConnectionString = sec.Key("CONN_STR").MustString(Queue.ConnectionString) - q.Type = sec.Key("TYPE").MustString(Queue.Type) - q.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(Queue.WrapIfNecessary) - q.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(Queue.MaxAttempts) - q.Timeout = sec.Key("TIMEOUT").MustDuration(Queue.Timeout) - q.Workers = sec.Key("WORKERS").MustInt(Queue.Workers) - q.MaxWorkers = sec.Key("MAX_WORKERS").MustInt(Queue.MaxWorkers) - q.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(Queue.BlockTimeout) - q.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(Queue.BoostTimeout) - q.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(Queue.BoostWorkers) - - return q + BatchLength int + MaxWorkers int } -// LoadQueueSettings sets up the default settings for Queues -// This is exported for tests to be able to use the queue -func LoadQueueSettings() { - loadQueueFrom(CfgProvider) +var queueSettingsDefault = QueueSettings{ + Type: "level", // dummy, channel, level, redis + Datadir: "queues/common", // relative to AppDataPath + Length: 100, // queue length before a channel queue will block + + QueueName: "_queue", + SetName: "_unique", + BatchLength: 20, + MaxWorkers: 10, } -func loadQueueFrom(rootCfg ConfigProvider) { - sec := rootCfg.Section("queue") - Queue.DataDir = filepath.ToSlash(sec.Key("DATADIR").MustString("queues/")) - if !filepath.IsAbs(Queue.DataDir) { - Queue.DataDir = filepath.ToSlash(filepath.Join(AppDataPath, Queue.DataDir)) +func GetQueueSettings(rootCfg ConfigProvider, name string) (QueueSettings, error) { + // deep copy default settings + cfg := QueueSettings{} + if cfgBs, err := json.Marshal(queueSettingsDefault); err != nil { + return cfg, err + } else if err = json.Unmarshal(cfgBs, &cfg); err != nil { + return cfg, err } - Queue.QueueLength = sec.Key("LENGTH").MustInt(20) - Queue.BatchLength = sec.Key("BATCH_LENGTH").MustInt(20) - Queue.ConnectionString = sec.Key("CONN_STR").MustString("") - defaultType := sec.Key("TYPE").String() - Queue.Type = sec.Key("TYPE").MustString("persistable-channel") - Queue.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(true) - Queue.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(10) - Queue.Timeout = sec.Key("TIMEOUT").MustDuration(GracefulHammerTime + 30*time.Second) - Queue.Workers = sec.Key("WORKERS").MustInt(0) - Queue.MaxWorkers = sec.Key("MAX_WORKERS").MustInt(10) - Queue.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(1 * time.Second) - Queue.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(5 * time.Minute) - Queue.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(1) - Queue.QueueName = sec.Key("QUEUE_NAME").MustString("_queue") - Queue.SetName = sec.Key("SET_NAME").MustString("") - - // Now handle the old issue_indexer configuration - // FIXME: DEPRECATED to be removed in v1.18.0 - section := rootCfg.Section("queue.issue_indexer") - directlySet := toDirectlySetKeysSet(section) - if !directlySet.Contains("TYPE") && defaultType == "" { - switch typ := rootCfg.Section("indexer").Key("ISSUE_INDEXER_QUEUE_TYPE").MustString(""); typ { - case "levelqueue": - _, _ = section.NewKey("TYPE", "level") - case "channel": - _, _ = section.NewKey("TYPE", "persistable-channel") - case "redis": - _, _ = section.NewKey("TYPE", "redis") - case "": - _, _ = section.NewKey("TYPE", "level") - default: - log.Fatal("Unsupported indexer queue type: %v", typ) + + cfg.Name = name + if sec, err := rootCfg.GetSection("queue"); err == nil { + if err = sec.MapTo(&cfg); err != nil { + log.Error("Failed to map queue common config for %q: %v", name, err) + return cfg, nil } } - if !directlySet.Contains("LENGTH") { - length := rootCfg.Section("indexer").Key("UPDATE_BUFFER_LEN").MustInt(0) - if length != 0 { - _, _ = section.NewKey("LENGTH", strconv.Itoa(length)) + if sec, err := rootCfg.GetSection("queue." + name); err == nil { + if err = sec.MapTo(&cfg); err != nil { + log.Error("Failed to map queue spec config for %q: %v", name, err) + return cfg, nil } - } - if !directlySet.Contains("BATCH_LENGTH") { - fallback := rootCfg.Section("indexer").Key("ISSUE_INDEXER_QUEUE_BATCH_NUMBER").MustInt(0) - if fallback != 0 { - _, _ = section.NewKey("BATCH_LENGTH", strconv.Itoa(fallback)) + if sec.HasKey("CONN_STR") { + cfg.ConnStr = sec.Key("CONN_STR").String() } } - if !directlySet.Contains("DATADIR") { - queueDir := filepath.ToSlash(rootCfg.Section("indexer").Key("ISSUE_INDEXER_QUEUE_DIR").MustString("")) - if queueDir != "" { - _, _ = section.NewKey("DATADIR", queueDir) - } + + if cfg.Datadir == "" { + cfg.Datadir = queueSettingsDefault.Datadir } - if !directlySet.Contains("CONN_STR") { - connStr := rootCfg.Section("indexer").Key("ISSUE_INDEXER_QUEUE_CONN_STR").MustString("") - if connStr != "" { - _, _ = section.NewKey("CONN_STR", connStr) - } + if !filepath.IsAbs(cfg.Datadir) { + cfg.Datadir = filepath.Join(AppDataPath, cfg.Datadir) } + cfg.Datadir = filepath.ToSlash(cfg.Datadir) - // FIXME: DEPRECATED to be removed in v1.18.0 - // - will need to set default for [queue.*)] LENGTH appropriately though though - - // Handle the old mailer configuration - handleOldLengthConfiguration(rootCfg, "mailer", "mailer", "SEND_BUFFER_LEN", 100) - - // Handle the old test pull requests configuration - // Please note this will be a unique queue - handleOldLengthConfiguration(rootCfg, "pr_patch_checker", "repository", "PULL_REQUEST_QUEUE_LENGTH", 1000) - - // Handle the old mirror queue configuration - // Please note this will be a unique queue - handleOldLengthConfiguration(rootCfg, "mirror", "repository", "MIRROR_QUEUE_LENGTH", 1000) -} - -// handleOldLengthConfiguration allows fallback to older configuration. `[queue.name]` `LENGTH` will override this configuration, but -// if that is left unset then we should fallback to the older configuration. (Except where the new length woul be <=0) -func handleOldLengthConfiguration(rootCfg ConfigProvider, queueName, oldSection, oldKey string, defaultValue int) { - if rootCfg.Section(oldSection).HasKey(oldKey) { - log.Error("Deprecated fallback for %s queue length `[%s]` `%s` present. Use `[queue.%s]` `LENGTH`. This will be removed in v1.18.0", queueName, queueName, oldSection, oldKey) + if cfg.Type == "redis" && cfg.ConnStr == "" { + cfg.ConnStr = "redis://127.0.0.1:6379/0" } - value := rootCfg.Section(oldSection).Key(oldKey).MustInt(defaultValue) - // Don't override with 0 - if value <= 0 { - return + if cfg.Length <= 0 { + cfg.Length = queueSettingsDefault.Length } - - section := rootCfg.Section("queue." + queueName) - directlySet := toDirectlySetKeysSet(section) - if !directlySet.Contains("LENGTH") { - _, _ = section.NewKey("LENGTH", strconv.Itoa(value)) + if cfg.MaxWorkers <= 0 { + cfg.MaxWorkers = queueSettingsDefault.MaxWorkers } + if cfg.BatchLength <= 0 { + cfg.BatchLength = queueSettingsDefault.BatchLength + } + + return cfg, nil } -// toDirectlySetKeysSet returns a set of keys directly set by this section -// Note: we cannot use section.HasKey(...) as that will immediately set the Key if a parent section has the Key -// but this section does not. -func toDirectlySetKeysSet(section ConfigSection) container.Set[string] { - sections := make(container.Set[string]) - for _, key := range section.Keys() { - sections.Add(key.Name()) +func LoadQueueSettings() { + loadQueueFrom(CfgProvider) +} + +func loadQueueFrom(rootCfg ConfigProvider) { + hasOld := false + handleOldLengthConfiguration := func(rootCfg ConfigProvider, newQueueName, oldSection, oldKey string) { + if rootCfg.Section(oldSection).HasKey(oldKey) { + hasOld = true + log.Error("Removed queue option: `[%s].%s`. Use new options in `[queue.%s]`", oldSection, oldKey, newQueueName) + } + } + handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_TYPE") + handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_BATCH_NUMBER") + handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_DIR") + handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_CONN_STR") + handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "UPDATE_BUFFER_LEN") + handleOldLengthConfiguration(rootCfg, "mailer", "mailer", "SEND_BUFFER_LEN") + handleOldLengthConfiguration(rootCfg, "pr_patch_checker", "repository", "PULL_REQUEST_QUEUE_LENGTH") + handleOldLengthConfiguration(rootCfg, "mirror", "repository", "MIRROR_QUEUE_LENGTH") + if hasOld { + log.Fatal("Please update your app.ini to remove deprecated config options") } - return sections } diff --git a/modules/setting/storage_test.go b/modules/setting/storage_test.go index 9c51bbc081..5e213606e3 100644 --- a/modules/setting/storage_test.go +++ b/modules/setting/storage_test.go @@ -19,7 +19,7 @@ MINIO_BUCKET = gitea-attachment STORAGE_TYPE = minio MINIO_ENDPOINT = my_minio:9000 ` - cfg, err := newConfigProviderFromData(iniStr) + cfg, err := NewConfigProviderFromData(iniStr) assert.NoError(t, err) sec := cfg.Section("attachment") @@ -42,7 +42,7 @@ MINIO_BUCKET = gitea-attachment [storage.minio] MINIO_BUCKET = gitea ` - cfg, err := newConfigProviderFromData(iniStr) + cfg, err := NewConfigProviderFromData(iniStr) assert.NoError(t, err) sec := cfg.Section("attachment") @@ -64,7 +64,7 @@ MINIO_BUCKET = gitea-minio [storage] MINIO_BUCKET = gitea ` - cfg, err := newConfigProviderFromData(iniStr) + cfg, err := NewConfigProviderFromData(iniStr) assert.NoError(t, err) sec := cfg.Section("attachment") @@ -87,7 +87,7 @@ MINIO_BUCKET = gitea [storage] STORAGE_TYPE = local ` - cfg, err := newConfigProviderFromData(iniStr) + cfg, err := NewConfigProviderFromData(iniStr) assert.NoError(t, err) sec := cfg.Section("attachment") @@ -99,7 +99,7 @@ STORAGE_TYPE = local } func Test_getStorageGetDefaults(t *testing.T) { - cfg, err := newConfigProviderFromData("") + cfg, err := NewConfigProviderFromData("") assert.NoError(t, err) sec := cfg.Section("attachment") @@ -120,7 +120,7 @@ MINIO_BUCKET = gitea-attachment [storage] MINIO_BUCKET = gitea-storage ` - cfg, err := newConfigProviderFromData(iniStr) + cfg, err := NewConfigProviderFromData(iniStr) assert.NoError(t, err) { @@ -154,7 +154,7 @@ STORAGE_TYPE = lfs [storage.lfs] MINIO_BUCKET = gitea-storage ` - cfg, err := newConfigProviderFromData(iniStr) + cfg, err := NewConfigProviderFromData(iniStr) assert.NoError(t, err) { @@ -178,7 +178,7 @@ func Test_getStorageInheritStorageType(t *testing.T) { [storage] STORAGE_TYPE = minio ` - cfg, err := newConfigProviderFromData(iniStr) + cfg, err := NewConfigProviderFromData(iniStr) assert.NoError(t, err) sec := cfg.Section("attachment") @@ -193,7 +193,7 @@ func Test_getStorageInheritNameSectionType(t *testing.T) { [storage.attachments] STORAGE_TYPE = minio ` - cfg, err := newConfigProviderFromData(iniStr) + cfg, err := NewConfigProviderFromData(iniStr) assert.NoError(t, err) sec := cfg.Section("attachment") |