diff options
author | zeripath <art27@cantab.net> | 2020-02-02 23:19:58 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-02 23:19:58 +0000 |
commit | 2c903383b5154795b90e4b4ed8eaadc6fac17a13 (patch) | |
tree | d5ca361d9597e027ad92f1e02a841be1d266b554 /modules/queue/setting.go | |
parent | b4914249ee389a733e7dcfd2df20708ab3215827 (diff) | |
download | gitea-2c903383b5154795b90e4b4ed8eaadc6fac17a13.tar.gz gitea-2c903383b5154795b90e4b4ed8eaadc6fac17a13.zip |
Add Unique Queue infrastructure and move TestPullRequests to this (#9856)
* Upgrade levelqueue to version 0.2.0
This adds functionality for Unique Queues
* Add UniqueQueue interface and functions to create them
* Add UniqueQueue implementations
* Move TestPullRequests over to use UniqueQueue
* Reduce code duplication
* Add bytefifos
* Ensure invalid types are logged
* Fix close race in PersistableChannelQueue Shutdown
Diffstat (limited to 'modules/queue/setting.go')
-rw-r--r-- | modules/queue/setting.go | 40 |
1 files changed, 40 insertions, 0 deletions
diff --git a/modules/queue/setting.go b/modules/queue/setting.go index 8760c09ae8..c47e85f756 100644 --- a/modules/queue/setting.go +++ b/modules/queue/setting.go @@ -7,6 +7,7 @@ package queue import ( "encoding/json" "fmt" + "strings" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" @@ -36,6 +37,7 @@ func getQueueSettings(name string) (setting.QueueSettings, []byte) { opts["Password"] = q.Password opts["DBIndex"] = q.DBIndex opts["QueueName"] = q.QueueName + opts["SetName"] = q.SetName opts["Workers"] = q.Workers opts["MaxWorkers"] = q.MaxWorkers opts["BlockTimeout"] = q.BlockTimeout @@ -81,3 +83,41 @@ func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue { } return returnable } + +// CreateUniqueQueue for name with provided handler and exemplar +func CreateUniqueQueue(name string, handle HandlerFunc, exemplar interface{}) UniqueQueue { + q, cfg := getQueueSettings(name) + if len(cfg) == 0 { + return nil + } + + if len(q.Type) > 0 && q.Type != "dummy" && !strings.HasPrefix(q.Type, "unique-") { + q.Type = "unique-" + q.Type + } + + typ, err := validType(q.Type) + if err != nil || typ == PersistableChannelQueueType { + typ = PersistableChannelUniqueQueueType + if err != nil { + log.Error("Invalid type %s provided for queue named %s defaulting to %s", q.Type, name, string(typ)) + } + } + + returnable, err := NewQueue(typ, handle, cfg, exemplar) + if q.WrapIfNecessary && err != nil { + log.Warn("Unable to create unique queue for %s: %v", name, err) + log.Warn("Attempting to create wrapped queue") + returnable, err = NewQueue(WrappedUniqueQueueType, handle, WrappedUniqueQueueConfiguration{ + Underlying: typ, + Timeout: q.Timeout, + MaxAttempts: q.MaxAttempts, + Config: cfg, + QueueLength: q.Length, + }, exemplar) + } + if err != nil { + log.Error("Unable to create unique queue for %s: %v", name, err) + return nil + } + return returnable.(UniqueQueue) +} |