summaryrefslogtreecommitdiffstats
path: root/modules/queue/queue_channel.go
diff options
context:
space:
mode:
authorzeripath <art27@cantab.net>2022-01-22 21:22:14 +0000
committerGitHub <noreply@github.com>2022-01-22 21:22:14 +0000
commita82fd98d5368a75cbcf6b74c12f58f3f81e66662 (patch)
treecb64c9348ee3d3194c786bb970770c06a8bd4fb1 /modules/queue/queue_channel.go
parent27ee01e1e866f2f13603af65224ddae77d5149d7 (diff)
downloadgitea-a82fd98d5368a75cbcf6b74c12f58f3f81e66662.tar.gz
gitea-a82fd98d5368a75cbcf6b74c12f58f3f81e66662.zip
Pause queues (#15928)
* Start adding mechanism to return unhandled data Signed-off-by: Andrew Thornton <art27@cantab.net> * Create pushback interface Signed-off-by: Andrew Thornton <art27@cantab.net> * Add Pausable interface to WorkerPool and Manager Signed-off-by: Andrew Thornton <art27@cantab.net> * Implement Pausable and PushBack for the bytefifos Signed-off-by: Andrew Thornton <art27@cantab.net> * Implement Pausable and Pushback for ChannelQueues and ChannelUniqueQueues Signed-off-by: Andrew Thornton <art27@cantab.net> * Wire in UI for pausing Signed-off-by: Andrew Thornton <art27@cantab.net> * add testcases and fix a few issues Signed-off-by: Andrew Thornton <art27@cantab.net> * fix build Signed-off-by: Andrew Thornton <art27@cantab.net> * prevent "race" in the test Signed-off-by: Andrew Thornton <art27@cantab.net> * fix jsoniter mismerge Signed-off-by: Andrew Thornton <art27@cantab.net> * fix conflicts Signed-off-by: Andrew Thornton <art27@cantab.net> * fix format Signed-off-by: Andrew Thornton <art27@cantab.net> * Add warnings for no worker configurations and prevent data-loss with redis/levelqueue Signed-off-by: Andrew Thornton <art27@cantab.net> * Use StopTimer Signed-off-by: Andrew Thornton <art27@cantab.net> Co-authored-by: Lauris BH <lauris@nix.lv> Co-authored-by: 6543 <6543@obermui.de> Co-authored-by: techknowlogick <techknowlogick@gitea.io> Co-authored-by: wxiaoguang <wxiaoguang@gmail.com>
Diffstat (limited to 'modules/queue/queue_channel.go')
-rw-r--r--modules/queue/queue_channel.go54
1 files changed, 53 insertions, 1 deletions
diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go
index 4df64b69ee..7de9c17c86 100644
--- a/modules/queue/queue_channel.go
+++ b/modules/queue/queue_channel.go
@@ -7,6 +7,8 @@ package queue
import (
"context"
"fmt"
+ "sync/atomic"
+ "time"
"code.gitea.io/gitea/modules/log"
)
@@ -51,7 +53,6 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro
shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
queue := &ChannelQueue{
- WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
shutdownCtx: shutdownCtx,
shutdownCtxCancel: shutdownCtxCancel,
terminateCtx: terminateCtx,
@@ -60,6 +61,23 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro
workers: config.Workers,
name: config.Name,
}
+ queue.WorkerPool = NewWorkerPool(func(data ...Data) []Data {
+ unhandled := handle(data...)
+ if len(unhandled) > 0 {
+ // We can only pushback to the channel if we're paused.
+ if queue.IsPaused() {
+ atomic.AddInt64(&queue.numInQueue, int64(len(unhandled)))
+ go func() {
+ for _, datum := range data {
+ queue.dataChan <- datum
+ }
+ }()
+ return nil
+ }
+ }
+ return unhandled
+ }, config.WorkerPoolConfiguration)
+
queue.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar)
return queue, nil
}
@@ -81,6 +99,39 @@ func (q *ChannelQueue) Push(data Data) error {
return nil
}
+// Flush flushes the channel with a timeout - the Flush worker will be registered as a flush worker with the manager
+func (q *ChannelQueue) Flush(timeout time.Duration) error {
+ if q.IsPaused() {
+ return nil
+ }
+ ctx, cancel := q.commonRegisterWorkers(1, timeout, true)
+ defer cancel()
+ return q.FlushWithContext(ctx)
+}
+
+// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
+func (q *ChannelQueue) FlushWithContext(ctx context.Context) error {
+ log.Trace("ChannelQueue: %d Flush", q.qid)
+ paused, _ := q.IsPausedIsResumed()
+ for {
+ select {
+ case <-paused:
+ return nil
+ case data := <-q.dataChan:
+ if unhandled := q.handle(data); unhandled != nil {
+ log.Error("Unhandled Data whilst flushing queue %d", q.qid)
+ }
+ atomic.AddInt64(&q.numInQueue, -1)
+ case <-q.baseCtx.Done():
+ return q.baseCtx.Err()
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ return nil
+ }
+ }
+}
+
// Shutdown processing from this queue
func (q *ChannelQueue) Shutdown() {
q.lock.Lock()
@@ -94,6 +145,7 @@ func (q *ChannelQueue) Shutdown() {
log.Trace("ChannelQueue: %s Shutting down", q.name)
go func() {
log.Trace("ChannelQueue: %s Flushing", q.name)
+ // We can't use Cleanup here because that will close the channel
if err := q.FlushWithContext(q.terminateCtx); err != nil {
log.Warn("ChannelQueue: %s Terminated before completed flushing", q.name)
return