summaryrefslogtreecommitdiffstats
path: root/models
diff options
context:
space:
mode:
Diffstat (limited to 'models')
-rw-r--r--models/action.go2
-rw-r--r--models/webhook.go172
2 files changed, 115 insertions, 59 deletions
diff --git a/models/action.go b/models/action.go
index 86520b57b1..2c16a575b6 100644
--- a/models/action.go
+++ b/models/action.go
@@ -431,6 +431,8 @@ func CommitRepoAction(userId, repoUserId int64, userName, actEmail string,
}
if err = CreateHookTask(&HookTask{
+ RepoID: repo.Id,
+ HookID: w.Id,
Type: w.HookTaskType,
Url: w.Url,
BasePayload: payload,
diff --git a/models/webhook.go b/models/webhook.go
index bfa52b9902..18cda74871 100644
--- a/models/webhook.go
+++ b/models/webhook.go
@@ -9,6 +9,7 @@ import (
"encoding/json"
"errors"
"io/ioutil"
+ "sync"
"time"
"github.com/gogits/gogs/modules/httplib"
@@ -259,7 +260,9 @@ func (p Payload) GetJSONPayload() ([]byte, error) {
// HookTask represents a hook task.
type HookTask struct {
- Id int64
+ ID int64 `xorm:"pk autoincr"`
+ RepoID int64 `xorm:"INDEX"`
+ HookID int64
Uuid string
Type HookTaskType
Url string
@@ -269,6 +272,7 @@ type HookTask struct {
EventType HookEventType
IsSsl bool
IsDelivered bool
+ Delivered int64
IsSucceed bool
}
@@ -287,87 +291,137 @@ func CreateHookTask(t *HookTask) error {
// UpdateHookTask updates information of hook task.
func UpdateHookTask(t *HookTask) error {
- _, err := x.Id(t.Id).AllCols().Update(t)
+ _, err := x.Id(t.ID).AllCols().Update(t)
return err
}
-var (
- // Prevent duplicate deliveries.
- // This happens with massive hook tasks cannot finish delivering
- // before next shooting starts.
- isShooting = false
-)
+type hookQueue struct {
+ // Make sure one repository only occur once in the queue.
+ lock sync.Mutex
+ repoIDs map[int64]bool
-// DeliverHooks checks and delivers undelivered hooks.
-// FIXME: maybe can use goroutine to shoot a number of them at same time?
-func DeliverHooks() {
- if isShooting {
+ queue chan int64
+}
+
+func (q *hookQueue) removeRepoID(id int64) {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+ delete(q.repoIDs, id)
+}
+
+func (q *hookQueue) addRepoID(id int64) {
+ q.lock.Lock()
+ if q.repoIDs[id] {
+ q.lock.Unlock()
return
}
- isShooting = true
- defer func() { isShooting = false }()
+ q.repoIDs[id] = true
+ q.lock.Unlock()
+ q.queue <- id
+}
- tasks := make([]*HookTask, 0, 10)
+// AddRepoID adds repository ID to hook delivery queue.
+func (q *hookQueue) AddRepoID(id int64) {
+ go q.addRepoID(id)
+}
+
+var HookQueue *hookQueue
+
+func deliverHook(t *HookTask) {
timeout := time.Duration(setting.Webhook.DeliverTimeout) * time.Second
- x.Where("is_delivered=?", false).Iterate(new(HookTask),
- func(idx int, bean interface{}) error {
- t := bean.(*HookTask)
- req := httplib.Post(t.Url).SetTimeout(timeout, timeout).
- Header("X-Gogs-Delivery", t.Uuid).
- Header("X-Gogs-Event", string(t.EventType)).
- SetTLSClientConfig(&tls.Config{InsecureSkipVerify: setting.Webhook.SkipTLSVerify})
-
- switch t.ContentType {
- case JSON:
- req = req.Header("Content-Type", "application/json").Body(t.PayloadContent)
- case FORM:
- req.Param("payload", t.PayloadContent)
- }
+ req := httplib.Post(t.Url).SetTimeout(timeout, timeout).
+ Header("X-Gogs-Delivery", t.Uuid).
+ Header("X-Gogs-Event", string(t.EventType)).
+ SetTLSClientConfig(&tls.Config{InsecureSkipVerify: setting.Webhook.SkipTLSVerify})
- t.IsDelivered = true
+ switch t.ContentType {
+ case JSON:
+ req = req.Header("Content-Type", "application/json").Body(t.PayloadContent)
+ case FORM:
+ req.Param("payload", t.PayloadContent)
+ }
- // FIXME: record response.
- switch t.Type {
- case GOGS:
- {
- if _, err := req.Response(); err != nil {
- log.Error(5, "Delivery: %v", err)
+ t.IsDelivered = true
+
+ // FIXME: record response.
+ switch t.Type {
+ case GOGS:
+ {
+ if resp, err := req.Response(); err != nil {
+ log.Error(5, "Delivery: %v", err)
+ } else {
+ resp.Body.Close()
+ t.IsSucceed = true
+ }
+ }
+ case SLACK:
+ {
+ if resp, err := req.Response(); err != nil {
+ log.Error(5, "Delivery: %v", err)
+ } else {
+ defer resp.Body.Close()
+ contents, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ log.Error(5, "%s", err)
+ } else {
+ if string(contents) != "ok" {
+ log.Error(5, "slack failed with: %s", string(contents))
} else {
t.IsSucceed = true
}
}
- case SLACK:
- {
- if res, err := req.Response(); err != nil {
- log.Error(5, "Delivery: %v", err)
- } else {
- defer res.Body.Close()
- contents, err := ioutil.ReadAll(res.Body)
- if err != nil {
- log.Error(5, "%s", err)
- } else {
- if string(contents) != "ok" {
- log.Error(5, "slack failed with: %s", string(contents))
- } else {
- t.IsSucceed = true
- }
- }
- }
- }
}
+ }
+ }
- tasks = append(tasks, t)
+ t.Delivered = time.Now().UTC().UnixNano()
+ if t.IsSucceed {
+ log.Trace("Hook delivered(%s): %s", t.Uuid, t.PayloadContent)
+ }
+}
- if t.IsSucceed {
- log.Trace("Hook delivered(%s): %s", t.Uuid, t.PayloadContent)
- }
+// DeliverHooks checks and delivers undelivered hooks.
+func DeliverHooks() {
+ tasks := make([]*HookTask, 0, 10)
+ x.Where("is_delivered=?", false).Iterate(new(HookTask),
+ func(idx int, bean interface{}) error {
+ t := bean.(*HookTask)
+ deliverHook(t)
+ tasks = append(tasks, t)
return nil
})
// Update hook task status.
for _, t := range tasks {
if err := UpdateHookTask(t); err != nil {
- log.Error(4, "UpdateHookTask(%d): %v", t.Id, err)
+ log.Error(4, "UpdateHookTask(%d): %v", t.ID, err)
+ }
+ }
+
+ HookQueue = &hookQueue{
+ lock: sync.Mutex{},
+ repoIDs: make(map[int64]bool),
+ queue: make(chan int64, setting.Webhook.QueueLength),
+ }
+
+ // Start listening on new hook requests.
+ for repoID := range HookQueue.queue {
+ HookQueue.removeRepoID(repoID)
+
+ tasks = make([]*HookTask, 0, 5)
+ if err := x.Where("repo_id=? AND is_delivered=?", repoID, false).Find(&tasks); err != nil {
+ log.Error(4, "Get repository(%d) hook tasks: %v", repoID, err)
+ continue
+ }
+ for _, t := range tasks {
+ deliverHook(t)
+ if err := UpdateHookTask(t); err != nil {
+ log.Error(4, "UpdateHookTask(%d): %v", t.ID, err)
+ }
}
}
}
+
+func InitDeliverHooks() {
+ go DeliverHooks()
+}