diff options
author | zeripath <art27@cantab.net> | 2021-05-15 15:22:26 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-05-15 16:22:26 +0200 |
commit | ba526ceffe33a54b6015cdfbdc9bba920484dc23 (patch) | |
tree | ddd9ff13b0da7b272b5a60445a997319cb0de882 /modules/queue/queue_disk_channel.go | |
parent | 9f19c2b8cca9edf2ad7b8803e6ed72b1aea322a5 (diff) | |
download | gitea-ba526ceffe33a54b6015cdfbdc9bba920484dc23.tar.gz gitea-ba526ceffe33a54b6015cdfbdc9bba920484dc23.zip |
Multiple Queue improvements: LevelDB Wait on empty, shutdown empty shadow level queue, reduce goroutines etc (#15693)
* move shutdownfns, terminatefns and hammerfns out of separate goroutines
Coalesce the shutdownfns etc into a list of functions that get run at shutdown
rather then have them run at goroutines blocked on selects.
This may help reduce the background select/poll load in certain
configurations.
* The LevelDB queues can actually wait on empty instead of polling
Slight refactor to cause leveldb queues to wait on empty instead of polling.
* Shutdown the shadow level queue once it is empty
* Remove bytefifo additional goroutine for readToChan as it can just be run in run
* Remove additional removeWorkers goroutine for workers
* Simplify the AtShutdown and AtTerminate functions and add Channel Flusher
* Add shutdown flusher to CUQ
* move persistable channel shutdown stuff to Shutdown Fn
* Ensure that UPCQ has the correct config
* handle shutdown during the flushing
* reduce risk of race between zeroBoost and addWorkers
* prevent double shutdown
Signed-off-by: Andrew Thornton <art27@cantab.net>
Diffstat (limited to 'modules/queue/queue_disk_channel.go')
-rw-r--r-- | modules/queue/queue_disk_channel.go | 87 |
1 files changed, 54 insertions, 33 deletions
diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index 801fd8a122..c3a1c5781e 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -133,8 +133,9 @@ func (q *PersistableChannelQueue) Push(data Data) error { } // Run starts to run the queue -func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { +func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) { log.Debug("PersistableChannelQueue: %s Starting", q.delayedStarter.name) + _ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0) q.lock.Lock() if q.internal == nil { @@ -147,34 +148,32 @@ func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Conte } else { q.lock.Unlock() } - atShutdown(context.Background(), q.Shutdown) - atTerminate(context.Background(), q.Terminate) + atShutdown(q.Shutdown) + atTerminate(q.Terminate) - // Just run the level queue - we shut it down later - go q.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {}) - - go func() { - _ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0) - }() + if lq, ok := q.internal.(*LevelQueue); ok && lq.byteFIFO.Len(lq.shutdownCtx) != 0 { + // Just run the level queue - we shut it down once it's flushed + go q.internal.Run(func(_ func()) {}, func(_ func()) {}) + go func() { + for !q.IsEmpty() { + _ = q.internal.Flush(0) + select { + case <-time.After(100 * time.Millisecond): + case <-q.internal.(*LevelQueue).shutdownCtx.Done(): + log.Warn("LevelQueue: %s shut down before completely flushed", q.internal.(*LevelQueue).Name()) + return + } + } + log.Debug("LevelQueue: %s flushed so shutting down", q.internal.(*LevelQueue).Name()) + q.internal.(*LevelQueue).Shutdown() + GetManager().Remove(q.internal.(*LevelQueue).qid) + }() + } else { + log.Debug("PersistableChannelQueue: %s Skipping running the empty level queue", q.delayedStarter.name) + q.internal.(*LevelQueue).Shutdown() + GetManager().Remove(q.internal.(*LevelQueue).qid) + } - log.Trace("PersistableChannelQueue: %s Waiting til closed", q.delayedStarter.name) - <-q.closed - log.Trace("PersistableChannelQueue: %s Cancelling pools", q.delayedStarter.name) - q.channelQueue.cancel() - q.internal.(*LevelQueue).cancel() - log.Trace("PersistableChannelQueue: %s Waiting til done", q.delayedStarter.name) - q.channelQueue.Wait() - q.internal.(*LevelQueue).Wait() - // Redirect all remaining data in the chan to the internal channel - go func() { - log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name) - for data := range q.channelQueue.dataChan { - _ = q.internal.Push(data) - atomic.AddInt64(&q.channelQueue.numInQueue, -1) - } - log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name) - }() - log.Trace("PersistableChannelQueue: %s Done main loop", q.delayedStarter.name) } // Flush flushes the queue and blocks till the queue is empty @@ -232,16 +231,37 @@ func (q *PersistableChannelQueue) IsEmpty() bool { func (q *PersistableChannelQueue) Shutdown() { log.Trace("PersistableChannelQueue: %s Shutting down", q.delayedStarter.name) q.lock.Lock() - defer q.lock.Unlock() + select { case <-q.closed: + q.lock.Unlock() + return default: - if q.internal != nil { - q.internal.(*LevelQueue).Shutdown() - } - close(q.closed) - log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name) } + q.channelQueue.Shutdown() + if q.internal != nil { + q.internal.(*LevelQueue).Shutdown() + } + close(q.closed) + q.lock.Unlock() + + log.Trace("PersistableChannelQueue: %s Cancelling pools", q.delayedStarter.name) + q.channelQueue.baseCtxCancel() + q.internal.(*LevelQueue).baseCtxCancel() + log.Trace("PersistableChannelQueue: %s Waiting til done", q.delayedStarter.name) + q.channelQueue.Wait() + q.internal.(*LevelQueue).Wait() + // Redirect all remaining data in the chan to the internal channel + go func() { + log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name) + for data := range q.channelQueue.dataChan { + _ = q.internal.Push(data) + atomic.AddInt64(&q.channelQueue.numInQueue, -1) + } + log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name) + }() + + log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name) } // Terminate this queue and close the queue @@ -250,6 +270,7 @@ func (q *PersistableChannelQueue) Terminate() { q.Shutdown() q.lock.Lock() defer q.lock.Unlock() + q.channelQueue.Terminate() if q.internal != nil { q.internal.(*LevelQueue).Terminate() } |