summaryrefslogtreecommitdiffstats
path: root/modules/queue/queue_bytefifo.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/queue/queue_bytefifo.go')
-rw-r--r--modules/queue/queue_bytefifo.go146
1 files changed, 118 insertions, 28 deletions
diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go
index c4d5d20a89..0380497ea6 100644
--- a/modules/queue/queue_bytefifo.go
+++ b/modules/queue/queue_bytefifo.go
@@ -8,10 +8,12 @@ import (
"context"
"fmt"
"sync"
+ "sync/atomic"
"time"
"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/util"
)
// ByteFIFOQueueConfiguration is the configuration for a ByteFIFOQueue
@@ -52,8 +54,7 @@ func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exem
terminateCtx, terminateCtxCancel := context.WithCancel(context.Background())
shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
- return &ByteFIFOQueue{
- WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
+ q := &ByteFIFOQueue{
byteFIFO: byteFIFO,
typ: typ,
shutdownCtx: shutdownCtx,
@@ -65,7 +66,17 @@ func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exem
name: config.Name,
waitOnEmpty: config.WaitOnEmpty,
pushed: make(chan struct{}, 1),
- }, nil
+ }
+ q.WorkerPool = NewWorkerPool(func(data ...Data) (failed []Data) {
+ for _, unhandled := range handle(data...) {
+ if fail := q.PushBack(unhandled); fail != nil {
+ failed = append(failed, fail)
+ }
+ }
+ return
+ }, config.WorkerPoolConfiguration)
+
+ return q, nil
}
// Name returns the name of this queue
@@ -78,6 +89,24 @@ func (q *ByteFIFOQueue) Push(data Data) error {
return q.PushFunc(data, nil)
}
+// PushBack pushes data to the fifo
+func (q *ByteFIFOQueue) PushBack(data Data) error {
+ if !assignableTo(data, q.exemplar) {
+ return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
+ }
+ bs, err := json.Marshal(data)
+ if err != nil {
+ return err
+ }
+ defer func() {
+ select {
+ case q.pushed <- struct{}{}:
+ default:
+ }
+ }()
+ return q.byteFIFO.PushBack(q.terminateCtx, bs)
+}
+
// PushFunc pushes data to the fifo
func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error {
if !assignableTo(data, q.exemplar) {
@@ -87,14 +116,12 @@ func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error {
if err != nil {
return err
}
- if q.waitOnEmpty {
- defer func() {
- select {
- case q.pushed <- struct{}{}:
- default:
- }
- }()
- }
+ defer func() {
+ select {
+ case q.pushed <- struct{}{}:
+ default:
+ }
+ }()
return q.byteFIFO.PushFunc(q.terminateCtx, bs, fn)
}
@@ -108,6 +135,15 @@ func (q *ByteFIFOQueue) IsEmpty() bool {
return q.byteFIFO.Len(q.terminateCtx) == 0
}
+// Flush flushes the ByteFIFOQueue
+func (q *ByteFIFOQueue) Flush(timeout time.Duration) error {
+ select {
+ case q.pushed <- struct{}{}:
+ default:
+ }
+ return q.WorkerPool.Flush(timeout)
+}
+
// Run runs the bytefifo queue
func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(func())) {
atShutdown(q.Shutdown)
@@ -142,31 +178,67 @@ func (q *ByteFIFOQueue) readToChan() {
// Default backoff values
backOffTime := time.Millisecond * 100
+ backOffTimer := time.NewTimer(0)
+ util.StopTimer(backOffTimer)
+
+ paused, _ := q.IsPausedIsResumed()
loop:
for {
- err := q.doPop()
- if err == errQueueEmpty {
- log.Trace("%s: %s Waiting on Empty", q.typ, q.name)
+ select {
+ case <-paused:
+ log.Trace("Queue %s pausing", q.name)
+ _, resumed := q.IsPausedIsResumed()
+
select {
- case <-q.pushed:
- // reset backOffTime
- backOffTime = 100 * time.Millisecond
- continue loop
+ case <-resumed:
+ paused, _ = q.IsPausedIsResumed()
+ log.Trace("Queue %s resuming", q.name)
+ if q.HasNoWorkerScaling() {
+ log.Warn(
+ "Queue: %s is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n"+
+ "The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required.", q.name)
+ q.Pause()
+ continue loop
+ }
case <-q.shutdownCtx.Done():
- // Oops we've been shutdown whilst waiting
- // Make sure the worker pool is shutdown too
+ // tell the pool to shutdown.
q.baseCtxCancel()
return
+ case data := <-q.dataChan:
+ if err := q.PushBack(data); err != nil {
+ log.Error("Unable to push back data into queue %s", q.name)
+ }
+ atomic.AddInt64(&q.numInQueue, -1)
}
+ default:
}
- // Reset the backOffTime if there is no error or an unmarshalError
- if err == nil || err == errUnmarshal {
- backOffTime = 100 * time.Millisecond
+ // empty the pushed channel
+ select {
+ case <-q.pushed:
+ default:
}
+ err := q.doPop()
+
+ util.StopTimer(backOffTimer)
+
if err != nil {
+ if err == errQueueEmpty && q.waitOnEmpty {
+ log.Trace("%s: %s Waiting on Empty", q.typ, q.name)
+
+ // reset the backoff time but don't set the timer
+ backOffTime = 100 * time.Millisecond
+ } else if err == errUnmarshal {
+ // reset the timer and backoff
+ backOffTime = 100 * time.Millisecond
+ backOffTimer.Reset(backOffTime)
+ } else {
+ // backoff
+ backOffTimer.Reset(backOffTime)
+ }
+
// Need to Backoff
select {
case <-q.shutdownCtx.Done():
@@ -174,8 +246,13 @@ loop:
// Make sure the worker pool is shutdown too
q.baseCtxCancel()
return
- case <-time.After(backOffTime):
- // OK we've waited - so backoff a bit
+ case <-q.pushed:
+ // Data has been pushed to the fifo (or flush has been called)
+ // reset the backoff time
+ backOffTime = 100 * time.Millisecond
+ continue loop
+ case <-backOffTimer.C:
+ // Calculate the next backoff time
backOffTime += backOffTime / 2
if backOffTime > maxBackOffTime {
backOffTime = maxBackOffTime
@@ -183,6 +260,10 @@ loop:
continue loop
}
}
+
+ // Reset the backoff time
+ backOffTime = 100 * time.Millisecond
+
select {
case <-q.shutdownCtx.Done():
// Oops we've been shutdown
@@ -289,9 +370,8 @@ func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFun
terminateCtx, terminateCtxCancel := context.WithCancel(context.Background())
shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
- return &ByteFIFOUniqueQueue{
+ q := &ByteFIFOUniqueQueue{
ByteFIFOQueue: ByteFIFOQueue{
- WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
byteFIFO: byteFIFO,
typ: typ,
shutdownCtx: shutdownCtx,
@@ -302,7 +382,17 @@ func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFun
workers: config.Workers,
name: config.Name,
},
- }, nil
+ }
+ q.WorkerPool = NewWorkerPool(func(data ...Data) (failed []Data) {
+ for _, unhandled := range handle(data...) {
+ if fail := q.PushBack(unhandled); fail != nil {
+ failed = append(failed, fail)
+ }
+ }
+ return
+ }, config.WorkerPoolConfiguration)
+
+ return q, nil
}
// Has checks if the provided data is in the queue