diff options
Diffstat (limited to 'modules/queue/queue_disk_channel.go')
-rw-r--r-- | modules/queue/queue_disk_channel.go | 87 |
1 files changed, 54 insertions, 33 deletions
diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index 801fd8a122..c3a1c5781e 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -133,8 +133,9 @@ func (q *PersistableChannelQueue) Push(data Data) error { } // Run starts to run the queue -func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { +func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) { log.Debug("PersistableChannelQueue: %s Starting", q.delayedStarter.name) + _ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0) q.lock.Lock() if q.internal == nil { @@ -147,34 +148,32 @@ func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Conte } else { q.lock.Unlock() } - atShutdown(context.Background(), q.Shutdown) - atTerminate(context.Background(), q.Terminate) + atShutdown(q.Shutdown) + atTerminate(q.Terminate) - // Just run the level queue - we shut it down later - go q.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {}) - - go func() { - _ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0) - }() + if lq, ok := q.internal.(*LevelQueue); ok && lq.byteFIFO.Len(lq.shutdownCtx) != 0 { + // Just run the level queue - we shut it down once it's flushed + go q.internal.Run(func(_ func()) {}, func(_ func()) {}) + go func() { + for !q.IsEmpty() { + _ = q.internal.Flush(0) + select { + case <-time.After(100 * time.Millisecond): + case <-q.internal.(*LevelQueue).shutdownCtx.Done(): + log.Warn("LevelQueue: %s shut down before completely flushed", q.internal.(*LevelQueue).Name()) + return + } + } + log.Debug("LevelQueue: %s flushed so shutting down", q.internal.(*LevelQueue).Name()) + q.internal.(*LevelQueue).Shutdown() + GetManager().Remove(q.internal.(*LevelQueue).qid) + }() + } else { + log.Debug("PersistableChannelQueue: %s Skipping running the empty level queue", q.delayedStarter.name) + q.internal.(*LevelQueue).Shutdown() + GetManager().Remove(q.internal.(*LevelQueue).qid) + } - log.Trace("PersistableChannelQueue: %s Waiting til closed", q.delayedStarter.name) - <-q.closed - log.Trace("PersistableChannelQueue: %s Cancelling pools", q.delayedStarter.name) - q.channelQueue.cancel() - q.internal.(*LevelQueue).cancel() - log.Trace("PersistableChannelQueue: %s Waiting til done", q.delayedStarter.name) - q.channelQueue.Wait() - q.internal.(*LevelQueue).Wait() - // Redirect all remaining data in the chan to the internal channel - go func() { - log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name) - for data := range q.channelQueue.dataChan { - _ = q.internal.Push(data) - atomic.AddInt64(&q.channelQueue.numInQueue, -1) - } - log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name) - }() - log.Trace("PersistableChannelQueue: %s Done main loop", q.delayedStarter.name) } // Flush flushes the queue and blocks till the queue is empty @@ -232,16 +231,37 @@ func (q *PersistableChannelQueue) IsEmpty() bool { func (q *PersistableChannelQueue) Shutdown() { log.Trace("PersistableChannelQueue: %s Shutting down", q.delayedStarter.name) q.lock.Lock() - defer q.lock.Unlock() + select { case <-q.closed: + q.lock.Unlock() + return default: - if q.internal != nil { - q.internal.(*LevelQueue).Shutdown() - } - close(q.closed) - log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name) } + q.channelQueue.Shutdown() + if q.internal != nil { + q.internal.(*LevelQueue).Shutdown() + } + close(q.closed) + q.lock.Unlock() + + log.Trace("PersistableChannelQueue: %s Cancelling pools", q.delayedStarter.name) + q.channelQueue.baseCtxCancel() + q.internal.(*LevelQueue).baseCtxCancel() + log.Trace("PersistableChannelQueue: %s Waiting til done", q.delayedStarter.name) + q.channelQueue.Wait() + q.internal.(*LevelQueue).Wait() + // Redirect all remaining data in the chan to the internal channel + go func() { + log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name) + for data := range q.channelQueue.dataChan { + _ = q.internal.Push(data) + atomic.AddInt64(&q.channelQueue.numInQueue, -1) + } + log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name) + }() + + log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name) } // Terminate this queue and close the queue @@ -250,6 +270,7 @@ func (q *PersistableChannelQueue) Terminate() { q.Shutdown() q.lock.Lock() defer q.lock.Unlock() + q.channelQueue.Terminate() if q.internal != nil { q.internal.(*LevelQueue).Terminate() } |