summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--models/webhook/hooktask.go22
-rw-r--r--services/webhook/deliver.go74
-rw-r--r--services/webhook/deliver_test.go10
-rw-r--r--services/webhook/webhook.go25
4 files changed, 99 insertions, 32 deletions
diff --git a/models/webhook/hooktask.go b/models/webhook/hooktask.go
index 246484aea9..92d9e97383 100644
--- a/models/webhook/hooktask.go
+++ b/models/webhook/hooktask.go
@@ -233,14 +233,30 @@ func ReplayHookTask(ctx context.Context, hookID int64, uuid string) (*HookTask,
return newTask, db.Insert(ctx, newTask)
}
-// FindUndeliveredHookTasks represents find the undelivered hook tasks
-func FindUndeliveredHookTasks(ctx context.Context) ([]*HookTask, error) {
- tasks := make([]*HookTask, 0, 10)
+// FindUndeliveredHookTaskIDs will find the next 100 undelivered hook tasks with ID greater than the provided lowerID
+func FindUndeliveredHookTaskIDs(ctx context.Context, lowerID int64) ([]int64, error) {
+ const batchSize = 100
+
+ tasks := make([]int64, 0, batchSize)
return tasks, db.GetEngine(ctx).
+ Select("id").
+ Table(new(HookTask)).
Where("is_delivered=?", false).
+ And("id > ?", lowerID).
+ Asc("id").
+ Limit(batchSize).
Find(&tasks)
}
+func MarkTaskDelivered(ctx context.Context, task *HookTask) (bool, error) {
+ count, err := db.GetEngine(ctx).ID(task.ID).Where("is_delivered = ?", false).Cols("is_delivered").Update(&HookTask{
+ ID: task.ID,
+ IsDelivered: true,
+ })
+
+ return count != 0, err
+}
+
// CleanupHookTaskTable deletes rows from hook_task as needed.
func CleanupHookTaskTable(ctx context.Context, cleanupType HookTaskCleanupType, olderThan time.Duration, numberToKeep int) error {
log.Trace("Doing: CleanupHookTaskTable")
diff --git a/services/webhook/deliver.go b/services/webhook/deliver.go
index 85717e0978..07fdf18c83 100644
--- a/services/webhook/deliver.go
+++ b/services/webhook/deliver.go
@@ -23,6 +23,7 @@ import (
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/hostmatcher"
"code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/proxy"
"code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting"
@@ -43,7 +44,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
return
}
// There was a panic whilst delivering a hook...
- log.Error("PANIC whilst trying to deliver webhook[%d] to %s Panic: %v\nStacktrace: %s", t.ID, w.URL, err, log.Stack(2))
+ log.Error("PANIC whilst trying to deliver webhook task[%d] to webhook %s Panic: %v\nStacktrace: %s", t.ID, w.URL, err, log.Stack(2))
}()
t.IsDelivered = true
@@ -52,7 +53,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
switch w.HTTPMethod {
case "":
- log.Info("HTTP Method for webhook %d empty, setting to POST as default", t.ID)
+ log.Info("HTTP Method for webhook %s empty, setting to POST as default", w.URL)
fallthrough
case http.MethodPost:
switch w.ContentType {
@@ -78,14 +79,14 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
case http.MethodGet:
u, err := url.Parse(w.URL)
if err != nil {
- return err
+ return fmt.Errorf("unable to deliver webhook task[%d] as cannot parse webhook url %s: %w", t.ID, w.URL, err)
}
vals := u.Query()
vals["payload"] = []string{t.PayloadContent}
u.RawQuery = vals.Encode()
req, err = http.NewRequest("GET", u.String(), nil)
if err != nil {
- return err
+ return fmt.Errorf("unable to deliver webhook task[%d] as unable to create HTTP request for webhook url %s: %w", t.ID, w.URL, err)
}
case http.MethodPut:
switch w.Type {
@@ -97,13 +98,13 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
url := fmt.Sprintf("%s/%s", w.URL, url.PathEscape(txnID))
req, err = http.NewRequest("PUT", url, strings.NewReader(t.PayloadContent))
if err != nil {
- return err
+ return fmt.Errorf("unable to deliver webhook task[%d] as cannot create matrix request for webhook url %s: %w", t.ID, w.URL, err)
}
default:
- return fmt.Errorf("invalid http method for webhook: [%d] %v", t.ID, w.HTTPMethod)
+ return fmt.Errorf("invalid http method for webhook task[%d] in webhook %s: %v", t.ID, w.URL, w.HTTPMethod)
}
default:
- return fmt.Errorf("invalid http method for webhook: [%d] %v", t.ID, w.HTTPMethod)
+ return fmt.Errorf("invalid http method for webhook task[%d] in webhook %s: %v", t.ID, w.URL, w.HTTPMethod)
}
var signatureSHA1 string
@@ -159,6 +160,20 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
Headers: map[string]string{},
}
+ // OK We're now ready to attempt to deliver the task - we must double check that it
+ // has not been delivered in the meantime
+ updated, err := webhook_model.MarkTaskDelivered(ctx, t)
+ if err != nil {
+ log.Error("MarkTaskDelivered[%d]: %v", t.ID, err)
+ return fmt.Errorf("unable to mark task[%d] delivered in the db: %w", t.ID, err)
+ }
+ if !updated {
+ // This webhook task has already been attempted to be delivered or is in the process of being delivered
+ log.Trace("Webhook Task[%d] already delivered", t.ID)
+ return nil
+ }
+
+ // All code from this point will update the hook task
defer func() {
t.Delivered = time.Now().UnixNano()
if t.IsSucceed {
@@ -190,13 +205,14 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
}
if !w.IsActive {
+ log.Trace("Webhook %s in Webhook Task[%d] is not active", w.URL, t.ID)
return nil
}
resp, err := webhookHTTPClient.Do(req.WithContext(ctx))
if err != nil {
t.ResponseInfo.Body = fmt.Sprintf("Delivery: %v", err)
- return err
+ return fmt.Errorf("unable to deliver webhook task[%d] in %s due to error in http client: %w", t.ID, w.URL, err)
}
defer resp.Body.Close()
@@ -210,7 +226,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
p, err := io.ReadAll(resp.Body)
if err != nil {
t.ResponseInfo.Body = fmt.Sprintf("read body: %s", err)
- return err
+ return fmt.Errorf("unable to deliver webhook task[%d] in %s as unable to read response body: %w", t.ID, w.URL, err)
}
t.ResponseInfo.Body = string(p)
return nil
@@ -272,17 +288,37 @@ func Init() error {
}
go graceful.GetManager().RunWithShutdownFns(hookQueue.Run)
- tasks, err := webhook_model.FindUndeliveredHookTasks(graceful.GetManager().HammerContext())
- if err != nil {
- log.Error("FindUndeliveredHookTasks failed: %v", err)
- return err
- }
+ go graceful.GetManager().RunWithShutdownContext(populateWebhookSendingQueue)
+
+ return nil
+}
+
+func populateWebhookSendingQueue(ctx context.Context) {
+ ctx, _, finished := process.GetManager().AddContext(ctx, "Webhook: Populate sending queue")
+ defer finished()
- for _, task := range tasks {
- if err := enqueueHookTask(task); err != nil {
- log.Error("enqueueHookTask failed: %v", err)
+ lowerID := int64(0)
+ for {
+ taskIDs, err := webhook_model.FindUndeliveredHookTaskIDs(ctx, lowerID)
+ if err != nil {
+ log.Error("Unable to populate webhook queue as FindUndeliveredHookTaskIDs failed: %v", err)
+ return
+ }
+ if len(taskIDs) == 0 {
+ return
+ }
+ lowerID = taskIDs[len(taskIDs)-1]
+
+ for _, taskID := range taskIDs {
+ select {
+ case <-ctx.Done():
+ log.Warn("Shutdown before Webhook Sending queue finishing being populated")
+ return
+ default:
+ }
+ if err := enqueueHookTask(taskID); err != nil {
+ log.Error("Unable to push HookTask[%d] to the Webhook Sending queue: %v", taskID, err)
+ }
}
}
-
- return nil
}
diff --git a/services/webhook/deliver_test.go b/services/webhook/deliver_test.go
index 83ca7d6178..498cf7d159 100644
--- a/services/webhook/deliver_test.go
+++ b/services/webhook/deliver_test.go
@@ -16,6 +16,7 @@ import (
"code.gitea.io/gitea/models/unittest"
webhook_model "code.gitea.io/gitea/models/webhook"
"code.gitea.io/gitea/modules/setting"
+ api "code.gitea.io/gitea/modules/structs"
"github.com/stretchr/testify/assert"
)
@@ -67,8 +68,15 @@ func TestWebhookDeliverAuthorizationHeader(t *testing.T) {
err := hook.SetHeaderAuthorization("Bearer s3cr3t-t0ken")
assert.NoError(t, err)
assert.NoError(t, webhook_model.CreateWebhook(db.DefaultContext, hook))
+ db.GetEngine(db.DefaultContext).NoAutoTime().DB().Logger.ShowSQL(true)
- hookTask := &webhook_model.HookTask{HookID: hook.ID, EventType: webhook_model.HookEventPush}
+ hookTask := &webhook_model.HookTask{HookID: hook.ID, EventType: webhook_model.HookEventPush, Payloader: &api.PushPayload{}}
+
+ hookTask, err = webhook_model.CreateHookTask(db.DefaultContext, hookTask)
+ assert.NoError(t, err)
+ if !assert.NotNil(t, hookTask) {
+ return
+ }
assert.NoError(t, Deliver(context.Background(), hookTask))
select {
diff --git a/services/webhook/webhook.go b/services/webhook/webhook.go
index 1780022eb4..5c9139b41f 100644
--- a/services/webhook/webhook.go
+++ b/services/webhook/webhook.go
@@ -116,19 +116,26 @@ func handle(data ...queue.Data) []queue.Data {
for _, taskID := range data {
task, err := webhook_model.GetHookTaskByID(ctx, taskID.(int64))
if err != nil {
- log.Error("GetHookTaskByID failed: %v", err)
- } else {
- if err := Deliver(ctx, task); err != nil {
- log.Error("webhook.Deliver failed: %v", err)
- }
+ log.Error("GetHookTaskByID[%d] failed: %v", taskID.(int64), err)
+ continue
+ }
+
+ if task.IsDelivered {
+ // Already delivered in the meantime
+ log.Trace("Task[%d] has already been delivered", task.ID)
+ continue
+ }
+
+ if err := Deliver(ctx, task); err != nil {
+ log.Error("Unable to deliver webhook task[%d]: %v", task.ID, err)
}
}
return nil
}
-func enqueueHookTask(task *webhook_model.HookTask) error {
- err := hookQueue.PushFunc(task.ID, nil)
+func enqueueHookTask(taskID int64) error {
+ err := hookQueue.Push(taskID)
if err != nil && err != queue.ErrAlreadyInQueue {
return err
}
@@ -205,7 +212,7 @@ func PrepareWebhook(ctx context.Context, w *webhook_model.Webhook, event webhook
return fmt.Errorf("CreateHookTask: %w", err)
}
- return enqueueHookTask(task)
+ return enqueueHookTask(task.ID)
}
// PrepareWebhooks adds new webhooks to task queue for given payload.
@@ -265,5 +272,5 @@ func ReplayHookTask(ctx context.Context, w *webhook_model.Webhook, uuid string)
return err
}
- return enqueueHookTask(task)
+ return enqueueHookTask(task.ID)
}