diff options
author | zeripath <art27@cantab.net> | 2020-01-29 01:01:06 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-28 20:01:06 -0500 |
commit | c01221e70fc71f5bcff5f699095fbcbfc1e2b4a3 (patch) | |
tree | 4017848a786da2080e9a003a77bd40bd81625680 /modules/queue/queue_channel.go | |
parent | 7c84dbca4f0f79dc90752105800a6964693283bd (diff) | |
download | gitea-c01221e70fc71f5bcff5f699095fbcbfc1e2b4a3.tar.gz gitea-c01221e70fc71f5bcff5f699095fbcbfc1e2b4a3.zip |
Queue: Make WorkerPools and Queues flushable (#10001)
* Make WorkerPools and Queues flushable
Adds Flush methods to Queues and the WorkerPool
Further abstracts the WorkerPool
Adds a final step to Flush the queues in the defer from PrintCurrentTest
Fixes an issue with Settings inheritance in queues
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Change to for loop
* Add IsEmpty and begin just making the queues composed WorkerPools
* subsume workerpool into the queues and create a flushable interface
* Add manager command
* Move flushall to queue.Manager and add to testlogger
* As per @guillep2k
* as per @guillep2k
* Just make queues all implement flushable and clean up the wrapped queue flushes
* cope with no timeout
Co-authored-by: Lauris BH <lauris@nix.lv>
Diffstat (limited to 'modules/queue/queue_channel.go')
-rw-r--r-- | modules/queue/queue_channel.go | 59 |
1 files changed, 19 insertions, 40 deletions
diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go index c8f8a53804..45df8a443e 100644 --- a/modules/queue/queue_channel.go +++ b/modules/queue/queue_channel.go @@ -7,8 +7,6 @@ package queue import ( "context" "fmt" - "reflect" - "time" "code.gitea.io/gitea/modules/log" ) @@ -18,25 +16,23 @@ const ChannelQueueType Type = "channel" // ChannelQueueConfiguration is the configuration for a ChannelQueue type ChannelQueueConfiguration struct { - QueueLength int - BatchLength int - Workers int - MaxWorkers int - BlockTimeout time.Duration - BoostTimeout time.Duration - BoostWorkers int - Name string + WorkerPoolConfiguration + Workers int + Name string } -// ChannelQueue implements +// ChannelQueue implements Queue +// +// A channel queue is not persistable and does not shutdown or terminate cleanly +// It is basically a very thin wrapper around a WorkerPool type ChannelQueue struct { - pool *WorkerPool + *WorkerPool exemplar interface{} workers int name string } -// NewChannelQueue create a memory channel queue +// NewChannelQueue creates a memory channel queue func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) { configInterface, err := toConfig(ChannelQueueConfiguration{}, cfg) if err != nil { @@ -46,26 +42,13 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro if config.BatchLength == 0 { config.BatchLength = 1 } - dataChan := make(chan Data, config.QueueLength) - - ctx, cancel := context.WithCancel(context.Background()) queue := &ChannelQueue{ - pool: &WorkerPool{ - baseCtx: ctx, - cancel: cancel, - batchLength: config.BatchLength, - handle: handle, - dataChan: dataChan, - blockTimeout: config.BlockTimeout, - boostTimeout: config.BoostTimeout, - boostWorkers: config.BoostWorkers, - maxNumberOfWorkers: config.MaxWorkers, - }, - exemplar: exemplar, - workers: config.Workers, - name: config.Name, + WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), + exemplar: exemplar, + workers: config.Workers, + name: config.Name, } - queue.pool.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar, queue.pool) + queue.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar) return queue, nil } @@ -77,22 +60,18 @@ func (c *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func()) atTerminate(context.Background(), func() { log.Warn("ChannelQueue: %s is not terminatable!", c.name) }) + log.Debug("ChannelQueue: %s Starting", c.name) go func() { - _ = c.pool.AddWorkers(c.workers, 0) + _ = c.AddWorkers(c.workers, 0) }() } // Push will push data into the queue func (c *ChannelQueue) Push(data Data) error { - if c.exemplar != nil { - // Assert data is of same type as r.exemplar - t := reflect.TypeOf(data) - exemplarType := reflect.TypeOf(c.exemplar) - if !t.AssignableTo(exemplarType) || data == nil { - return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, c.exemplar, c.name) - } + if !assignableTo(data, c.exemplar) { + return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, c.exemplar, c.name) } - c.pool.Push(data) + c.WorkerPool.Push(data) return nil } |