From ab7f70167100b69dc66f988d2df0bbd639f405be Mon Sep 17 00:00:00 2001 From: zeripath Date: Mon, 24 Jan 2022 22:54:35 +0000 Subject: Make WrappedQueues and PersistableChannelUniqueQueues Pausable (#18393) Implements the Pausable interface on WrappedQueues and PersistableChannelUniqueQueues Reference #15928 Signed-off-by: Andrew Thornton art27@cantab.net --- modules/queue/queue.go | 2 +- modules/queue/queue_bytefifo.go | 6 ++-- modules/queue/queue_channel.go | 2 +- modules/queue/queue_wrapped.go | 46 ++++++++++++++++++++++++++++-- modules/queue/setting.go | 2 +- modules/queue/unique_queue_channel.go | 2 +- modules/queue/unique_queue_disk_channel.go | 20 +++++++++++++ modules/queue/unique_queue_wrapped.go | 2 +- modules/queue/workerpool.go | 4 +-- 9 files changed, 72 insertions(+), 14 deletions(-) (limited to 'modules') diff --git a/modules/queue/queue.go b/modules/queue/queue.go index 3a51965143..a166a935a6 100644 --- a/modules/queue/queue.go +++ b/modules/queue/queue.go @@ -196,7 +196,7 @@ func RegisteredTypesAsString() []string { func NewQueue(queueType Type, handlerFunc HandlerFunc, opts, exemplar interface{}) (Queue, error) { newFn, ok := queuesMap[queueType] if !ok { - return nil, fmt.Errorf("Unsupported queue type: %v", queueType) + return nil, fmt.Errorf("unsupported queue type: %v", queueType) } return newFn(handlerFunc, opts, exemplar) } diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go index 0380497ea6..7f2acf3deb 100644 --- a/modules/queue/queue_bytefifo.go +++ b/modules/queue/queue_bytefifo.go @@ -92,7 +92,7 @@ func (q *ByteFIFOQueue) Push(data Data) error { // PushBack pushes data to the fifo func (q *ByteFIFOQueue) PushBack(data Data) error { if !assignableTo(data, q.exemplar) { - return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name) + return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name) } bs, err := json.Marshal(data) if err != nil { @@ -110,7 +110,7 @@ func (q *ByteFIFOQueue) PushBack(data Data) error { // PushFunc pushes data to the fifo func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error { if !assignableTo(data, q.exemplar) { - return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name) + return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name) } bs, err := json.Marshal(data) if err != nil { @@ -398,7 +398,7 @@ func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFun // Has checks if the provided data is in the queue func (q *ByteFIFOUniqueQueue) Has(data Data) (bool, error) { if !assignableTo(data, q.exemplar) { - return false, fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name) + return false, fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name) } bs, err := json.Marshal(data) if err != nil { diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go index 7de9c17c86..105388f421 100644 --- a/modules/queue/queue_channel.go +++ b/modules/queue/queue_channel.go @@ -93,7 +93,7 @@ func (q *ChannelQueue) Run(atShutdown, atTerminate func(func())) { // Push will push data into the queue func (q *ChannelQueue) Push(data Data) error { if !assignableTo(data, q.exemplar) { - return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name) + return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name) } q.WorkerPool.Push(data) return nil diff --git a/modules/queue/queue_wrapped.go b/modules/queue/queue_wrapped.go index edb589338a..02f7818aa8 100644 --- a/modules/queue/queue_wrapped.go +++ b/modules/queue/queue_wrapped.go @@ -59,7 +59,7 @@ func (q *delayedStarter) setInternal(atShutdown func(func()), handle HandlerFunc if s, ok := cfg.([]byte); ok { cfg = string(s) } - return fmt.Errorf("Timedout creating queue %v with cfg %#v in %s", q.underlying, cfg, q.name) + return fmt.Errorf("timedout creating queue %v with cfg %#v in %s", q.underlying, cfg, q.name) default: queue, err := NewQueue(q.underlying, handle, q.cfg, exemplar) if err == nil { @@ -76,9 +76,9 @@ func (q *delayedStarter) setInternal(atShutdown func(func()), handle HandlerFunc i++ if q.maxAttempts > 0 && i > q.maxAttempts { if bs, ok := q.cfg.([]byte); ok { - return fmt.Errorf("Unable to create queue %v for %s with cfg %s by max attempts: error: %v", q.underlying, q.name, string(bs), err) + return fmt.Errorf("unable to create queue %v for %s with cfg %s by max attempts: error: %v", q.underlying, q.name, string(bs), err) } - return fmt.Errorf("Unable to create queue %v for %s with cfg %#v by max attempts: error: %v", q.underlying, q.name, q.cfg, err) + return fmt.Errorf("unable to create queue %v for %s with cfg %#v by max attempts: error: %v", q.underlying, q.name, q.cfg, err) } sleepTime := 100 * time.Millisecond if q.timeout > 0 && q.maxAttempts > 0 { @@ -271,6 +271,46 @@ func (q *WrappedQueue) Terminate() { log.Debug("WrappedQueue: %s Terminated", q.name) } +// IsPaused will return if the pool or queue is paused +func (q *WrappedQueue) IsPaused() bool { + q.lock.Lock() + defer q.lock.Unlock() + pausable, ok := q.internal.(Pausable) + return ok && pausable.IsPaused() +} + +// Pause will pause the pool or queue +func (q *WrappedQueue) Pause() { + q.lock.Lock() + defer q.lock.Unlock() + if pausable, ok := q.internal.(Pausable); ok { + pausable.Pause() + } +} + +// Resume will resume the pool or queue +func (q *WrappedQueue) Resume() { + q.lock.Lock() + defer q.lock.Unlock() + if pausable, ok := q.internal.(Pausable); ok { + pausable.Resume() + } +} + +// IsPausedIsResumed will return a bool indicating if the pool or queue is paused and a channel that will be closed when it is resumed +func (q *WrappedQueue) IsPausedIsResumed() (paused, resumed <-chan struct{}) { + q.lock.Lock() + defer q.lock.Unlock() + if pausable, ok := q.internal.(Pausable); ok { + return pausable.IsPausedIsResumed() + } + return context.Background().Done(), closedChan +} + +var closedChan chan struct{} + func init() { queuesMap[WrappedQueueType] = NewWrappedQueue + closedChan = make(chan struct{}) + close(closedChan) } diff --git a/modules/queue/setting.go b/modules/queue/setting.go index 61f156c377..880770f073 100644 --- a/modules/queue/setting.go +++ b/modules/queue/setting.go @@ -22,7 +22,7 @@ func validType(t string) (Type, error) { return typ, nil } } - return PersistableChannelQueueType, fmt.Errorf("Unknown queue type: %s defaulting to %s", t, string(PersistableChannelQueueType)) + return PersistableChannelQueueType, fmt.Errorf("unknown queue type: %s defaulting to %s", t, string(PersistableChannelQueueType)) } func getQueueSettings(name string) (setting.QueueSettings, []byte) { diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go index b6d2e770fc..59210855a1 100644 --- a/modules/queue/unique_queue_channel.go +++ b/modules/queue/unique_queue_channel.go @@ -111,7 +111,7 @@ func (q *ChannelUniqueQueue) Push(data Data) error { // PushFunc will push data into the queue func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error { if !assignableTo(data, q.exemplar) { - return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name) + return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name) } bs, err := json.Marshal(data) diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go index 7fc304b17e..ac7919926f 100644 --- a/modules/queue/unique_queue_disk_channel.go +++ b/modules/queue/unique_queue_disk_channel.go @@ -239,6 +239,26 @@ func (q *PersistableChannelUniqueQueue) IsEmpty() bool { return q.channelQueue.IsEmpty() } +// IsPaused will return if the pool or queue is paused +func (q *PersistableChannelUniqueQueue) IsPaused() bool { + return q.channelQueue.IsPaused() +} + +// Pause will pause the pool or queue +func (q *PersistableChannelUniqueQueue) Pause() { + q.channelQueue.Pause() +} + +// Resume will resume the pool or queue +func (q *PersistableChannelUniqueQueue) Resume() { + q.channelQueue.Resume() +} + +// IsPausedIsResumed will return a bool indicating if the pool or queue is paused and a channel that will be closed when it is resumed +func (q *PersistableChannelUniqueQueue) IsPausedIsResumed() (paused, resumed <-chan struct{}) { + return q.channelQueue.IsPausedIsResumed() +} + // Shutdown processing this queue func (q *PersistableChannelUniqueQueue) Shutdown() { log.Trace("PersistableChannelUniqueQueue: %s Shutting down", q.delayedStarter.name) diff --git a/modules/queue/unique_queue_wrapped.go b/modules/queue/unique_queue_wrapped.go index 32fa9ed970..5245a35f77 100644 --- a/modules/queue/unique_queue_wrapped.go +++ b/modules/queue/unique_queue_wrapped.go @@ -105,7 +105,7 @@ func (q *WrappedUniqueQueue) Push(data Data) error { // PushFunc will push the data to the internal channel checking it against the exemplar func (q *WrappedUniqueQueue) PushFunc(data Data, fn func() error) error { if !assignableTo(data, q.exemplar) { - return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name) + return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name) } q.tlock.Lock() diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index da56216dcb..30dc8073c9 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -57,14 +57,12 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo ctx, cancel := context.WithCancel(context.Background()) dataChan := make(chan Data, config.QueueLength) - resumed := make(chan struct{}) - close(resumed) pool := &WorkerPool{ baseCtx: ctx, baseCtxCancel: cancel, batchLength: config.BatchLength, dataChan: dataChan, - resumed: resumed, + resumed: closedChan, paused: make(chan struct{}), handle: handle, blockTimeout: config.BlockTimeout, -- cgit v1.2.3