1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
// Copyright 2019 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package queue
import (
"fmt"
"code.gitea.io/gitea/modules/graceful"
"github.com/go-redis/redis/v8"
)
// RedisUniqueQueueType is the type for redis queue
const RedisUniqueQueueType Type = "unique-redis"
// RedisUniqueQueue redis queue
type RedisUniqueQueue struct {
*ByteFIFOUniqueQueue
}
// RedisUniqueQueueConfiguration is the configuration for the redis queue
type RedisUniqueQueueConfiguration struct {
ByteFIFOQueueConfiguration
RedisUniqueByteFIFOConfiguration
}
// NewRedisUniqueQueue creates single redis or cluster redis queue.
//
// Please note that this Queue does not guarantee that a particular
// task cannot be processed twice or more at the same time. Uniqueness is
// only guaranteed whilst the task is waiting in the queue.
func NewRedisUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
configInterface, err := toConfig(RedisUniqueQueueConfiguration{}, cfg)
if err != nil {
return nil, err
}
config := configInterface.(RedisUniqueQueueConfiguration)
byteFIFO, err := NewRedisUniqueByteFIFO(config.RedisUniqueByteFIFOConfiguration)
if err != nil {
return nil, err
}
if len(byteFIFO.setName) == 0 {
byteFIFO.setName = byteFIFO.queueName + "_unique"
}
byteFIFOQueue, err := NewByteFIFOUniqueQueue(RedisUniqueQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar)
if err != nil {
return nil, err
}
byteFIFO.ctx = graceful.NewChannelContext(byteFIFOQueue.IsTerminated(), fmt.Errorf("queue has been terminated"))
queue := &RedisUniqueQueue{
ByteFIFOUniqueQueue: byteFIFOQueue,
}
queue.qid = GetManager().Add(queue, RedisUniqueQueueType, config, exemplar)
return queue, nil
}
var _ UniqueByteFIFO = &RedisUniqueByteFIFO{}
// RedisUniqueByteFIFO represents a UniqueByteFIFO formed from a redisClient
type RedisUniqueByteFIFO struct {
RedisByteFIFO
setName string
}
// RedisUniqueByteFIFOConfiguration is the configuration for the RedisUniqueByteFIFO
type RedisUniqueByteFIFOConfiguration struct {
RedisByteFIFOConfiguration
SetName string
}
// NewRedisUniqueByteFIFO creates a UniqueByteFIFO formed from a redisClient
func NewRedisUniqueByteFIFO(config RedisUniqueByteFIFOConfiguration) (*RedisUniqueByteFIFO, error) {
internal, err := NewRedisByteFIFO(config.RedisByteFIFOConfiguration)
if err != nil {
return nil, err
}
fifo := &RedisUniqueByteFIFO{
RedisByteFIFO: *internal,
setName: config.SetName,
}
return fifo, nil
}
// PushFunc pushes data to the end of the fifo and calls the callback if it is added
func (fifo *RedisUniqueByteFIFO) PushFunc(data []byte, fn func() error) error {
added, err := fifo.client.SAdd(fifo.ctx, fifo.setName, data).Result()
if err != nil {
return err
}
if added == 0 {
return ErrAlreadyInQueue
}
if fn != nil {
if err := fn(); err != nil {
return err
}
}
return fifo.client.RPush(fifo.ctx, fifo.queueName, data).Err()
}
// Pop pops data from the start of the fifo
func (fifo *RedisUniqueByteFIFO) Pop() ([]byte, error) {
data, err := fifo.client.LPop(fifo.ctx, fifo.queueName).Bytes()
if err != nil && err != redis.Nil {
return data, err
}
if len(data) == 0 {
return data, nil
}
err = fifo.client.SRem(fifo.ctx, fifo.setName, data).Err()
return data, err
}
// Has returns whether the fifo contains this data
func (fifo *RedisUniqueByteFIFO) Has(data []byte) (bool, error) {
return fifo.client.SIsMember(fifo.ctx, fifo.setName, data).Result()
}
func init() {
queuesMap[RedisUniqueQueueType] = NewRedisUniqueQueue
}
|