diff options
author | Unknwon <u@gogs.io> | 2015-07-25 21:32:04 +0800 |
---|---|---|
committer | Unknwon <u@gogs.io> | 2015-07-25 21:32:04 +0800 |
commit | fa298a2c30c358dbfa47fc123c6aca83fe9eb999 (patch) | |
tree | 2179828e8b7c8ebafe3121506f26d52cb0350c63 /models/webhook.go | |
parent | 2b1442f3dfa3558f4e1a9dd5b1dbb50cf348e6e0 (diff) | |
download | gitea-fa298a2c30c358dbfa47fc123c6aca83fe9eb999.tar.gz gitea-fa298a2c30c358dbfa47fc123c6aca83fe9eb999.zip |
#835: Realtime webhooks
Diffstat (limited to 'models/webhook.go')
-rw-r--r-- | models/webhook.go | 172 |
1 files changed, 113 insertions, 59 deletions
diff --git a/models/webhook.go b/models/webhook.go index bfa52b9902..18cda74871 100644 --- a/models/webhook.go +++ b/models/webhook.go @@ -9,6 +9,7 @@ import ( "encoding/json" "errors" "io/ioutil" + "sync" "time" "github.com/gogits/gogs/modules/httplib" @@ -259,7 +260,9 @@ func (p Payload) GetJSONPayload() ([]byte, error) { // HookTask represents a hook task. type HookTask struct { - Id int64 + ID int64 `xorm:"pk autoincr"` + RepoID int64 `xorm:"INDEX"` + HookID int64 Uuid string Type HookTaskType Url string @@ -269,6 +272,7 @@ type HookTask struct { EventType HookEventType IsSsl bool IsDelivered bool + Delivered int64 IsSucceed bool } @@ -287,87 +291,137 @@ func CreateHookTask(t *HookTask) error { // UpdateHookTask updates information of hook task. func UpdateHookTask(t *HookTask) error { - _, err := x.Id(t.Id).AllCols().Update(t) + _, err := x.Id(t.ID).AllCols().Update(t) return err } -var ( - // Prevent duplicate deliveries. - // This happens with massive hook tasks cannot finish delivering - // before next shooting starts. - isShooting = false -) +type hookQueue struct { + // Make sure one repository only occur once in the queue. + lock sync.Mutex + repoIDs map[int64]bool -// DeliverHooks checks and delivers undelivered hooks. -// FIXME: maybe can use goroutine to shoot a number of them at same time? -func DeliverHooks() { - if isShooting { + queue chan int64 +} + +func (q *hookQueue) removeRepoID(id int64) { + q.lock.Lock() + defer q.lock.Unlock() + delete(q.repoIDs, id) +} + +func (q *hookQueue) addRepoID(id int64) { + q.lock.Lock() + if q.repoIDs[id] { + q.lock.Unlock() return } - isShooting = true - defer func() { isShooting = false }() + q.repoIDs[id] = true + q.lock.Unlock() + q.queue <- id +} - tasks := make([]*HookTask, 0, 10) +// AddRepoID adds repository ID to hook delivery queue. +func (q *hookQueue) AddRepoID(id int64) { + go q.addRepoID(id) +} + +var HookQueue *hookQueue + +func deliverHook(t *HookTask) { timeout := time.Duration(setting.Webhook.DeliverTimeout) * time.Second - x.Where("is_delivered=?", false).Iterate(new(HookTask), - func(idx int, bean interface{}) error { - t := bean.(*HookTask) - req := httplib.Post(t.Url).SetTimeout(timeout, timeout). - Header("X-Gogs-Delivery", t.Uuid). - Header("X-Gogs-Event", string(t.EventType)). - SetTLSClientConfig(&tls.Config{InsecureSkipVerify: setting.Webhook.SkipTLSVerify}) - - switch t.ContentType { - case JSON: - req = req.Header("Content-Type", "application/json").Body(t.PayloadContent) - case FORM: - req.Param("payload", t.PayloadContent) - } + req := httplib.Post(t.Url).SetTimeout(timeout, timeout). + Header("X-Gogs-Delivery", t.Uuid). + Header("X-Gogs-Event", string(t.EventType)). + SetTLSClientConfig(&tls.Config{InsecureSkipVerify: setting.Webhook.SkipTLSVerify}) - t.IsDelivered = true + switch t.ContentType { + case JSON: + req = req.Header("Content-Type", "application/json").Body(t.PayloadContent) + case FORM: + req.Param("payload", t.PayloadContent) + } - // FIXME: record response. - switch t.Type { - case GOGS: - { - if _, err := req.Response(); err != nil { - log.Error(5, "Delivery: %v", err) + t.IsDelivered = true + + // FIXME: record response. + switch t.Type { + case GOGS: + { + if resp, err := req.Response(); err != nil { + log.Error(5, "Delivery: %v", err) + } else { + resp.Body.Close() + t.IsSucceed = true + } + } + case SLACK: + { + if resp, err := req.Response(); err != nil { + log.Error(5, "Delivery: %v", err) + } else { + defer resp.Body.Close() + contents, err := ioutil.ReadAll(resp.Body) + if err != nil { + log.Error(5, "%s", err) + } else { + if string(contents) != "ok" { + log.Error(5, "slack failed with: %s", string(contents)) } else { t.IsSucceed = true } } - case SLACK: - { - if res, err := req.Response(); err != nil { - log.Error(5, "Delivery: %v", err) - } else { - defer res.Body.Close() - contents, err := ioutil.ReadAll(res.Body) - if err != nil { - log.Error(5, "%s", err) - } else { - if string(contents) != "ok" { - log.Error(5, "slack failed with: %s", string(contents)) - } else { - t.IsSucceed = true - } - } - } - } } + } + } - tasks = append(tasks, t) + t.Delivered = time.Now().UTC().UnixNano() + if t.IsSucceed { + log.Trace("Hook delivered(%s): %s", t.Uuid, t.PayloadContent) + } +} - if t.IsSucceed { - log.Trace("Hook delivered(%s): %s", t.Uuid, t.PayloadContent) - } +// DeliverHooks checks and delivers undelivered hooks. +func DeliverHooks() { + tasks := make([]*HookTask, 0, 10) + x.Where("is_delivered=?", false).Iterate(new(HookTask), + func(idx int, bean interface{}) error { + t := bean.(*HookTask) + deliverHook(t) + tasks = append(tasks, t) return nil }) // Update hook task status. for _, t := range tasks { if err := UpdateHookTask(t); err != nil { - log.Error(4, "UpdateHookTask(%d): %v", t.Id, err) + log.Error(4, "UpdateHookTask(%d): %v", t.ID, err) + } + } + + HookQueue = &hookQueue{ + lock: sync.Mutex{}, + repoIDs: make(map[int64]bool), + queue: make(chan int64, setting.Webhook.QueueLength), + } + + // Start listening on new hook requests. + for repoID := range HookQueue.queue { + HookQueue.removeRepoID(repoID) + + tasks = make([]*HookTask, 0, 5) + if err := x.Where("repo_id=? AND is_delivered=?", repoID, false).Find(&tasks); err != nil { + log.Error(4, "Get repository(%d) hook tasks: %v", repoID, err) + continue + } + for _, t := range tasks { + deliverHook(t) + if err := UpdateHookTask(t); err != nil { + log.Error(4, "UpdateHookTask(%d): %v", t.ID, err) + } } } } + +func InitDeliverHooks() { + go DeliverHooks() +} |