aboutsummaryrefslogtreecommitdiffstats
path: root/modules/queue/manager.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/queue/manager.go')
-rw-r--r--modules/queue/manager.go9
1 files changed, 6 insertions, 3 deletions
diff --git a/modules/queue/manager.go b/modules/queue/manager.go
index 8b964c0c28..079e2bee7a 100644
--- a/modules/queue/manager.go
+++ b/modules/queue/manager.go
@@ -5,6 +5,7 @@ package queue
import (
"context"
+ "errors"
"sync"
"time"
@@ -32,6 +33,7 @@ type ManagedWorkerPoolQueue interface {
// FlushWithContext tries to make the handler process all items in the queue synchronously.
// It is for testing purpose only. It's not designed to be used in a cluster.
+ // Negative timeout means discarding all items in the queue.
FlushWithContext(ctx context.Context, timeout time.Duration) error
// RemoveAllItems removes all items in the base queue (on-the-fly items are not affected)
@@ -76,15 +78,16 @@ func (m *Manager) ManagedQueues() map[int64]ManagedWorkerPoolQueue {
// FlushAll tries to make all managed queues process all items synchronously, until timeout or the queue is empty.
// It is for testing purpose only. It's not designed to be used in a cluster.
+// Negative timeout means discarding all items in the queue.
func (m *Manager) FlushAll(ctx context.Context, timeout time.Duration) error {
- var finalErr error
+ var finalErrors []error
qs := m.ManagedQueues()
for _, q := range qs {
if err := q.FlushWithContext(ctx, timeout); err != nil {
- finalErr = err // TODO: in Go 1.20: errors.Join
+ finalErrors = append(finalErrors, err)
}
}
- return finalErr
+ return errors.Join(finalErrors...)
}
// CreateSimpleQueue creates a simple queue from global setting config provider by name