diff options
author | zeripath <art27@cantab.net> | 2020-02-02 23:19:58 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-02 23:19:58 +0000 |
commit | 2c903383b5154795b90e4b4ed8eaadc6fac17a13 (patch) | |
tree | d5ca361d9597e027ad92f1e02a841be1d266b554 /modules/queue/queue_bytefifo.go | |
parent | b4914249ee389a733e7dcfd2df20708ab3215827 (diff) | |
download | gitea-2c903383b5154795b90e4b4ed8eaadc6fac17a13.tar.gz gitea-2c903383b5154795b90e4b4ed8eaadc6fac17a13.zip |
Add Unique Queue infrastructure and move TestPullRequests to this (#9856)
* Upgrade levelqueue to version 0.2.0
This adds functionality for Unique Queues
* Add UniqueQueue interface and functions to create them
* Add UniqueQueue implementations
* Move TestPullRequests over to use UniqueQueue
* Reduce code duplication
* Add bytefifos
* Ensure invalid types are logged
* Fix close race in PersistableChannelQueue Shutdown
Diffstat (limited to 'modules/queue/queue_bytefifo.go')
-rw-r--r-- | modules/queue/queue_bytefifo.go | 227 |
1 files changed, 227 insertions, 0 deletions
diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go new file mode 100644 index 0000000000..cad258bda8 --- /dev/null +++ b/modules/queue/queue_bytefifo.go @@ -0,0 +1,227 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "time" + + "code.gitea.io/gitea/modules/log" +) + +// ByteFIFOQueueConfiguration is the configuration for a ByteFIFOQueue +type ByteFIFOQueueConfiguration struct { + WorkerPoolConfiguration + Workers int + Name string +} + +var _ (Queue) = &ByteFIFOQueue{} + +// ByteFIFOQueue is a Queue formed from a ByteFIFO and WorkerPool +type ByteFIFOQueue struct { + *WorkerPool + byteFIFO ByteFIFO + typ Type + closed chan struct{} + terminated chan struct{} + exemplar interface{} + workers int + name string + lock sync.Mutex +} + +// NewByteFIFOQueue creates a new ByteFIFOQueue +func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exemplar interface{}) (*ByteFIFOQueue, error) { + configInterface, err := toConfig(ByteFIFOQueueConfiguration{}, cfg) + if err != nil { + return nil, err + } + config := configInterface.(ByteFIFOQueueConfiguration) + + return &ByteFIFOQueue{ + WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), + byteFIFO: byteFIFO, + typ: typ, + closed: make(chan struct{}), + terminated: make(chan struct{}), + exemplar: exemplar, + workers: config.Workers, + name: config.Name, + }, nil +} + +// Name returns the name of this queue +func (q *ByteFIFOQueue) Name() string { + return q.name +} + +// Push pushes data to the fifo +func (q *ByteFIFOQueue) Push(data Data) error { + return q.PushFunc(data, nil) +} + +// PushFunc pushes data to the fifo +func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error { + if !assignableTo(data, q.exemplar) { + return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name) + } + bs, err := json.Marshal(data) + if err != nil { + return err + } + return q.byteFIFO.PushFunc(bs, fn) +} + +// IsEmpty checks if the queue is empty +func (q *ByteFIFOQueue) IsEmpty() bool { + q.lock.Lock() + defer q.lock.Unlock() + if !q.WorkerPool.IsEmpty() { + return false + } + return q.byteFIFO.Len() == 0 +} + +// Run runs the bytefifo queue +func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(context.Context, func())) { + atShutdown(context.Background(), q.Shutdown) + atTerminate(context.Background(), q.Terminate) + log.Debug("%s: %s Starting", q.typ, q.name) + + go func() { + _ = q.AddWorkers(q.workers, 0) + }() + + go q.readToChan() + + log.Trace("%s: %s Waiting til closed", q.typ, q.name) + <-q.closed + log.Trace("%s: %s Waiting til done", q.typ, q.name) + q.Wait() + + log.Trace("%s: %s Waiting til cleaned", q.typ, q.name) + ctx, cancel := context.WithCancel(context.Background()) + atTerminate(ctx, cancel) + q.CleanUp(ctx) + cancel() +} + +func (q *ByteFIFOQueue) readToChan() { + for { + select { + case <-q.closed: + // tell the pool to shutdown. + q.cancel() + return + default: + q.lock.Lock() + bs, err := q.byteFIFO.Pop() + if err != nil { + q.lock.Unlock() + log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err) + time.Sleep(time.Millisecond * 100) + continue + } + + if len(bs) == 0 { + q.lock.Unlock() + time.Sleep(time.Millisecond * 100) + continue + } + + data, err := unmarshalAs(bs, q.exemplar) + if err != nil { + log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err) + q.lock.Unlock() + time.Sleep(time.Millisecond * 100) + continue + } + + log.Trace("%s %s: Task found: %#v", q.typ, q.name, data) + q.WorkerPool.Push(data) + q.lock.Unlock() + } + } +} + +// Shutdown processing from this queue +func (q *ByteFIFOQueue) Shutdown() { + log.Trace("%s: %s Shutting down", q.typ, q.name) + q.lock.Lock() + select { + case <-q.closed: + default: + close(q.closed) + } + q.lock.Unlock() + log.Debug("%s: %s Shutdown", q.typ, q.name) +} + +// Terminate this queue and close the queue +func (q *ByteFIFOQueue) Terminate() { + log.Trace("%s: %s Terminating", q.typ, q.name) + q.Shutdown() + q.lock.Lock() + select { + case <-q.terminated: + q.lock.Unlock() + return + default: + } + close(q.terminated) + q.lock.Unlock() + if log.IsDebug() { + log.Debug("%s: %s Closing with %d tasks left in queue", q.typ, q.name, q.byteFIFO.Len()) + } + if err := q.byteFIFO.Close(); err != nil { + log.Error("Error whilst closing internal byte fifo in %s: %s: %v", q.typ, q.name, err) + } + log.Debug("%s: %s Terminated", q.typ, q.name) +} + +var _ (UniqueQueue) = &ByteFIFOUniqueQueue{} + +// ByteFIFOUniqueQueue represents a UniqueQueue formed from a UniqueByteFifo +type ByteFIFOUniqueQueue struct { + ByteFIFOQueue +} + +// NewByteFIFOUniqueQueue creates a new ByteFIFOUniqueQueue +func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFunc, cfg, exemplar interface{}) (*ByteFIFOUniqueQueue, error) { + configInterface, err := toConfig(ByteFIFOQueueConfiguration{}, cfg) + if err != nil { + return nil, err + } + config := configInterface.(ByteFIFOQueueConfiguration) + + return &ByteFIFOUniqueQueue{ + ByteFIFOQueue: ByteFIFOQueue{ + WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), + byteFIFO: byteFIFO, + typ: typ, + closed: make(chan struct{}), + terminated: make(chan struct{}), + exemplar: exemplar, + workers: config.Workers, + name: config.Name, + }, + }, nil +} + +// Has checks if the provided data is in the queue +func (q *ByteFIFOUniqueQueue) Has(data Data) (bool, error) { + if !assignableTo(data, q.exemplar) { + return false, fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name) + } + bs, err := json.Marshal(data) + if err != nil { + return false, err + } + return q.byteFIFO.(UniqueByteFIFO).Has(bs) +} |