summaryrefslogtreecommitdiffstats
path: root/modules/queue
diff options
context:
space:
mode:
Diffstat (limited to 'modules/queue')
-rw-r--r--modules/queue/manager.go8
1 files changed, 7 insertions, 1 deletions
diff --git a/modules/queue/manager.go b/modules/queue/manager.go
index d44007a0f0..da0fc606e6 100644
--- a/modules/queue/manager.go
+++ b/modules/queue/manager.go
@@ -174,6 +174,7 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error
default:
}
mqs := m.ManagedQueues()
+ log.Debug("Found %d Managed Queues", len(mqs))
wg := sync.WaitGroup{}
wg.Add(len(mqs))
allEmpty := true
@@ -184,6 +185,7 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error
}
allEmpty = false
if flushable, ok := mq.Managed.(Flushable); ok {
+ log.Debug("Flushing (flushable) queue: %s", mq.Name)
go func(q *ManagedQueue) {
localCtx, localCancel := context.WithCancel(ctx)
pid := q.RegisterWorkers(1, start, hasTimeout, end, localCancel, true)
@@ -196,7 +198,11 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error
wg.Done()
}(mq)
} else {
- wg.Done()
+ log.Debug("Queue: %s is non-empty but is not flushable - adding 100 millisecond wait", mq.Name)
+ go func() {
+ <-time.After(100 * time.Millisecond)
+ wg.Done()
+ }()
}
}