aboutsummaryrefslogtreecommitdiffstats
path: root/modules/queue/queue_bytefifo.go
diff options
context:
space:
mode:
authorzeripath <art27@cantab.net>2021-05-08 17:29:47 +0100
committerGitHub <noreply@github.com>2021-05-08 17:29:47 +0100
commite22ee468cf0e2d751baa859a643453a59f3c7896 (patch)
tree8afc3929fc498365dd15ac18f17174e6bd1d5422 /modules/queue/queue_bytefifo.go
parent2a9b8d173a286a7155306d7d1df6328d1eb98199 (diff)
downloadgitea-e22ee468cf0e2d751baa859a643453a59f3c7896.tar.gz
gitea-e22ee468cf0e2d751baa859a643453a59f3c7896.zip
Exponential Backoff for ByteFIFO (#15724)
This PR is another in the vein of queue improvements. It suggests an exponential backoff for bytefifo queues to reduce the load from queue polling. This will mostly be useful for redis queues. Signed-off-by: Andrew Thornton <art27@cantab.net> Co-authored-by: Lauris BH <lauris@nix.lv>
Diffstat (limited to 'modules/queue/queue_bytefifo.go')
-rw-r--r--modules/queue/queue_bytefifo.go88
1 files changed, 59 insertions, 29 deletions
diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go
index bc86078493..fe1fb7807e 100644
--- a/modules/queue/queue_bytefifo.go
+++ b/modules/queue/queue_bytefifo.go
@@ -114,43 +114,73 @@ func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(context.Context, func()
}
func (q *ByteFIFOQueue) readToChan() {
+ // handle quick cancels
+ select {
+ case <-q.closed:
+ // tell the pool to shutdown.
+ q.cancel()
+ return
+ default:
+ }
+
+ backOffTime := time.Millisecond * 100
+ maxBackOffTime := time.Second * 3
for {
- select {
- case <-q.closed:
- // tell the pool to shutdown.
- q.cancel()
- return
- default:
- q.lock.Lock()
- bs, err := q.byteFIFO.Pop()
- if err != nil {
- q.lock.Unlock()
- log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err)
- time.Sleep(time.Millisecond * 100)
- continue
- }
+ success, resetBackoff := q.doPop()
+ if resetBackoff {
+ backOffTime = 100 * time.Millisecond
+ }
- if len(bs) == 0 {
- q.lock.Unlock()
- time.Sleep(time.Millisecond * 100)
- continue
+ if success {
+ select {
+ case <-q.closed:
+ // tell the pool to shutdown.
+ q.cancel()
+ return
+ default:
}
-
- data, err := unmarshalAs(bs, q.exemplar)
- if err != nil {
- log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err)
- q.lock.Unlock()
- time.Sleep(time.Millisecond * 100)
- continue
+ } else {
+ select {
+ case <-q.closed:
+ // tell the pool to shutdown.
+ q.cancel()
+ return
+ case <-time.After(backOffTime):
+ }
+ backOffTime += backOffTime / 2
+ if backOffTime > maxBackOffTime {
+ backOffTime = maxBackOffTime
}
-
- log.Trace("%s %s: Task found: %#v", q.typ, q.name, data)
- q.WorkerPool.Push(data)
- q.lock.Unlock()
}
}
}
+func (q *ByteFIFOQueue) doPop() (success, resetBackoff bool) {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+ bs, err := q.byteFIFO.Pop()
+ if err != nil {
+ log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err)
+ return
+ }
+ if len(bs) == 0 {
+ return
+ }
+
+ resetBackoff = true
+
+ data, err := unmarshalAs(bs, q.exemplar)
+ if err != nil {
+ log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err)
+ return
+ }
+
+ log.Trace("%s %s: Task found: %#v", q.typ, q.name, data)
+ q.WorkerPool.Push(data)
+ success = true
+ return
+}
+
// Shutdown processing from this queue
func (q *ByteFIFOQueue) Shutdown() {
log.Trace("%s: %s Shutting down", q.typ, q.name)