summaryrefslogtreecommitdiffstats
path: root/models/webhook.go
diff options
context:
space:
mode:
Diffstat (limited to 'models/webhook.go')
-rw-r--r--models/webhook.go75
1 files changed, 46 insertions, 29 deletions
diff --git a/models/webhook.go b/models/webhook.go
index 75380d17c1..7b6d7826e7 100644
--- a/models/webhook.go
+++ b/models/webhook.go
@@ -13,6 +13,7 @@ import (
"sync"
"time"
+ "github.com/Unknwon/com"
"github.com/go-xorm/xorm"
api "github.com/gogits/go-gogs-client"
@@ -435,39 +436,58 @@ func PrepareWebhooks(repo *Repository, event HookEventType, p api.Payloader) err
return nil
}
-type hookQueue struct {
- // Make sure one repository only occur once in the queue.
- lock sync.Mutex
- repoIDs map[int64]bool
+// UniqueQueue represents a queue that guarantees only one instance of same ID is in the line.
+type UniqueQueue struct {
+ lock sync.Mutex
+ ids map[string]bool
- queue chan int64
+ queue chan string
}
-func (q *hookQueue) removeRepoID(id int64) {
+func (q *UniqueQueue) Queue() <-chan string {
+ return q.queue
+}
+
+func NewUniqueQueue(queueLength int) *UniqueQueue {
+ if queueLength <= 0 {
+ queueLength = 100
+ }
+
+ return &UniqueQueue{
+ ids: make(map[string]bool),
+ queue: make(chan string, queueLength),
+ }
+}
+
+func (q *UniqueQueue) Remove(id interface{}) {
q.lock.Lock()
defer q.lock.Unlock()
- delete(q.repoIDs, id)
+ delete(q.ids, com.ToStr(id))
}
-func (q *hookQueue) addRepoID(id int64) {
- q.lock.Lock()
- if q.repoIDs[id] {
- q.lock.Unlock()
+func (q *UniqueQueue) Add(id interface{}) {
+ newid := com.ToStr(id)
+
+ if q.Exist(id) {
return
}
- q.repoIDs[id] = true
+
+ q.lock.Lock()
+ q.ids[newid] = true
q.lock.Unlock()
- q.queue <- id
+ q.queue <- newid
}
-// AddRepoID adds repository ID to hook delivery queue.
-func (q *hookQueue) AddRepoID(id int64) {
- go q.addRepoID(id)
+func (q *UniqueQueue) Exist(id interface{}) bool {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ return q.ids[com.ToStr(id)]
}
-var HookQueue *hookQueue
+var HookQueue = NewUniqueQueue(setting.Webhook.QueueLength)
-func deliverHook(t *HookTask) {
+func (t *HookTask) deliver() {
t.IsDelivered = true
timeout := time.Duration(setting.Webhook.DeliverTimeout) * time.Second
@@ -549,12 +569,13 @@ func deliverHook(t *HookTask) {
}
// DeliverHooks checks and delivers undelivered hooks.
+// TODO: shoot more hooks at same time.
func DeliverHooks() {
tasks := make([]*HookTask, 0, 10)
x.Where("is_delivered=?", false).Iterate(new(HookTask),
func(idx int, bean interface{}) error {
t := bean.(*HookTask)
- deliverHook(t)
+ t.deliver()
tasks = append(tasks, t)
return nil
})
@@ -566,15 +587,10 @@ func DeliverHooks() {
}
}
- HookQueue = &hookQueue{
- lock: sync.Mutex{},
- repoIDs: make(map[int64]bool),
- queue: make(chan int64, setting.Webhook.QueueLength),
- }
-
// Start listening on new hook requests.
- for repoID := range HookQueue.queue {
- HookQueue.removeRepoID(repoID)
+ for repoID := range HookQueue.Queue() {
+ log.Trace("DeliverHooks[%v]: processing delivery hooks", repoID)
+ HookQueue.Remove(repoID)
tasks = make([]*HookTask, 0, 5)
if err := x.Where("repo_id=? AND is_delivered=?", repoID, false).Find(&tasks); err != nil {
@@ -582,9 +598,10 @@ func DeliverHooks() {
continue
}
for _, t := range tasks {
- deliverHook(t)
+ t.deliver()
if err := UpdateHookTask(t); err != nil {
- log.Error(4, "UpdateHookTask(%d): %v", t.ID, err)
+ log.Error(4, "UpdateHookTask[%d]: %v", t.ID, err)
+ continue
}
}
}