diff options
author | wxiaoguang <wxiaoguang@gmail.com> | 2023-05-08 19:49:59 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-05-08 19:49:59 +0800 |
commit | 6f9c278559789066aa831c1df25b0d866103d02d (patch) | |
tree | e3a1880e0d4cf88916f9d1b65d82fbd4c41ea47f /tests | |
parent | cb700aedd1e670fb47b8cf0cd67fb117a1ad88a2 (diff) | |
download | gitea-6f9c278559789066aa831c1df25b0d866103d02d.tar.gz gitea-6f9c278559789066aa831c1df25b0d866103d02d.zip |
Rewrite queue (#24505)
# ⚠️ Breaking
Many deprecated queue config options are removed (actually, they should
have been removed in 1.18/1.19).
If you see the fatal message when starting Gitea: "Please update your
app.ini to remove deprecated config options", please follow the error
messages to remove these options from your app.ini.
Example:
```
2023/05/06 19:39:22 [E] Removed queue option: `[indexer].ISSUE_INDEXER_QUEUE_TYPE`. Use new options in `[queue.issue_indexer]`
2023/05/06 19:39:22 [E] Removed queue option: `[indexer].UPDATE_BUFFER_LEN`. Use new options in `[queue.issue_indexer]`
2023/05/06 19:39:22 [F] Please update your app.ini to remove deprecated config options
```
Many options in `[queue]` are are dropped, including:
`WRAP_IF_NECESSARY`, `MAX_ATTEMPTS`, `TIMEOUT`, `WORKERS`,
`BLOCK_TIMEOUT`, `BOOST_TIMEOUT`, `BOOST_WORKERS`, they can be removed
from app.ini.
# The problem
The old queue package has some legacy problems:
* complexity: I doubt few people could tell how it works.
* maintainability: Too many channels and mutex/cond are mixed together,
too many different structs/interfaces depends each other.
* stability: due to the complexity & maintainability, sometimes there
are strange bugs and difficult to debug, and some code doesn't have test
(indeed some code is difficult to test because a lot of things are mixed
together).
* general applicability: although it is called "queue", its behavior is
not a well-known queue.
* scalability: it doesn't seem easy to make it work with a cluster
without breaking its behaviors.
It came from some very old code to "avoid breaking", however, its
technical debt is too heavy now. It's a good time to introduce a better
"queue" package.
# The new queue package
It keeps using old config and concept as much as possible.
* It only contains two major kinds of concepts:
* The "base queue": channel, levelqueue, redis
* They have the same abstraction, the same interface, and they are
tested by the same testing code.
* The "WokerPoolQueue", it uses the "base queue" to provide "worker
pool" function, calls the "handler" to process the data in the base
queue.
* The new code doesn't do "PushBack"
* Think about a queue with many workers, the "PushBack" can't guarantee
the order for re-queued unhandled items, so in new code it just does
"normal push"
* The new code doesn't do "pause/resume"
* The "pause/resume" was designed to handle some handler's failure: eg:
document indexer (elasticsearch) is down
* If a queue is paused for long time, either the producers blocks or the
new items are dropped.
* The new code doesn't do such "pause/resume" trick, it's not a common
queue's behavior and it doesn't help much.
* If there are unhandled items, the "push" function just blocks for a
few seconds and then re-queue them and retry.
* The new code doesn't do "worker booster"
* Gitea's queue's handlers are light functions, the cost is only the
go-routine, so it doesn't make sense to "boost" them.
* The new code only use "max worker number" to limit the concurrent
workers.
* The new "Push" never blocks forever
* Instead of creating more and more blocking goroutines, return an error
is more friendly to the server and to the end user.
There are more details in code comments: eg: the "Flush" problem, the
strange "code.index" hanging problem, the "immediate" queue problem.
Almost ready for review.
TODO:
* [x] add some necessary comments during review
* [x] add some more tests if necessary
* [x] update documents and config options
* [x] test max worker / active worker
* [x] re-run the CI tasks to see whether any test is flaky
* [x] improve the `handleOldLengthConfiguration` to provide more
friendly messages
* [x] fine tune default config values (eg: length?)
## Code coverage:
![image](https://user-images.githubusercontent.com/2114189/236620635-55576955-f95d-4810-b12f-879026a3afdf.png)
Diffstat (limited to 'tests')
-rw-r--r-- | tests/e2e/e2e_test.go | 3 | ||||
-rw-r--r-- | tests/integration/api_branch_test.go | 1 | ||||
-rw-r--r-- | tests/integration/integration_test.go | 11 | ||||
-rw-r--r-- | tests/mssql.ini.tmpl | 2 | ||||
-rw-r--r-- | tests/mysql.ini.tmpl | 5 | ||||
-rw-r--r-- | tests/mysql8.ini.tmpl | 2 | ||||
-rw-r--r-- | tests/pgsql.ini.tmpl | 2 | ||||
-rw-r--r-- | tests/sqlite.ini.tmpl | 2 | ||||
-rw-r--r-- | tests/test_utils.go | 55 | ||||
-rw-r--r-- | tests/testlogger.go | 201 |
10 files changed, 29 insertions, 255 deletions
diff --git a/tests/e2e/e2e_test.go b/tests/e2e/e2e_test.go index 224b38e728..e75b3db38f 100644 --- a/tests/e2e/e2e_test.go +++ b/tests/e2e/e2e_test.go @@ -21,6 +21,7 @@ import ( "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/testlogger" "code.gitea.io/gitea/modules/util" "code.gitea.io/gitea/modules/web" "code.gitea.io/gitea/routers" @@ -58,7 +59,7 @@ func TestMain(m *testing.M) { exitVal := m.Run() - tests.WriterCloser.Reset() + testlogger.WriterCloser.Reset() if err = util.RemoveAll(setting.Indexer.IssuePath); err != nil { fmt.Printf("util.RemoveAll: %v\n", err) diff --git a/tests/integration/api_branch_test.go b/tests/integration/api_branch_test.go index 0d4a750a29..6616399a8a 100644 --- a/tests/integration/api_branch_test.go +++ b/tests/integration/api_branch_test.go @@ -143,7 +143,6 @@ func testAPICreateBranches(t *testing.T, giteaURL *url.URL) { }, } for _, test := range testCases { - defer tests.ResetFixtures(t) session := ctx.Session testAPICreateBranch(t, session, "user2", "my-noo-repo", test.OldBranch, test.NewBranch, test.ExpectedHTTPStatus) } diff --git a/tests/integration/integration_test.go b/tests/integration/integration_test.go index fbe90ecf78..01f26d567f 100644 --- a/tests/integration/integration_test.go +++ b/tests/integration/integration_test.go @@ -29,6 +29,7 @@ import ( "code.gitea.io/gitea/modules/json" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/testlogger" "code.gitea.io/gitea/modules/util" "code.gitea.io/gitea/modules/web" "code.gitea.io/gitea/routers" @@ -91,21 +92,21 @@ func TestMain(m *testing.M) { // integration test settings... if setting.CfgProvider != nil { testingCfg := setting.CfgProvider.Section("integration-tests") - tests.SlowTest = testingCfg.Key("SLOW_TEST").MustDuration(tests.SlowTest) - tests.SlowFlush = testingCfg.Key("SLOW_FLUSH").MustDuration(tests.SlowFlush) + testlogger.SlowTest = testingCfg.Key("SLOW_TEST").MustDuration(testlogger.SlowTest) + testlogger.SlowFlush = testingCfg.Key("SLOW_FLUSH").MustDuration(testlogger.SlowFlush) } if os.Getenv("GITEA_SLOW_TEST_TIME") != "" { duration, err := time.ParseDuration(os.Getenv("GITEA_SLOW_TEST_TIME")) if err == nil { - tests.SlowTest = duration + testlogger.SlowTest = duration } } if os.Getenv("GITEA_SLOW_FLUSH_TIME") != "" { duration, err := time.ParseDuration(os.Getenv("GITEA_SLOW_FLUSH_TIME")) if err == nil { - tests.SlowFlush = duration + testlogger.SlowFlush = duration } } @@ -130,7 +131,7 @@ func TestMain(m *testing.M) { // Instead, "No tests were found", last nonsense log is "According to the configuration, subsequent logs will not be printed to the console" exitCode := m.Run() - tests.WriterCloser.Reset() + testlogger.WriterCloser.Reset() if err = util.RemoveAll(setting.Indexer.IssuePath); err != nil { fmt.Printf("util.RemoveAll: %v\n", err) diff --git a/tests/mssql.ini.tmpl b/tests/mssql.ini.tmpl index 9cec6169f9..09e75acb01 100644 --- a/tests/mssql.ini.tmpl +++ b/tests/mssql.ini.tmpl @@ -14,7 +14,7 @@ REPO_INDEXER_ENABLED = true REPO_INDEXER_PATH = tests/{{TEST_TYPE}}/gitea-{{TEST_TYPE}}-mssql/indexers/repos.bleve [queue.issue_indexer] -PATH = tests/{{TEST_TYPE}}/gitea-{{TEST_TYPE}}-mssql/indexers/issues.bleve +TYPE = level DATADIR = tests/{{TEST_TYPE}}/gitea-{{TEST_TYPE}}-mssql/indexers/issues.queue [queue] diff --git a/tests/mysql.ini.tmpl b/tests/mysql.ini.tmpl index b286f37bf8..6fcb2792c3 100644 --- a/tests/mysql.ini.tmpl +++ b/tests/mysql.ini.tmpl @@ -12,10 +12,11 @@ SSL_MODE = disable [indexer] REPO_INDEXER_ENABLED = true REPO_INDEXER_PATH = tests/{{TEST_TYPE}}/gitea-{{TEST_TYPE}}-mysql/indexers/repos.bleve +ISSUE_INDEXER_TYPE = elasticsearch +ISSUE_INDEXER_CONN_STR = http://elastic:changeme@elasticsearch:9200 [queue.issue_indexer] -TYPE = elasticsearch -CONN_STR = http://elastic:changeme@elasticsearch:9200 +TYPE = level DATADIR = tests/{{TEST_TYPE}}/gitea-{{TEST_TYPE}}-mysql/indexers/issues.queue [queue] diff --git a/tests/mysql8.ini.tmpl b/tests/mysql8.ini.tmpl index f290efe1dc..e37052da8c 100644 --- a/tests/mysql8.ini.tmpl +++ b/tests/mysql8.ini.tmpl @@ -14,7 +14,7 @@ REPO_INDEXER_ENABLED = true REPO_INDEXER_PATH = tests/{{TEST_TYPE}}/gitea-{{TEST_TYPE}}-mysql8/indexers/repos.bleve [queue.issue_indexer] -PATH = tests/{{TEST_TYPE}}/gitea-{{TEST_TYPE}}-mysql8/indexers/issues.bleve +TYPE = level DATADIR = tests/{{TEST_TYPE}}/gitea-{{TEST_TYPE}}-mysql8/indexers/issues.queue [queue] diff --git a/tests/pgsql.ini.tmpl b/tests/pgsql.ini.tmpl index 15349aa4c2..0f538797c2 100644 --- a/tests/pgsql.ini.tmpl +++ b/tests/pgsql.ini.tmpl @@ -15,7 +15,7 @@ REPO_INDEXER_ENABLED = true REPO_INDEXER_PATH = tests/{{TEST_TYPE}}/gitea-{{TEST_TYPE}}-pgsql/indexers/repos.bleve [queue.issue_indexer] -PATH = tests/{{TEST_TYPE}}/gitea-{{TEST_TYPE}}-pgsql/indexers/issues.bleve +TYPE = level DATADIR = tests/{{TEST_TYPE}}/gitea-{{TEST_TYPE}}-pgsql/indexers/issues.queue [queue] diff --git a/tests/sqlite.ini.tmpl b/tests/sqlite.ini.tmpl index d2e4a2d5ae..24aff9af43 100644 --- a/tests/sqlite.ini.tmpl +++ b/tests/sqlite.ini.tmpl @@ -10,7 +10,7 @@ REPO_INDEXER_ENABLED = true REPO_INDEXER_PATH = tests/{{TEST_TYPE}}/gitea-{{TEST_TYPE}}-sqlite/indexers/repos.bleve [queue.issue_indexer] -PATH = tests/{{TEST_TYPE}}/gitea-{{TEST_TYPE}}-sqlite/indexers/issues.bleve +TYPE = level DATADIR = tests/{{TEST_TYPE}}/gitea-{{TEST_TYPE}}-sqlite/indexers/issues.queue [queue] diff --git a/tests/test_utils.go b/tests/test_utils.go index c22b2c356c..de022fde0a 100644 --- a/tests/test_utils.go +++ b/tests/test_utils.go @@ -20,10 +20,10 @@ import ( "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" - "code.gitea.io/gitea/modules/queue" repo_module "code.gitea.io/gitea/modules/repository" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/storage" + "code.gitea.io/gitea/modules/testlogger" "code.gitea.io/gitea/modules/util" "code.gitea.io/gitea/routers" @@ -61,7 +61,7 @@ func InitTest(requireGitea bool) { _ = os.Setenv("GITEA_CONF", giteaConf) fmt.Printf("Environment variable $GITEA_CONF not set, use default: %s\n", giteaConf) if !setting.EnableSQLite3 { - exitf(`Need to enable SQLite3 for sqlite.ini testing, please set: -tags "sqlite,sqlite_unlock_notify"`) + exitf(`sqlite3 requires: import _ "github.com/mattn/go-sqlite3" or -tags sqlite,sqlite_unlock_notify`) } } @@ -235,45 +235,18 @@ func PrepareTestEnv(t testing.TB, skip ...int) func() { return deferFn } -// ResetFixtures flushes queues, reloads fixtures and resets test repositories within a single test. -// Most tests should call defer tests.PrepareTestEnv(t)() (or have onGiteaRun do that for them) but sometimes -// within a single test this is required -func ResetFixtures(t *testing.T) { - assert.NoError(t, queue.GetManager().FlushAll(context.Background(), -1)) - - // load database fixtures - assert.NoError(t, unittest.LoadFixtures()) - - // load git repo fixtures - assert.NoError(t, util.RemoveAll(setting.RepoRootPath)) - assert.NoError(t, unittest.CopyDir(path.Join(filepath.Dir(setting.AppPath), "tests/gitea-repositories-meta"), setting.RepoRootPath)) - ownerDirs, err := os.ReadDir(setting.RepoRootPath) - if err != nil { - assert.NoError(t, err, "unable to read the new repo root: %v\n", err) - } - for _, ownerDir := range ownerDirs { - if !ownerDir.Type().IsDir() { - continue - } - repoDirs, err := os.ReadDir(filepath.Join(setting.RepoRootPath, ownerDir.Name())) - if err != nil { - assert.NoError(t, err, "unable to read the new repo root: %v\n", err) - } - for _, repoDir := range repoDirs { - _ = os.MkdirAll(filepath.Join(setting.RepoRootPath, ownerDir.Name(), repoDir.Name(), "objects", "pack"), 0o755) - _ = os.MkdirAll(filepath.Join(setting.RepoRootPath, ownerDir.Name(), repoDir.Name(), "objects", "info"), 0o755) - _ = os.MkdirAll(filepath.Join(setting.RepoRootPath, ownerDir.Name(), repoDir.Name(), "refs", "heads"), 0o755) - _ = os.MkdirAll(filepath.Join(setting.RepoRootPath, ownerDir.Name(), repoDir.Name(), "refs", "tag"), 0o755) - } +func PrintCurrentTest(t testing.TB, skip ...int) func() { + if len(skip) == 1 { + skip = []int{skip[0] + 1} } + return testlogger.PrintCurrentTest(t, skip...) +} - // load LFS object fixtures - // (LFS storage can be on any of several backends, including remote servers, so we init it with the storage API) - lfsFixtures, err := storage.NewStorage("", storage.LocalStorageConfig{Path: path.Join(filepath.Dir(setting.AppPath), "tests/gitea-lfs-meta")}) - assert.NoError(t, err) - assert.NoError(t, storage.Clean(storage.LFS)) - assert.NoError(t, lfsFixtures.IterateObjects("", func(path string, _ storage.Object) error { - _, err := storage.Copy(storage.LFS, path, lfsFixtures, path) - return err - })) +// Printf takes a format and args and prints the string to os.Stdout +func Printf(format string, args ...interface{}) { + testlogger.Printf(format, args...) +} + +func init() { + log.Register("test", testlogger.NewTestLogger) } diff --git a/tests/testlogger.go b/tests/testlogger.go deleted file mode 100644 index 7cac5bbe33..0000000000 --- a/tests/testlogger.go +++ /dev/null @@ -1,201 +0,0 @@ -// Copyright 2019 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package tests - -import ( - "context" - "fmt" - "os" - "runtime" - "strings" - "sync" - "testing" - "time" - - "code.gitea.io/gitea/modules/json" - "code.gitea.io/gitea/modules/log" - "code.gitea.io/gitea/modules/queue" -) - -var ( - prefix string - SlowTest = 10 * time.Second - SlowFlush = 5 * time.Second -) - -// TestLogger is a logger which will write to the testing log -type TestLogger struct { - log.WriterLogger -} - -var WriterCloser = &testLoggerWriterCloser{} - -type testLoggerWriterCloser struct { - sync.RWMutex - t []*testing.TB -} - -func (w *testLoggerWriterCloser) setT(t *testing.TB) { - w.Lock() - w.t = append(w.t, t) - w.Unlock() -} - -func (w *testLoggerWriterCloser) Write(p []byte) (int, error) { - w.RLock() - var t *testing.TB - if len(w.t) > 0 { - t = w.t[len(w.t)-1] - } - w.RUnlock() - if t != nil && *t != nil { - if len(p) > 0 && p[len(p)-1] == '\n' { - p = p[:len(p)-1] - } - - defer func() { - err := recover() - if err == nil { - return - } - var errString string - errErr, ok := err.(error) - if ok { - errString = errErr.Error() - } else { - errString, ok = err.(string) - } - if !ok { - panic(err) - } - if !strings.HasPrefix(errString, "Log in goroutine after ") { - panic(err) - } - }() - - (*t).Log(string(p)) - return len(p), nil - } - return len(p), nil -} - -func (w *testLoggerWriterCloser) Close() error { - w.Lock() - if len(w.t) > 0 { - w.t = w.t[:len(w.t)-1] - } - w.Unlock() - return nil -} - -func (w *testLoggerWriterCloser) Reset() { - w.Lock() - if len(w.t) > 0 { - for _, t := range w.t { - if t == nil { - continue - } - fmt.Fprintf(os.Stdout, "Unclosed logger writer in test: %s", (*t).Name()) - (*t).Errorf("Unclosed logger writer in test: %s", (*t).Name()) - } - w.t = nil - } - w.Unlock() -} - -// PrintCurrentTest prints the current test to os.Stdout -func PrintCurrentTest(t testing.TB, skip ...int) func() { - start := time.Now() - actualSkip := 1 - if len(skip) > 0 { - actualSkip = skip[0] - } - _, filename, line, _ := runtime.Caller(actualSkip) - - if log.CanColorStdout { - fmt.Fprintf(os.Stdout, "=== %s (%s:%d)\n", fmt.Formatter(log.NewColoredValue(t.Name())), strings.TrimPrefix(filename, prefix), line) - } else { - fmt.Fprintf(os.Stdout, "=== %s (%s:%d)\n", t.Name(), strings.TrimPrefix(filename, prefix), line) - } - WriterCloser.setT(&t) - return func() { - took := time.Since(start) - if took > SlowTest { - if log.CanColorStdout { - fmt.Fprintf(os.Stdout, "+++ %s is a slow test (took %v)\n", fmt.Formatter(log.NewColoredValue(t.Name(), log.Bold, log.FgYellow)), fmt.Formatter(log.NewColoredValue(took, log.Bold, log.FgYellow))) - } else { - fmt.Fprintf(os.Stdout, "+++ %s is a slow test (took %v)\n", t.Name(), took) - } - } - timer := time.AfterFunc(SlowFlush, func() { - if log.CanColorStdout { - fmt.Fprintf(os.Stdout, "+++ %s ... still flushing after %v ...\n", fmt.Formatter(log.NewColoredValue(t.Name(), log.Bold, log.FgRed)), SlowFlush) - } else { - fmt.Fprintf(os.Stdout, "+++ %s ... still flushing after %v ...\n", t.Name(), SlowFlush) - } - }) - if err := queue.GetManager().FlushAll(context.Background(), 2*time.Minute); err != nil { - t.Errorf("Flushing queues failed with error %v", err) - } - timer.Stop() - flushTook := time.Since(start) - took - if flushTook > SlowFlush { - if log.CanColorStdout { - fmt.Fprintf(os.Stdout, "+++ %s had a slow clean-up flush (took %v)\n", fmt.Formatter(log.NewColoredValue(t.Name(), log.Bold, log.FgRed)), fmt.Formatter(log.NewColoredValue(flushTook, log.Bold, log.FgRed))) - } else { - fmt.Fprintf(os.Stdout, "+++ %s had a slow clean-up flush (took %v)\n", t.Name(), flushTook) - } - } - _ = WriterCloser.Close() - } -} - -// Printf takes a format and args and prints the string to os.Stdout -func Printf(format string, args ...interface{}) { - if log.CanColorStdout { - for i := 0; i < len(args); i++ { - args[i] = log.NewColoredValue(args[i]) - } - } - fmt.Fprintf(os.Stdout, "\t"+format, args...) -} - -// NewTestLogger creates a TestLogger as a log.LoggerProvider -func NewTestLogger() log.LoggerProvider { - logger := &TestLogger{} - logger.Colorize = log.CanColorStdout - logger.Level = log.TRACE - return logger -} - -// Init inits connection writer with json config. -// json config only need key "level". -func (log *TestLogger) Init(config string) error { - err := json.Unmarshal([]byte(config), log) - if err != nil { - return err - } - log.NewWriterLogger(WriterCloser) - return nil -} - -// Flush when log should be flushed -func (log *TestLogger) Flush() { -} - -// ReleaseReopen does nothing -func (log *TestLogger) ReleaseReopen() error { - return nil -} - -// GetName returns the default name for this implementation -func (log *TestLogger) GetName() string { - return "test" -} - -func init() { - log.Register("test", NewTestLogger) - _, filename, _, _ := runtime.Caller(0) - prefix = strings.TrimSuffix(filename, "tests/integration/testlogger.go") -} |