From a82fd98d5368a75cbcf6b74c12f58f3f81e66662 Mon Sep 17 00:00:00 2001 From: zeripath Date: Sat, 22 Jan 2022 21:22:14 +0000 Subject: Pause queues (#15928) * Start adding mechanism to return unhandled data Signed-off-by: Andrew Thornton * Create pushback interface Signed-off-by: Andrew Thornton * Add Pausable interface to WorkerPool and Manager Signed-off-by: Andrew Thornton * Implement Pausable and PushBack for the bytefifos Signed-off-by: Andrew Thornton * Implement Pausable and Pushback for ChannelQueues and ChannelUniqueQueues Signed-off-by: Andrew Thornton * Wire in UI for pausing Signed-off-by: Andrew Thornton * add testcases and fix a few issues Signed-off-by: Andrew Thornton * fix build Signed-off-by: Andrew Thornton * prevent "race" in the test Signed-off-by: Andrew Thornton * fix jsoniter mismerge Signed-off-by: Andrew Thornton * fix conflicts Signed-off-by: Andrew Thornton * fix format Signed-off-by: Andrew Thornton * Add warnings for no worker configurations and prevent data-loss with redis/levelqueue Signed-off-by: Andrew Thornton * Use StopTimer Signed-off-by: Andrew Thornton Co-authored-by: Lauris BH Co-authored-by: 6543 <6543@obermui.de> Co-authored-by: techknowlogick Co-authored-by: wxiaoguang --- modules/queue/unique_queue_redis.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) (limited to 'modules/queue/unique_queue_redis.go') diff --git a/modules/queue/unique_queue_redis.go b/modules/queue/unique_queue_redis.go index 7474c09665..477d5dd81f 100644 --- a/modules/queue/unique_queue_redis.go +++ b/modules/queue/unique_queue_redis.go @@ -105,6 +105,18 @@ func (fifo *RedisUniqueByteFIFO) PushFunc(ctx context.Context, data []byte, fn f return fifo.client.RPush(ctx, fifo.queueName, data).Err() } +// PushBack pushes data to the top of the fifo +func (fifo *RedisUniqueByteFIFO) PushBack(ctx context.Context, data []byte) error { + added, err := fifo.client.SAdd(ctx, fifo.setName, data).Result() + if err != nil { + return err + } + if added == 0 { + return ErrAlreadyInQueue + } + return fifo.client.LPush(ctx, fifo.queueName, data).Err() +} + // Pop pops data from the start of the fifo func (fifo *RedisUniqueByteFIFO) Pop(ctx context.Context) ([]byte, error) { data, err := fifo.client.LPop(ctx, fifo.queueName).Bytes() -- cgit v1.2.3