}
func (q *ByteFIFOQueue) readToChan() {
+ // handle quick cancels
+ select {
+ case <-q.closed:
+ // tell the pool to shutdown.
+ q.cancel()
+ return
+ default:
+ }
+
+ backOffTime := time.Millisecond * 100
+ maxBackOffTime := time.Second * 3
for {
- select {
- case <-q.closed:
- // tell the pool to shutdown.
- q.cancel()
- return
- default:
- q.lock.Lock()
- bs, err := q.byteFIFO.Pop()
- if err != nil {
- q.lock.Unlock()
- log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err)
- time.Sleep(time.Millisecond * 100)
- continue
- }
+ success, resetBackoff := q.doPop()
+ if resetBackoff {
+ backOffTime = 100 * time.Millisecond
+ }
- if len(bs) == 0 {
- q.lock.Unlock()
- time.Sleep(time.Millisecond * 100)
- continue
+ if success {
+ select {
+ case <-q.closed:
+ // tell the pool to shutdown.
+ q.cancel()
+ return
+ default:
}
-
- data, err := unmarshalAs(bs, q.exemplar)
- if err != nil {
- log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err)
- q.lock.Unlock()
- time.Sleep(time.Millisecond * 100)
- continue
+ } else {
+ select {
+ case <-q.closed:
+ // tell the pool to shutdown.
+ q.cancel()
+ return
+ case <-time.After(backOffTime):
+ }
+ backOffTime += backOffTime / 2
+ if backOffTime > maxBackOffTime {
+ backOffTime = maxBackOffTime
}
-
- log.Trace("%s %s: Task found: %#v", q.typ, q.name, data)
- q.WorkerPool.Push(data)
- q.lock.Unlock()
}
}
}
+func (q *ByteFIFOQueue) doPop() (success, resetBackoff bool) {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+ bs, err := q.byteFIFO.Pop()
+ if err != nil {
+ log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err)
+ return
+ }
+ if len(bs) == 0 {
+ return
+ }
+
+ resetBackoff = true
+
+ data, err := unmarshalAs(bs, q.exemplar)
+ if err != nil {
+ log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err)
+ return
+ }
+
+ log.Trace("%s %s: Task found: %#v", q.typ, q.name, data)
+ q.WorkerPool.Push(data)
+ success = true
+ return
+}
+
// Shutdown processing from this queue
func (q *ByteFIFOQueue) Shutdown() {
log.Trace("%s: %s Shutting down", q.typ, q.name)