Implements the Pausable interface on WrappedQueues and PersistableChannelUniqueQueues
Reference #15928
Signed-off-by: Andrew Thornton art27@cantab.net
func NewQueue(queueType Type, handlerFunc HandlerFunc, opts, exemplar interface{}) (Queue, error) {
newFn, ok := queuesMap[queueType]
if !ok {
- return nil, fmt.Errorf("Unsupported queue type: %v", queueType)
+ return nil, fmt.Errorf("unsupported queue type: %v", queueType)
}
return newFn(handlerFunc, opts, exemplar)
}
// PushBack pushes data to the fifo
func (q *ByteFIFOQueue) PushBack(data Data) error {
if !assignableTo(data, q.exemplar) {
- return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
+ return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
}
bs, err := json.Marshal(data)
if err != nil {
// PushFunc pushes data to the fifo
func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error {
if !assignableTo(data, q.exemplar) {
- return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
+ return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
}
bs, err := json.Marshal(data)
if err != nil {
// Has checks if the provided data is in the queue
func (q *ByteFIFOUniqueQueue) Has(data Data) (bool, error) {
if !assignableTo(data, q.exemplar) {
- return false, fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
+ return false, fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
}
bs, err := json.Marshal(data)
if err != nil {
// Push will push data into the queue
func (q *ChannelQueue) Push(data Data) error {
if !assignableTo(data, q.exemplar) {
- return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
+ return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
}
q.WorkerPool.Push(data)
return nil
if s, ok := cfg.([]byte); ok {
cfg = string(s)
}
- return fmt.Errorf("Timedout creating queue %v with cfg %#v in %s", q.underlying, cfg, q.name)
+ return fmt.Errorf("timedout creating queue %v with cfg %#v in %s", q.underlying, cfg, q.name)
default:
queue, err := NewQueue(q.underlying, handle, q.cfg, exemplar)
if err == nil {
i++
if q.maxAttempts > 0 && i > q.maxAttempts {
if bs, ok := q.cfg.([]byte); ok {
- return fmt.Errorf("Unable to create queue %v for %s with cfg %s by max attempts: error: %v", q.underlying, q.name, string(bs), err)
+ return fmt.Errorf("unable to create queue %v for %s with cfg %s by max attempts: error: %v", q.underlying, q.name, string(bs), err)
}
- return fmt.Errorf("Unable to create queue %v for %s with cfg %#v by max attempts: error: %v", q.underlying, q.name, q.cfg, err)
+ return fmt.Errorf("unable to create queue %v for %s with cfg %#v by max attempts: error: %v", q.underlying, q.name, q.cfg, err)
}
sleepTime := 100 * time.Millisecond
if q.timeout > 0 && q.maxAttempts > 0 {
log.Debug("WrappedQueue: %s Terminated", q.name)
}
+// IsPaused will return if the pool or queue is paused
+func (q *WrappedQueue) IsPaused() bool {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+ pausable, ok := q.internal.(Pausable)
+ return ok && pausable.IsPaused()
+}
+
+// Pause will pause the pool or queue
+func (q *WrappedQueue) Pause() {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+ if pausable, ok := q.internal.(Pausable); ok {
+ pausable.Pause()
+ }
+}
+
+// Resume will resume the pool or queue
+func (q *WrappedQueue) Resume() {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+ if pausable, ok := q.internal.(Pausable); ok {
+ pausable.Resume()
+ }
+}
+
+// IsPausedIsResumed will return a bool indicating if the pool or queue is paused and a channel that will be closed when it is resumed
+func (q *WrappedQueue) IsPausedIsResumed() (paused, resumed <-chan struct{}) {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+ if pausable, ok := q.internal.(Pausable); ok {
+ return pausable.IsPausedIsResumed()
+ }
+ return context.Background().Done(), closedChan
+}
+
+var closedChan chan struct{}
+
func init() {
queuesMap[WrappedQueueType] = NewWrappedQueue
+ closedChan = make(chan struct{})
+ close(closedChan)
}
return typ, nil
}
}
- return PersistableChannelQueueType, fmt.Errorf("Unknown queue type: %s defaulting to %s", t, string(PersistableChannelQueueType))
+ return PersistableChannelQueueType, fmt.Errorf("unknown queue type: %s defaulting to %s", t, string(PersistableChannelQueueType))
}
func getQueueSettings(name string) (setting.QueueSettings, []byte) {
// PushFunc will push data into the queue
func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
if !assignableTo(data, q.exemplar) {
- return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
+ return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
}
bs, err := json.Marshal(data)
return q.channelQueue.IsEmpty()
}
+// IsPaused will return if the pool or queue is paused
+func (q *PersistableChannelUniqueQueue) IsPaused() bool {
+ return q.channelQueue.IsPaused()
+}
+
+// Pause will pause the pool or queue
+func (q *PersistableChannelUniqueQueue) Pause() {
+ q.channelQueue.Pause()
+}
+
+// Resume will resume the pool or queue
+func (q *PersistableChannelUniqueQueue) Resume() {
+ q.channelQueue.Resume()
+}
+
+// IsPausedIsResumed will return a bool indicating if the pool or queue is paused and a channel that will be closed when it is resumed
+func (q *PersistableChannelUniqueQueue) IsPausedIsResumed() (paused, resumed <-chan struct{}) {
+ return q.channelQueue.IsPausedIsResumed()
+}
+
// Shutdown processing this queue
func (q *PersistableChannelUniqueQueue) Shutdown() {
log.Trace("PersistableChannelUniqueQueue: %s Shutting down", q.delayedStarter.name)
// PushFunc will push the data to the internal channel checking it against the exemplar
func (q *WrappedUniqueQueue) PushFunc(data Data, fn func() error) error {
if !assignableTo(data, q.exemplar) {
- return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
+ return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
}
q.tlock.Lock()
ctx, cancel := context.WithCancel(context.Background())
dataChan := make(chan Data, config.QueueLength)
- resumed := make(chan struct{})
- close(resumed)
pool := &WorkerPool{
baseCtx: ctx,
baseCtxCancel: cancel,
batchLength: config.BatchLength,
dataChan: dataChan,
- resumed: resumed,
+ resumed: closedChan,
paused: make(chan struct{}),
handle: handle,
blockTimeout: config.BlockTimeout,