diff options
author | zeripath <art27@cantab.net> | 2020-02-02 23:19:58 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-02 23:19:58 +0000 |
commit | 2c903383b5154795b90e4b4ed8eaadc6fac17a13 (patch) | |
tree | d5ca361d9597e027ad92f1e02a841be1d266b554 /modules/queue/queue_redis.go | |
parent | b4914249ee389a733e7dcfd2df20708ab3215827 (diff) | |
download | gitea-2c903383b5154795b90e4b4ed8eaadc6fac17a13.tar.gz gitea-2c903383b5154795b90e4b4ed8eaadc6fac17a13.zip |
Add Unique Queue infrastructure and move TestPullRequests to this (#9856)
* Upgrade levelqueue to version 0.2.0
This adds functionality for Unique Queues
* Add UniqueQueue interface and functions to create them
* Add UniqueQueue implementations
* Move TestPullRequests over to use UniqueQueue
* Reduce code duplication
* Add bytefifos
* Ensure invalid types are logged
* Fix close race in PersistableChannelQueue Shutdown
Diffstat (limited to 'modules/queue/queue_redis.go')
-rw-r--r-- | modules/queue/queue_redis.go | 238 |
1 files changed, 79 insertions, 159 deletions
diff --git a/modules/queue/queue_redis.go b/modules/queue/queue_redis.go index 0167c1ec49..8a395cd5aa 100644 --- a/modules/queue/queue_redis.go +++ b/modules/queue/queue_redis.go @@ -5,14 +5,8 @@ package queue import ( - "context" - "encoding/json" "errors" - "fmt" "strings" - "sync" - "sync/atomic" - "time" "code.gitea.io/gitea/modules/log" @@ -22,204 +16,130 @@ import ( // RedisQueueType is the type for redis queue const RedisQueueType Type = "redis" +// RedisQueueConfiguration is the configuration for the redis queue +type RedisQueueConfiguration struct { + ByteFIFOQueueConfiguration + RedisByteFIFOConfiguration +} + +// RedisQueue redis queue +type RedisQueue struct { + *ByteFIFOQueue +} + +// NewRedisQueue creates single redis or cluster redis queue +func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) { + configInterface, err := toConfig(RedisQueueConfiguration{}, cfg) + if err != nil { + return nil, err + } + config := configInterface.(RedisQueueConfiguration) + + byteFIFO, err := NewRedisByteFIFO(config.RedisByteFIFOConfiguration) + if err != nil { + return nil, err + } + + byteFIFOQueue, err := NewByteFIFOQueue(RedisQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar) + if err != nil { + return nil, err + } + + queue := &RedisQueue{ + ByteFIFOQueue: byteFIFOQueue, + } + + queue.qid = GetManager().Add(queue, RedisQueueType, config, exemplar) + + return queue, nil +} + type redisClient interface { RPush(key string, args ...interface{}) *redis.IntCmd LPop(key string) *redis.StringCmd LLen(key string) *redis.IntCmd + SAdd(key string, members ...interface{}) *redis.IntCmd + SRem(key string, members ...interface{}) *redis.IntCmd + SIsMember(key string, member interface{}) *redis.BoolCmd Ping() *redis.StatusCmd Close() error } -// RedisQueue redis queue -type RedisQueue struct { - *WorkerPool - client redisClient - queueName string - closed chan struct{} - terminated chan struct{} - exemplar interface{} - workers int - name string - lock sync.Mutex +var _ (ByteFIFO) = &RedisByteFIFO{} + +// RedisByteFIFO represents a ByteFIFO formed from a redisClient +type RedisByteFIFO struct { + client redisClient + queueName string } -// RedisQueueConfiguration is the configuration for the redis queue -type RedisQueueConfiguration struct { - WorkerPoolConfiguration +// RedisByteFIFOConfiguration is the configuration for the RedisByteFIFO +type RedisByteFIFOConfiguration struct { Network string Addresses string Password string DBIndex int QueueName string - Workers int - Name string } -// NewRedisQueue creates single redis or cluster redis queue -func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) { - configInterface, err := toConfig(RedisQueueConfiguration{}, cfg) - if err != nil { - return nil, err +// NewRedisByteFIFO creates a ByteFIFO formed from a redisClient +func NewRedisByteFIFO(config RedisByteFIFOConfiguration) (*RedisByteFIFO, error) { + fifo := &RedisByteFIFO{ + queueName: config.QueueName, } - config := configInterface.(RedisQueueConfiguration) - dbs := strings.Split(config.Addresses, ",") - - var queue = &RedisQueue{ - WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), - queueName: config.QueueName, - exemplar: exemplar, - closed: make(chan struct{}), - terminated: make(chan struct{}), - workers: config.Workers, - name: config.Name, - } if len(dbs) == 0 { return nil, errors.New("no redis host specified") } else if len(dbs) == 1 { - queue.client = redis.NewClient(&redis.Options{ + fifo.client = redis.NewClient(&redis.Options{ Network: config.Network, Addr: strings.TrimSpace(dbs[0]), // use default Addr Password: config.Password, // no password set DB: config.DBIndex, // use default DB }) } else { - queue.client = redis.NewClusterClient(&redis.ClusterOptions{ + fifo.client = redis.NewClusterClient(&redis.ClusterOptions{ Addrs: dbs, }) } - if err := queue.client.Ping().Err(); err != nil { + if err := fifo.client.Ping().Err(); err != nil { return nil, err } - queue.qid = GetManager().Add(queue, RedisQueueType, config, exemplar) - - return queue, nil + return fifo, nil } -// Run runs the redis queue -func (r *RedisQueue) Run(atShutdown, atTerminate func(context.Context, func())) { - atShutdown(context.Background(), r.Shutdown) - atTerminate(context.Background(), r.Terminate) - log.Debug("RedisQueue: %s Starting", r.name) - - go func() { - _ = r.AddWorkers(r.workers, 0) - }() - - go r.readToChan() - - log.Trace("RedisQueue: %s Waiting til closed", r.name) - <-r.closed - log.Trace("RedisQueue: %s Waiting til done", r.name) - r.Wait() - - log.Trace("RedisQueue: %s Waiting til cleaned", r.name) - ctx, cancel := context.WithCancel(context.Background()) - atTerminate(ctx, cancel) - r.CleanUp(ctx) - cancel() -} - -func (r *RedisQueue) readToChan() { - for { - select { - case <-r.closed: - // tell the pool to shutdown - r.cancel() - return - default: - atomic.AddInt64(&r.numInQueue, 1) - bs, err := r.client.LPop(r.queueName).Bytes() - if err != nil && err != redis.Nil { - log.Error("RedisQueue: %s Error on LPop: %v", r.name, err) - atomic.AddInt64(&r.numInQueue, -1) - time.Sleep(time.Millisecond * 100) - continue - } - - if len(bs) == 0 { - atomic.AddInt64(&r.numInQueue, -1) - time.Sleep(time.Millisecond * 100) - continue - } - - data, err := unmarshalAs(bs, r.exemplar) - if err != nil { - log.Error("RedisQueue: %s Error on Unmarshal: %v", r.name, err) - atomic.AddInt64(&r.numInQueue, -1) - time.Sleep(time.Millisecond * 100) - continue - } - - log.Trace("RedisQueue: %s Task found: %#v", r.name, data) - r.WorkerPool.Push(data) - atomic.AddInt64(&r.numInQueue, -1) +// PushFunc pushes data to the end of the fifo and calls the callback if it is added +func (fifo *RedisByteFIFO) PushFunc(data []byte, fn func() error) error { + if fn != nil { + if err := fn(); err != nil { + return err } } + return fifo.client.RPush(fifo.queueName, data).Err() } -// Push implements Queue -func (r *RedisQueue) Push(data Data) error { - if !assignableTo(data, r.exemplar) { - return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, r.exemplar, r.name) - } - bs, err := json.Marshal(data) - if err != nil { - return err - } - return r.client.RPush(r.queueName, bs).Err() -} - -// IsEmpty checks if the queue is empty -func (r *RedisQueue) IsEmpty() bool { - if !r.WorkerPool.IsEmpty() { - return false +// Pop pops data from the start of the fifo +func (fifo *RedisByteFIFO) Pop() ([]byte, error) { + data, err := fifo.client.LPop(fifo.queueName).Bytes() + if err != nil && err == redis.Nil { + return data, nil } - length, err := r.client.LLen(r.queueName).Result() - if err != nil { - log.Error("Error whilst getting queue length for %s: Error: %v", r.name, err) - return false - } - return length == 0 + return data, err } -// Shutdown processing from this queue -func (r *RedisQueue) Shutdown() { - log.Trace("RedisQueue: %s Shutting down", r.name) - r.lock.Lock() - select { - case <-r.closed: - default: - close(r.closed) - } - r.lock.Unlock() - log.Debug("RedisQueue: %s Shutdown", r.name) +// Close this fifo +func (fifo *RedisByteFIFO) Close() error { + return fifo.client.Close() } -// Terminate this queue and close the queue -func (r *RedisQueue) Terminate() { - log.Trace("RedisQueue: %s Terminating", r.name) - r.Shutdown() - r.lock.Lock() - select { - case <-r.terminated: - r.lock.Unlock() - default: - close(r.terminated) - r.lock.Unlock() - if log.IsDebug() { - log.Debug("RedisQueue: %s Closing with %d tasks left in queue", r.name, r.client.LLen(r.queueName)) - } - if err := r.client.Close(); err != nil { - log.Error("Error whilst closing internal redis client in %s: %v", r.name, err) - } +// Len returns the length of the fifo +func (fifo *RedisByteFIFO) Len() int64 { + val, err := fifo.client.LLen(fifo.queueName).Result() + if err != nil { + log.Error("Error whilst getting length of redis queue %s: Error: %v", fifo.queueName, err) + return -1 } - log.Debug("RedisQueue: %s Terminated", r.name) -} - -// Name returns the name of this queue -func (r *RedisQueue) Name() string { - return r.name + return val } func init() { |