diff options
author | zeripath <art27@cantab.net> | 2022-01-22 21:22:14 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-22 21:22:14 +0000 |
commit | a82fd98d5368a75cbcf6b74c12f58f3f81e66662 (patch) | |
tree | cb64c9348ee3d3194c786bb970770c06a8bd4fb1 /modules/queue/queue_channel.go | |
parent | 27ee01e1e866f2f13603af65224ddae77d5149d7 (diff) | |
download | gitea-a82fd98d5368a75cbcf6b74c12f58f3f81e66662.tar.gz gitea-a82fd98d5368a75cbcf6b74c12f58f3f81e66662.zip |
Pause queues (#15928)
* Start adding mechanism to return unhandled data
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Create pushback interface
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Add Pausable interface to WorkerPool and Manager
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Implement Pausable and PushBack for the bytefifos
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Implement Pausable and Pushback for ChannelQueues and ChannelUniqueQueues
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Wire in UI for pausing
Signed-off-by: Andrew Thornton <art27@cantab.net>
* add testcases and fix a few issues
Signed-off-by: Andrew Thornton <art27@cantab.net>
* fix build
Signed-off-by: Andrew Thornton <art27@cantab.net>
* prevent "race" in the test
Signed-off-by: Andrew Thornton <art27@cantab.net>
* fix jsoniter mismerge
Signed-off-by: Andrew Thornton <art27@cantab.net>
* fix conflicts
Signed-off-by: Andrew Thornton <art27@cantab.net>
* fix format
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Add warnings for no worker configurations and prevent data-loss with redis/levelqueue
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Use StopTimer
Signed-off-by: Andrew Thornton <art27@cantab.net>
Co-authored-by: Lauris BH <lauris@nix.lv>
Co-authored-by: 6543 <6543@obermui.de>
Co-authored-by: techknowlogick <techknowlogick@gitea.io>
Co-authored-by: wxiaoguang <wxiaoguang@gmail.com>
Diffstat (limited to 'modules/queue/queue_channel.go')
-rw-r--r-- | modules/queue/queue_channel.go | 54 |
1 files changed, 53 insertions, 1 deletions
diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go index 4df64b69ee..7de9c17c86 100644 --- a/modules/queue/queue_channel.go +++ b/modules/queue/queue_channel.go @@ -7,6 +7,8 @@ package queue import ( "context" "fmt" + "sync/atomic" + "time" "code.gitea.io/gitea/modules/log" ) @@ -51,7 +53,6 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx) queue := &ChannelQueue{ - WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), shutdownCtx: shutdownCtx, shutdownCtxCancel: shutdownCtxCancel, terminateCtx: terminateCtx, @@ -60,6 +61,23 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro workers: config.Workers, name: config.Name, } + queue.WorkerPool = NewWorkerPool(func(data ...Data) []Data { + unhandled := handle(data...) + if len(unhandled) > 0 { + // We can only pushback to the channel if we're paused. + if queue.IsPaused() { + atomic.AddInt64(&queue.numInQueue, int64(len(unhandled))) + go func() { + for _, datum := range data { + queue.dataChan <- datum + } + }() + return nil + } + } + return unhandled + }, config.WorkerPoolConfiguration) + queue.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar) return queue, nil } @@ -81,6 +99,39 @@ func (q *ChannelQueue) Push(data Data) error { return nil } +// Flush flushes the channel with a timeout - the Flush worker will be registered as a flush worker with the manager +func (q *ChannelQueue) Flush(timeout time.Duration) error { + if q.IsPaused() { + return nil + } + ctx, cancel := q.commonRegisterWorkers(1, timeout, true) + defer cancel() + return q.FlushWithContext(ctx) +} + +// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty +func (q *ChannelQueue) FlushWithContext(ctx context.Context) error { + log.Trace("ChannelQueue: %d Flush", q.qid) + paused, _ := q.IsPausedIsResumed() + for { + select { + case <-paused: + return nil + case data := <-q.dataChan: + if unhandled := q.handle(data); unhandled != nil { + log.Error("Unhandled Data whilst flushing queue %d", q.qid) + } + atomic.AddInt64(&q.numInQueue, -1) + case <-q.baseCtx.Done(): + return q.baseCtx.Err() + case <-ctx.Done(): + return ctx.Err() + default: + return nil + } + } +} + // Shutdown processing from this queue func (q *ChannelQueue) Shutdown() { q.lock.Lock() @@ -94,6 +145,7 @@ func (q *ChannelQueue) Shutdown() { log.Trace("ChannelQueue: %s Shutting down", q.name) go func() { log.Trace("ChannelQueue: %s Flushing", q.name) + // We can't use Cleanup here because that will close the channel if err := q.FlushWithContext(q.terminateCtx); err != nil { log.Warn("ChannelQueue: %s Terminated before completed flushing", q.name) return |