summaryrefslogtreecommitdiffstats
path: root/modules/queue/manager.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/queue/manager.go')
-rw-r--r--modules/queue/manager.go56
1 files changed, 55 insertions, 1 deletions
diff --git a/modules/queue/manager.go b/modules/queue/manager.go
index e0384d15a3..56298a3e00 100644
--- a/modules/queue/manager.go
+++ b/modules/queue/manager.go
@@ -54,6 +54,18 @@ type Flushable interface {
IsEmpty() bool
}
+// Pausable represents a pool or queue that is Pausable
+type Pausable interface {
+ // IsPaused will return if the pool or queue is paused
+ IsPaused() bool
+ // Pause will pause the pool or queue
+ Pause()
+ // Resume will resume the pool or queue
+ Resume()
+ // IsPausedIsResumed will return a bool indicating if the pool or queue is paused and a channel that will be closed when it is resumed
+ IsPausedIsResumed() (paused, resumed <-chan struct{})
+}
+
// ManagedPool is a simple interface to get certain details from a worker pool
type ManagedPool interface {
// AddWorkers adds a number of worker as group to the pool with the provided timeout. A CancelFunc is provided to cancel the group
@@ -192,6 +204,14 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error
wg.Done()
continue
}
+ if pausable, ok := mq.Managed.(Pausable); ok {
+ // no point flushing paused queues
+ if pausable.IsPaused() {
+ wg.Done()
+ continue
+ }
+ }
+
allEmpty = false
if flushable, ok := mq.Managed.(Flushable); ok {
log.Debug("Flushing (flushable) queue: %s", mq.Name)
@@ -215,7 +235,7 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error
log.Debug("All queues are empty")
break
}
- // Ensure there are always at least 100ms between loops but not more if we've actually been doing some flushign
+ // Ensure there are always at least 100ms between loops but not more if we've actually been doing some flushing
// but don't delay cancellation here.
select {
case <-ctx.Done():
@@ -298,6 +318,12 @@ func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.Can
return nil
}
+// Flushable returns true if the queue is flushable
+func (q *ManagedQueue) Flushable() bool {
+ _, ok := q.Managed.(Flushable)
+ return ok
+}
+
// Flush flushes the queue with a timeout
func (q *ManagedQueue) Flush(timeout time.Duration) error {
if flushable, ok := q.Managed.(Flushable); ok {
@@ -315,6 +341,34 @@ func (q *ManagedQueue) IsEmpty() bool {
return true
}
+// Pausable returns whether the queue is Pausable
+func (q *ManagedQueue) Pausable() bool {
+ _, ok := q.Managed.(Pausable)
+ return ok
+}
+
+// Pause pauses the queue
+func (q *ManagedQueue) Pause() {
+ if pausable, ok := q.Managed.(Pausable); ok {
+ pausable.Pause()
+ }
+}
+
+// IsPaused reveals if the queue is paused
+func (q *ManagedQueue) IsPaused() bool {
+ if pausable, ok := q.Managed.(Pausable); ok {
+ return pausable.IsPaused()
+ }
+ return false
+}
+
+// Resume resumes the queue
+func (q *ManagedQueue) Resume() {
+ if pausable, ok := q.Managed.(Pausable); ok {
+ pausable.Resume()
+ }
+}
+
// NumberOfWorkers returns the number of workers in the queue
func (q *ManagedQueue) NumberOfWorkers() int {
if pool, ok := q.Managed.(ManagedPool); ok {