summaryrefslogtreecommitdiffstats
path: root/modules/queue/queue_wrapped.go
diff options
context:
space:
mode:
authorzeripath <art27@cantab.net>2022-01-24 22:54:35 +0000
committerGitHub <noreply@github.com>2022-01-24 22:54:35 +0000
commitab7f70167100b69dc66f988d2df0bbd639f405be (patch)
tree26bb6c394a4ee46db9330d6226fe3a8c6b9382c1 /modules/queue/queue_wrapped.go
parent43c6b27716381dad9debd2ebbd9e1ae332af23b2 (diff)
downloadgitea-ab7f70167100b69dc66f988d2df0bbd639f405be.tar.gz
gitea-ab7f70167100b69dc66f988d2df0bbd639f405be.zip
Make WrappedQueues and PersistableChannelUniqueQueues Pausable (#18393)
Implements the Pausable interface on WrappedQueues and PersistableChannelUniqueQueues Reference #15928 Signed-off-by: Andrew Thornton art27@cantab.net
Diffstat (limited to 'modules/queue/queue_wrapped.go')
-rw-r--r--modules/queue/queue_wrapped.go46
1 files changed, 43 insertions, 3 deletions
diff --git a/modules/queue/queue_wrapped.go b/modules/queue/queue_wrapped.go
index edb589338a..02f7818aa8 100644
--- a/modules/queue/queue_wrapped.go
+++ b/modules/queue/queue_wrapped.go
@@ -59,7 +59,7 @@ func (q *delayedStarter) setInternal(atShutdown func(func()), handle HandlerFunc
if s, ok := cfg.([]byte); ok {
cfg = string(s)
}
- return fmt.Errorf("Timedout creating queue %v with cfg %#v in %s", q.underlying, cfg, q.name)
+ return fmt.Errorf("timedout creating queue %v with cfg %#v in %s", q.underlying, cfg, q.name)
default:
queue, err := NewQueue(q.underlying, handle, q.cfg, exemplar)
if err == nil {
@@ -76,9 +76,9 @@ func (q *delayedStarter) setInternal(atShutdown func(func()), handle HandlerFunc
i++
if q.maxAttempts > 0 && i > q.maxAttempts {
if bs, ok := q.cfg.([]byte); ok {
- return fmt.Errorf("Unable to create queue %v for %s with cfg %s by max attempts: error: %v", q.underlying, q.name, string(bs), err)
+ return fmt.Errorf("unable to create queue %v for %s with cfg %s by max attempts: error: %v", q.underlying, q.name, string(bs), err)
}
- return fmt.Errorf("Unable to create queue %v for %s with cfg %#v by max attempts: error: %v", q.underlying, q.name, q.cfg, err)
+ return fmt.Errorf("unable to create queue %v for %s with cfg %#v by max attempts: error: %v", q.underlying, q.name, q.cfg, err)
}
sleepTime := 100 * time.Millisecond
if q.timeout > 0 && q.maxAttempts > 0 {
@@ -271,6 +271,46 @@ func (q *WrappedQueue) Terminate() {
log.Debug("WrappedQueue: %s Terminated", q.name)
}
+// IsPaused will return if the pool or queue is paused
+func (q *WrappedQueue) IsPaused() bool {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+ pausable, ok := q.internal.(Pausable)
+ return ok && pausable.IsPaused()
+}
+
+// Pause will pause the pool or queue
+func (q *WrappedQueue) Pause() {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+ if pausable, ok := q.internal.(Pausable); ok {
+ pausable.Pause()
+ }
+}
+
+// Resume will resume the pool or queue
+func (q *WrappedQueue) Resume() {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+ if pausable, ok := q.internal.(Pausable); ok {
+ pausable.Resume()
+ }
+}
+
+// IsPausedIsResumed will return a bool indicating if the pool or queue is paused and a channel that will be closed when it is resumed
+func (q *WrappedQueue) IsPausedIsResumed() (paused, resumed <-chan struct{}) {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+ if pausable, ok := q.internal.(Pausable); ok {
+ return pausable.IsPausedIsResumed()
+ }
+ return context.Background().Done(), closedChan
+}
+
+var closedChan chan struct{}
+
func init() {
queuesMap[WrappedQueueType] = NewWrappedQueue
+ closedChan = make(chan struct{})
+ close(closedChan)
}