|
|
@@ -198,17 +198,20 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error |
|
|
|
wg.Done() |
|
|
|
}(mq) |
|
|
|
} else { |
|
|
|
log.Debug("Queue: %s is non-empty but is not flushable - adding 100 millisecond wait", mq.Name) |
|
|
|
go func() { |
|
|
|
<-time.After(100 * time.Millisecond) |
|
|
|
wg.Done() |
|
|
|
}() |
|
|
|
log.Debug("Queue: %s is non-empty but is not flushable", mq.Name) |
|
|
|
wg.Done() |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
if allEmpty { |
|
|
|
log.Debug("All queues are empty") |
|
|
|
break |
|
|
|
} |
|
|
|
// Ensure there are always at least 100ms between loops but not more if we've actually been doing some flushign |
|
|
|
// but don't delay cancellation here. |
|
|
|
select { |
|
|
|
case <-ctx.Done(): |
|
|
|
case <-time.After(100 * time.Millisecond): |
|
|
|
} |
|
|
|
wg.Wait() |
|
|
|
} |
|
|
|
return nil |