aboutsummaryrefslogtreecommitdiffstats
path: root/modules/queue/setting.go
diff options
context:
space:
mode:
authorzeripath <art27@cantab.net>2020-02-02 23:19:58 +0000
committerGitHub <noreply@github.com>2020-02-02 23:19:58 +0000
commit2c903383b5154795b90e4b4ed8eaadc6fac17a13 (patch)
treed5ca361d9597e027ad92f1e02a841be1d266b554 /modules/queue/setting.go
parentb4914249ee389a733e7dcfd2df20708ab3215827 (diff)
downloadgitea-2c903383b5154795b90e4b4ed8eaadc6fac17a13.tar.gz
gitea-2c903383b5154795b90e4b4ed8eaadc6fac17a13.zip
Add Unique Queue infrastructure and move TestPullRequests to this (#9856)
* Upgrade levelqueue to version 0.2.0 This adds functionality for Unique Queues * Add UniqueQueue interface and functions to create them * Add UniqueQueue implementations * Move TestPullRequests over to use UniqueQueue * Reduce code duplication * Add bytefifos * Ensure invalid types are logged * Fix close race in PersistableChannelQueue Shutdown
Diffstat (limited to 'modules/queue/setting.go')
-rw-r--r--modules/queue/setting.go40
1 files changed, 40 insertions, 0 deletions
diff --git a/modules/queue/setting.go b/modules/queue/setting.go
index 8760c09ae8..c47e85f756 100644
--- a/modules/queue/setting.go
+++ b/modules/queue/setting.go
@@ -7,6 +7,7 @@ package queue
import (
"encoding/json"
"fmt"
+ "strings"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
@@ -36,6 +37,7 @@ func getQueueSettings(name string) (setting.QueueSettings, []byte) {
opts["Password"] = q.Password
opts["DBIndex"] = q.DBIndex
opts["QueueName"] = q.QueueName
+ opts["SetName"] = q.SetName
opts["Workers"] = q.Workers
opts["MaxWorkers"] = q.MaxWorkers
opts["BlockTimeout"] = q.BlockTimeout
@@ -81,3 +83,41 @@ func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue {
}
return returnable
}
+
+// CreateUniqueQueue for name with provided handler and exemplar
+func CreateUniqueQueue(name string, handle HandlerFunc, exemplar interface{}) UniqueQueue {
+ q, cfg := getQueueSettings(name)
+ if len(cfg) == 0 {
+ return nil
+ }
+
+ if len(q.Type) > 0 && q.Type != "dummy" && !strings.HasPrefix(q.Type, "unique-") {
+ q.Type = "unique-" + q.Type
+ }
+
+ typ, err := validType(q.Type)
+ if err != nil || typ == PersistableChannelQueueType {
+ typ = PersistableChannelUniqueQueueType
+ if err != nil {
+ log.Error("Invalid type %s provided for queue named %s defaulting to %s", q.Type, name, string(typ))
+ }
+ }
+
+ returnable, err := NewQueue(typ, handle, cfg, exemplar)
+ if q.WrapIfNecessary && err != nil {
+ log.Warn("Unable to create unique queue for %s: %v", name, err)
+ log.Warn("Attempting to create wrapped queue")
+ returnable, err = NewQueue(WrappedUniqueQueueType, handle, WrappedUniqueQueueConfiguration{
+ Underlying: typ,
+ Timeout: q.Timeout,
+ MaxAttempts: q.MaxAttempts,
+ Config: cfg,
+ QueueLength: q.Length,
+ }, exemplar)
+ }
+ if err != nil {
+ log.Error("Unable to create unique queue for %s: %v", name, err)
+ return nil
+ }
+ return returnable.(UniqueQueue)
+}