diff options
Diffstat (limited to 'modules/queue/manager.go')
-rw-r--r-- | modules/queue/manager.go | 9 |
1 files changed, 6 insertions, 3 deletions
diff --git a/modules/queue/manager.go b/modules/queue/manager.go index 8b964c0c28..079e2bee7a 100644 --- a/modules/queue/manager.go +++ b/modules/queue/manager.go @@ -5,6 +5,7 @@ package queue import ( "context" + "errors" "sync" "time" @@ -32,6 +33,7 @@ type ManagedWorkerPoolQueue interface { // FlushWithContext tries to make the handler process all items in the queue synchronously. // It is for testing purpose only. It's not designed to be used in a cluster. + // Negative timeout means discarding all items in the queue. FlushWithContext(ctx context.Context, timeout time.Duration) error // RemoveAllItems removes all items in the base queue (on-the-fly items are not affected) @@ -76,15 +78,16 @@ func (m *Manager) ManagedQueues() map[int64]ManagedWorkerPoolQueue { // FlushAll tries to make all managed queues process all items synchronously, until timeout or the queue is empty. // It is for testing purpose only. It's not designed to be used in a cluster. +// Negative timeout means discarding all items in the queue. func (m *Manager) FlushAll(ctx context.Context, timeout time.Duration) error { - var finalErr error + var finalErrors []error qs := m.ManagedQueues() for _, q := range qs { if err := q.FlushWithContext(ctx, timeout); err != nil { - finalErr = err // TODO: in Go 1.20: errors.Join + finalErrors = append(finalErrors, err) } } - return finalErr + return errors.Join(finalErrors...) } // CreateSimpleQueue creates a simple queue from global setting config provider by name |