summaryrefslogtreecommitdiffstats
path: root/modules/queue/queue_disk_channel.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/queue/queue_disk_channel.go')
-rw-r--r--modules/queue/queue_disk_channel.go87
1 files changed, 54 insertions, 33 deletions
diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go
index 801fd8a122..c3a1c5781e 100644
--- a/modules/queue/queue_disk_channel.go
+++ b/modules/queue/queue_disk_channel.go
@@ -133,8 +133,9 @@ func (q *PersistableChannelQueue) Push(data Data) error {
}
// Run starts to run the queue
-func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
+func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) {
log.Debug("PersistableChannelQueue: %s Starting", q.delayedStarter.name)
+ _ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0)
q.lock.Lock()
if q.internal == nil {
@@ -147,34 +148,32 @@ func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Conte
} else {
q.lock.Unlock()
}
- atShutdown(context.Background(), q.Shutdown)
- atTerminate(context.Background(), q.Terminate)
+ atShutdown(q.Shutdown)
+ atTerminate(q.Terminate)
- // Just run the level queue - we shut it down later
- go q.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {})
-
- go func() {
- _ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0)
- }()
+ if lq, ok := q.internal.(*LevelQueue); ok && lq.byteFIFO.Len(lq.shutdownCtx) != 0 {
+ // Just run the level queue - we shut it down once it's flushed
+ go q.internal.Run(func(_ func()) {}, func(_ func()) {})
+ go func() {
+ for !q.IsEmpty() {
+ _ = q.internal.Flush(0)
+ select {
+ case <-time.After(100 * time.Millisecond):
+ case <-q.internal.(*LevelQueue).shutdownCtx.Done():
+ log.Warn("LevelQueue: %s shut down before completely flushed", q.internal.(*LevelQueue).Name())
+ return
+ }
+ }
+ log.Debug("LevelQueue: %s flushed so shutting down", q.internal.(*LevelQueue).Name())
+ q.internal.(*LevelQueue).Shutdown()
+ GetManager().Remove(q.internal.(*LevelQueue).qid)
+ }()
+ } else {
+ log.Debug("PersistableChannelQueue: %s Skipping running the empty level queue", q.delayedStarter.name)
+ q.internal.(*LevelQueue).Shutdown()
+ GetManager().Remove(q.internal.(*LevelQueue).qid)
+ }
- log.Trace("PersistableChannelQueue: %s Waiting til closed", q.delayedStarter.name)
- <-q.closed
- log.Trace("PersistableChannelQueue: %s Cancelling pools", q.delayedStarter.name)
- q.channelQueue.cancel()
- q.internal.(*LevelQueue).cancel()
- log.Trace("PersistableChannelQueue: %s Waiting til done", q.delayedStarter.name)
- q.channelQueue.Wait()
- q.internal.(*LevelQueue).Wait()
- // Redirect all remaining data in the chan to the internal channel
- go func() {
- log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
- for data := range q.channelQueue.dataChan {
- _ = q.internal.Push(data)
- atomic.AddInt64(&q.channelQueue.numInQueue, -1)
- }
- log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
- }()
- log.Trace("PersistableChannelQueue: %s Done main loop", q.delayedStarter.name)
}
// Flush flushes the queue and blocks till the queue is empty
@@ -232,16 +231,37 @@ func (q *PersistableChannelQueue) IsEmpty() bool {
func (q *PersistableChannelQueue) Shutdown() {
log.Trace("PersistableChannelQueue: %s Shutting down", q.delayedStarter.name)
q.lock.Lock()
- defer q.lock.Unlock()
+
select {
case <-q.closed:
+ q.lock.Unlock()
+ return
default:
- if q.internal != nil {
- q.internal.(*LevelQueue).Shutdown()
- }
- close(q.closed)
- log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name)
}
+ q.channelQueue.Shutdown()
+ if q.internal != nil {
+ q.internal.(*LevelQueue).Shutdown()
+ }
+ close(q.closed)
+ q.lock.Unlock()
+
+ log.Trace("PersistableChannelQueue: %s Cancelling pools", q.delayedStarter.name)
+ q.channelQueue.baseCtxCancel()
+ q.internal.(*LevelQueue).baseCtxCancel()
+ log.Trace("PersistableChannelQueue: %s Waiting til done", q.delayedStarter.name)
+ q.channelQueue.Wait()
+ q.internal.(*LevelQueue).Wait()
+ // Redirect all remaining data in the chan to the internal channel
+ go func() {
+ log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
+ for data := range q.channelQueue.dataChan {
+ _ = q.internal.Push(data)
+ atomic.AddInt64(&q.channelQueue.numInQueue, -1)
+ }
+ log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
+ }()
+
+ log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name)
}
// Terminate this queue and close the queue
@@ -250,6 +270,7 @@ func (q *PersistableChannelQueue) Terminate() {
q.Shutdown()
q.lock.Lock()
defer q.lock.Unlock()
+ q.channelQueue.Terminate()
if q.internal != nil {
q.internal.(*LevelQueue).Terminate()
}