diff options
author | Lunny Xiao <xiaolunwen@gmail.com> | 2022-04-26 02:03:01 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-04-25 20:03:01 +0200 |
commit | 7c164d5a918c2461bbdf2a1ef79a20908c3259be (patch) | |
tree | ca46ea235e10fc43d081cd375df953e021390f0f /services | |
parent | 257cea654cf1fb592fdb5eacc9c1d5a97611a3f1 (diff) | |
download | gitea-7c164d5a918c2461bbdf2a1ef79a20908c3259be.tar.gz gitea-7c164d5a918c2461bbdf2a1ef79a20908c3259be.zip |
Use queue instead of memory queue in webhook send service (#19390)
Diffstat (limited to 'services')
-rw-r--r-- | services/webhook/deliver.go | 59 | ||||
-rw-r--r-- | services/webhook/main_test.go | 4 | ||||
-rw-r--r-- | services/webhook/webhook.go | 49 |
3 files changed, 60 insertions, 52 deletions
diff --git a/services/webhook/deliver.go b/services/webhook/deliver.go index 7998be53c2..77744473f1 100644 --- a/services/webhook/deliver.go +++ b/services/webhook/deliver.go @@ -15,7 +15,6 @@ import ( "io" "net/http" "net/url" - "strconv" "strings" "sync" "time" @@ -26,6 +25,7 @@ import ( "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/process" "code.gitea.io/gitea/modules/proxy" + "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" "github.com/gobwas/glob" @@ -202,10 +202,8 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { return nil } -// DeliverHooks checks and delivers undelivered hooks. -// FIXME: graceful: This would likely benefit from either a worker pool with dummy queue -// or a full queue. Then more hooks could be sent at same time. -func DeliverHooks(ctx context.Context) { +// populateDeliverHooks checks and delivers undelivered hooks. +func populateDeliverHooks(ctx context.Context) { select { case <-ctx.Done(): return @@ -226,42 +224,9 @@ func DeliverHooks(ctx context.Context) { return default: } - if err = Deliver(ctx, t); err != nil { - log.Error("deliver: %v", err) - } - } - - // Start listening on new hook requests. - for { - select { - case <-ctx.Done(): - hookQueue.Close() - return - case repoIDStr := <-hookQueue.Queue(): - log.Trace("DeliverHooks [repo_id: %v]", repoIDStr) - hookQueue.Remove(repoIDStr) - - repoID, err := strconv.ParseInt(repoIDStr, 10, 64) - if err != nil { - log.Error("Invalid repo ID: %s", repoIDStr) - continue - } - tasks, err := webhook_model.FindRepoUndeliveredHookTasks(repoID) - if err != nil { - log.Error("Get repository [%d] hook tasks: %v", repoID, err) - continue - } - for _, t := range tasks { - select { - case <-ctx.Done(): - return - default: - } - if err = Deliver(ctx, t); err != nil { - log.Error("deliver: %v", err) - } - } + if err := addToTask(t.RepoID); err != nil { + log.Error("DeliverHook failed [%d]: %v", t.RepoID, err) } } } @@ -297,8 +262,8 @@ func webhookProxy() func(req *http.Request) (*url.URL, error) { } } -// InitDeliverHooks starts the hooks delivery thread -func InitDeliverHooks() { +// Init starts the hooks delivery thread +func Init() error { timeout := time.Duration(setting.Webhook.DeliverTimeout) * time.Second allowedHostListValue := setting.Webhook.AllowedHostList @@ -316,5 +281,13 @@ func InitDeliverHooks() { }, } - go graceful.GetManager().RunWithShutdownContext(DeliverHooks) + hookQueue = queue.CreateUniqueQueue("webhook_sender", handle, "") + if hookQueue == nil { + return fmt.Errorf("Unable to create webhook_sender Queue") + } + go graceful.GetManager().RunWithShutdownFns(hookQueue.Run) + + populateDeliverHooks(graceful.GetManager().HammerContext()) + + return nil } diff --git a/services/webhook/main_test.go b/services/webhook/main_test.go index 25b9df0af6..1dc2e1bd83 100644 --- a/services/webhook/main_test.go +++ b/services/webhook/main_test.go @@ -9,12 +9,16 @@ import ( "testing" "code.gitea.io/gitea/models/unittest" + "code.gitea.io/gitea/modules/setting" _ "code.gitea.io/gitea/models" ) func TestMain(m *testing.M) { + setting.LoadForTest() + setting.NewQueueService() unittest.MainTest(m, &unittest.TestOptions{ GiteaRootPath: filepath.Join("..", ".."), + SetUp: Init, }) } diff --git a/services/webhook/webhook.go b/services/webhook/webhook.go index a3efc7535f..b15b8173f5 100644 --- a/services/webhook/webhook.go +++ b/services/webhook/webhook.go @@ -12,10 +12,11 @@ import ( repo_model "code.gitea.io/gitea/models/repo" webhook_model "code.gitea.io/gitea/models/webhook" "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" api "code.gitea.io/gitea/modules/structs" - "code.gitea.io/gitea/modules/sync" "code.gitea.io/gitea/modules/util" "github.com/gobwas/glob" @@ -80,7 +81,7 @@ func IsValidHookTaskType(name string) bool { } // hookQueue is a global queue of web hooks -var hookQueue = sync.NewUniqueQueue(setting.Webhook.QueueLength) +var hookQueue queue.UniqueQueue // getPayloadBranch returns branch for hook event, if applicable. func getPayloadBranch(p api.Payloader) string { @@ -101,14 +102,47 @@ func getPayloadBranch(p api.Payloader) string { return "" } +// handle passed PR IDs and test the PRs +func handle(data ...queue.Data) []queue.Data { + for _, datum := range data { + repoIDStr := datum.(string) + log.Trace("DeliverHooks [repo_id: %v]", repoIDStr) + + repoID, err := strconv.ParseInt(repoIDStr, 10, 64) + if err != nil { + log.Error("Invalid repo ID: %s", repoIDStr) + continue + } + + tasks, err := webhook_model.FindRepoUndeliveredHookTasks(repoID) + if err != nil { + log.Error("Get repository [%d] hook tasks: %v", repoID, err) + continue + } + for _, t := range tasks { + if err = Deliver(graceful.GetManager().HammerContext(), t); err != nil { + log.Error("deliver: %v", err) + } + } + } + return nil +} + +func addToTask(repoID int64) error { + err := hookQueue.PushFunc(strconv.FormatInt(repoID, 10), nil) + if err != nil && err != queue.ErrAlreadyInQueue { + return err + } + return nil +} + // PrepareWebhook adds special webhook to task queue for given payload. func PrepareWebhook(w *webhook_model.Webhook, repo *repo_model.Repository, event webhook_model.HookEventType, p api.Payloader) error { if err := prepareWebhook(w, repo, event, p); err != nil { return err } - go hookQueue.Add(strconv.FormatInt(repo.ID, 10)) - return nil + return addToTask(repo.ID) } func checkBranch(w *webhook_model.Webhook, branch string) bool { @@ -188,8 +222,7 @@ func PrepareWebhooks(repo *repo_model.Repository, event webhook_model.HookEventT return err } - go hookQueue.Add(strconv.FormatInt(repo.ID, 10)) - return nil + return addToTask(repo.ID) } func prepareWebhooks(repo *repo_model.Repository, event webhook_model.HookEventType, p api.Payloader) error { @@ -240,7 +273,5 @@ func ReplayHookTask(w *webhook_model.Webhook, uuid string) error { return err } - go hookQueue.Add(strconv.FormatInt(t.RepoID, 10)) - - return nil + return addToTask(t.RepoID) } |