diff options
Diffstat (limited to 'modules/queue/queue_redis.go')
-rw-r--r-- | modules/queue/queue_redis.go | 102 |
1 files changed, 47 insertions, 55 deletions
diff --git a/modules/queue/queue_redis.go b/modules/queue/queue_redis.go index 7d3efb9cff..0167c1ec49 100644 --- a/modules/queue/queue_redis.go +++ b/modules/queue/queue_redis.go @@ -9,9 +9,9 @@ import ( "encoding/json" "errors" "fmt" - "reflect" "strings" "sync" + "sync/atomic" "time" "code.gitea.io/gitea/modules/log" @@ -25,13 +25,14 @@ const RedisQueueType Type = "redis" type redisClient interface { RPush(key string, args ...interface{}) *redis.IntCmd LPop(key string) *redis.StringCmd + LLen(key string) *redis.IntCmd Ping() *redis.StatusCmd Close() error } // RedisQueue redis queue type RedisQueue struct { - pool *WorkerPool + *WorkerPool client redisClient queueName string closed chan struct{} @@ -44,19 +45,14 @@ type RedisQueue struct { // RedisQueueConfiguration is the configuration for the redis queue type RedisQueueConfiguration struct { - Network string - Addresses string - Password string - DBIndex int - BatchLength int - QueueLength int - QueueName string - Workers int - MaxWorkers int - BlockTimeout time.Duration - BoostTimeout time.Duration - BoostWorkers int - Name string + WorkerPoolConfiguration + Network string + Addresses string + Password string + DBIndex int + QueueName string + Workers int + Name string } // NewRedisQueue creates single redis or cluster redis queue @@ -69,21 +65,8 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) dbs := strings.Split(config.Addresses, ",") - dataChan := make(chan Data, config.QueueLength) - ctx, cancel := context.WithCancel(context.Background()) - var queue = &RedisQueue{ - pool: &WorkerPool{ - baseCtx: ctx, - cancel: cancel, - batchLength: config.BatchLength, - handle: handle, - dataChan: dataChan, - blockTimeout: config.BlockTimeout, - boostTimeout: config.BoostTimeout, - boostWorkers: config.BoostWorkers, - maxNumberOfWorkers: config.MaxWorkers, - }, + WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), queueName: config.QueueName, exemplar: exemplar, closed: make(chan struct{}), @@ -108,7 +91,7 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) if err := queue.client.Ping().Err(); err != nil { return nil, err } - queue.pool.qid = GetManager().Add(queue, RedisQueueType, config, exemplar, queue.pool) + queue.qid = GetManager().Add(queue, RedisQueueType, config, exemplar) return queue, nil } @@ -117,9 +100,10 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) func (r *RedisQueue) Run(atShutdown, atTerminate func(context.Context, func())) { atShutdown(context.Background(), r.Shutdown) atTerminate(context.Background(), r.Terminate) + log.Debug("RedisQueue: %s Starting", r.name) go func() { - _ = r.pool.AddWorkers(r.workers, 0) + _ = r.AddWorkers(r.workers, 0) }() go r.readToChan() @@ -127,12 +111,12 @@ func (r *RedisQueue) Run(atShutdown, atTerminate func(context.Context, func())) log.Trace("RedisQueue: %s Waiting til closed", r.name) <-r.closed log.Trace("RedisQueue: %s Waiting til done", r.name) - r.pool.Wait() + r.Wait() log.Trace("RedisQueue: %s Waiting til cleaned", r.name) ctx, cancel := context.WithCancel(context.Background()) atTerminate(ctx, cancel) - r.pool.CleanUp(ctx) + r.CleanUp(ctx) cancel() } @@ -141,53 +125,43 @@ func (r *RedisQueue) readToChan() { select { case <-r.closed: // tell the pool to shutdown - r.pool.cancel() + r.cancel() return default: + atomic.AddInt64(&r.numInQueue, 1) bs, err := r.client.LPop(r.queueName).Bytes() if err != nil && err != redis.Nil { log.Error("RedisQueue: %s Error on LPop: %v", r.name, err) + atomic.AddInt64(&r.numInQueue, -1) time.Sleep(time.Millisecond * 100) continue } if len(bs) == 0 { + atomic.AddInt64(&r.numInQueue, -1) time.Sleep(time.Millisecond * 100) continue } - var data Data - if r.exemplar != nil { - t := reflect.TypeOf(r.exemplar) - n := reflect.New(t) - ne := n.Elem() - err = json.Unmarshal(bs, ne.Addr().Interface()) - data = ne.Interface().(Data) - } else { - err = json.Unmarshal(bs, &data) - } + data, err := unmarshalAs(bs, r.exemplar) if err != nil { log.Error("RedisQueue: %s Error on Unmarshal: %v", r.name, err) + atomic.AddInt64(&r.numInQueue, -1) time.Sleep(time.Millisecond * 100) continue } log.Trace("RedisQueue: %s Task found: %#v", r.name, data) - r.pool.Push(data) + r.WorkerPool.Push(data) + atomic.AddInt64(&r.numInQueue, -1) } } } // Push implements Queue func (r *RedisQueue) Push(data Data) error { - if r.exemplar != nil { - // Assert data is of same type as r.exemplar - value := reflect.ValueOf(data) - t := value.Type() - exemplarType := reflect.ValueOf(r.exemplar).Type() - if !t.AssignableTo(exemplarType) || data == nil { - return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, r.exemplar, r.name) - } + if !assignableTo(data, r.exemplar) { + return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, r.exemplar, r.name) } bs, err := json.Marshal(data) if err != nil { @@ -196,9 +170,22 @@ func (r *RedisQueue) Push(data Data) error { return r.client.RPush(r.queueName, bs).Err() } +// IsEmpty checks if the queue is empty +func (r *RedisQueue) IsEmpty() bool { + if !r.WorkerPool.IsEmpty() { + return false + } + length, err := r.client.LLen(r.queueName).Result() + if err != nil { + log.Error("Error whilst getting queue length for %s: Error: %v", r.name, err) + return false + } + return length == 0 +} + // Shutdown processing from this queue func (r *RedisQueue) Shutdown() { - log.Trace("Shutdown: %s", r.name) + log.Trace("RedisQueue: %s Shutting down", r.name) r.lock.Lock() select { case <-r.closed: @@ -206,11 +193,12 @@ func (r *RedisQueue) Shutdown() { close(r.closed) } r.lock.Unlock() + log.Debug("RedisQueue: %s Shutdown", r.name) } // Terminate this queue and close the queue func (r *RedisQueue) Terminate() { - log.Trace("Terminating: %s", r.name) + log.Trace("RedisQueue: %s Terminating", r.name) r.Shutdown() r.lock.Lock() select { @@ -219,10 +207,14 @@ func (r *RedisQueue) Terminate() { default: close(r.terminated) r.lock.Unlock() + if log.IsDebug() { + log.Debug("RedisQueue: %s Closing with %d tasks left in queue", r.name, r.client.LLen(r.queueName)) + } if err := r.client.Close(); err != nil { log.Error("Error whilst closing internal redis client in %s: %v", r.name, err) } } + log.Debug("RedisQueue: %s Terminated", r.name) } // Name returns the name of this queue |