diff options
author | zeripath <art27@cantab.net> | 2022-01-24 22:54:35 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-24 22:54:35 +0000 |
commit | ab7f70167100b69dc66f988d2df0bbd639f405be (patch) | |
tree | 26bb6c394a4ee46db9330d6226fe3a8c6b9382c1 /modules/queue/queue_wrapped.go | |
parent | 43c6b27716381dad9debd2ebbd9e1ae332af23b2 (diff) | |
download | gitea-ab7f70167100b69dc66f988d2df0bbd639f405be.tar.gz gitea-ab7f70167100b69dc66f988d2df0bbd639f405be.zip |
Make WrappedQueues and PersistableChannelUniqueQueues Pausable (#18393)
Implements the Pausable interface on WrappedQueues and PersistableChannelUniqueQueues
Reference #15928
Signed-off-by: Andrew Thornton art27@cantab.net
Diffstat (limited to 'modules/queue/queue_wrapped.go')
-rw-r--r-- | modules/queue/queue_wrapped.go | 46 |
1 files changed, 43 insertions, 3 deletions
diff --git a/modules/queue/queue_wrapped.go b/modules/queue/queue_wrapped.go index edb589338a..02f7818aa8 100644 --- a/modules/queue/queue_wrapped.go +++ b/modules/queue/queue_wrapped.go @@ -59,7 +59,7 @@ func (q *delayedStarter) setInternal(atShutdown func(func()), handle HandlerFunc if s, ok := cfg.([]byte); ok { cfg = string(s) } - return fmt.Errorf("Timedout creating queue %v with cfg %#v in %s", q.underlying, cfg, q.name) + return fmt.Errorf("timedout creating queue %v with cfg %#v in %s", q.underlying, cfg, q.name) default: queue, err := NewQueue(q.underlying, handle, q.cfg, exemplar) if err == nil { @@ -76,9 +76,9 @@ func (q *delayedStarter) setInternal(atShutdown func(func()), handle HandlerFunc i++ if q.maxAttempts > 0 && i > q.maxAttempts { if bs, ok := q.cfg.([]byte); ok { - return fmt.Errorf("Unable to create queue %v for %s with cfg %s by max attempts: error: %v", q.underlying, q.name, string(bs), err) + return fmt.Errorf("unable to create queue %v for %s with cfg %s by max attempts: error: %v", q.underlying, q.name, string(bs), err) } - return fmt.Errorf("Unable to create queue %v for %s with cfg %#v by max attempts: error: %v", q.underlying, q.name, q.cfg, err) + return fmt.Errorf("unable to create queue %v for %s with cfg %#v by max attempts: error: %v", q.underlying, q.name, q.cfg, err) } sleepTime := 100 * time.Millisecond if q.timeout > 0 && q.maxAttempts > 0 { @@ -271,6 +271,46 @@ func (q *WrappedQueue) Terminate() { log.Debug("WrappedQueue: %s Terminated", q.name) } +// IsPaused will return if the pool or queue is paused +func (q *WrappedQueue) IsPaused() bool { + q.lock.Lock() + defer q.lock.Unlock() + pausable, ok := q.internal.(Pausable) + return ok && pausable.IsPaused() +} + +// Pause will pause the pool or queue +func (q *WrappedQueue) Pause() { + q.lock.Lock() + defer q.lock.Unlock() + if pausable, ok := q.internal.(Pausable); ok { + pausable.Pause() + } +} + +// Resume will resume the pool or queue +func (q *WrappedQueue) Resume() { + q.lock.Lock() + defer q.lock.Unlock() + if pausable, ok := q.internal.(Pausable); ok { + pausable.Resume() + } +} + +// IsPausedIsResumed will return a bool indicating if the pool or queue is paused and a channel that will be closed when it is resumed +func (q *WrappedQueue) IsPausedIsResumed() (paused, resumed <-chan struct{}) { + q.lock.Lock() + defer q.lock.Unlock() + if pausable, ok := q.internal.(Pausable); ok { + return pausable.IsPausedIsResumed() + } + return context.Background().Done(), closedChan +} + +var closedChan chan struct{} + func init() { queuesMap[WrappedQueueType] = NewWrappedQueue + closedChan = make(chan struct{}) + close(closedChan) } |