diff options
Diffstat (limited to 'modules/queue/queue_bytefifo.go')
-rw-r--r-- | modules/queue/queue_bytefifo.go | 146 |
1 files changed, 118 insertions, 28 deletions
diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go index c4d5d20a89..0380497ea6 100644 --- a/modules/queue/queue_bytefifo.go +++ b/modules/queue/queue_bytefifo.go @@ -8,10 +8,12 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "code.gitea.io/gitea/modules/json" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/util" ) // ByteFIFOQueueConfiguration is the configuration for a ByteFIFOQueue @@ -52,8 +54,7 @@ func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exem terminateCtx, terminateCtxCancel := context.WithCancel(context.Background()) shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx) - return &ByteFIFOQueue{ - WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), + q := &ByteFIFOQueue{ byteFIFO: byteFIFO, typ: typ, shutdownCtx: shutdownCtx, @@ -65,7 +66,17 @@ func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exem name: config.Name, waitOnEmpty: config.WaitOnEmpty, pushed: make(chan struct{}, 1), - }, nil + } + q.WorkerPool = NewWorkerPool(func(data ...Data) (failed []Data) { + for _, unhandled := range handle(data...) { + if fail := q.PushBack(unhandled); fail != nil { + failed = append(failed, fail) + } + } + return + }, config.WorkerPoolConfiguration) + + return q, nil } // Name returns the name of this queue @@ -78,6 +89,24 @@ func (q *ByteFIFOQueue) Push(data Data) error { return q.PushFunc(data, nil) } +// PushBack pushes data to the fifo +func (q *ByteFIFOQueue) PushBack(data Data) error { + if !assignableTo(data, q.exemplar) { + return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name) + } + bs, err := json.Marshal(data) + if err != nil { + return err + } + defer func() { + select { + case q.pushed <- struct{}{}: + default: + } + }() + return q.byteFIFO.PushBack(q.terminateCtx, bs) +} + // PushFunc pushes data to the fifo func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error { if !assignableTo(data, q.exemplar) { @@ -87,14 +116,12 @@ func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error { if err != nil { return err } - if q.waitOnEmpty { - defer func() { - select { - case q.pushed <- struct{}{}: - default: - } - }() - } + defer func() { + select { + case q.pushed <- struct{}{}: + default: + } + }() return q.byteFIFO.PushFunc(q.terminateCtx, bs, fn) } @@ -108,6 +135,15 @@ func (q *ByteFIFOQueue) IsEmpty() bool { return q.byteFIFO.Len(q.terminateCtx) == 0 } +// Flush flushes the ByteFIFOQueue +func (q *ByteFIFOQueue) Flush(timeout time.Duration) error { + select { + case q.pushed <- struct{}{}: + default: + } + return q.WorkerPool.Flush(timeout) +} + // Run runs the bytefifo queue func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(func())) { atShutdown(q.Shutdown) @@ -142,31 +178,67 @@ func (q *ByteFIFOQueue) readToChan() { // Default backoff values backOffTime := time.Millisecond * 100 + backOffTimer := time.NewTimer(0) + util.StopTimer(backOffTimer) + + paused, _ := q.IsPausedIsResumed() loop: for { - err := q.doPop() - if err == errQueueEmpty { - log.Trace("%s: %s Waiting on Empty", q.typ, q.name) + select { + case <-paused: + log.Trace("Queue %s pausing", q.name) + _, resumed := q.IsPausedIsResumed() + select { - case <-q.pushed: - // reset backOffTime - backOffTime = 100 * time.Millisecond - continue loop + case <-resumed: + paused, _ = q.IsPausedIsResumed() + log.Trace("Queue %s resuming", q.name) + if q.HasNoWorkerScaling() { + log.Warn( + "Queue: %s is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n"+ + "The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required.", q.name) + q.Pause() + continue loop + } case <-q.shutdownCtx.Done(): - // Oops we've been shutdown whilst waiting - // Make sure the worker pool is shutdown too + // tell the pool to shutdown. q.baseCtxCancel() return + case data := <-q.dataChan: + if err := q.PushBack(data); err != nil { + log.Error("Unable to push back data into queue %s", q.name) + } + atomic.AddInt64(&q.numInQueue, -1) } + default: } - // Reset the backOffTime if there is no error or an unmarshalError - if err == nil || err == errUnmarshal { - backOffTime = 100 * time.Millisecond + // empty the pushed channel + select { + case <-q.pushed: + default: } + err := q.doPop() + + util.StopTimer(backOffTimer) + if err != nil { + if err == errQueueEmpty && q.waitOnEmpty { + log.Trace("%s: %s Waiting on Empty", q.typ, q.name) + + // reset the backoff time but don't set the timer + backOffTime = 100 * time.Millisecond + } else if err == errUnmarshal { + // reset the timer and backoff + backOffTime = 100 * time.Millisecond + backOffTimer.Reset(backOffTime) + } else { + // backoff + backOffTimer.Reset(backOffTime) + } + // Need to Backoff select { case <-q.shutdownCtx.Done(): @@ -174,8 +246,13 @@ loop: // Make sure the worker pool is shutdown too q.baseCtxCancel() return - case <-time.After(backOffTime): - // OK we've waited - so backoff a bit + case <-q.pushed: + // Data has been pushed to the fifo (or flush has been called) + // reset the backoff time + backOffTime = 100 * time.Millisecond + continue loop + case <-backOffTimer.C: + // Calculate the next backoff time backOffTime += backOffTime / 2 if backOffTime > maxBackOffTime { backOffTime = maxBackOffTime @@ -183,6 +260,10 @@ loop: continue loop } } + + // Reset the backoff time + backOffTime = 100 * time.Millisecond + select { case <-q.shutdownCtx.Done(): // Oops we've been shutdown @@ -289,9 +370,8 @@ func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFun terminateCtx, terminateCtxCancel := context.WithCancel(context.Background()) shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx) - return &ByteFIFOUniqueQueue{ + q := &ByteFIFOUniqueQueue{ ByteFIFOQueue: ByteFIFOQueue{ - WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), byteFIFO: byteFIFO, typ: typ, shutdownCtx: shutdownCtx, @@ -302,7 +382,17 @@ func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFun workers: config.Workers, name: config.Name, }, - }, nil + } + q.WorkerPool = NewWorkerPool(func(data ...Data) (failed []Data) { + for _, unhandled := range handle(data...) { + if fail := q.PushBack(unhandled); fail != nil { + failed = append(failed, fail) + } + } + return + }, config.WorkerPoolConfiguration) + + return q, nil } // Has checks if the provided data is in the queue |