diff options
Diffstat (limited to 'modules/queue/manager.go')
-rw-r--r-- | modules/queue/manager.go | 56 |
1 files changed, 55 insertions, 1 deletions
diff --git a/modules/queue/manager.go b/modules/queue/manager.go index e0384d15a3..56298a3e00 100644 --- a/modules/queue/manager.go +++ b/modules/queue/manager.go @@ -54,6 +54,18 @@ type Flushable interface { IsEmpty() bool } +// Pausable represents a pool or queue that is Pausable +type Pausable interface { + // IsPaused will return if the pool or queue is paused + IsPaused() bool + // Pause will pause the pool or queue + Pause() + // Resume will resume the pool or queue + Resume() + // IsPausedIsResumed will return a bool indicating if the pool or queue is paused and a channel that will be closed when it is resumed + IsPausedIsResumed() (paused, resumed <-chan struct{}) +} + // ManagedPool is a simple interface to get certain details from a worker pool type ManagedPool interface { // AddWorkers adds a number of worker as group to the pool with the provided timeout. A CancelFunc is provided to cancel the group @@ -192,6 +204,14 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error wg.Done() continue } + if pausable, ok := mq.Managed.(Pausable); ok { + // no point flushing paused queues + if pausable.IsPaused() { + wg.Done() + continue + } + } + allEmpty = false if flushable, ok := mq.Managed.(Flushable); ok { log.Debug("Flushing (flushable) queue: %s", mq.Name) @@ -215,7 +235,7 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error log.Debug("All queues are empty") break } - // Ensure there are always at least 100ms between loops but not more if we've actually been doing some flushign + // Ensure there are always at least 100ms between loops but not more if we've actually been doing some flushing // but don't delay cancellation here. select { case <-ctx.Done(): @@ -298,6 +318,12 @@ func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.Can return nil } +// Flushable returns true if the queue is flushable +func (q *ManagedQueue) Flushable() bool { + _, ok := q.Managed.(Flushable) + return ok +} + // Flush flushes the queue with a timeout func (q *ManagedQueue) Flush(timeout time.Duration) error { if flushable, ok := q.Managed.(Flushable); ok { @@ -315,6 +341,34 @@ func (q *ManagedQueue) IsEmpty() bool { return true } +// Pausable returns whether the queue is Pausable +func (q *ManagedQueue) Pausable() bool { + _, ok := q.Managed.(Pausable) + return ok +} + +// Pause pauses the queue +func (q *ManagedQueue) Pause() { + if pausable, ok := q.Managed.(Pausable); ok { + pausable.Pause() + } +} + +// IsPaused reveals if the queue is paused +func (q *ManagedQueue) IsPaused() bool { + if pausable, ok := q.Managed.(Pausable); ok { + return pausable.IsPaused() + } + return false +} + +// Resume resumes the queue +func (q *ManagedQueue) Resume() { + if pausable, ok := q.Managed.(Pausable); ok { + pausable.Resume() + } +} + // NumberOfWorkers returns the number of workers in the queue func (q *ManagedQueue) NumberOfWorkers() int { if pool, ok := q.Managed.(ManagedPool); ok { |