]> source.dussan.org Git - gitea.git/commitdiff
Exponential Backoff for ByteFIFO (#15724)
authorzeripath <art27@cantab.net>
Sat, 8 May 2021 16:29:47 +0000 (17:29 +0100)
committerGitHub <noreply@github.com>
Sat, 8 May 2021 16:29:47 +0000 (17:29 +0100)
This PR is another in the vein of queue improvements. It suggests an
exponential backoff for bytefifo queues to reduce the load from queue
polling. This will mostly be useful for redis queues.

Signed-off-by: Andrew Thornton <art27@cantab.net>
Co-authored-by: Lauris BH <lauris@nix.lv>
modules/queue/queue_bytefifo.go

index bc86078493307982059541a66f884c57b59981ce..fe1fb7807e831ca652d423ee6e2982380a1d044a 100644 (file)
@@ -114,43 +114,73 @@ func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(context.Context, func()
 }
 
 func (q *ByteFIFOQueue) readToChan() {
+       // handle quick cancels
+       select {
+       case <-q.closed:
+               // tell the pool to shutdown.
+               q.cancel()
+               return
+       default:
+       }
+
+       backOffTime := time.Millisecond * 100
+       maxBackOffTime := time.Second * 3
        for {
-               select {
-               case <-q.closed:
-                       // tell the pool to shutdown.
-                       q.cancel()
-                       return
-               default:
-                       q.lock.Lock()
-                       bs, err := q.byteFIFO.Pop()
-                       if err != nil {
-                               q.lock.Unlock()
-                               log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err)
-                               time.Sleep(time.Millisecond * 100)
-                               continue
-                       }
+               success, resetBackoff := q.doPop()
+               if resetBackoff {
+                       backOffTime = 100 * time.Millisecond
+               }
 
-                       if len(bs) == 0 {
-                               q.lock.Unlock()
-                               time.Sleep(time.Millisecond * 100)
-                               continue
+               if success {
+                       select {
+                       case <-q.closed:
+                               // tell the pool to shutdown.
+                               q.cancel()
+                               return
+                       default:
                        }
-
-                       data, err := unmarshalAs(bs, q.exemplar)
-                       if err != nil {
-                               log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err)
-                               q.lock.Unlock()
-                               time.Sleep(time.Millisecond * 100)
-                               continue
+               } else {
+                       select {
+                       case <-q.closed:
+                               // tell the pool to shutdown.
+                               q.cancel()
+                               return
+                       case <-time.After(backOffTime):
+                       }
+                       backOffTime += backOffTime / 2
+                       if backOffTime > maxBackOffTime {
+                               backOffTime = maxBackOffTime
                        }
-
-                       log.Trace("%s %s: Task found: %#v", q.typ, q.name, data)
-                       q.WorkerPool.Push(data)
-                       q.lock.Unlock()
                }
        }
 }
 
+func (q *ByteFIFOQueue) doPop() (success, resetBackoff bool) {
+       q.lock.Lock()
+       defer q.lock.Unlock()
+       bs, err := q.byteFIFO.Pop()
+       if err != nil {
+               log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err)
+               return
+       }
+       if len(bs) == 0 {
+               return
+       }
+
+       resetBackoff = true
+
+       data, err := unmarshalAs(bs, q.exemplar)
+       if err != nil {
+               log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err)
+               return
+       }
+
+       log.Trace("%s %s: Task found: %#v", q.typ, q.name, data)
+       q.WorkerPool.Push(data)
+       success = true
+       return
+}
+
 // Shutdown processing from this queue
 func (q *ByteFIFOQueue) Shutdown() {
        log.Trace("%s: %s Shutting down", q.typ, q.name)