diff options
author | zeripath <art27@cantab.net> | 2020-01-29 01:01:06 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-28 20:01:06 -0500 |
commit | c01221e70fc71f5bcff5f699095fbcbfc1e2b4a3 (patch) | |
tree | 4017848a786da2080e9a003a77bd40bd81625680 /modules/queue/setting.go | |
parent | 7c84dbca4f0f79dc90752105800a6964693283bd (diff) | |
download | gitea-c01221e70fc71f5bcff5f699095fbcbfc1e2b4a3.tar.gz gitea-c01221e70fc71f5bcff5f699095fbcbfc1e2b4a3.zip |
Queue: Make WorkerPools and Queues flushable (#10001)
* Make WorkerPools and Queues flushable
Adds Flush methods to Queues and the WorkerPool
Further abstracts the WorkerPool
Adds a final step to Flush the queues in the defer from PrintCurrentTest
Fixes an issue with Settings inheritance in queues
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Change to for loop
* Add IsEmpty and begin just making the queues composed WorkerPools
* subsume workerpool into the queues and create a flushable interface
* Add manager command
* Move flushall to queue.Manager and add to testlogger
* As per @guillep2k
* as per @guillep2k
* Just make queues all implement flushable and clean up the wrapped queue flushes
* cope with no timeout
Co-authored-by: Lauris BH <lauris@nix.lv>
Diffstat (limited to 'modules/queue/setting.go')
-rw-r--r-- | modules/queue/setting.go | 24 |
1 files changed, 16 insertions, 8 deletions
diff --git a/modules/queue/setting.go b/modules/queue/setting.go index d5a6b41882..8760c09ae8 100644 --- a/modules/queue/setting.go +++ b/modules/queue/setting.go @@ -24,8 +24,7 @@ func validType(t string) (Type, error) { return PersistableChannelQueueType, fmt.Errorf("Unknown queue type: %s defaulting to %s", t, string(PersistableChannelQueueType)) } -// CreateQueue for name with provided handler and exemplar -func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue { +func getQueueSettings(name string) (setting.QueueSettings, []byte) { q := setting.GetQueueSettings(name) opts := make(map[string]interface{}) opts["Name"] = name @@ -43,24 +42,33 @@ func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue { opts["BoostTimeout"] = q.BoostTimeout opts["BoostWorkers"] = q.BoostWorkers - typ, err := validType(q.Type) - if err != nil { - log.Error("Invalid type %s provided for queue named %s defaulting to %s", q.Type, name, string(typ)) - } - cfg, err := json.Marshal(opts) if err != nil { log.Error("Unable to marshall generic options: %v Error: %v", opts, err) log.Error("Unable to create queue for %s", name, err) + return q, []byte{} + } + return q, cfg +} + +// CreateQueue for name with provided handler and exemplar +func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue { + q, cfg := getQueueSettings(name) + if len(cfg) == 0 { return nil } + typ, err := validType(q.Type) + if err != nil { + log.Error("Invalid type %s provided for queue named %s defaulting to %s", q.Type, name, string(typ)) + } + returnable, err := NewQueue(typ, handle, cfg, exemplar) if q.WrapIfNecessary && err != nil { log.Warn("Unable to create queue for %s: %v", name, err) log.Warn("Attempting to create wrapped queue") returnable, err = NewQueue(WrappedQueueType, handle, WrappedQueueConfiguration{ - Underlying: Type(q.Type), + Underlying: typ, Timeout: q.Timeout, MaxAttempts: q.MaxAttempts, Config: cfg, |