aboutsummaryrefslogtreecommitdiffstats
path: root/routers
diff options
context:
space:
mode:
authorzeripath <art27@cantab.net>2020-01-07 11:23:09 +0000
committerAntoine GIRARD <sapk@users.noreply.github.com>2020-01-07 12:23:09 +0100
commit62eb1b0f2530a5ae1ce9b729378c0c8066174215 (patch)
treee567b2a9d91e69c0f2bccfeaf1a7341b4dda2706 /routers
parentf71e1c8e796b099f4634bcd358e48189a97dcbad (diff)
downloadgitea-62eb1b0f2530a5ae1ce9b729378c0c8066174215.tar.gz
gitea-62eb1b0f2530a5ae1ce9b729378c0c8066174215.zip
Graceful Queues: Issue Indexing and Tasks (#9363)
* Queue: Add generic graceful queues with settings * Queue & Setting: Add worker pool implementation * Queue: Add worker settings * Queue: Make resizing worker pools * Queue: Add name variable to queues * Queue: Add monitoring * Queue: Improve logging * Issues: Gracefulise the issues indexer Remove the old now unused specific queues * Task: Move to generic queue and gracefulise * Issues: Standardise the issues indexer queue settings * Fix test * Queue: Allow Redis to connect to unix * Prevent deadlock during early shutdown of issue indexer * Add MaxWorker settings to queues * Merge branch 'master' into graceful-queues * Update modules/indexer/issues/indexer.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/indexer/issues/indexer.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/queue/queue_channel.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/queue/queue_disk.go * Update modules/queue/queue_disk_channel.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Rename queue.Description to queue.ManagedQueue as per @guillep2k * Cancel pool workers when removed * Remove dependency on queue from setting * Update modules/queue/queue_redis.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * As per @guillep2k add mutex locks on shutdown/terminate * move unlocking out of setInternal * Add warning if number of workers < 0 * Small changes as per @guillep2k * No redis host specified not found * Clean up documentation for queues * Update docs/content/doc/advanced/config-cheat-sheet.en-us.md * Update modules/indexer/issues/indexer_test.go * Ensure that persistable channel queue is added to manager * Rename QUEUE_NAME REDIS_QUEUE_NAME * Revert "Rename QUEUE_NAME REDIS_QUEUE_NAME" This reverts commit 1f83b4fc9b9dabda186257b38c265fe7012f90df. Co-authored-by: guillep2k <18600385+guillep2k@users.noreply.github.com> Co-authored-by: Lauris BH <lauris@nix.lv> Co-authored-by: techknowlogick <matti@mdranta.net> Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
Diffstat (limited to 'routers')
-rw-r--r--routers/admin/admin.go127
-rw-r--r--routers/routes/routes.go12
2 files changed, 137 insertions, 2 deletions
diff --git a/routers/admin/admin.go b/routers/admin/admin.go
index ccedcaf8a6..055b8f5a5e 100644
--- a/routers/admin/admin.go
+++ b/routers/admin/admin.go
@@ -11,6 +11,7 @@ import (
"net/url"
"os"
"runtime"
+ "strconv"
"strings"
"time"
@@ -22,6 +23,7 @@ import (
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/process"
+ "code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/services/mailer"
@@ -35,6 +37,7 @@ const (
tplDashboard base.TplName = "admin/dashboard"
tplConfig base.TplName = "admin/config"
tplMonitor base.TplName = "admin/monitor"
+ tplQueue base.TplName = "admin/queue"
)
var (
@@ -355,6 +358,7 @@ func Monitor(ctx *context.Context) {
ctx.Data["PageIsAdminMonitor"] = true
ctx.Data["Processes"] = process.GetManager().Processes()
ctx.Data["Entries"] = cron.ListTasks()
+ ctx.Data["Queues"] = queue.GetManager().ManagedQueues()
ctx.HTML(200, tplMonitor)
}
@@ -366,3 +370,126 @@ func MonitorCancel(ctx *context.Context) {
"redirect": ctx.Repo.RepoLink + "/admin/monitor",
})
}
+
+// Queue shows details for a specific queue
+func Queue(ctx *context.Context) {
+ qid := ctx.ParamsInt64("qid")
+ mq := queue.GetManager().GetManagedQueue(qid)
+ if mq == nil {
+ ctx.Status(404)
+ return
+ }
+ ctx.Data["Title"] = ctx.Tr("admin.monitor.queue", mq.Name)
+ ctx.Data["PageIsAdmin"] = true
+ ctx.Data["PageIsAdminMonitor"] = true
+ ctx.Data["Queue"] = mq
+ ctx.HTML(200, tplQueue)
+}
+
+// WorkerCancel cancels a worker group
+func WorkerCancel(ctx *context.Context) {
+ qid := ctx.ParamsInt64("qid")
+ mq := queue.GetManager().GetManagedQueue(qid)
+ if mq == nil {
+ ctx.Status(404)
+ return
+ }
+ pid := ctx.ParamsInt64("pid")
+ mq.CancelWorkers(pid)
+ ctx.Flash.Info(ctx.Tr("admin.monitor.queue.pool.cancelling"))
+ ctx.JSON(200, map[string]interface{}{
+ "redirect": setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid),
+ })
+}
+
+// AddWorkers adds workers to a worker group
+func AddWorkers(ctx *context.Context) {
+ qid := ctx.ParamsInt64("qid")
+ mq := queue.GetManager().GetManagedQueue(qid)
+ if mq == nil {
+ ctx.Status(404)
+ return
+ }
+ number := ctx.QueryInt("number")
+ if number < 1 {
+ ctx.Flash.Error(ctx.Tr("admin.monitor.queue.pool.addworkers.mustnumbergreaterzero"))
+ ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid))
+ return
+ }
+ timeout, err := time.ParseDuration(ctx.Query("timeout"))
+ if err != nil {
+ ctx.Flash.Error(ctx.Tr("admin.monitor.queue.pool.addworkers.musttimeoutduration"))
+ ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid))
+ return
+ }
+ if mq.Pool == nil {
+ ctx.Flash.Error(ctx.Tr("admin.monitor.queue.pool.none"))
+ ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid))
+ return
+ }
+ mq.AddWorkers(number, timeout)
+ ctx.Flash.Success(ctx.Tr("admin.monitor.queue.pool.added"))
+ ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid))
+}
+
+// SetQueueSettings sets the maximum number of workers and other settings for this queue
+func SetQueueSettings(ctx *context.Context) {
+ qid := ctx.ParamsInt64("qid")
+ mq := queue.GetManager().GetManagedQueue(qid)
+ if mq == nil {
+ ctx.Status(404)
+ return
+ }
+ if mq.Pool == nil {
+ ctx.Flash.Error(ctx.Tr("admin.monitor.queue.pool.none"))
+ ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid))
+ return
+ }
+
+ maxNumberStr := ctx.Query("max-number")
+ numberStr := ctx.Query("number")
+ timeoutStr := ctx.Query("timeout")
+
+ var err error
+ var maxNumber, number int
+ var timeout time.Duration
+ if len(maxNumberStr) > 0 {
+ maxNumber, err = strconv.Atoi(maxNumberStr)
+ if err != nil {
+ ctx.Flash.Error(ctx.Tr("admin.monitor.queue.settings.maxnumberworkers.error"))
+ ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid))
+ return
+ }
+ if maxNumber < -1 {
+ maxNumber = -1
+ }
+ } else {
+ maxNumber = mq.MaxNumberOfWorkers()
+ }
+
+ if len(numberStr) > 0 {
+ number, err = strconv.Atoi(numberStr)
+ if err != nil || number < 0 {
+ ctx.Flash.Error(ctx.Tr("admin.monitor.queue.settings.numberworkers.error"))
+ ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid))
+ return
+ }
+ } else {
+ number = mq.BoostWorkers()
+ }
+
+ if len(timeoutStr) > 0 {
+ timeout, err = time.ParseDuration(timeoutStr)
+ if err != nil {
+ ctx.Flash.Error(ctx.Tr("admin.monitor.queue.settings.timeout.error"))
+ ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid))
+ return
+ }
+ } else {
+ timeout = mq.Pool.BoostTimeout()
+ }
+
+ mq.SetSettings(maxNumber, number, timeout)
+ ctx.Flash.Success(ctx.Tr("admin.monitor.queue.settings.changed"))
+ ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid))
+}
diff --git a/routers/routes/routes.go b/routers/routes/routes.go
index 888c92ac4a..2f886f749d 100644
--- a/routers/routes/routes.go
+++ b/routers/routes/routes.go
@@ -410,8 +410,16 @@ func RegisterRoutes(m *macaron.Macaron) {
m.Get("", adminReq, admin.Dashboard)
m.Get("/config", admin.Config)
m.Post("/config/test_mail", admin.SendTestMail)
- m.Get("/monitor", admin.Monitor)
- m.Post("/monitor/cancel/:pid", admin.MonitorCancel)
+ m.Group("/monitor", func() {
+ m.Get("", admin.Monitor)
+ m.Post("/cancel/:pid", admin.MonitorCancel)
+ m.Group("/queue/:qid", func() {
+ m.Get("", admin.Queue)
+ m.Post("/set", admin.SetQueueSettings)
+ m.Post("/add", admin.AddWorkers)
+ m.Post("/cancel/:pid", admin.WorkerCancel)
+ })
+ })
m.Group("/users", func() {
m.Get("", admin.Users)