diff options
Diffstat (limited to 'modules/queue')
-rw-r--r-- | modules/queue/manager.go | 8 |
1 files changed, 7 insertions, 1 deletions
diff --git a/modules/queue/manager.go b/modules/queue/manager.go index d44007a0f0..da0fc606e6 100644 --- a/modules/queue/manager.go +++ b/modules/queue/manager.go @@ -174,6 +174,7 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error default: } mqs := m.ManagedQueues() + log.Debug("Found %d Managed Queues", len(mqs)) wg := sync.WaitGroup{} wg.Add(len(mqs)) allEmpty := true @@ -184,6 +185,7 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error } allEmpty = false if flushable, ok := mq.Managed.(Flushable); ok { + log.Debug("Flushing (flushable) queue: %s", mq.Name) go func(q *ManagedQueue) { localCtx, localCancel := context.WithCancel(ctx) pid := q.RegisterWorkers(1, start, hasTimeout, end, localCancel, true) @@ -196,7 +198,11 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error wg.Done() }(mq) } else { - wg.Done() + log.Debug("Queue: %s is non-empty but is not flushable - adding 100 millisecond wait", mq.Name) + go func() { + <-time.After(100 * time.Millisecond) + wg.Done() + }() } } |