diff options
author | zeripath <art27@cantab.net> | 2022-11-23 14:10:04 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-23 22:10:04 +0800 |
commit | 787f6c32278f1fb814ce405e960dd7c50a7d2b3b (patch) | |
tree | 48879d3b6eecebc20286e32e6f22800b826ffc68 /services | |
parent | 4c00d8f916f4a347d04001418b766f1849214181 (diff) | |
download | gitea-787f6c32278f1fb814ce405e960dd7c50a7d2b3b.tar.gz gitea-787f6c32278f1fb814ce405e960dd7c50a7d2b3b.zip |
Ensure that Webhook tasks are not double delivered (#21558)
When re-retrieving hook tasks from the DB double check if they have not
been delivered in the meantime. Further ensure that tasks are marked as
delivered when they are being delivered.
In addition:
* Improve the error reporting and make sure that the webhook task
population script runs in a separate goroutine.
* Only get hook task IDs out of the DB instead of the whole task when
repopulating the queue
* When repopulating the queue make the DB request paged
Ref #17940
Signed-off-by: Andrew Thornton <art27@cantab.net>
Co-authored-by: delvh <dev.lh@web.de>
Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
Diffstat (limited to 'services')
-rw-r--r-- | services/webhook/deliver.go | 74 | ||||
-rw-r--r-- | services/webhook/deliver_test.go | 10 | ||||
-rw-r--r-- | services/webhook/webhook.go | 25 |
3 files changed, 80 insertions, 29 deletions
diff --git a/services/webhook/deliver.go b/services/webhook/deliver.go index 85717e0978..07fdf18c83 100644 --- a/services/webhook/deliver.go +++ b/services/webhook/deliver.go @@ -23,6 +23,7 @@ import ( "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/hostmatcher" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/process" "code.gitea.io/gitea/modules/proxy" "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" @@ -43,7 +44,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { return } // There was a panic whilst delivering a hook... - log.Error("PANIC whilst trying to deliver webhook[%d] to %s Panic: %v\nStacktrace: %s", t.ID, w.URL, err, log.Stack(2)) + log.Error("PANIC whilst trying to deliver webhook task[%d] to webhook %s Panic: %v\nStacktrace: %s", t.ID, w.URL, err, log.Stack(2)) }() t.IsDelivered = true @@ -52,7 +53,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { switch w.HTTPMethod { case "": - log.Info("HTTP Method for webhook %d empty, setting to POST as default", t.ID) + log.Info("HTTP Method for webhook %s empty, setting to POST as default", w.URL) fallthrough case http.MethodPost: switch w.ContentType { @@ -78,14 +79,14 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { case http.MethodGet: u, err := url.Parse(w.URL) if err != nil { - return err + return fmt.Errorf("unable to deliver webhook task[%d] as cannot parse webhook url %s: %w", t.ID, w.URL, err) } vals := u.Query() vals["payload"] = []string{t.PayloadContent} u.RawQuery = vals.Encode() req, err = http.NewRequest("GET", u.String(), nil) if err != nil { - return err + return fmt.Errorf("unable to deliver webhook task[%d] as unable to create HTTP request for webhook url %s: %w", t.ID, w.URL, err) } case http.MethodPut: switch w.Type { @@ -97,13 +98,13 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { url := fmt.Sprintf("%s/%s", w.URL, url.PathEscape(txnID)) req, err = http.NewRequest("PUT", url, strings.NewReader(t.PayloadContent)) if err != nil { - return err + return fmt.Errorf("unable to deliver webhook task[%d] as cannot create matrix request for webhook url %s: %w", t.ID, w.URL, err) } default: - return fmt.Errorf("invalid http method for webhook: [%d] %v", t.ID, w.HTTPMethod) + return fmt.Errorf("invalid http method for webhook task[%d] in webhook %s: %v", t.ID, w.URL, w.HTTPMethod) } default: - return fmt.Errorf("invalid http method for webhook: [%d] %v", t.ID, w.HTTPMethod) + return fmt.Errorf("invalid http method for webhook task[%d] in webhook %s: %v", t.ID, w.URL, w.HTTPMethod) } var signatureSHA1 string @@ -159,6 +160,20 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { Headers: map[string]string{}, } + // OK We're now ready to attempt to deliver the task - we must double check that it + // has not been delivered in the meantime + updated, err := webhook_model.MarkTaskDelivered(ctx, t) + if err != nil { + log.Error("MarkTaskDelivered[%d]: %v", t.ID, err) + return fmt.Errorf("unable to mark task[%d] delivered in the db: %w", t.ID, err) + } + if !updated { + // This webhook task has already been attempted to be delivered or is in the process of being delivered + log.Trace("Webhook Task[%d] already delivered", t.ID) + return nil + } + + // All code from this point will update the hook task defer func() { t.Delivered = time.Now().UnixNano() if t.IsSucceed { @@ -190,13 +205,14 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { } if !w.IsActive { + log.Trace("Webhook %s in Webhook Task[%d] is not active", w.URL, t.ID) return nil } resp, err := webhookHTTPClient.Do(req.WithContext(ctx)) if err != nil { t.ResponseInfo.Body = fmt.Sprintf("Delivery: %v", err) - return err + return fmt.Errorf("unable to deliver webhook task[%d] in %s due to error in http client: %w", t.ID, w.URL, err) } defer resp.Body.Close() @@ -210,7 +226,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { p, err := io.ReadAll(resp.Body) if err != nil { t.ResponseInfo.Body = fmt.Sprintf("read body: %s", err) - return err + return fmt.Errorf("unable to deliver webhook task[%d] in %s as unable to read response body: %w", t.ID, w.URL, err) } t.ResponseInfo.Body = string(p) return nil @@ -272,17 +288,37 @@ func Init() error { } go graceful.GetManager().RunWithShutdownFns(hookQueue.Run) - tasks, err := webhook_model.FindUndeliveredHookTasks(graceful.GetManager().HammerContext()) - if err != nil { - log.Error("FindUndeliveredHookTasks failed: %v", err) - return err - } + go graceful.GetManager().RunWithShutdownContext(populateWebhookSendingQueue) + + return nil +} + +func populateWebhookSendingQueue(ctx context.Context) { + ctx, _, finished := process.GetManager().AddContext(ctx, "Webhook: Populate sending queue") + defer finished() - for _, task := range tasks { - if err := enqueueHookTask(task); err != nil { - log.Error("enqueueHookTask failed: %v", err) + lowerID := int64(0) + for { + taskIDs, err := webhook_model.FindUndeliveredHookTaskIDs(ctx, lowerID) + if err != nil { + log.Error("Unable to populate webhook queue as FindUndeliveredHookTaskIDs failed: %v", err) + return + } + if len(taskIDs) == 0 { + return + } + lowerID = taskIDs[len(taskIDs)-1] + + for _, taskID := range taskIDs { + select { + case <-ctx.Done(): + log.Warn("Shutdown before Webhook Sending queue finishing being populated") + return + default: + } + if err := enqueueHookTask(taskID); err != nil { + log.Error("Unable to push HookTask[%d] to the Webhook Sending queue: %v", taskID, err) + } } } - - return nil } diff --git a/services/webhook/deliver_test.go b/services/webhook/deliver_test.go index 83ca7d6178..498cf7d159 100644 --- a/services/webhook/deliver_test.go +++ b/services/webhook/deliver_test.go @@ -16,6 +16,7 @@ import ( "code.gitea.io/gitea/models/unittest" webhook_model "code.gitea.io/gitea/models/webhook" "code.gitea.io/gitea/modules/setting" + api "code.gitea.io/gitea/modules/structs" "github.com/stretchr/testify/assert" ) @@ -67,8 +68,15 @@ func TestWebhookDeliverAuthorizationHeader(t *testing.T) { err := hook.SetHeaderAuthorization("Bearer s3cr3t-t0ken") assert.NoError(t, err) assert.NoError(t, webhook_model.CreateWebhook(db.DefaultContext, hook)) + db.GetEngine(db.DefaultContext).NoAutoTime().DB().Logger.ShowSQL(true) - hookTask := &webhook_model.HookTask{HookID: hook.ID, EventType: webhook_model.HookEventPush} + hookTask := &webhook_model.HookTask{HookID: hook.ID, EventType: webhook_model.HookEventPush, Payloader: &api.PushPayload{}} + + hookTask, err = webhook_model.CreateHookTask(db.DefaultContext, hookTask) + assert.NoError(t, err) + if !assert.NotNil(t, hookTask) { + return + } assert.NoError(t, Deliver(context.Background(), hookTask)) select { diff --git a/services/webhook/webhook.go b/services/webhook/webhook.go index 1780022eb4..5c9139b41f 100644 --- a/services/webhook/webhook.go +++ b/services/webhook/webhook.go @@ -116,19 +116,26 @@ func handle(data ...queue.Data) []queue.Data { for _, taskID := range data { task, err := webhook_model.GetHookTaskByID(ctx, taskID.(int64)) if err != nil { - log.Error("GetHookTaskByID failed: %v", err) - } else { - if err := Deliver(ctx, task); err != nil { - log.Error("webhook.Deliver failed: %v", err) - } + log.Error("GetHookTaskByID[%d] failed: %v", taskID.(int64), err) + continue + } + + if task.IsDelivered { + // Already delivered in the meantime + log.Trace("Task[%d] has already been delivered", task.ID) + continue + } + + if err := Deliver(ctx, task); err != nil { + log.Error("Unable to deliver webhook task[%d]: %v", task.ID, err) } } return nil } -func enqueueHookTask(task *webhook_model.HookTask) error { - err := hookQueue.PushFunc(task.ID, nil) +func enqueueHookTask(taskID int64) error { + err := hookQueue.Push(taskID) if err != nil && err != queue.ErrAlreadyInQueue { return err } @@ -205,7 +212,7 @@ func PrepareWebhook(ctx context.Context, w *webhook_model.Webhook, event webhook return fmt.Errorf("CreateHookTask: %w", err) } - return enqueueHookTask(task) + return enqueueHookTask(task.ID) } // PrepareWebhooks adds new webhooks to task queue for given payload. @@ -265,5 +272,5 @@ func ReplayHookTask(ctx context.Context, w *webhook_model.Webhook, uuid string) return err } - return enqueueHookTask(task) + return enqueueHookTask(task.ID) } |