aboutsummaryrefslogtreecommitdiffstats
path: root/modules/queue/queue_bytefifo.go
diff options
context:
space:
mode:
authorzeripath <art27@cantab.net>2021-05-15 15:22:26 +0100
committerGitHub <noreply@github.com>2021-05-15 16:22:26 +0200
commitba526ceffe33a54b6015cdfbdc9bba920484dc23 (patch)
treeddd9ff13b0da7b272b5a60445a997319cb0de882 /modules/queue/queue_bytefifo.go
parent9f19c2b8cca9edf2ad7b8803e6ed72b1aea322a5 (diff)
downloadgitea-ba526ceffe33a54b6015cdfbdc9bba920484dc23.tar.gz
gitea-ba526ceffe33a54b6015cdfbdc9bba920484dc23.zip
Multiple Queue improvements: LevelDB Wait on empty, shutdown empty shadow level queue, reduce goroutines etc (#15693)
* move shutdownfns, terminatefns and hammerfns out of separate goroutines Coalesce the shutdownfns etc into a list of functions that get run at shutdown rather then have them run at goroutines blocked on selects. This may help reduce the background select/poll load in certain configurations. * The LevelDB queues can actually wait on empty instead of polling Slight refactor to cause leveldb queues to wait on empty instead of polling. * Shutdown the shadow level queue once it is empty * Remove bytefifo additional goroutine for readToChan as it can just be run in run * Remove additional removeWorkers goroutine for workers * Simplify the AtShutdown and AtTerminate functions and add Channel Flusher * Add shutdown flusher to CUQ * move persistable channel shutdown stuff to Shutdown Fn * Ensure that UPCQ has the correct config * handle shutdown during the flushing * reduce risk of race between zeroBoost and addWorkers * prevent double shutdown Signed-off-by: Andrew Thornton <art27@cantab.net>
Diffstat (limited to 'modules/queue/queue_bytefifo.go')
-rw-r--r--modules/queue/queue_bytefifo.go217
1 files changed, 133 insertions, 84 deletions
diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go
index fe1fb7807e..3ea61aad0e 100644
--- a/modules/queue/queue_bytefifo.go
+++ b/modules/queue/queue_bytefifo.go
@@ -17,8 +17,9 @@ import (
// ByteFIFOQueueConfiguration is the configuration for a ByteFIFOQueue
type ByteFIFOQueueConfiguration struct {
WorkerPoolConfiguration
- Workers int
- Name string
+ Workers int
+ Name string
+ WaitOnEmpty bool
}
var _ Queue = &ByteFIFOQueue{}
@@ -26,14 +27,18 @@ var _ Queue = &ByteFIFOQueue{}
// ByteFIFOQueue is a Queue formed from a ByteFIFO and WorkerPool
type ByteFIFOQueue struct {
*WorkerPool
- byteFIFO ByteFIFO
- typ Type
- closed chan struct{}
- terminated chan struct{}
- exemplar interface{}
- workers int
- name string
- lock sync.Mutex
+ byteFIFO ByteFIFO
+ typ Type
+ shutdownCtx context.Context
+ shutdownCtxCancel context.CancelFunc
+ terminateCtx context.Context
+ terminateCtxCancel context.CancelFunc
+ exemplar interface{}
+ workers int
+ name string
+ lock sync.Mutex
+ waitOnEmpty bool
+ pushed chan struct{}
}
// NewByteFIFOQueue creates a new ByteFIFOQueue
@@ -44,15 +49,22 @@ func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exem
}
config := configInterface.(ByteFIFOQueueConfiguration)
+ terminateCtx, terminateCtxCancel := context.WithCancel(context.Background())
+ shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
+
return &ByteFIFOQueue{
- WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
- byteFIFO: byteFIFO,
- typ: typ,
- closed: make(chan struct{}),
- terminated: make(chan struct{}),
- exemplar: exemplar,
- workers: config.Workers,
- name: config.Name,
+ WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
+ byteFIFO: byteFIFO,
+ typ: typ,
+ shutdownCtx: shutdownCtx,
+ shutdownCtxCancel: shutdownCtxCancel,
+ terminateCtx: terminateCtx,
+ terminateCtxCancel: terminateCtxCancel,
+ exemplar: exemplar,
+ workers: config.Workers,
+ name: config.Name,
+ waitOnEmpty: config.WaitOnEmpty,
+ pushed: make(chan struct{}, 1),
}, nil
}
@@ -76,7 +88,15 @@ func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error {
if err != nil {
return err
}
- return q.byteFIFO.PushFunc(bs, fn)
+ if q.waitOnEmpty {
+ defer func() {
+ select {
+ case q.pushed <- struct{}{}:
+ default:
+ }
+ }()
+ }
+ return q.byteFIFO.PushFunc(q.terminateCtx, bs, fn)
}
// IsEmpty checks if the queue is empty
@@ -86,135 +106,160 @@ func (q *ByteFIFOQueue) IsEmpty() bool {
if !q.WorkerPool.IsEmpty() {
return false
}
- return q.byteFIFO.Len() == 0
+ return q.byteFIFO.Len(q.terminateCtx) == 0
}
// Run runs the bytefifo queue
-func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
- atShutdown(context.Background(), q.Shutdown)
- atTerminate(context.Background(), q.Terminate)
+func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(func())) {
+ atShutdown(q.Shutdown)
+ atTerminate(q.Terminate)
log.Debug("%s: %s Starting", q.typ, q.name)
- go func() {
- _ = q.AddWorkers(q.workers, 0)
- }()
+ _ = q.AddWorkers(q.workers, 0)
- go q.readToChan()
+ log.Trace("%s: %s Now running", q.typ, q.name)
+ q.readToChan()
- log.Trace("%s: %s Waiting til closed", q.typ, q.name)
- <-q.closed
+ <-q.shutdownCtx.Done()
log.Trace("%s: %s Waiting til done", q.typ, q.name)
q.Wait()
log.Trace("%s: %s Waiting til cleaned", q.typ, q.name)
- ctx, cancel := context.WithCancel(context.Background())
- atTerminate(ctx, cancel)
- q.CleanUp(ctx)
- cancel()
+ q.CleanUp(q.terminateCtx)
+ q.terminateCtxCancel()
}
+const maxBackOffTime = time.Second * 3
+
func (q *ByteFIFOQueue) readToChan() {
// handle quick cancels
select {
- case <-q.closed:
+ case <-q.shutdownCtx.Done():
// tell the pool to shutdown.
- q.cancel()
+ q.baseCtxCancel()
return
default:
}
+ // Default backoff values
backOffTime := time.Millisecond * 100
- maxBackOffTime := time.Second * 3
- for {
- success, resetBackoff := q.doPop()
- if resetBackoff {
- backOffTime = 100 * time.Millisecond
- }
- if success {
+loop:
+ for {
+ err := q.doPop()
+ if err == errQueueEmpty {
+ log.Trace("%s: %s Waiting on Empty", q.typ, q.name)
select {
- case <-q.closed:
- // tell the pool to shutdown.
- q.cancel()
+ case <-q.pushed:
+ // reset backOffTime
+ backOffTime = 100 * time.Millisecond
+ continue loop
+ case <-q.shutdownCtx.Done():
+ // Oops we've been shutdown whilst waiting
+ // Make sure the worker pool is shutdown too
+ q.baseCtxCancel()
return
- default:
}
- } else {
+ }
+
+ // Reset the backOffTime if there is no error or an unmarshalError
+ if err == nil || err == errUnmarshal {
+ backOffTime = 100 * time.Millisecond
+ }
+
+ if err != nil {
+ // Need to Backoff
select {
- case <-q.closed:
- // tell the pool to shutdown.
- q.cancel()
+ case <-q.shutdownCtx.Done():
+ // Oops we've been shutdown whilst backing off
+ // Make sure the worker pool is shutdown too
+ q.baseCtxCancel()
return
case <-time.After(backOffTime):
- }
- backOffTime += backOffTime / 2
- if backOffTime > maxBackOffTime {
- backOffTime = maxBackOffTime
+ // OK we've waited - so backoff a bit
+ backOffTime += backOffTime / 2
+ if backOffTime > maxBackOffTime {
+ backOffTime = maxBackOffTime
+ }
+ continue loop
}
}
+ select {
+ case <-q.shutdownCtx.Done():
+ // Oops we've been shutdown
+ // Make sure the worker pool is shutdown too
+ q.baseCtxCancel()
+ return
+ default:
+ continue loop
+ }
}
}
-func (q *ByteFIFOQueue) doPop() (success, resetBackoff bool) {
+var errQueueEmpty = fmt.Errorf("empty queue")
+var errEmptyBytes = fmt.Errorf("empty bytes")
+var errUnmarshal = fmt.Errorf("failed to unmarshal")
+
+func (q *ByteFIFOQueue) doPop() error {
q.lock.Lock()
defer q.lock.Unlock()
- bs, err := q.byteFIFO.Pop()
+ bs, err := q.byteFIFO.Pop(q.shutdownCtx)
if err != nil {
+ if err == context.Canceled {
+ q.baseCtxCancel()
+ return err
+ }
log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err)
- return
+ return err
}
if len(bs) == 0 {
- return
+ if q.waitOnEmpty && q.byteFIFO.Len(q.shutdownCtx) == 0 {
+ return errQueueEmpty
+ }
+ return errEmptyBytes
}
- resetBackoff = true
-
data, err := unmarshalAs(bs, q.exemplar)
if err != nil {
log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err)
- return
+ return errUnmarshal
}
log.Trace("%s %s: Task found: %#v", q.typ, q.name, data)
q.WorkerPool.Push(data)
- success = true
- return
+ return nil
}
// Shutdown processing from this queue
func (q *ByteFIFOQueue) Shutdown() {
log.Trace("%s: %s Shutting down", q.typ, q.name)
- q.lock.Lock()
select {
- case <-q.closed:
+ case <-q.shutdownCtx.Done():
+ return
default:
- close(q.closed)
}
- q.lock.Unlock()
+ q.shutdownCtxCancel()
log.Debug("%s: %s Shutdown", q.typ, q.name)
}
// IsShutdown returns a channel which is closed when this Queue is shutdown
func (q *ByteFIFOQueue) IsShutdown() <-chan struct{} {
- return q.closed
+ return q.shutdownCtx.Done()
}
// Terminate this queue and close the queue
func (q *ByteFIFOQueue) Terminate() {
log.Trace("%s: %s Terminating", q.typ, q.name)
q.Shutdown()
- q.lock.Lock()
select {
- case <-q.terminated:
- q.lock.Unlock()
+ case <-q.terminateCtx.Done():
return
default:
}
- close(q.terminated)
- q.lock.Unlock()
if log.IsDebug() {
- log.Debug("%s: %s Closing with %d tasks left in queue", q.typ, q.name, q.byteFIFO.Len())
+ log.Debug("%s: %s Closing with %d tasks left in queue", q.typ, q.name, q.byteFIFO.Len(q.terminateCtx))
}
+ q.terminateCtxCancel()
if err := q.byteFIFO.Close(); err != nil {
log.Error("Error whilst closing internal byte fifo in %s: %s: %v", q.typ, q.name, err)
}
@@ -223,7 +268,7 @@ func (q *ByteFIFOQueue) Terminate() {
// IsTerminated returns a channel which is closed when this Queue is terminated
func (q *ByteFIFOQueue) IsTerminated() <-chan struct{} {
- return q.terminated
+ return q.terminateCtx.Done()
}
var _ UniqueQueue = &ByteFIFOUniqueQueue{}
@@ -240,17 +285,21 @@ func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFun
return nil, err
}
config := configInterface.(ByteFIFOQueueConfiguration)
+ terminateCtx, terminateCtxCancel := context.WithCancel(context.Background())
+ shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
return &ByteFIFOUniqueQueue{
ByteFIFOQueue: ByteFIFOQueue{
- WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
- byteFIFO: byteFIFO,
- typ: typ,
- closed: make(chan struct{}),
- terminated: make(chan struct{}),
- exemplar: exemplar,
- workers: config.Workers,
- name: config.Name,
+ WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
+ byteFIFO: byteFIFO,
+ typ: typ,
+ shutdownCtx: shutdownCtx,
+ shutdownCtxCancel: shutdownCtxCancel,
+ terminateCtx: terminateCtx,
+ terminateCtxCancel: terminateCtxCancel,
+ exemplar: exemplar,
+ workers: config.Workers,
+ name: config.Name,
},
}, nil
}
@@ -265,5 +314,5 @@ func (q *ByteFIFOUniqueQueue) Has(data Data) (bool, error) {
if err != nil {
return false, err
}
- return q.byteFIFO.(UniqueByteFIFO).Has(bs)
+ return q.byteFIFO.(UniqueByteFIFO).Has(q.terminateCtx, bs)
}