|
|
@@ -183,17 +183,17 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error |
|
|
|
} |
|
|
|
allEmpty = false |
|
|
|
if flushable, ok := mq.Managed.(Flushable); ok { |
|
|
|
go func() { |
|
|
|
go func(q *ManagedQueue) { |
|
|
|
localCtx, localCancel := context.WithCancel(ctx) |
|
|
|
pid := mq.RegisterWorkers(1, start, hasTimeout, end, localCancel, true) |
|
|
|
pid := q.RegisterWorkers(1, start, hasTimeout, end, localCancel, true) |
|
|
|
err := flushable.FlushWithContext(localCtx) |
|
|
|
if err != nil && err != ctx.Err() { |
|
|
|
cancel() |
|
|
|
} |
|
|
|
mq.CancelWorkers(pid) |
|
|
|
q.CancelWorkers(pid) |
|
|
|
localCancel() |
|
|
|
wg.Done() |
|
|
|
}() |
|
|
|
}(mq) |
|
|
|
} else { |
|
|
|
wg.Done() |
|
|
|
} |