diff options
author | zeripath <art27@cantab.net> | 2020-01-29 01:01:06 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-28 20:01:06 -0500 |
commit | c01221e70fc71f5bcff5f699095fbcbfc1e2b4a3 (patch) | |
tree | 4017848a786da2080e9a003a77bd40bd81625680 /modules/queue/queue.go | |
parent | 7c84dbca4f0f79dc90752105800a6964693283bd (diff) | |
download | gitea-c01221e70fc71f5bcff5f699095fbcbfc1e2b4a3.tar.gz gitea-c01221e70fc71f5bcff5f699095fbcbfc1e2b4a3.zip |
Queue: Make WorkerPools and Queues flushable (#10001)
* Make WorkerPools and Queues flushable
Adds Flush methods to Queues and the WorkerPool
Further abstracts the WorkerPool
Adds a final step to Flush the queues in the defer from PrintCurrentTest
Fixes an issue with Settings inheritance in queues
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Change to for loop
* Add IsEmpty and begin just making the queues composed WorkerPools
* subsume workerpool into the queues and create a flushable interface
* Add manager command
* Move flushall to queue.Manager and add to testlogger
* As per @guillep2k
* as per @guillep2k
* Just make queues all implement flushable and clean up the wrapped queue flushes
* cope with no timeout
Co-authored-by: Lauris BH <lauris@nix.lv>
Diffstat (limited to 'modules/queue/queue.go')
-rw-r--r-- | modules/queue/queue.go | 43 |
1 files changed, 20 insertions, 23 deletions
diff --git a/modules/queue/queue.go b/modules/queue/queue.go index d458a7d506..094699d4af 100644 --- a/modules/queue/queue.go +++ b/modules/queue/queue.go @@ -6,9 +6,8 @@ package queue import ( "context" - "encoding/json" "fmt" - "reflect" + "time" ) // ErrInvalidConfiguration is called when there is invalid configuration for a queue @@ -53,8 +52,11 @@ type Named interface { Name() string } -// Queue defines an interface to save an issue indexer queue +// Queue defines an interface of a queue-like item +// +// Queues will handle their own contents in the Run method type Queue interface { + Flushable Run(atShutdown, atTerminate func(context.Context, func())) Push(Data) error } @@ -71,32 +73,27 @@ func NewDummyQueue(handler HandlerFunc, opts, exemplar interface{}) (Queue, erro type DummyQueue struct { } -// Run starts to run the queue +// Run does nothing func (b *DummyQueue) Run(_, _ func(context.Context, func())) {} -// Push pushes data to the queue +// Push fakes a push of data to the queue func (b *DummyQueue) Push(Data) error { return nil } -func toConfig(exemplar, cfg interface{}) (interface{}, error) { - if reflect.TypeOf(cfg).AssignableTo(reflect.TypeOf(exemplar)) { - return cfg, nil - } +// Flush always returns nil +func (b *DummyQueue) Flush(time.Duration) error { + return nil +} - configBytes, ok := cfg.([]byte) - if !ok { - configStr, ok := cfg.(string) - if !ok { - return nil, ErrInvalidConfiguration{cfg: cfg} - } - configBytes = []byte(configStr) - } - newVal := reflect.New(reflect.TypeOf(exemplar)) - if err := json.Unmarshal(configBytes, newVal.Interface()); err != nil { - return nil, ErrInvalidConfiguration{cfg: cfg, err: err} - } - return newVal.Elem().Interface(), nil +// FlushWithContext always returns nil +func (b *DummyQueue) FlushWithContext(context.Context) error { + return nil +} + +// IsEmpty asserts that the queue is empty +func (b *DummyQueue) IsEmpty() bool { + return true } var queuesMap = map[Type]NewQueueFunc{DummyQueueType: NewDummyQueue} @@ -123,7 +120,7 @@ func RegisteredTypesAsString() []string { return types } -// NewQueue takes a queue Type and HandlerFunc some options and possibly an exemplar and returns a Queue or an error +// NewQueue takes a queue Type, HandlerFunc, some options and possibly an exemplar and returns a Queue or an error func NewQueue(queueType Type, handlerFunc HandlerFunc, opts, exemplar interface{}) (Queue, error) { newFn, ok := queuesMap[queueType] if !ok { |