aboutsummaryrefslogtreecommitdiffstats
path: root/routers/web/admin
diff options
context:
space:
mode:
authorwxiaoguang <wxiaoguang@gmail.com>2023-05-08 19:49:59 +0800
committerGitHub <noreply@github.com>2023-05-08 19:49:59 +0800
commit6f9c278559789066aa831c1df25b0d866103d02d (patch)
treee3a1880e0d4cf88916f9d1b65d82fbd4c41ea47f /routers/web/admin
parentcb700aedd1e670fb47b8cf0cd67fb117a1ad88a2 (diff)
downloadgitea-6f9c278559789066aa831c1df25b0d866103d02d.tar.gz
gitea-6f9c278559789066aa831c1df25b0d866103d02d.zip
Rewrite queue (#24505)
# ⚠️ Breaking Many deprecated queue config options are removed (actually, they should have been removed in 1.18/1.19). If you see the fatal message when starting Gitea: "Please update your app.ini to remove deprecated config options", please follow the error messages to remove these options from your app.ini. Example: ``` 2023/05/06 19:39:22 [E] Removed queue option: `[indexer].ISSUE_INDEXER_QUEUE_TYPE`. Use new options in `[queue.issue_indexer]` 2023/05/06 19:39:22 [E] Removed queue option: `[indexer].UPDATE_BUFFER_LEN`. Use new options in `[queue.issue_indexer]` 2023/05/06 19:39:22 [F] Please update your app.ini to remove deprecated config options ``` Many options in `[queue]` are are dropped, including: `WRAP_IF_NECESSARY`, `MAX_ATTEMPTS`, `TIMEOUT`, `WORKERS`, `BLOCK_TIMEOUT`, `BOOST_TIMEOUT`, `BOOST_WORKERS`, they can be removed from app.ini. # The problem The old queue package has some legacy problems: * complexity: I doubt few people could tell how it works. * maintainability: Too many channels and mutex/cond are mixed together, too many different structs/interfaces depends each other. * stability: due to the complexity & maintainability, sometimes there are strange bugs and difficult to debug, and some code doesn't have test (indeed some code is difficult to test because a lot of things are mixed together). * general applicability: although it is called "queue", its behavior is not a well-known queue. * scalability: it doesn't seem easy to make it work with a cluster without breaking its behaviors. It came from some very old code to "avoid breaking", however, its technical debt is too heavy now. It's a good time to introduce a better "queue" package. # The new queue package It keeps using old config and concept as much as possible. * It only contains two major kinds of concepts: * The "base queue": channel, levelqueue, redis * They have the same abstraction, the same interface, and they are tested by the same testing code. * The "WokerPoolQueue", it uses the "base queue" to provide "worker pool" function, calls the "handler" to process the data in the base queue. * The new code doesn't do "PushBack" * Think about a queue with many workers, the "PushBack" can't guarantee the order for re-queued unhandled items, so in new code it just does "normal push" * The new code doesn't do "pause/resume" * The "pause/resume" was designed to handle some handler's failure: eg: document indexer (elasticsearch) is down * If a queue is paused for long time, either the producers blocks or the new items are dropped. * The new code doesn't do such "pause/resume" trick, it's not a common queue's behavior and it doesn't help much. * If there are unhandled items, the "push" function just blocks for a few seconds and then re-queue them and retry. * The new code doesn't do "worker booster" * Gitea's queue's handlers are light functions, the cost is only the go-routine, so it doesn't make sense to "boost" them. * The new code only use "max worker number" to limit the concurrent workers. * The new "Push" never blocks forever * Instead of creating more and more blocking goroutines, return an error is more friendly to the server and to the end user. There are more details in code comments: eg: the "Flush" problem, the strange "code.index" hanging problem, the "immediate" queue problem. Almost ready for review. TODO: * [x] add some necessary comments during review * [x] add some more tests if necessary * [x] update documents and config options * [x] test max worker / active worker * [x] re-run the CI tasks to see whether any test is flaky * [x] improve the `handleOldLengthConfiguration` to provide more friendly messages * [x] fine tune default config values (eg: length?) ## Code coverage: ![image](https://user-images.githubusercontent.com/2114189/236620635-55576955-f95d-4810-b12f-879026a3afdf.png)
Diffstat (limited to 'routers/web/admin')
-rw-r--r--routers/web/admin/admin.go178
-rw-r--r--routers/web/admin/queue.go59
2 files changed, 63 insertions, 174 deletions
diff --git a/routers/web/admin/admin.go b/routers/web/admin/admin.go
index 35c387c28b..cbe1482a24 100644
--- a/routers/web/admin/admin.go
+++ b/routers/web/admin/admin.go
@@ -8,13 +8,11 @@ import (
"fmt"
"net/http"
"runtime"
- "strconv"
"time"
activities_model "code.gitea.io/gitea/models/activities"
"code.gitea.io/gitea/modules/base"
"code.gitea.io/gitea/modules/context"
- "code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting"
@@ -25,10 +23,10 @@ import (
)
const (
- tplDashboard base.TplName = "admin/dashboard"
- tplMonitor base.TplName = "admin/monitor"
- tplStacktrace base.TplName = "admin/stacktrace"
- tplQueue base.TplName = "admin/queue"
+ tplDashboard base.TplName = "admin/dashboard"
+ tplMonitor base.TplName = "admin/monitor"
+ tplStacktrace base.TplName = "admin/stacktrace"
+ tplQueueManage base.TplName = "admin/queue_manage"
)
var sysStatus struct {
@@ -188,171 +186,3 @@ func MonitorCancel(ctx *context.Context) {
"redirect": setting.AppSubURL + "/admin/monitor",
})
}
-
-// Queue shows details for a specific queue
-func Queue(ctx *context.Context) {
- qid := ctx.ParamsInt64("qid")
- mq := queue.GetManager().GetManagedQueue(qid)
- if mq == nil {
- ctx.Status(http.StatusNotFound)
- return
- }
- ctx.Data["Title"] = ctx.Tr("admin.monitor.queue", mq.Name)
- ctx.Data["PageIsAdminMonitor"] = true
- ctx.Data["Queue"] = mq
- ctx.HTML(http.StatusOK, tplQueue)
-}
-
-// WorkerCancel cancels a worker group
-func WorkerCancel(ctx *context.Context) {
- qid := ctx.ParamsInt64("qid")
- mq := queue.GetManager().GetManagedQueue(qid)
- if mq == nil {
- ctx.Status(http.StatusNotFound)
- return
- }
- pid := ctx.ParamsInt64("pid")
- mq.CancelWorkers(pid)
- ctx.Flash.Info(ctx.Tr("admin.monitor.queue.pool.cancelling"))
- ctx.JSON(http.StatusOK, map[string]interface{}{
- "redirect": setting.AppSubURL + "/admin/monitor/queue/" + strconv.FormatInt(qid, 10),
- })
-}
-
-// Flush flushes a queue
-func Flush(ctx *context.Context) {
- qid := ctx.ParamsInt64("qid")
- mq := queue.GetManager().GetManagedQueue(qid)
- if mq == nil {
- ctx.Status(http.StatusNotFound)
- return
- }
- timeout, err := time.ParseDuration(ctx.FormString("timeout"))
- if err != nil {
- timeout = -1
- }
- ctx.Flash.Info(ctx.Tr("admin.monitor.queue.pool.flush.added", mq.Name))
- go func() {
- err := mq.Flush(timeout)
- if err != nil {
- log.Error("Flushing failure for %s: Error %v", mq.Name, err)
- }
- }()
- ctx.Redirect(setting.AppSubURL + "/admin/monitor/queue/" + strconv.FormatInt(qid, 10))
-}
-
-// Pause pauses a queue
-func Pause(ctx *context.Context) {
- qid := ctx.ParamsInt64("qid")
- mq := queue.GetManager().GetManagedQueue(qid)
- if mq == nil {
- ctx.Status(404)
- return
- }
- mq.Pause()
- ctx.Redirect(setting.AppSubURL + "/admin/monitor/queue/" + strconv.FormatInt(qid, 10))
-}
-
-// Resume resumes a queue
-func Resume(ctx *context.Context) {
- qid := ctx.ParamsInt64("qid")
- mq := queue.GetManager().GetManagedQueue(qid)
- if mq == nil {
- ctx.Status(404)
- return
- }
- mq.Resume()
- ctx.Redirect(setting.AppSubURL + "/admin/monitor/queue/" + strconv.FormatInt(qid, 10))
-}
-
-// AddWorkers adds workers to a worker group
-func AddWorkers(ctx *context.Context) {
- qid := ctx.ParamsInt64("qid")
- mq := queue.GetManager().GetManagedQueue(qid)
- if mq == nil {
- ctx.Status(http.StatusNotFound)
- return
- }
- number := ctx.FormInt("number")
- if number < 1 {
- ctx.Flash.Error(ctx.Tr("admin.monitor.queue.pool.addworkers.mustnumbergreaterzero"))
- ctx.Redirect(setting.AppSubURL + "/admin/monitor/queue/" + strconv.FormatInt(qid, 10))
- return
- }
- timeout, err := time.ParseDuration(ctx.FormString("timeout"))
- if err != nil {
- ctx.Flash.Error(ctx.Tr("admin.monitor.queue.pool.addworkers.musttimeoutduration"))
- ctx.Redirect(setting.AppSubURL + "/admin/monitor/queue/" + strconv.FormatInt(qid, 10))
- return
- }
- if _, ok := mq.Managed.(queue.ManagedPool); !ok {
- ctx.Flash.Error(ctx.Tr("admin.monitor.queue.pool.none"))
- ctx.Redirect(setting.AppSubURL + "/admin/monitor/queue/" + strconv.FormatInt(qid, 10))
- return
- }
- mq.AddWorkers(number, timeout)
- ctx.Flash.Success(ctx.Tr("admin.monitor.queue.pool.added"))
- ctx.Redirect(setting.AppSubURL + "/admin/monitor/queue/" + strconv.FormatInt(qid, 10))
-}
-
-// SetQueueSettings sets the maximum number of workers and other settings for this queue
-func SetQueueSettings(ctx *context.Context) {
- qid := ctx.ParamsInt64("qid")
- mq := queue.GetManager().GetManagedQueue(qid)
- if mq == nil {
- ctx.Status(http.StatusNotFound)
- return
- }
- if _, ok := mq.Managed.(queue.ManagedPool); !ok {
- ctx.Flash.Error(ctx.Tr("admin.monitor.queue.pool.none"))
- ctx.Redirect(setting.AppSubURL + "/admin/monitor/queue/" + strconv.FormatInt(qid, 10))
- return
- }
-
- maxNumberStr := ctx.FormString("max-number")
- numberStr := ctx.FormString("number")
- timeoutStr := ctx.FormString("timeout")
-
- var err error
- var maxNumber, number int
- var timeout time.Duration
- if len(maxNumberStr) > 0 {
- maxNumber, err = strconv.Atoi(maxNumberStr)
- if err != nil {
- ctx.Flash.Error(ctx.Tr("admin.monitor.queue.settings.maxnumberworkers.error"))
- ctx.Redirect(setting.AppSubURL + "/admin/monitor/queue/" + strconv.FormatInt(qid, 10))
- return
- }
- if maxNumber < -1 {
- maxNumber = -1
- }
- } else {
- maxNumber = mq.MaxNumberOfWorkers()
- }
-
- if len(numberStr) > 0 {
- number, err = strconv.Atoi(numberStr)
- if err != nil || number < 0 {
- ctx.Flash.Error(ctx.Tr("admin.monitor.queue.settings.numberworkers.error"))
- ctx.Redirect(setting.AppSubURL + "/admin/monitor/queue/" + strconv.FormatInt(qid, 10))
- return
- }
- } else {
- number = mq.BoostWorkers()
- }
-
- if len(timeoutStr) > 0 {
- timeout, err = time.ParseDuration(timeoutStr)
- if err != nil {
- ctx.Flash.Error(ctx.Tr("admin.monitor.queue.settings.timeout.error"))
- ctx.Redirect(setting.AppSubURL + "/admin/monitor/queue/" + strconv.FormatInt(qid, 10))
- return
- }
- } else {
- timeout = mq.BoostTimeout()
- }
-
- mq.SetPoolSettings(maxNumber, number, timeout)
- ctx.Flash.Success(ctx.Tr("admin.monitor.queue.settings.changed"))
- ctx.Redirect(setting.AppSubURL + "/admin/monitor/queue/" + strconv.FormatInt(qid, 10))
-}
diff --git a/routers/web/admin/queue.go b/routers/web/admin/queue.go
new file mode 100644
index 0000000000..1d57bc54c9
--- /dev/null
+++ b/routers/web/admin/queue.go
@@ -0,0 +1,59 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package admin
+
+import (
+ "net/http"
+ "strconv"
+
+ "code.gitea.io/gitea/modules/context"
+ "code.gitea.io/gitea/modules/queue"
+ "code.gitea.io/gitea/modules/setting"
+)
+
+// Queue shows details for a specific queue
+func Queue(ctx *context.Context) {
+ qid := ctx.ParamsInt64("qid")
+ mq := queue.GetManager().GetManagedQueue(qid)
+ if mq == nil {
+ ctx.Status(http.StatusNotFound)
+ return
+ }
+ ctx.Data["Title"] = ctx.Tr("admin.monitor.queue", mq.GetName())
+ ctx.Data["PageIsAdminMonitor"] = true
+ ctx.Data["Queue"] = mq
+ ctx.HTML(http.StatusOK, tplQueueManage)
+}
+
+// QueueSet sets the maximum number of workers and other settings for this queue
+func QueueSet(ctx *context.Context) {
+ qid := ctx.ParamsInt64("qid")
+ mq := queue.GetManager().GetManagedQueue(qid)
+ if mq == nil {
+ ctx.Status(http.StatusNotFound)
+ return
+ }
+
+ maxNumberStr := ctx.FormString("max-number")
+
+ var err error
+ var maxNumber int
+ if len(maxNumberStr) > 0 {
+ maxNumber, err = strconv.Atoi(maxNumberStr)
+ if err != nil {
+ ctx.Flash.Error(ctx.Tr("admin.monitor.queue.settings.maxnumberworkers.error"))
+ ctx.Redirect(setting.AppSubURL + "/admin/monitor/queue/" + strconv.FormatInt(qid, 10))
+ return
+ }
+ if maxNumber < -1 {
+ maxNumber = -1
+ }
+ } else {
+ maxNumber = mq.GetWorkerMaxNumber()
+ }
+
+ mq.SetWorkerMaxNumber(maxNumber)
+ ctx.Flash.Success(ctx.Tr("admin.monitor.queue.settings.changed"))
+ ctx.Redirect(setting.AppSubURL + "/admin/monitor/queue/" + strconv.FormatInt(qid, 10))
+}