diff options
author | zeripath <art27@cantab.net> | 2021-05-15 15:22:26 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-05-15 16:22:26 +0200 |
commit | ba526ceffe33a54b6015cdfbdc9bba920484dc23 (patch) | |
tree | ddd9ff13b0da7b272b5a60445a997319cb0de882 /modules/queue/queue_bytefifo.go | |
parent | 9f19c2b8cca9edf2ad7b8803e6ed72b1aea322a5 (diff) | |
download | gitea-ba526ceffe33a54b6015cdfbdc9bba920484dc23.tar.gz gitea-ba526ceffe33a54b6015cdfbdc9bba920484dc23.zip |
Multiple Queue improvements: LevelDB Wait on empty, shutdown empty shadow level queue, reduce goroutines etc (#15693)
* move shutdownfns, terminatefns and hammerfns out of separate goroutines
Coalesce the shutdownfns etc into a list of functions that get run at shutdown
rather then have them run at goroutines blocked on selects.
This may help reduce the background select/poll load in certain
configurations.
* The LevelDB queues can actually wait on empty instead of polling
Slight refactor to cause leveldb queues to wait on empty instead of polling.
* Shutdown the shadow level queue once it is empty
* Remove bytefifo additional goroutine for readToChan as it can just be run in run
* Remove additional removeWorkers goroutine for workers
* Simplify the AtShutdown and AtTerminate functions and add Channel Flusher
* Add shutdown flusher to CUQ
* move persistable channel shutdown stuff to Shutdown Fn
* Ensure that UPCQ has the correct config
* handle shutdown during the flushing
* reduce risk of race between zeroBoost and addWorkers
* prevent double shutdown
Signed-off-by: Andrew Thornton <art27@cantab.net>
Diffstat (limited to 'modules/queue/queue_bytefifo.go')
-rw-r--r-- | modules/queue/queue_bytefifo.go | 217 |
1 files changed, 133 insertions, 84 deletions
diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go index fe1fb7807e..3ea61aad0e 100644 --- a/modules/queue/queue_bytefifo.go +++ b/modules/queue/queue_bytefifo.go @@ -17,8 +17,9 @@ import ( // ByteFIFOQueueConfiguration is the configuration for a ByteFIFOQueue type ByteFIFOQueueConfiguration struct { WorkerPoolConfiguration - Workers int - Name string + Workers int + Name string + WaitOnEmpty bool } var _ Queue = &ByteFIFOQueue{} @@ -26,14 +27,18 @@ var _ Queue = &ByteFIFOQueue{} // ByteFIFOQueue is a Queue formed from a ByteFIFO and WorkerPool type ByteFIFOQueue struct { *WorkerPool - byteFIFO ByteFIFO - typ Type - closed chan struct{} - terminated chan struct{} - exemplar interface{} - workers int - name string - lock sync.Mutex + byteFIFO ByteFIFO + typ Type + shutdownCtx context.Context + shutdownCtxCancel context.CancelFunc + terminateCtx context.Context + terminateCtxCancel context.CancelFunc + exemplar interface{} + workers int + name string + lock sync.Mutex + waitOnEmpty bool + pushed chan struct{} } // NewByteFIFOQueue creates a new ByteFIFOQueue @@ -44,15 +49,22 @@ func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exem } config := configInterface.(ByteFIFOQueueConfiguration) + terminateCtx, terminateCtxCancel := context.WithCancel(context.Background()) + shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx) + return &ByteFIFOQueue{ - WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), - byteFIFO: byteFIFO, - typ: typ, - closed: make(chan struct{}), - terminated: make(chan struct{}), - exemplar: exemplar, - workers: config.Workers, - name: config.Name, + WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), + byteFIFO: byteFIFO, + typ: typ, + shutdownCtx: shutdownCtx, + shutdownCtxCancel: shutdownCtxCancel, + terminateCtx: terminateCtx, + terminateCtxCancel: terminateCtxCancel, + exemplar: exemplar, + workers: config.Workers, + name: config.Name, + waitOnEmpty: config.WaitOnEmpty, + pushed: make(chan struct{}, 1), }, nil } @@ -76,7 +88,15 @@ func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error { if err != nil { return err } - return q.byteFIFO.PushFunc(bs, fn) + if q.waitOnEmpty { + defer func() { + select { + case q.pushed <- struct{}{}: + default: + } + }() + } + return q.byteFIFO.PushFunc(q.terminateCtx, bs, fn) } // IsEmpty checks if the queue is empty @@ -86,135 +106,160 @@ func (q *ByteFIFOQueue) IsEmpty() bool { if !q.WorkerPool.IsEmpty() { return false } - return q.byteFIFO.Len() == 0 + return q.byteFIFO.Len(q.terminateCtx) == 0 } // Run runs the bytefifo queue -func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(context.Context, func())) { - atShutdown(context.Background(), q.Shutdown) - atTerminate(context.Background(), q.Terminate) +func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(func())) { + atShutdown(q.Shutdown) + atTerminate(q.Terminate) log.Debug("%s: %s Starting", q.typ, q.name) - go func() { - _ = q.AddWorkers(q.workers, 0) - }() + _ = q.AddWorkers(q.workers, 0) - go q.readToChan() + log.Trace("%s: %s Now running", q.typ, q.name) + q.readToChan() - log.Trace("%s: %s Waiting til closed", q.typ, q.name) - <-q.closed + <-q.shutdownCtx.Done() log.Trace("%s: %s Waiting til done", q.typ, q.name) q.Wait() log.Trace("%s: %s Waiting til cleaned", q.typ, q.name) - ctx, cancel := context.WithCancel(context.Background()) - atTerminate(ctx, cancel) - q.CleanUp(ctx) - cancel() + q.CleanUp(q.terminateCtx) + q.terminateCtxCancel() } +const maxBackOffTime = time.Second * 3 + func (q *ByteFIFOQueue) readToChan() { // handle quick cancels select { - case <-q.closed: + case <-q.shutdownCtx.Done(): // tell the pool to shutdown. - q.cancel() + q.baseCtxCancel() return default: } + // Default backoff values backOffTime := time.Millisecond * 100 - maxBackOffTime := time.Second * 3 - for { - success, resetBackoff := q.doPop() - if resetBackoff { - backOffTime = 100 * time.Millisecond - } - if success { +loop: + for { + err := q.doPop() + if err == errQueueEmpty { + log.Trace("%s: %s Waiting on Empty", q.typ, q.name) select { - case <-q.closed: - // tell the pool to shutdown. - q.cancel() + case <-q.pushed: + // reset backOffTime + backOffTime = 100 * time.Millisecond + continue loop + case <-q.shutdownCtx.Done(): + // Oops we've been shutdown whilst waiting + // Make sure the worker pool is shutdown too + q.baseCtxCancel() return - default: } - } else { + } + + // Reset the backOffTime if there is no error or an unmarshalError + if err == nil || err == errUnmarshal { + backOffTime = 100 * time.Millisecond + } + + if err != nil { + // Need to Backoff select { - case <-q.closed: - // tell the pool to shutdown. - q.cancel() + case <-q.shutdownCtx.Done(): + // Oops we've been shutdown whilst backing off + // Make sure the worker pool is shutdown too + q.baseCtxCancel() return case <-time.After(backOffTime): - } - backOffTime += backOffTime / 2 - if backOffTime > maxBackOffTime { - backOffTime = maxBackOffTime + // OK we've waited - so backoff a bit + backOffTime += backOffTime / 2 + if backOffTime > maxBackOffTime { + backOffTime = maxBackOffTime + } + continue loop } } + select { + case <-q.shutdownCtx.Done(): + // Oops we've been shutdown + // Make sure the worker pool is shutdown too + q.baseCtxCancel() + return + default: + continue loop + } } } -func (q *ByteFIFOQueue) doPop() (success, resetBackoff bool) { +var errQueueEmpty = fmt.Errorf("empty queue") +var errEmptyBytes = fmt.Errorf("empty bytes") +var errUnmarshal = fmt.Errorf("failed to unmarshal") + +func (q *ByteFIFOQueue) doPop() error { q.lock.Lock() defer q.lock.Unlock() - bs, err := q.byteFIFO.Pop() + bs, err := q.byteFIFO.Pop(q.shutdownCtx) if err != nil { + if err == context.Canceled { + q.baseCtxCancel() + return err + } log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err) - return + return err } if len(bs) == 0 { - return + if q.waitOnEmpty && q.byteFIFO.Len(q.shutdownCtx) == 0 { + return errQueueEmpty + } + return errEmptyBytes } - resetBackoff = true - data, err := unmarshalAs(bs, q.exemplar) if err != nil { log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err) - return + return errUnmarshal } log.Trace("%s %s: Task found: %#v", q.typ, q.name, data) q.WorkerPool.Push(data) - success = true - return + return nil } // Shutdown processing from this queue func (q *ByteFIFOQueue) Shutdown() { log.Trace("%s: %s Shutting down", q.typ, q.name) - q.lock.Lock() select { - case <-q.closed: + case <-q.shutdownCtx.Done(): + return default: - close(q.closed) } - q.lock.Unlock() + q.shutdownCtxCancel() log.Debug("%s: %s Shutdown", q.typ, q.name) } // IsShutdown returns a channel which is closed when this Queue is shutdown func (q *ByteFIFOQueue) IsShutdown() <-chan struct{} { - return q.closed + return q.shutdownCtx.Done() } // Terminate this queue and close the queue func (q *ByteFIFOQueue) Terminate() { log.Trace("%s: %s Terminating", q.typ, q.name) q.Shutdown() - q.lock.Lock() select { - case <-q.terminated: - q.lock.Unlock() + case <-q.terminateCtx.Done(): return default: } - close(q.terminated) - q.lock.Unlock() if log.IsDebug() { - log.Debug("%s: %s Closing with %d tasks left in queue", q.typ, q.name, q.byteFIFO.Len()) + log.Debug("%s: %s Closing with %d tasks left in queue", q.typ, q.name, q.byteFIFO.Len(q.terminateCtx)) } + q.terminateCtxCancel() if err := q.byteFIFO.Close(); err != nil { log.Error("Error whilst closing internal byte fifo in %s: %s: %v", q.typ, q.name, err) } @@ -223,7 +268,7 @@ func (q *ByteFIFOQueue) Terminate() { // IsTerminated returns a channel which is closed when this Queue is terminated func (q *ByteFIFOQueue) IsTerminated() <-chan struct{} { - return q.terminated + return q.terminateCtx.Done() } var _ UniqueQueue = &ByteFIFOUniqueQueue{} @@ -240,17 +285,21 @@ func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFun return nil, err } config := configInterface.(ByteFIFOQueueConfiguration) + terminateCtx, terminateCtxCancel := context.WithCancel(context.Background()) + shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx) return &ByteFIFOUniqueQueue{ ByteFIFOQueue: ByteFIFOQueue{ - WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), - byteFIFO: byteFIFO, - typ: typ, - closed: make(chan struct{}), - terminated: make(chan struct{}), - exemplar: exemplar, - workers: config.Workers, - name: config.Name, + WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), + byteFIFO: byteFIFO, + typ: typ, + shutdownCtx: shutdownCtx, + shutdownCtxCancel: shutdownCtxCancel, + terminateCtx: terminateCtx, + terminateCtxCancel: terminateCtxCancel, + exemplar: exemplar, + workers: config.Workers, + name: config.Name, }, }, nil } @@ -265,5 +314,5 @@ func (q *ByteFIFOUniqueQueue) Has(data Data) (bool, error) { if err != nil { return false, err } - return q.byteFIFO.(UniqueByteFIFO).Has(bs) + return q.byteFIFO.(UniqueByteFIFO).Has(q.terminateCtx, bs) } |