diff options
author | Lunny Xiao <xiaolunwen@gmail.com> | 2022-04-26 02:03:01 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-04-25 20:03:01 +0200 |
commit | 7c164d5a918c2461bbdf2a1ef79a20908c3259be (patch) | |
tree | ca46ea235e10fc43d081cd375df953e021390f0f /services/webhook/deliver.go | |
parent | 257cea654cf1fb592fdb5eacc9c1d5a97611a3f1 (diff) | |
download | gitea-7c164d5a918c2461bbdf2a1ef79a20908c3259be.tar.gz gitea-7c164d5a918c2461bbdf2a1ef79a20908c3259be.zip |
Use queue instead of memory queue in webhook send service (#19390)
Diffstat (limited to 'services/webhook/deliver.go')
-rw-r--r-- | services/webhook/deliver.go | 59 |
1 files changed, 16 insertions, 43 deletions
diff --git a/services/webhook/deliver.go b/services/webhook/deliver.go index 7998be53c2..77744473f1 100644 --- a/services/webhook/deliver.go +++ b/services/webhook/deliver.go @@ -15,7 +15,6 @@ import ( "io" "net/http" "net/url" - "strconv" "strings" "sync" "time" @@ -26,6 +25,7 @@ import ( "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/process" "code.gitea.io/gitea/modules/proxy" + "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" "github.com/gobwas/glob" @@ -202,10 +202,8 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { return nil } -// DeliverHooks checks and delivers undelivered hooks. -// FIXME: graceful: This would likely benefit from either a worker pool with dummy queue -// or a full queue. Then more hooks could be sent at same time. -func DeliverHooks(ctx context.Context) { +// populateDeliverHooks checks and delivers undelivered hooks. +func populateDeliverHooks(ctx context.Context) { select { case <-ctx.Done(): return @@ -226,42 +224,9 @@ func DeliverHooks(ctx context.Context) { return default: } - if err = Deliver(ctx, t); err != nil { - log.Error("deliver: %v", err) - } - } - - // Start listening on new hook requests. - for { - select { - case <-ctx.Done(): - hookQueue.Close() - return - case repoIDStr := <-hookQueue.Queue(): - log.Trace("DeliverHooks [repo_id: %v]", repoIDStr) - hookQueue.Remove(repoIDStr) - - repoID, err := strconv.ParseInt(repoIDStr, 10, 64) - if err != nil { - log.Error("Invalid repo ID: %s", repoIDStr) - continue - } - tasks, err := webhook_model.FindRepoUndeliveredHookTasks(repoID) - if err != nil { - log.Error("Get repository [%d] hook tasks: %v", repoID, err) - continue - } - for _, t := range tasks { - select { - case <-ctx.Done(): - return - default: - } - if err = Deliver(ctx, t); err != nil { - log.Error("deliver: %v", err) - } - } + if err := addToTask(t.RepoID); err != nil { + log.Error("DeliverHook failed [%d]: %v", t.RepoID, err) } } } @@ -297,8 +262,8 @@ func webhookProxy() func(req *http.Request) (*url.URL, error) { } } -// InitDeliverHooks starts the hooks delivery thread -func InitDeliverHooks() { +// Init starts the hooks delivery thread +func Init() error { timeout := time.Duration(setting.Webhook.DeliverTimeout) * time.Second allowedHostListValue := setting.Webhook.AllowedHostList @@ -316,5 +281,13 @@ func InitDeliverHooks() { }, } - go graceful.GetManager().RunWithShutdownContext(DeliverHooks) + hookQueue = queue.CreateUniqueQueue("webhook_sender", handle, "") + if hookQueue == nil { + return fmt.Errorf("Unable to create webhook_sender Queue") + } + go graceful.GetManager().RunWithShutdownFns(hookQueue.Run) + + populateDeliverHooks(graceful.GetManager().HammerContext()) + + return nil } |