diff options
Diffstat (limited to 'models/webhook.go')
-rw-r--r-- | models/webhook.go | 75 |
1 files changed, 46 insertions, 29 deletions
diff --git a/models/webhook.go b/models/webhook.go index 75380d17c1..7b6d7826e7 100644 --- a/models/webhook.go +++ b/models/webhook.go @@ -13,6 +13,7 @@ import ( "sync" "time" + "github.com/Unknwon/com" "github.com/go-xorm/xorm" api "github.com/gogits/go-gogs-client" @@ -435,39 +436,58 @@ func PrepareWebhooks(repo *Repository, event HookEventType, p api.Payloader) err return nil } -type hookQueue struct { - // Make sure one repository only occur once in the queue. - lock sync.Mutex - repoIDs map[int64]bool +// UniqueQueue represents a queue that guarantees only one instance of same ID is in the line. +type UniqueQueue struct { + lock sync.Mutex + ids map[string]bool - queue chan int64 + queue chan string } -func (q *hookQueue) removeRepoID(id int64) { +func (q *UniqueQueue) Queue() <-chan string { + return q.queue +} + +func NewUniqueQueue(queueLength int) *UniqueQueue { + if queueLength <= 0 { + queueLength = 100 + } + + return &UniqueQueue{ + ids: make(map[string]bool), + queue: make(chan string, queueLength), + } +} + +func (q *UniqueQueue) Remove(id interface{}) { q.lock.Lock() defer q.lock.Unlock() - delete(q.repoIDs, id) + delete(q.ids, com.ToStr(id)) } -func (q *hookQueue) addRepoID(id int64) { - q.lock.Lock() - if q.repoIDs[id] { - q.lock.Unlock() +func (q *UniqueQueue) Add(id interface{}) { + newid := com.ToStr(id) + + if q.Exist(id) { return } - q.repoIDs[id] = true + + q.lock.Lock() + q.ids[newid] = true q.lock.Unlock() - q.queue <- id + q.queue <- newid } -// AddRepoID adds repository ID to hook delivery queue. -func (q *hookQueue) AddRepoID(id int64) { - go q.addRepoID(id) +func (q *UniqueQueue) Exist(id interface{}) bool { + q.lock.Lock() + defer q.lock.Unlock() + + return q.ids[com.ToStr(id)] } -var HookQueue *hookQueue +var HookQueue = NewUniqueQueue(setting.Webhook.QueueLength) -func deliverHook(t *HookTask) { +func (t *HookTask) deliver() { t.IsDelivered = true timeout := time.Duration(setting.Webhook.DeliverTimeout) * time.Second @@ -549,12 +569,13 @@ func deliverHook(t *HookTask) { } // DeliverHooks checks and delivers undelivered hooks. +// TODO: shoot more hooks at same time. func DeliverHooks() { tasks := make([]*HookTask, 0, 10) x.Where("is_delivered=?", false).Iterate(new(HookTask), func(idx int, bean interface{}) error { t := bean.(*HookTask) - deliverHook(t) + t.deliver() tasks = append(tasks, t) return nil }) @@ -566,15 +587,10 @@ func DeliverHooks() { } } - HookQueue = &hookQueue{ - lock: sync.Mutex{}, - repoIDs: make(map[int64]bool), - queue: make(chan int64, setting.Webhook.QueueLength), - } - // Start listening on new hook requests. - for repoID := range HookQueue.queue { - HookQueue.removeRepoID(repoID) + for repoID := range HookQueue.Queue() { + log.Trace("DeliverHooks[%v]: processing delivery hooks", repoID) + HookQueue.Remove(repoID) tasks = make([]*HookTask, 0, 5) if err := x.Where("repo_id=? AND is_delivered=?", repoID, false).Find(&tasks); err != nil { @@ -582,9 +598,10 @@ func DeliverHooks() { continue } for _, t := range tasks { - deliverHook(t) + t.deliver() if err := UpdateHookTask(t); err != nil { - log.Error(4, "UpdateHookTask(%d): %v", t.ID, err) + log.Error(4, "UpdateHookTask[%d]: %v", t.ID, err) + continue } } } |