diff options
author | wxiaoguang <wxiaoguang@gmail.com> | 2023-05-08 19:49:59 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-05-08 19:49:59 +0800 |
commit | 6f9c278559789066aa831c1df25b0d866103d02d (patch) | |
tree | e3a1880e0d4cf88916f9d1b65d82fbd4c41ea47f /models | |
parent | cb700aedd1e670fb47b8cf0cd67fb117a1ad88a2 (diff) | |
download | gitea-6f9c278559789066aa831c1df25b0d866103d02d.tar.gz gitea-6f9c278559789066aa831c1df25b0d866103d02d.zip |
Rewrite queue (#24505)
# ⚠️ Breaking
Many deprecated queue config options are removed (actually, they should
have been removed in 1.18/1.19).
If you see the fatal message when starting Gitea: "Please update your
app.ini to remove deprecated config options", please follow the error
messages to remove these options from your app.ini.
Example:
```
2023/05/06 19:39:22 [E] Removed queue option: `[indexer].ISSUE_INDEXER_QUEUE_TYPE`. Use new options in `[queue.issue_indexer]`
2023/05/06 19:39:22 [E] Removed queue option: `[indexer].UPDATE_BUFFER_LEN`. Use new options in `[queue.issue_indexer]`
2023/05/06 19:39:22 [F] Please update your app.ini to remove deprecated config options
```
Many options in `[queue]` are are dropped, including:
`WRAP_IF_NECESSARY`, `MAX_ATTEMPTS`, `TIMEOUT`, `WORKERS`,
`BLOCK_TIMEOUT`, `BOOST_TIMEOUT`, `BOOST_WORKERS`, they can be removed
from app.ini.
# The problem
The old queue package has some legacy problems:
* complexity: I doubt few people could tell how it works.
* maintainability: Too many channels and mutex/cond are mixed together,
too many different structs/interfaces depends each other.
* stability: due to the complexity & maintainability, sometimes there
are strange bugs and difficult to debug, and some code doesn't have test
(indeed some code is difficult to test because a lot of things are mixed
together).
* general applicability: although it is called "queue", its behavior is
not a well-known queue.
* scalability: it doesn't seem easy to make it work with a cluster
without breaking its behaviors.
It came from some very old code to "avoid breaking", however, its
technical debt is too heavy now. It's a good time to introduce a better
"queue" package.
# The new queue package
It keeps using old config and concept as much as possible.
* It only contains two major kinds of concepts:
* The "base queue": channel, levelqueue, redis
* They have the same abstraction, the same interface, and they are
tested by the same testing code.
* The "WokerPoolQueue", it uses the "base queue" to provide "worker
pool" function, calls the "handler" to process the data in the base
queue.
* The new code doesn't do "PushBack"
* Think about a queue with many workers, the "PushBack" can't guarantee
the order for re-queued unhandled items, so in new code it just does
"normal push"
* The new code doesn't do "pause/resume"
* The "pause/resume" was designed to handle some handler's failure: eg:
document indexer (elasticsearch) is down
* If a queue is paused for long time, either the producers blocks or the
new items are dropped.
* The new code doesn't do such "pause/resume" trick, it's not a common
queue's behavior and it doesn't help much.
* If there are unhandled items, the "push" function just blocks for a
few seconds and then re-queue them and retry.
* The new code doesn't do "worker booster"
* Gitea's queue's handlers are light functions, the cost is only the
go-routine, so it doesn't make sense to "boost" them.
* The new code only use "max worker number" to limit the concurrent
workers.
* The new "Push" never blocks forever
* Instead of creating more and more blocking goroutines, return an error
is more friendly to the server and to the end user.
There are more details in code comments: eg: the "Flush" problem, the
strange "code.index" hanging problem, the "immediate" queue problem.
Almost ready for review.
TODO:
* [x] add some necessary comments during review
* [x] add some more tests if necessary
* [x] update documents and config options
* [x] test max worker / active worker
* [x] re-run the CI tasks to see whether any test is flaky
* [x] improve the `handleOldLengthConfiguration` to provide more
friendly messages
* [x] fine tune default config values (eg: length?)
## Code coverage:
![image](https://user-images.githubusercontent.com/2114189/236620635-55576955-f95d-4810-b12f-879026a3afdf.png)
Diffstat (limited to 'models')
-rw-r--r-- | models/migrations/base/testlogger.go | 180 | ||||
-rw-r--r-- | models/migrations/base/tests.go | 8 | ||||
-rw-r--r-- | models/unittest/testdb.go | 3 |
3 files changed, 6 insertions, 185 deletions
diff --git a/models/migrations/base/testlogger.go b/models/migrations/base/testlogger.go deleted file mode 100644 index 80e672952a..0000000000 --- a/models/migrations/base/testlogger.go +++ /dev/null @@ -1,180 +0,0 @@ -// Copyright 2019 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package base - -import ( - "context" - "fmt" - "os" - "runtime" - "strings" - "sync" - "testing" - "time" - - "code.gitea.io/gitea/modules/json" - "code.gitea.io/gitea/modules/log" - "code.gitea.io/gitea/modules/queue" -) - -var ( - prefix string - slowTest = 10 * time.Second - slowFlush = 5 * time.Second -) - -// TestLogger is a logger which will write to the testing log -type TestLogger struct { - log.WriterLogger -} - -var writerCloser = &testLoggerWriterCloser{} - -type testLoggerWriterCloser struct { - sync.RWMutex - t []*testing.TB -} - -func (w *testLoggerWriterCloser) setT(t *testing.TB) { - w.Lock() - w.t = append(w.t, t) - w.Unlock() -} - -func (w *testLoggerWriterCloser) Write(p []byte) (int, error) { - w.RLock() - var t *testing.TB - if len(w.t) > 0 { - t = w.t[len(w.t)-1] - } - w.RUnlock() - if t != nil && *t != nil { - if len(p) > 0 && p[len(p)-1] == '\n' { - p = p[:len(p)-1] - } - - defer func() { - err := recover() - if err == nil { - return - } - var errString string - errErr, ok := err.(error) - if ok { - errString = errErr.Error() - } else { - errString, ok = err.(string) - } - if !ok { - panic(err) - } - if !strings.HasPrefix(errString, "Log in goroutine after ") { - panic(err) - } - }() - - (*t).Log(string(p)) - return len(p), nil - } - return len(p), nil -} - -func (w *testLoggerWriterCloser) Close() error { - w.Lock() - if len(w.t) > 0 { - w.t = w.t[:len(w.t)-1] - } - w.Unlock() - return nil -} - -// PrintCurrentTest prints the current test to os.Stdout -func PrintCurrentTest(t testing.TB, skip ...int) func() { - start := time.Now() - actualSkip := 1 - if len(skip) > 0 { - actualSkip = skip[0] - } - _, filename, line, _ := runtime.Caller(actualSkip) - - if log.CanColorStdout { - fmt.Fprintf(os.Stdout, "=== %s (%s:%d)\n", fmt.Formatter(log.NewColoredValue(t.Name())), strings.TrimPrefix(filename, prefix), line) - } else { - fmt.Fprintf(os.Stdout, "=== %s (%s:%d)\n", t.Name(), strings.TrimPrefix(filename, prefix), line) - } - writerCloser.setT(&t) - return func() { - took := time.Since(start) - if took > slowTest { - if log.CanColorStdout { - fmt.Fprintf(os.Stdout, "+++ %s is a slow test (took %v)\n", fmt.Formatter(log.NewColoredValue(t.Name(), log.Bold, log.FgYellow)), fmt.Formatter(log.NewColoredValue(took, log.Bold, log.FgYellow))) - } else { - fmt.Fprintf(os.Stdout, "+++ %s is a slow test (took %v)\n", t.Name(), took) - } - } - timer := time.AfterFunc(slowFlush, func() { - if log.CanColorStdout { - fmt.Fprintf(os.Stdout, "+++ %s ... still flushing after %v ...\n", fmt.Formatter(log.NewColoredValue(t.Name(), log.Bold, log.FgRed)), slowFlush) - } else { - fmt.Fprintf(os.Stdout, "+++ %s ... still flushing after %v ...\n", t.Name(), slowFlush) - } - }) - if err := queue.GetManager().FlushAll(context.Background(), -1); err != nil { - t.Errorf("Flushing queues failed with error %v", err) - } - timer.Stop() - flushTook := time.Since(start) - took - if flushTook > slowFlush { - if log.CanColorStdout { - fmt.Fprintf(os.Stdout, "+++ %s had a slow clean-up flush (took %v)\n", fmt.Formatter(log.NewColoredValue(t.Name(), log.Bold, log.FgRed)), fmt.Formatter(log.NewColoredValue(flushTook, log.Bold, log.FgRed))) - } else { - fmt.Fprintf(os.Stdout, "+++ %s had a slow clean-up flush (took %v)\n", t.Name(), flushTook) - } - } - _ = writerCloser.Close() - } -} - -// Printf takes a format and args and prints the string to os.Stdout -func Printf(format string, args ...interface{}) { - if log.CanColorStdout { - for i := 0; i < len(args); i++ { - args[i] = log.NewColoredValue(args[i]) - } - } - fmt.Fprintf(os.Stdout, "\t"+format, args...) -} - -// NewTestLogger creates a TestLogger as a log.LoggerProvider -func NewTestLogger() log.LoggerProvider { - logger := &TestLogger{} - logger.Colorize = log.CanColorStdout - logger.Level = log.TRACE - return logger -} - -// Init inits connection writer with json config. -// json config only need key "level". -func (log *TestLogger) Init(config string) error { - err := json.Unmarshal([]byte(config), log) - if err != nil { - return err - } - log.NewWriterLogger(writerCloser) - return nil -} - -// Flush when log should be flushed -func (log *TestLogger) Flush() { -} - -// ReleaseReopen does nothing -func (log *TestLogger) ReleaseReopen() error { - return nil -} - -// GetName returns the default name for this implementation -func (log *TestLogger) GetName() string { - return "test" -} diff --git a/models/migrations/base/tests.go b/models/migrations/base/tests.go index 124111f51f..08eded7fdc 100644 --- a/models/migrations/base/tests.go +++ b/models/migrations/base/tests.go @@ -11,7 +11,6 @@ import ( "path" "path/filepath" "runtime" - "strings" "testing" "code.gitea.io/gitea/models/unittest" @@ -19,6 +18,7 @@ import ( "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/testlogger" "github.com/stretchr/testify/assert" "xorm.io/xorm" @@ -32,7 +32,7 @@ func PrepareTestEnv(t *testing.T, skip int, syncModels ...interface{}) (*xorm.En t.Helper() ourSkip := 2 ourSkip += skip - deferFn := PrintCurrentTest(t, ourSkip) + deferFn := testlogger.PrintCurrentTest(t, ourSkip) assert.NoError(t, os.RemoveAll(setting.RepoRootPath)) assert.NoError(t, unittest.CopyDir(path.Join(filepath.Dir(setting.AppPath), "tests/gitea-repositories-meta"), setting.RepoRootPath)) ownerDirs, err := os.ReadDir(setting.RepoRootPath) @@ -110,9 +110,7 @@ func PrepareTestEnv(t *testing.T, skip int, syncModels ...interface{}) (*xorm.En } func MainTest(m *testing.M) { - log.Register("test", NewTestLogger) - _, filename, _, _ := runtime.Caller(0) - prefix = strings.TrimSuffix(filename, "tests/testlogger.go") + log.Register("test", testlogger.NewTestLogger) giteaRoot := base.SetupGiteaRoot() if giteaRoot == "" { diff --git a/models/unittest/testdb.go b/models/unittest/testdb.go index a5b126350d..10a70ad9f8 100644 --- a/models/unittest/testdb.go +++ b/models/unittest/testdb.go @@ -202,6 +202,9 @@ type FixturesOptions struct { func CreateTestEngine(opts FixturesOptions) error { x, err := xorm.NewEngine("sqlite3", "file::memory:?cache=shared&_txlock=immediate") if err != nil { + if strings.Contains(err.Error(), "unknown driver") { + return fmt.Errorf(`sqlite3 requires: import _ "github.com/mattn/go-sqlite3" or -tags sqlite,sqlite_unlock_notify%s%w`, "\n", err) + } return err } x.SetMapper(names.GonicMapper{}) |