diff options
-rw-r--r-- | modules/sync/unique_queue.go | 104 | ||||
-rw-r--r-- | routers/api/v1/repo/main_test.go | 5 | ||||
-rw-r--r-- | routers/init.go | 2 | ||||
-rw-r--r-- | services/webhook/deliver.go | 59 | ||||
-rw-r--r-- | services/webhook/main_test.go | 4 | ||||
-rw-r--r-- | services/webhook/webhook.go | 49 |
6 files changed, 66 insertions, 157 deletions
diff --git a/modules/sync/unique_queue.go b/modules/sync/unique_queue.go deleted file mode 100644 index df115d7c96..0000000000 --- a/modules/sync/unique_queue.go +++ /dev/null @@ -1,104 +0,0 @@ -// Copyright 2016 The Gogs Authors. All rights reserved. -// Copyright 2019 The Gitea Authors. All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package sync - -// UniqueQueue is a queue which guarantees only one instance of same -// identity is in the line. Instances with same identity will be -// discarded if there is already one in the line. -// -// This queue is particularly useful for preventing duplicated task -// of same purpose. -type UniqueQueue struct { - table *StatusTable - queue chan string - closed chan struct{} -} - -// NewUniqueQueue initializes and returns a new UniqueQueue object. -func NewUniqueQueue(queueLength int) *UniqueQueue { - if queueLength <= 0 { - queueLength = 100 - } - - return &UniqueQueue{ - table: NewStatusTable(), - queue: make(chan string, queueLength), - closed: make(chan struct{}), - } -} - -// Close closes this queue -func (q *UniqueQueue) Close() { - select { - case <-q.closed: - default: - q.table.lock.Lock() - select { - case <-q.closed: - default: - close(q.closed) - } - q.table.lock.Unlock() - } -} - -// IsClosed returns a channel that is closed when this Queue is closed -func (q *UniqueQueue) IsClosed() <-chan struct{} { - return q.closed -} - -// IDs returns the current ids in the pool -func (q *UniqueQueue) IDs() []string { - q.table.lock.Lock() - defer q.table.lock.Unlock() - ids := make([]string, 0, len(q.table.pool)) - for id := range q.table.pool { - ids = append(ids, id) - } - return ids -} - -// Queue returns channel of queue for retrieving instances. -func (q *UniqueQueue) Queue() <-chan string { - return q.queue -} - -// Exist returns true if there is an instance with given identity -// exists in the queue. -func (q *UniqueQueue) Exist(id string) bool { - return q.table.IsRunning(id) -} - -// AddFunc adds new instance to the queue with a custom runnable function, -// the queue is blocked until the function exits. -func (q *UniqueQueue) AddFunc(id string, fn func()) { - q.table.lock.Lock() - if _, ok := q.table.pool[id]; ok { - q.table.lock.Unlock() - return - } - q.table.pool[id] = struct{}{} - if fn != nil { - fn() - } - q.table.lock.Unlock() - select { - case <-q.closed: - return - case q.queue <- id: - return - } -} - -// Add adds new instance to the queue. -func (q *UniqueQueue) Add(id string) { - q.AddFunc(id, nil) -} - -// Remove removes instance from the queue. -func (q *UniqueQueue) Remove(id string) { - q.table.Stop(id) -} diff --git a/routers/api/v1/repo/main_test.go b/routers/api/v1/repo/main_test.go index 19e524d014..1f91a24937 100644 --- a/routers/api/v1/repo/main_test.go +++ b/routers/api/v1/repo/main_test.go @@ -9,10 +9,15 @@ import ( "testing" "code.gitea.io/gitea/models/unittest" + "code.gitea.io/gitea/modules/setting" + webhook_service "code.gitea.io/gitea/services/webhook" ) func TestMain(m *testing.M) { + setting.LoadForTest() + setting.NewQueueService() unittest.MainTest(m, &unittest.TestOptions{ GiteaRootPath: filepath.Join("..", "..", "..", ".."), + SetUp: webhook_service.Init, }) } diff --git a/routers/init.go b/routers/init.go index 88c393736e..403fab00cd 100644 --- a/routers/init.go +++ b/routers/init.go @@ -145,7 +145,7 @@ func GlobalInitInstalled(ctx context.Context) { mustInit(stats_indexer.Init) mirror_service.InitSyncMirrors() - webhook.InitDeliverHooks() + mustInit(webhook.Init) mustInit(pull_service.Init) mustInit(task.Init) mustInit(repo_migrations.Init) diff --git a/services/webhook/deliver.go b/services/webhook/deliver.go index 7998be53c2..77744473f1 100644 --- a/services/webhook/deliver.go +++ b/services/webhook/deliver.go @@ -15,7 +15,6 @@ import ( "io" "net/http" "net/url" - "strconv" "strings" "sync" "time" @@ -26,6 +25,7 @@ import ( "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/process" "code.gitea.io/gitea/modules/proxy" + "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" "github.com/gobwas/glob" @@ -202,10 +202,8 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { return nil } -// DeliverHooks checks and delivers undelivered hooks. -// FIXME: graceful: This would likely benefit from either a worker pool with dummy queue -// or a full queue. Then more hooks could be sent at same time. -func DeliverHooks(ctx context.Context) { +// populateDeliverHooks checks and delivers undelivered hooks. +func populateDeliverHooks(ctx context.Context) { select { case <-ctx.Done(): return @@ -226,42 +224,9 @@ func DeliverHooks(ctx context.Context) { return default: } - if err = Deliver(ctx, t); err != nil { - log.Error("deliver: %v", err) - } - } - - // Start listening on new hook requests. - for { - select { - case <-ctx.Done(): - hookQueue.Close() - return - case repoIDStr := <-hookQueue.Queue(): - log.Trace("DeliverHooks [repo_id: %v]", repoIDStr) - hookQueue.Remove(repoIDStr) - - repoID, err := strconv.ParseInt(repoIDStr, 10, 64) - if err != nil { - log.Error("Invalid repo ID: %s", repoIDStr) - continue - } - tasks, err := webhook_model.FindRepoUndeliveredHookTasks(repoID) - if err != nil { - log.Error("Get repository [%d] hook tasks: %v", repoID, err) - continue - } - for _, t := range tasks { - select { - case <-ctx.Done(): - return - default: - } - if err = Deliver(ctx, t); err != nil { - log.Error("deliver: %v", err) - } - } + if err := addToTask(t.RepoID); err != nil { + log.Error("DeliverHook failed [%d]: %v", t.RepoID, err) } } } @@ -297,8 +262,8 @@ func webhookProxy() func(req *http.Request) (*url.URL, error) { } } -// InitDeliverHooks starts the hooks delivery thread -func InitDeliverHooks() { +// Init starts the hooks delivery thread +func Init() error { timeout := time.Duration(setting.Webhook.DeliverTimeout) * time.Second allowedHostListValue := setting.Webhook.AllowedHostList @@ -316,5 +281,13 @@ func InitDeliverHooks() { }, } - go graceful.GetManager().RunWithShutdownContext(DeliverHooks) + hookQueue = queue.CreateUniqueQueue("webhook_sender", handle, "") + if hookQueue == nil { + return fmt.Errorf("Unable to create webhook_sender Queue") + } + go graceful.GetManager().RunWithShutdownFns(hookQueue.Run) + + populateDeliverHooks(graceful.GetManager().HammerContext()) + + return nil } diff --git a/services/webhook/main_test.go b/services/webhook/main_test.go index 25b9df0af6..1dc2e1bd83 100644 --- a/services/webhook/main_test.go +++ b/services/webhook/main_test.go @@ -9,12 +9,16 @@ import ( "testing" "code.gitea.io/gitea/models/unittest" + "code.gitea.io/gitea/modules/setting" _ "code.gitea.io/gitea/models" ) func TestMain(m *testing.M) { + setting.LoadForTest() + setting.NewQueueService() unittest.MainTest(m, &unittest.TestOptions{ GiteaRootPath: filepath.Join("..", ".."), + SetUp: Init, }) } diff --git a/services/webhook/webhook.go b/services/webhook/webhook.go index a3efc7535f..b15b8173f5 100644 --- a/services/webhook/webhook.go +++ b/services/webhook/webhook.go @@ -12,10 +12,11 @@ import ( repo_model "code.gitea.io/gitea/models/repo" webhook_model "code.gitea.io/gitea/models/webhook" "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" api "code.gitea.io/gitea/modules/structs" - "code.gitea.io/gitea/modules/sync" "code.gitea.io/gitea/modules/util" "github.com/gobwas/glob" @@ -80,7 +81,7 @@ func IsValidHookTaskType(name string) bool { } // hookQueue is a global queue of web hooks -var hookQueue = sync.NewUniqueQueue(setting.Webhook.QueueLength) +var hookQueue queue.UniqueQueue // getPayloadBranch returns branch for hook event, if applicable. func getPayloadBranch(p api.Payloader) string { @@ -101,14 +102,47 @@ func getPayloadBranch(p api.Payloader) string { return "" } +// handle passed PR IDs and test the PRs +func handle(data ...queue.Data) []queue.Data { + for _, datum := range data { + repoIDStr := datum.(string) + log.Trace("DeliverHooks [repo_id: %v]", repoIDStr) + + repoID, err := strconv.ParseInt(repoIDStr, 10, 64) + if err != nil { + log.Error("Invalid repo ID: %s", repoIDStr) + continue + } + + tasks, err := webhook_model.FindRepoUndeliveredHookTasks(repoID) + if err != nil { + log.Error("Get repository [%d] hook tasks: %v", repoID, err) + continue + } + for _, t := range tasks { + if err = Deliver(graceful.GetManager().HammerContext(), t); err != nil { + log.Error("deliver: %v", err) + } + } + } + return nil +} + +func addToTask(repoID int64) error { + err := hookQueue.PushFunc(strconv.FormatInt(repoID, 10), nil) + if err != nil && err != queue.ErrAlreadyInQueue { + return err + } + return nil +} + // PrepareWebhook adds special webhook to task queue for given payload. func PrepareWebhook(w *webhook_model.Webhook, repo *repo_model.Repository, event webhook_model.HookEventType, p api.Payloader) error { if err := prepareWebhook(w, repo, event, p); err != nil { return err } - go hookQueue.Add(strconv.FormatInt(repo.ID, 10)) - return nil + return addToTask(repo.ID) } func checkBranch(w *webhook_model.Webhook, branch string) bool { @@ -188,8 +222,7 @@ func PrepareWebhooks(repo *repo_model.Repository, event webhook_model.HookEventT return err } - go hookQueue.Add(strconv.FormatInt(repo.ID, 10)) - return nil + return addToTask(repo.ID) } func prepareWebhooks(repo *repo_model.Repository, event webhook_model.HookEventType, p api.Payloader) error { @@ -240,7 +273,5 @@ func ReplayHookTask(w *webhook_model.Webhook, uuid string) error { return err } - go hookQueue.Add(strconv.FormatInt(t.RepoID, 10)) - - return nil + return addToTask(t.RepoID) } |