diff options
Diffstat (limited to 'modules/queue')
-rw-r--r-- | modules/queue/helper.go | 63 | ||||
-rw-r--r-- | modules/queue/manager.go | 159 | ||||
-rw-r--r-- | modules/queue/queue.go | 43 | ||||
-rw-r--r-- | modules/queue/queue_channel.go | 59 | ||||
-rw-r--r-- | modules/queue/queue_channel_test.go | 30 | ||||
-rw-r--r-- | modules/queue/queue_disk.go | 87 | ||||
-rw-r--r-- | modules/queue/queue_disk_channel.go | 126 | ||||
-rw-r--r-- | modules/queue/queue_disk_test.go | 36 | ||||
-rw-r--r-- | modules/queue/queue_redis.go | 102 | ||||
-rw-r--r-- | modules/queue/queue_wrapped.go | 97 | ||||
-rw-r--r-- | modules/queue/setting.go | 24 | ||||
-rw-r--r-- | modules/queue/workerpool.go | 94 |
12 files changed, 628 insertions, 292 deletions
diff --git a/modules/queue/helper.go b/modules/queue/helper.go new file mode 100644 index 0000000000..e6fb1b94f9 --- /dev/null +++ b/modules/queue/helper.go @@ -0,0 +1,63 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "encoding/json" + "reflect" +) + +// toConfig will attempt to convert a given configuration cfg into the provided exemplar type. +// +// It will tolerate the cfg being passed as a []byte or string of a json representation of the +// exemplar or the correct type of the exemplar itself +func toConfig(exemplar, cfg interface{}) (interface{}, error) { + if reflect.TypeOf(cfg).AssignableTo(reflect.TypeOf(exemplar)) { + return cfg, nil + } + + configBytes, ok := cfg.([]byte) + if !ok { + configStr, ok := cfg.(string) + if !ok { + return nil, ErrInvalidConfiguration{cfg: cfg} + } + configBytes = []byte(configStr) + } + newVal := reflect.New(reflect.TypeOf(exemplar)) + if err := json.Unmarshal(configBytes, newVal.Interface()); err != nil { + return nil, ErrInvalidConfiguration{cfg: cfg, err: err} + } + return newVal.Elem().Interface(), nil +} + +// unmarshalAs will attempt to unmarshal provided bytes as the provided exemplar +func unmarshalAs(bs []byte, exemplar interface{}) (data Data, err error) { + if exemplar != nil { + t := reflect.TypeOf(exemplar) + n := reflect.New(t) + ne := n.Elem() + err = json.Unmarshal(bs, ne.Addr().Interface()) + data = ne.Interface().(Data) + } else { + err = json.Unmarshal(bs, &data) + } + + return +} + +// assignableTo will check if provided data is assignable to the same type as the exemplar +// if the provided exemplar is nil then it will always return true +func assignableTo(data Data, exemplar interface{}) bool { + if exemplar == nil { + return true + } + + // Assert data is of same type as exemplar + t := reflect.TypeOf(data) + exemplarType := reflect.TypeOf(exemplar) + + return t.AssignableTo(exemplarType) && data != nil +} diff --git a/modules/queue/manager.go b/modules/queue/manager.go index 88b2644848..a6734787a9 100644 --- a/modules/queue/manager.go +++ b/modules/queue/manager.go @@ -26,36 +26,57 @@ type Manager struct { Queues map[int64]*ManagedQueue } -// ManagedQueue represents a working queue inheriting from Gitea. +// ManagedQueue represents a working queue with a Pool of workers. +// +// Although a ManagedQueue should really represent a Queue this does not +// necessarily have to be the case. This could be used to describe any queue.WorkerPool. type ManagedQueue struct { mutex sync.Mutex QID int64 - Queue Queue Type Type Name string Configuration interface{} ExemplarType string - Pool ManagedPool + Managed interface{} counter int64 PoolWorkers map[int64]*PoolWorkers } +// Flushable represents a pool or queue that is flushable +type Flushable interface { + // Flush will add a flush worker to the pool - the worker should be autoregistered with the manager + Flush(time.Duration) error + // FlushWithContext is very similar to Flush + // NB: The worker will not be registered with the manager. + FlushWithContext(ctx context.Context) error + // IsEmpty will return if the managed pool is empty and has no work + IsEmpty() bool +} + // ManagedPool is a simple interface to get certain details from a worker pool type ManagedPool interface { + // AddWorkers adds a number of worker as group to the pool with the provided timeout. A CancelFunc is provided to cancel the group AddWorkers(number int, timeout time.Duration) context.CancelFunc + // NumberOfWorkers returns the total number of workers in the pool NumberOfWorkers() int + // MaxNumberOfWorkers returns the maximum number of workers the pool can dynamically grow to MaxNumberOfWorkers() int + // SetMaxNumberOfWorkers sets the maximum number of workers the pool can dynamically grow to SetMaxNumberOfWorkers(int) + // BoostTimeout returns the current timeout for worker groups created during a boost BoostTimeout() time.Duration + // BlockTimeout returns the timeout the internal channel can block for before a boost would occur BlockTimeout() time.Duration + // BoostWorkers sets the number of workers to be created during a boost BoostWorkers() int - SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) + // SetPoolSettings sets the user updatable settings for the pool + SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) } // ManagedQueueList implements the sort.Interface type ManagedQueueList []*ManagedQueue -// PoolWorkers represents a working queue inheriting from Gitea. +// PoolWorkers represents a group of workers working on a queue type PoolWorkers struct { PID int64 Workers int @@ -63,9 +84,10 @@ type PoolWorkers struct { Timeout time.Time HasTimeout bool Cancel context.CancelFunc + IsFlusher bool } -// PoolWorkersList implements the sort.Interface +// PoolWorkersList implements the sort.Interface for PoolWorkers type PoolWorkersList []*PoolWorkers func init() { @@ -83,27 +105,28 @@ func GetManager() *Manager { } // Add adds a queue to this manager -func (m *Manager) Add(queue Queue, +func (m *Manager) Add(managed interface{}, t Type, configuration, - exemplar interface{}, - pool ManagedPool) int64 { + exemplar interface{}) int64 { cfg, _ := json.Marshal(configuration) mq := &ManagedQueue{ - Queue: queue, Type: t, Configuration: string(cfg), ExemplarType: reflect.TypeOf(exemplar).String(), PoolWorkers: make(map[int64]*PoolWorkers), - Pool: pool, + Managed: managed, } m.mutex.Lock() m.counter++ mq.QID = m.counter mq.Name = fmt.Sprintf("queue-%d", mq.QID) - if named, ok := queue.(Named); ok { - mq.Name = named.Name() + if named, ok := managed.(Named); ok { + name := named.Name() + if len(name) > 0 { + mq.Name = name + } } m.Queues[mq.QID] = mq m.mutex.Unlock() @@ -127,6 +150,64 @@ func (m *Manager) GetManagedQueue(qid int64) *ManagedQueue { return m.Queues[qid] } +// FlushAll flushes all the flushable queues attached to this manager +func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error { + var ctx context.Context + var cancel context.CancelFunc + start := time.Now() + end := start + hasTimeout := false + if timeout > 0 { + ctx, cancel = context.WithTimeout(baseCtx, timeout) + end = start.Add(timeout) + hasTimeout = true + } else { + ctx, cancel = context.WithCancel(baseCtx) + } + defer cancel() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + mqs := m.ManagedQueues() + wg := sync.WaitGroup{} + wg.Add(len(mqs)) + allEmpty := true + for _, mq := range mqs { + if mq.IsEmpty() { + wg.Done() + continue + } + allEmpty = false + if flushable, ok := mq.Managed.(Flushable); ok { + go func() { + localCtx, localCancel := context.WithCancel(ctx) + pid := mq.RegisterWorkers(1, start, hasTimeout, end, localCancel, true) + err := flushable.FlushWithContext(localCtx) + if err != nil && err != ctx.Err() { + cancel() + } + mq.CancelWorkers(pid) + localCancel() + wg.Done() + }() + } else { + wg.Done() + } + + } + if allEmpty { + break + } + wg.Wait() + } + return nil + +} + // ManagedQueues returns the managed queues func (m *Manager) ManagedQueues() []*ManagedQueue { m.mutex.Lock() @@ -152,7 +233,7 @@ func (q *ManagedQueue) Workers() []*PoolWorkers { } // RegisterWorkers registers workers to this queue -func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, cancel context.CancelFunc) int64 { +func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, cancel context.CancelFunc, isFlusher bool) int64 { q.mutex.Lock() defer q.mutex.Unlock() q.counter++ @@ -163,6 +244,7 @@ func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout b Timeout: timeout, HasTimeout: hasTimeout, Cancel: cancel, + IsFlusher: isFlusher, } return q.counter } @@ -191,57 +273,74 @@ func (q *ManagedQueue) RemoveWorkers(pid int64) { // AddWorkers adds workers to the queue if it has registered an add worker function func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.CancelFunc { - if q.Pool != nil { + if pool, ok := q.Managed.(ManagedPool); ok { // the cancel will be added to the pool workers description above - return q.Pool.AddWorkers(number, timeout) + return pool.AddWorkers(number, timeout) } return nil } +// Flush flushes the queue with a timeout +func (q *ManagedQueue) Flush(timeout time.Duration) error { + if flushable, ok := q.Managed.(Flushable); ok { + // the cancel will be added to the pool workers description above + return flushable.Flush(timeout) + } + return nil +} + +// IsEmpty returns if the queue is empty +func (q *ManagedQueue) IsEmpty() bool { + if flushable, ok := q.Managed.(Flushable); ok { + return flushable.IsEmpty() + } + return true +} + // NumberOfWorkers returns the number of workers in the queue func (q *ManagedQueue) NumberOfWorkers() int { - if q.Pool != nil { - return q.Pool.NumberOfWorkers() + if pool, ok := q.Managed.(ManagedPool); ok { + return pool.NumberOfWorkers() } return -1 } // MaxNumberOfWorkers returns the maximum number of workers for the pool func (q *ManagedQueue) MaxNumberOfWorkers() int { - if q.Pool != nil { - return q.Pool.MaxNumberOfWorkers() + if pool, ok := q.Managed.(ManagedPool); ok { + return pool.MaxNumberOfWorkers() } return 0 } // BoostWorkers returns the number of workers for a boost func (q *ManagedQueue) BoostWorkers() int { - if q.Pool != nil { - return q.Pool.BoostWorkers() + if pool, ok := q.Managed.(ManagedPool); ok { + return pool.BoostWorkers() } return -1 } // BoostTimeout returns the timeout of the next boost func (q *ManagedQueue) BoostTimeout() time.Duration { - if q.Pool != nil { - return q.Pool.BoostTimeout() + if pool, ok := q.Managed.(ManagedPool); ok { + return pool.BoostTimeout() } return 0 } // BlockTimeout returns the timeout til the next boost func (q *ManagedQueue) BlockTimeout() time.Duration { - if q.Pool != nil { - return q.Pool.BlockTimeout() + if pool, ok := q.Managed.(ManagedPool); ok { + return pool.BlockTimeout() } return 0 } -// SetSettings sets the setable boost values -func (q *ManagedQueue) SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) { - if q.Pool != nil { - q.Pool.SetSettings(maxNumberOfWorkers, boostWorkers, timeout) +// SetPoolSettings sets the setable boost values +func (q *ManagedQueue) SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) { + if pool, ok := q.Managed.(ManagedPool); ok { + pool.SetPoolSettings(maxNumberOfWorkers, boostWorkers, timeout) } } diff --git a/modules/queue/queue.go b/modules/queue/queue.go index d458a7d506..094699d4af 100644 --- a/modules/queue/queue.go +++ b/modules/queue/queue.go @@ -6,9 +6,8 @@ package queue import ( "context" - "encoding/json" "fmt" - "reflect" + "time" ) // ErrInvalidConfiguration is called when there is invalid configuration for a queue @@ -53,8 +52,11 @@ type Named interface { Name() string } -// Queue defines an interface to save an issue indexer queue +// Queue defines an interface of a queue-like item +// +// Queues will handle their own contents in the Run method type Queue interface { + Flushable Run(atShutdown, atTerminate func(context.Context, func())) Push(Data) error } @@ -71,32 +73,27 @@ func NewDummyQueue(handler HandlerFunc, opts, exemplar interface{}) (Queue, erro type DummyQueue struct { } -// Run starts to run the queue +// Run does nothing func (b *DummyQueue) Run(_, _ func(context.Context, func())) {} -// Push pushes data to the queue +// Push fakes a push of data to the queue func (b *DummyQueue) Push(Data) error { return nil } -func toConfig(exemplar, cfg interface{}) (interface{}, error) { - if reflect.TypeOf(cfg).AssignableTo(reflect.TypeOf(exemplar)) { - return cfg, nil - } +// Flush always returns nil +func (b *DummyQueue) Flush(time.Duration) error { + return nil +} - configBytes, ok := cfg.([]byte) - if !ok { - configStr, ok := cfg.(string) - if !ok { - return nil, ErrInvalidConfiguration{cfg: cfg} - } - configBytes = []byte(configStr) - } - newVal := reflect.New(reflect.TypeOf(exemplar)) - if err := json.Unmarshal(configBytes, newVal.Interface()); err != nil { - return nil, ErrInvalidConfiguration{cfg: cfg, err: err} - } - return newVal.Elem().Interface(), nil +// FlushWithContext always returns nil +func (b *DummyQueue) FlushWithContext(context.Context) error { + return nil +} + +// IsEmpty asserts that the queue is empty +func (b *DummyQueue) IsEmpty() bool { + return true } var queuesMap = map[Type]NewQueueFunc{DummyQueueType: NewDummyQueue} @@ -123,7 +120,7 @@ func RegisteredTypesAsString() []string { return types } -// NewQueue takes a queue Type and HandlerFunc some options and possibly an exemplar and returns a Queue or an error +// NewQueue takes a queue Type, HandlerFunc, some options and possibly an exemplar and returns a Queue or an error func NewQueue(queueType Type, handlerFunc HandlerFunc, opts, exemplar interface{}) (Queue, error) { newFn, ok := queuesMap[queueType] if !ok { diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go index c8f8a53804..45df8a443e 100644 --- a/modules/queue/queue_channel.go +++ b/modules/queue/queue_channel.go @@ -7,8 +7,6 @@ package queue import ( "context" "fmt" - "reflect" - "time" "code.gitea.io/gitea/modules/log" ) @@ -18,25 +16,23 @@ const ChannelQueueType Type = "channel" // ChannelQueueConfiguration is the configuration for a ChannelQueue type ChannelQueueConfiguration struct { - QueueLength int - BatchLength int - Workers int - MaxWorkers int - BlockTimeout time.Duration - BoostTimeout time.Duration - BoostWorkers int - Name string + WorkerPoolConfiguration + Workers int + Name string } -// ChannelQueue implements +// ChannelQueue implements Queue +// +// A channel queue is not persistable and does not shutdown or terminate cleanly +// It is basically a very thin wrapper around a WorkerPool type ChannelQueue struct { - pool *WorkerPool + *WorkerPool exemplar interface{} workers int name string } -// NewChannelQueue create a memory channel queue +// NewChannelQueue creates a memory channel queue func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) { configInterface, err := toConfig(ChannelQueueConfiguration{}, cfg) if err != nil { @@ -46,26 +42,13 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro if config.BatchLength == 0 { config.BatchLength = 1 } - dataChan := make(chan Data, config.QueueLength) - - ctx, cancel := context.WithCancel(context.Background()) queue := &ChannelQueue{ - pool: &WorkerPool{ - baseCtx: ctx, - cancel: cancel, - batchLength: config.BatchLength, - handle: handle, - dataChan: dataChan, - blockTimeout: config.BlockTimeout, - boostTimeout: config.BoostTimeout, - boostWorkers: config.BoostWorkers, - maxNumberOfWorkers: config.MaxWorkers, - }, - exemplar: exemplar, - workers: config.Workers, - name: config.Name, + WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), + exemplar: exemplar, + workers: config.Workers, + name: config.Name, } - queue.pool.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar, queue.pool) + queue.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar) return queue, nil } @@ -77,22 +60,18 @@ func (c *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func()) atTerminate(context.Background(), func() { log.Warn("ChannelQueue: %s is not terminatable!", c.name) }) + log.Debug("ChannelQueue: %s Starting", c.name) go func() { - _ = c.pool.AddWorkers(c.workers, 0) + _ = c.AddWorkers(c.workers, 0) }() } // Push will push data into the queue func (c *ChannelQueue) Push(data Data) error { - if c.exemplar != nil { - // Assert data is of same type as r.exemplar - t := reflect.TypeOf(data) - exemplarType := reflect.TypeOf(c.exemplar) - if !t.AssignableTo(exemplarType) || data == nil { - return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, c.exemplar, c.name) - } + if !assignableTo(data, c.exemplar) { + return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, c.exemplar, c.name) } - c.pool.Push(data) + c.WorkerPool.Push(data) return nil } diff --git a/modules/queue/queue_channel_test.go b/modules/queue/queue_channel_test.go index fafc1e3303..8234b0f6f2 100644 --- a/modules/queue/queue_channel_test.go +++ b/modules/queue/queue_channel_test.go @@ -25,12 +25,14 @@ func TestChannelQueue(t *testing.T) { queue, err := NewChannelQueue(handle, ChannelQueueConfiguration{ - QueueLength: 20, - Workers: 1, - MaxWorkers: 10, - BlockTimeout: 1 * time.Second, - BoostTimeout: 5 * time.Minute, - BoostWorkers: 5, + WorkerPoolConfiguration: WorkerPoolConfiguration{ + QueueLength: 20, + MaxWorkers: 10, + BlockTimeout: 1 * time.Second, + BoostTimeout: 5 * time.Minute, + BoostWorkers: 5, + }, + Workers: 1, }, &testData{}) assert.NoError(t, err) @@ -60,13 +62,15 @@ func TestChannelQueue_Batch(t *testing.T) { queue, err := NewChannelQueue(handle, ChannelQueueConfiguration{ - QueueLength: 20, - BatchLength: 2, - Workers: 1, - MaxWorkers: 10, - BlockTimeout: 1 * time.Second, - BoostTimeout: 5 * time.Minute, - BoostWorkers: 5, + WorkerPoolConfiguration: WorkerPoolConfiguration{ + QueueLength: 20, + BatchLength: 2, + BlockTimeout: 1 * time.Second, + BoostTimeout: 5 * time.Minute, + BoostWorkers: 5, + MaxWorkers: 10, + }, + Workers: 1, }, &testData{}) assert.NoError(t, err) diff --git a/modules/queue/queue_disk.go b/modules/queue/queue_disk.go index 98e7b24e42..ca3e230e3d 100644 --- a/modules/queue/queue_disk.go +++ b/modules/queue/queue_disk.go @@ -8,8 +8,8 @@ import ( "context" "encoding/json" "fmt" - "reflect" "sync" + "sync/atomic" "time" "code.gitea.io/gitea/modules/log" @@ -22,20 +22,15 @@ const LevelQueueType Type = "level" // LevelQueueConfiguration is the configuration for a LevelQueue type LevelQueueConfiguration struct { - DataDir string - QueueLength int - BatchLength int - Workers int - MaxWorkers int - BlockTimeout time.Duration - BoostTimeout time.Duration - BoostWorkers int - Name string + WorkerPoolConfiguration + DataDir string + Workers int + Name string } // LevelQueue implements a disk library queue type LevelQueue struct { - pool *WorkerPool + *WorkerPool queue *levelqueue.Queue closed chan struct{} terminated chan struct{} @@ -58,21 +53,8 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) return nil, err } - dataChan := make(chan Data, config.QueueLength) - ctx, cancel := context.WithCancel(context.Background()) - queue := &LevelQueue{ - pool: &WorkerPool{ - baseCtx: ctx, - cancel: cancel, - batchLength: config.BatchLength, - handle: handle, - dataChan: dataChan, - blockTimeout: config.BlockTimeout, - boostTimeout: config.BoostTimeout, - boostWorkers: config.BoostWorkers, - maxNumberOfWorkers: config.MaxWorkers, - }, + WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), queue: internal, exemplar: exemplar, closed: make(chan struct{}), @@ -80,7 +62,7 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) workers: config.Workers, name: config.Name, } - queue.pool.qid = GetManager().Add(queue, LevelQueueType, config, exemplar, queue.pool) + queue.qid = GetManager().Add(queue, LevelQueueType, config, exemplar) return queue, nil } @@ -88,9 +70,10 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { atShutdown(context.Background(), l.Shutdown) atTerminate(context.Background(), l.Terminate) + log.Debug("LevelQueue: %s Starting", l.name) go func() { - _ = l.pool.AddWorkers(l.workers, 0) + _ = l.AddWorkers(l.workers, 0) }() go l.readToChan() @@ -99,12 +82,12 @@ func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func())) <-l.closed log.Trace("LevelQueue: %s Waiting til done", l.name) - l.pool.Wait() + l.Wait() log.Trace("LevelQueue: %s Waiting til cleaned", l.name) ctx, cancel := context.WithCancel(context.Background()) atTerminate(ctx, cancel) - l.pool.CleanUp(ctx) + l.CleanUp(ctx) cancel() log.Trace("LevelQueue: %s Cleaned", l.name) @@ -115,56 +98,45 @@ func (l *LevelQueue) readToChan() { select { case <-l.closed: // tell the pool to shutdown. - l.pool.cancel() + l.cancel() return default: + atomic.AddInt64(&l.numInQueue, 1) bs, err := l.queue.RPop() if err != nil { if err != levelqueue.ErrNotFound { log.Error("LevelQueue: %s Error on RPop: %v", l.name, err) } + atomic.AddInt64(&l.numInQueue, -1) time.Sleep(time.Millisecond * 100) continue } if len(bs) == 0 { + atomic.AddInt64(&l.numInQueue, -1) time.Sleep(time.Millisecond * 100) continue } - var data Data - if l.exemplar != nil { - t := reflect.TypeOf(l.exemplar) - n := reflect.New(t) - ne := n.Elem() - err = json.Unmarshal(bs, ne.Addr().Interface()) - data = ne.Interface().(Data) - } else { - err = json.Unmarshal(bs, &data) - } + data, err := unmarshalAs(bs, l.exemplar) if err != nil { log.Error("LevelQueue: %s Failed to unmarshal with error: %v", l.name, err) + atomic.AddInt64(&l.numInQueue, -1) time.Sleep(time.Millisecond * 100) continue } log.Trace("LevelQueue %s: Task found: %#v", l.name, data) - l.pool.Push(data) - + l.WorkerPool.Push(data) + atomic.AddInt64(&l.numInQueue, -1) } } } // Push will push the indexer data to queue func (l *LevelQueue) Push(data Data) error { - if l.exemplar != nil { - // Assert data is of same type as r.exemplar - value := reflect.ValueOf(data) - t := value.Type() - exemplarType := reflect.ValueOf(l.exemplar).Type() - if !t.AssignableTo(exemplarType) || data == nil { - return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, l.exemplar, l.name) - } + if !assignableTo(data, l.exemplar) { + return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, l.exemplar, l.name) } bs, err := json.Marshal(data) if err != nil { @@ -173,16 +145,25 @@ func (l *LevelQueue) Push(data Data) error { return l.queue.LPush(bs) } +// IsEmpty checks whether the queue is empty +func (l *LevelQueue) IsEmpty() bool { + if !l.WorkerPool.IsEmpty() { + return false + } + return l.queue.Len() == 0 +} + // Shutdown this queue and stop processing func (l *LevelQueue) Shutdown() { l.lock.Lock() defer l.lock.Unlock() - log.Trace("LevelQueue: %s Shutdown", l.name) + log.Trace("LevelQueue: %s Shutting down", l.name) select { case <-l.closed: default: close(l.closed) } + log.Debug("LevelQueue: %s Shutdown", l.name) } // Terminate this queue and close the queue @@ -196,11 +177,15 @@ func (l *LevelQueue) Terminate() { default: close(l.terminated) l.lock.Unlock() + if log.IsDebug() { + log.Debug("LevelQueue: %s Closing with %d tasks left in queue", l.name, l.queue.Len()) + } if err := l.queue.Close(); err != nil && err.Error() != "leveldb: closed" { log.Error("Error whilst closing internal queue in %s: %v", l.name, err) } } + log.Debug("LevelQueue: %s Terminated", l.name) } // Name returns the name of this queue diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index 6bb5a1be97..961187ab0d 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -6,7 +6,9 @@ package queue import ( "context" + "fmt" "sync" + "sync/atomic" "time" "code.gitea.io/gitea/modules/log" @@ -31,8 +33,10 @@ type PersistableChannelQueueConfiguration struct { } // PersistableChannelQueue wraps a channel queue and level queue together +// The disk level queue will be used to store data at shutdown and terminate - and will be restored +// on start up. type PersistableChannelQueue struct { - *ChannelQueue + channelQueue *ChannelQueue delayedStarter lock sync.Mutex closed chan struct{} @@ -48,14 +52,16 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( config := configInterface.(PersistableChannelQueueConfiguration) channelQueue, err := NewChannelQueue(handle, ChannelQueueConfiguration{ - QueueLength: config.QueueLength, - BatchLength: config.BatchLength, - Workers: config.Workers, - MaxWorkers: config.MaxWorkers, - BlockTimeout: config.BlockTimeout, - BoostTimeout: config.BoostTimeout, - BoostWorkers: config.BoostWorkers, - Name: config.Name + "-channel", + WorkerPoolConfiguration: WorkerPoolConfiguration{ + QueueLength: config.QueueLength, + BatchLength: config.BatchLength, + BlockTimeout: config.BlockTimeout, + BoostTimeout: config.BoostTimeout, + BoostWorkers: config.BoostWorkers, + MaxWorkers: config.MaxWorkers, + }, + Workers: config.Workers, + Name: config.Name + "-channel", }, exemplar) if err != nil { return nil, err @@ -63,28 +69,30 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( // the level backend only needs temporary workers to catch up with the previously dropped work levelCfg := LevelQueueConfiguration{ - DataDir: config.DataDir, - QueueLength: config.QueueLength, - BatchLength: config.BatchLength, - Workers: 1, - MaxWorkers: 6, - BlockTimeout: 1 * time.Second, - BoostTimeout: 5 * time.Minute, - BoostWorkers: 5, - Name: config.Name + "-level", + WorkerPoolConfiguration: WorkerPoolConfiguration{ + QueueLength: config.QueueLength, + BatchLength: config.BatchLength, + BlockTimeout: 1 * time.Second, + BoostTimeout: 5 * time.Minute, + BoostWorkers: 5, + MaxWorkers: 6, + }, + DataDir: config.DataDir, + Workers: 1, + Name: config.Name + "-level", } levelQueue, err := NewLevelQueue(handle, levelCfg, exemplar) if err == nil { queue := &PersistableChannelQueue{ - ChannelQueue: channelQueue.(*ChannelQueue), + channelQueue: channelQueue.(*ChannelQueue), delayedStarter: delayedStarter{ internal: levelQueue.(*LevelQueue), name: config.Name, }, closed: make(chan struct{}), } - _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar, nil) + _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar) return queue, nil } if IsErrInvalidConfiguration(err) { @@ -93,7 +101,7 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( } queue := &PersistableChannelQueue{ - ChannelQueue: channelQueue.(*ChannelQueue), + channelQueue: channelQueue.(*ChannelQueue), delayedStarter: delayedStarter{ cfg: levelCfg, underlying: LevelQueueType, @@ -103,7 +111,7 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( }, closed: make(chan struct{}), } - _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar, nil) + _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar) return queue, nil } @@ -118,15 +126,17 @@ func (p *PersistableChannelQueue) Push(data Data) error { case <-p.closed: return p.internal.Push(data) default: - return p.ChannelQueue.Push(data) + return p.channelQueue.Push(data) } } // Run starts to run the queue func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { + log.Debug("PersistableChannelQueue: %s Starting", p.delayedStarter.name) + p.lock.Lock() if p.internal == nil { - err := p.setInternal(atShutdown, p.ChannelQueue.pool.handle, p.exemplar) + err := p.setInternal(atShutdown, p.channelQueue.handle, p.channelQueue.exemplar) p.lock.Unlock() if err != nil { log.Fatal("Unable to create internal queue for %s Error: %v", p.Name(), err) @@ -142,31 +152,83 @@ func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Conte go p.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {}) go func() { - _ = p.ChannelQueue.pool.AddWorkers(p.workers, 0) + _ = p.channelQueue.AddWorkers(p.channelQueue.workers, 0) }() log.Trace("PersistableChannelQueue: %s Waiting til closed", p.delayedStarter.name) <-p.closed log.Trace("PersistableChannelQueue: %s Cancelling pools", p.delayedStarter.name) - p.ChannelQueue.pool.cancel() - p.internal.(*LevelQueue).pool.cancel() + p.channelQueue.cancel() + p.internal.(*LevelQueue).cancel() log.Trace("PersistableChannelQueue: %s Waiting til done", p.delayedStarter.name) - p.ChannelQueue.pool.Wait() - p.internal.(*LevelQueue).pool.Wait() + p.channelQueue.Wait() + p.internal.(*LevelQueue).Wait() // Redirect all remaining data in the chan to the internal channel go func() { log.Trace("PersistableChannelQueue: %s Redirecting remaining data", p.delayedStarter.name) - for data := range p.ChannelQueue.pool.dataChan { + for data := range p.channelQueue.dataChan { _ = p.internal.Push(data) + atomic.AddInt64(&p.channelQueue.numInQueue, -1) } log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", p.delayedStarter.name) }() log.Trace("PersistableChannelQueue: %s Done main loop", p.delayedStarter.name) } +// Flush flushes the queue and blocks till the queue is empty +func (p *PersistableChannelQueue) Flush(timeout time.Duration) error { + var ctx context.Context + var cancel context.CancelFunc + if timeout > 0 { + ctx, cancel = context.WithTimeout(context.Background(), timeout) + } else { + ctx, cancel = context.WithCancel(context.Background()) + } + defer cancel() + return p.FlushWithContext(ctx) +} + +// FlushWithContext flushes the queue and blocks till the queue is empty +func (p *PersistableChannelQueue) FlushWithContext(ctx context.Context) error { + errChan := make(chan error, 1) + go func() { + errChan <- p.channelQueue.FlushWithContext(ctx) + }() + go func() { + p.lock.Lock() + if p.internal == nil { + p.lock.Unlock() + errChan <- fmt.Errorf("not ready to flush internal queue %s yet", p.Name()) + return + } + p.lock.Unlock() + errChan <- p.internal.FlushWithContext(ctx) + }() + err1 := <-errChan + err2 := <-errChan + + if err1 != nil { + return err1 + } + return err2 +} + +// IsEmpty checks if a queue is empty +func (p *PersistableChannelQueue) IsEmpty() bool { + if !p.channelQueue.IsEmpty() { + return false + } + p.lock.Lock() + defer p.lock.Unlock() + if p.internal == nil { + return false + } + return p.internal.IsEmpty() +} + // Shutdown processing this queue func (p *PersistableChannelQueue) Shutdown() { - log.Trace("PersistableChannelQueue: %s Shutdown", p.delayedStarter.name) + log.Trace("PersistableChannelQueue: %s Shutting down", p.delayedStarter.name) select { case <-p.closed: default: @@ -177,6 +239,7 @@ func (p *PersistableChannelQueue) Shutdown() { } close(p.closed) } + log.Debug("PersistableChannelQueue: %s Shutdown", p.delayedStarter.name) } // Terminate this queue and close the queue @@ -188,6 +251,7 @@ func (p *PersistableChannelQueue) Terminate() { if p.internal != nil { p.internal.(*LevelQueue).Terminate() } + log.Debug("PersistableChannelQueue: %s Terminated", p.delayedStarter.name) } func init() { diff --git a/modules/queue/queue_disk_test.go b/modules/queue/queue_disk_test.go index c5959d606f..8600b8d868 100644 --- a/modules/queue/queue_disk_test.go +++ b/modules/queue/queue_disk_test.go @@ -32,14 +32,16 @@ func TestLevelQueue(t *testing.T) { defer os.RemoveAll(tmpDir) queue, err := NewLevelQueue(handle, LevelQueueConfiguration{ - DataDir: tmpDir, - BatchLength: 2, - Workers: 1, - MaxWorkers: 10, - QueueLength: 20, - BlockTimeout: 1 * time.Second, - BoostTimeout: 5 * time.Minute, - BoostWorkers: 5, + WorkerPoolConfiguration: WorkerPoolConfiguration{ + QueueLength: 20, + BatchLength: 2, + BlockTimeout: 1 * time.Second, + BoostTimeout: 5 * time.Minute, + BoostWorkers: 5, + MaxWorkers: 10, + }, + DataDir: tmpDir, + Workers: 1, }, &testData{}) assert.NoError(t, err) @@ -92,14 +94,16 @@ func TestLevelQueue(t *testing.T) { WrappedQueueConfiguration{ Underlying: LevelQueueType, Config: LevelQueueConfiguration{ - DataDir: tmpDir, - BatchLength: 2, - Workers: 1, - MaxWorkers: 10, - QueueLength: 20, - BlockTimeout: 1 * time.Second, - BoostTimeout: 5 * time.Minute, - BoostWorkers: 5, + WorkerPoolConfiguration: WorkerPoolConfiguration{ + QueueLength: 20, + BatchLength: 2, + BlockTimeout: 1 * time.Second, + BoostTimeout: 5 * time.Minute, + BoostWorkers: 5, + MaxWorkers: 10, + }, + DataDir: tmpDir, + Workers: 1, }, }, &testData{}) assert.NoError(t, err) diff --git a/modules/queue/queue_redis.go b/modules/queue/queue_redis.go index 7d3efb9cff..0167c1ec49 100644 --- a/modules/queue/queue_redis.go +++ b/modules/queue/queue_redis.go @@ -9,9 +9,9 @@ import ( "encoding/json" "errors" "fmt" - "reflect" "strings" "sync" + "sync/atomic" "time" "code.gitea.io/gitea/modules/log" @@ -25,13 +25,14 @@ const RedisQueueType Type = "redis" type redisClient interface { RPush(key string, args ...interface{}) *redis.IntCmd LPop(key string) *redis.StringCmd + LLen(key string) *redis.IntCmd Ping() *redis.StatusCmd Close() error } // RedisQueue redis queue type RedisQueue struct { - pool *WorkerPool + *WorkerPool client redisClient queueName string closed chan struct{} @@ -44,19 +45,14 @@ type RedisQueue struct { // RedisQueueConfiguration is the configuration for the redis queue type RedisQueueConfiguration struct { - Network string - Addresses string - Password string - DBIndex int - BatchLength int - QueueLength int - QueueName string - Workers int - MaxWorkers int - BlockTimeout time.Duration - BoostTimeout time.Duration - BoostWorkers int - Name string + WorkerPoolConfiguration + Network string + Addresses string + Password string + DBIndex int + QueueName string + Workers int + Name string } // NewRedisQueue creates single redis or cluster redis queue @@ -69,21 +65,8 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) dbs := strings.Split(config.Addresses, ",") - dataChan := make(chan Data, config.QueueLength) - ctx, cancel := context.WithCancel(context.Background()) - var queue = &RedisQueue{ - pool: &WorkerPool{ - baseCtx: ctx, - cancel: cancel, - batchLength: config.BatchLength, - handle: handle, - dataChan: dataChan, - blockTimeout: config.BlockTimeout, - boostTimeout: config.BoostTimeout, - boostWorkers: config.BoostWorkers, - maxNumberOfWorkers: config.MaxWorkers, - }, + WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), queueName: config.QueueName, exemplar: exemplar, closed: make(chan struct{}), @@ -108,7 +91,7 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) if err := queue.client.Ping().Err(); err != nil { return nil, err } - queue.pool.qid = GetManager().Add(queue, RedisQueueType, config, exemplar, queue.pool) + queue.qid = GetManager().Add(queue, RedisQueueType, config, exemplar) return queue, nil } @@ -117,9 +100,10 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) func (r *RedisQueue) Run(atShutdown, atTerminate func(context.Context, func())) { atShutdown(context.Background(), r.Shutdown) atTerminate(context.Background(), r.Terminate) + log.Debug("RedisQueue: %s Starting", r.name) go func() { - _ = r.pool.AddWorkers(r.workers, 0) + _ = r.AddWorkers(r.workers, 0) }() go r.readToChan() @@ -127,12 +111,12 @@ func (r *RedisQueue) Run(atShutdown, atTerminate func(context.Context, func())) log.Trace("RedisQueue: %s Waiting til closed", r.name) <-r.closed log.Trace("RedisQueue: %s Waiting til done", r.name) - r.pool.Wait() + r.Wait() log.Trace("RedisQueue: %s Waiting til cleaned", r.name) ctx, cancel := context.WithCancel(context.Background()) atTerminate(ctx, cancel) - r.pool.CleanUp(ctx) + r.CleanUp(ctx) cancel() } @@ -141,53 +125,43 @@ func (r *RedisQueue) readToChan() { select { case <-r.closed: // tell the pool to shutdown - r.pool.cancel() + r.cancel() return default: + atomic.AddInt64(&r.numInQueue, 1) bs, err := r.client.LPop(r.queueName).Bytes() if err != nil && err != redis.Nil { log.Error("RedisQueue: %s Error on LPop: %v", r.name, err) + atomic.AddInt64(&r.numInQueue, -1) time.Sleep(time.Millisecond * 100) continue } if len(bs) == 0 { + atomic.AddInt64(&r.numInQueue, -1) time.Sleep(time.Millisecond * 100) continue } - var data Data - if r.exemplar != nil { - t := reflect.TypeOf(r.exemplar) - n := reflect.New(t) - ne := n.Elem() - err = json.Unmarshal(bs, ne.Addr().Interface()) - data = ne.Interface().(Data) - } else { - err = json.Unmarshal(bs, &data) - } + data, err := unmarshalAs(bs, r.exemplar) if err != nil { log.Error("RedisQueue: %s Error on Unmarshal: %v", r.name, err) + atomic.AddInt64(&r.numInQueue, -1) time.Sleep(time.Millisecond * 100) continue } log.Trace("RedisQueue: %s Task found: %#v", r.name, data) - r.pool.Push(data) + r.WorkerPool.Push(data) + atomic.AddInt64(&r.numInQueue, -1) } } } // Push implements Queue func (r *RedisQueue) Push(data Data) error { - if r.exemplar != nil { - // Assert data is of same type as r.exemplar - value := reflect.ValueOf(data) - t := value.Type() - exemplarType := reflect.ValueOf(r.exemplar).Type() - if !t.AssignableTo(exemplarType) || data == nil { - return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, r.exemplar, r.name) - } + if !assignableTo(data, r.exemplar) { + return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, r.exemplar, r.name) } bs, err := json.Marshal(data) if err != nil { @@ -196,9 +170,22 @@ func (r *RedisQueue) Push(data Data) error { return r.client.RPush(r.queueName, bs).Err() } +// IsEmpty checks if the queue is empty +func (r *RedisQueue) IsEmpty() bool { + if !r.WorkerPool.IsEmpty() { + return false + } + length, err := r.client.LLen(r.queueName).Result() + if err != nil { + log.Error("Error whilst getting queue length for %s: Error: %v", r.name, err) + return false + } + return length == 0 +} + // Shutdown processing from this queue func (r *RedisQueue) Shutdown() { - log.Trace("Shutdown: %s", r.name) + log.Trace("RedisQueue: %s Shutting down", r.name) r.lock.Lock() select { case <-r.closed: @@ -206,11 +193,12 @@ func (r *RedisQueue) Shutdown() { close(r.closed) } r.lock.Unlock() + log.Debug("RedisQueue: %s Shutdown", r.name) } // Terminate this queue and close the queue func (r *RedisQueue) Terminate() { - log.Trace("Terminating: %s", r.name) + log.Trace("RedisQueue: %s Terminating", r.name) r.Shutdown() r.lock.Lock() select { @@ -219,10 +207,14 @@ func (r *RedisQueue) Terminate() { default: close(r.terminated) r.lock.Unlock() + if log.IsDebug() { + log.Debug("RedisQueue: %s Closing with %d tasks left in queue", r.name, r.client.LLen(r.queueName)) + } if err := r.client.Close(); err != nil { log.Error("Error whilst closing internal redis client in %s: %v", r.name, err) } } + log.Debug("RedisQueue: %s Terminated", r.name) } // Name returns the name of this queue diff --git a/modules/queue/queue_wrapped.go b/modules/queue/queue_wrapped.go index c52e6e4673..ef90d18608 100644 --- a/modules/queue/queue_wrapped.go +++ b/modules/queue/queue_wrapped.go @@ -7,8 +7,8 @@ package queue import ( "context" "fmt" - "reflect" "sync" + "sync/atomic" "time" "code.gitea.io/gitea/modules/log" @@ -56,7 +56,7 @@ func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), h for q.internal == nil { select { case <-ctx.Done(): - return fmt.Errorf("Timedout creating queue %v with cfg %v in %s", q.underlying, q.cfg, q.name) + return fmt.Errorf("Timedout creating queue %v with cfg %s in %s", q.underlying, q.cfg, q.name) default: queue, err := NewQueue(q.underlying, handle, q.cfg, exemplar) if err == nil { @@ -64,11 +64,11 @@ func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), h break } if err.Error() != "resource temporarily unavailable" { - log.Warn("[Attempt: %d] Failed to create queue: %v for %s cfg: %v error: %v", i, q.underlying, q.name, q.cfg, err) + log.Warn("[Attempt: %d] Failed to create queue: %v for %s cfg: %s error: %v", i, q.underlying, q.name, q.cfg, err) } i++ if q.maxAttempts > 0 && i > q.maxAttempts { - return fmt.Errorf("Unable to create queue %v for %s with cfg %v by max attempts: error: %v", q.underlying, q.name, q.cfg, err) + return fmt.Errorf("Unable to create queue %v for %s with cfg %s by max attempts: error: %v", q.underlying, q.name, q.cfg, err) } sleepTime := 100 * time.Millisecond if q.timeout > 0 && q.maxAttempts > 0 { @@ -88,10 +88,11 @@ func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), h // WrappedQueue wraps a delayed starting queue type WrappedQueue struct { delayedStarter - lock sync.Mutex - handle HandlerFunc - exemplar interface{} - channel chan Data + lock sync.Mutex + handle HandlerFunc + exemplar interface{} + channel chan Data + numInQueue int64 } // NewWrappedQueue will attempt to create a queue of the provided type, @@ -127,7 +128,7 @@ func NewWrappedQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro name: config.Name, }, } - _ = GetManager().Add(queue, WrappedQueueType, config, exemplar, nil) + _ = GetManager().Add(queue, WrappedQueueType, config, exemplar) return queue, nil } @@ -138,21 +139,78 @@ func (q *WrappedQueue) Name() string { // Push will push the data to the internal channel checking it against the exemplar func (q *WrappedQueue) Push(data Data) error { - if q.exemplar != nil { - // Assert data is of same type as r.exemplar - value := reflect.ValueOf(data) - t := value.Type() - exemplarType := reflect.ValueOf(q.exemplar).Type() - if !t.AssignableTo(exemplarType) || data == nil { - return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name) - } + if !assignableTo(data, q.exemplar) { + return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name) } + atomic.AddInt64(&q.numInQueue, 1) q.channel <- data return nil } +func (q *WrappedQueue) flushInternalWithContext(ctx context.Context) error { + q.lock.Lock() + if q.internal == nil { + q.lock.Unlock() + return fmt.Errorf("not ready to flush wrapped queue %s yet", q.Name()) + } + q.lock.Unlock() + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + return q.internal.FlushWithContext(ctx) +} + +// Flush flushes the queue and blocks till the queue is empty +func (q *WrappedQueue) Flush(timeout time.Duration) error { + var ctx context.Context + var cancel context.CancelFunc + if timeout > 0 { + ctx, cancel = context.WithTimeout(context.Background(), timeout) + } else { + ctx, cancel = context.WithCancel(context.Background()) + } + defer cancel() + return q.FlushWithContext(ctx) +} + +// FlushWithContext implements the final part of Flushable +func (q *WrappedQueue) FlushWithContext(ctx context.Context) error { + log.Trace("WrappedQueue: %s FlushWithContext", q.Name()) + errChan := make(chan error, 1) + go func() { + errChan <- q.flushInternalWithContext(ctx) + close(errChan) + }() + + select { + case err := <-errChan: + return err + case <-ctx.Done(): + go func() { + <-errChan + }() + return ctx.Err() + } +} + +// IsEmpty checks whether the queue is empty +func (q *WrappedQueue) IsEmpty() bool { + if atomic.LoadInt64(&q.numInQueue) != 0 { + return false + } + q.lock.Lock() + defer q.lock.Unlock() + if q.internal == nil { + return false + } + return q.internal.IsEmpty() +} + // Run starts to run the queue and attempts to create the internal queue func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func())) { + log.Debug("WrappedQueue: %s Starting", q.name) q.lock.Lock() if q.internal == nil { err := q.setInternal(atShutdown, q.handle, q.exemplar) @@ -164,6 +222,7 @@ func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func()) go func() { for data := range q.channel { _ = q.internal.Push(data) + atomic.AddInt64(&q.numInQueue, -1) } }() } else { @@ -176,7 +235,7 @@ func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func()) // Shutdown this queue and stop processing func (q *WrappedQueue) Shutdown() { - log.Trace("WrappedQueue: %s Shutdown", q.name) + log.Trace("WrappedQueue: %s Shutting down", q.name) q.lock.Lock() defer q.lock.Unlock() if q.internal == nil { @@ -185,6 +244,7 @@ func (q *WrappedQueue) Shutdown() { if shutdownable, ok := q.internal.(Shutdownable); ok { shutdownable.Shutdown() } + log.Debug("WrappedQueue: %s Shutdown", q.name) } // Terminate this queue and close the queue @@ -198,6 +258,7 @@ func (q *WrappedQueue) Terminate() { if shutdownable, ok := q.internal.(Shutdownable); ok { shutdownable.Terminate() } + log.Debug("WrappedQueue: %s Terminated", q.name) } func init() { diff --git a/modules/queue/setting.go b/modules/queue/setting.go index d5a6b41882..8760c09ae8 100644 --- a/modules/queue/setting.go +++ b/modules/queue/setting.go @@ -24,8 +24,7 @@ func validType(t string) (Type, error) { return PersistableChannelQueueType, fmt.Errorf("Unknown queue type: %s defaulting to %s", t, string(PersistableChannelQueueType)) } -// CreateQueue for name with provided handler and exemplar -func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue { +func getQueueSettings(name string) (setting.QueueSettings, []byte) { q := setting.GetQueueSettings(name) opts := make(map[string]interface{}) opts["Name"] = name @@ -43,24 +42,33 @@ func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue { opts["BoostTimeout"] = q.BoostTimeout opts["BoostWorkers"] = q.BoostWorkers - typ, err := validType(q.Type) - if err != nil { - log.Error("Invalid type %s provided for queue named %s defaulting to %s", q.Type, name, string(typ)) - } - cfg, err := json.Marshal(opts) if err != nil { log.Error("Unable to marshall generic options: %v Error: %v", opts, err) log.Error("Unable to create queue for %s", name, err) + return q, []byte{} + } + return q, cfg +} + +// CreateQueue for name with provided handler and exemplar +func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue { + q, cfg := getQueueSettings(name) + if len(cfg) == 0 { return nil } + typ, err := validType(q.Type) + if err != nil { + log.Error("Invalid type %s provided for queue named %s defaulting to %s", q.Type, name, string(typ)) + } + returnable, err := NewQueue(typ, handle, cfg, exemplar) if q.WrapIfNecessary && err != nil { log.Warn("Unable to create queue for %s: %v", name, err) log.Warn("Attempting to create wrapped queue") returnable, err = NewQueue(WrappedQueueType, handle, WrappedQueueConfiguration{ - Underlying: Type(q.Type), + Underlying: typ, Timeout: q.Timeout, MaxAttempts: q.MaxAttempts, Config: cfg, diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index 25fc7dd644..63ec897481 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -7,12 +7,16 @@ package queue import ( "context" "sync" + "sync/atomic" "time" "code.gitea.io/gitea/modules/log" ) -// WorkerPool takes +// WorkerPool represent a dynamically growable worker pool for a +// provided handler function. They have an internal channel which +// they use to detect if there is a block and will grow and shrink in +// response to demand as per configuration. type WorkerPool struct { lock sync.Mutex baseCtx context.Context @@ -27,10 +31,42 @@ type WorkerPool struct { blockTimeout time.Duration boostTimeout time.Duration boostWorkers int + numInQueue int64 +} + +// WorkerPoolConfiguration is the basic configuration for a WorkerPool +type WorkerPoolConfiguration struct { + QueueLength int + BatchLength int + BlockTimeout time.Duration + BoostTimeout time.Duration + BoostWorkers int + MaxWorkers int +} + +// NewWorkerPool creates a new worker pool +func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPool { + ctx, cancel := context.WithCancel(context.Background()) + + dataChan := make(chan Data, config.QueueLength) + pool := &WorkerPool{ + baseCtx: ctx, + cancel: cancel, + batchLength: config.BatchLength, + dataChan: dataChan, + handle: handle, + blockTimeout: config.BlockTimeout, + boostTimeout: config.BoostTimeout, + boostWorkers: config.BoostWorkers, + maxNumberOfWorkers: config.MaxWorkers, + } + + return pool } // Push pushes the data to the internal channel func (p *WorkerPool) Push(data Data) { + atomic.AddInt64(&p.numInQueue, 1) p.lock.Lock() if p.blockTimeout > 0 && p.boostTimeout > 0 && (p.numberOfWorkers <= p.maxNumberOfWorkers || p.maxNumberOfWorkers < 0) { p.lock.Unlock() @@ -80,7 +116,7 @@ func (p *WorkerPool) pushBoost(data Data) { log.Warn("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, mq.Name, ourTimeout, boost, p.boostTimeout, p.blockTimeout) start := time.Now() - pid := mq.RegisterWorkers(boost, start, false, start, cancel) + pid := mq.RegisterWorkers(boost, start, false, start, cancel, false) go func() { <-ctx.Done() mq.RemoveWorkers(pid) @@ -138,8 +174,8 @@ func (p *WorkerPool) BlockTimeout() time.Duration { return p.blockTimeout } -// SetSettings sets the setable boost values -func (p *WorkerPool) SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) { +// SetPoolSettings sets the setable boost values +func (p *WorkerPool) SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) { p.lock.Lock() defer p.lock.Unlock() p.maxNumberOfWorkers = maxNumberOfWorkers @@ -156,8 +192,7 @@ func (p *WorkerPool) SetMaxNumberOfWorkers(newMax int) { p.maxNumberOfWorkers = newMax } -// AddWorkers adds workers to the pool - this allows the number of workers to go above the limit -func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc { +func (p *WorkerPool) commonRegisterWorkers(number int, timeout time.Duration, isFlusher bool) (context.Context, context.CancelFunc) { var ctx context.Context var cancel context.CancelFunc start := time.Now() @@ -173,7 +208,7 @@ func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.Cance mq := GetManager().GetManagedQueue(p.qid) if mq != nil { - pid := mq.RegisterWorkers(number, start, hasTimeout, end, cancel) + pid := mq.RegisterWorkers(number, start, hasTimeout, end, cancel, isFlusher) go func() { <-ctx.Done() mq.RemoveWorkers(pid) @@ -184,6 +219,12 @@ func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.Cance log.Trace("WorkerPool: %d adding %d workers (no group id)", p.qid, number) } + return ctx, cancel +} + +// AddWorkers adds workers to the pool - this allows the number of workers to go above the limit +func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc { + ctx, cancel := p.commonRegisterWorkers(number, timeout, false) p.addWorkers(ctx, number) return cancel } @@ -235,6 +276,7 @@ func (p *WorkerPool) CleanUp(ctx context.Context) { close(p.dataChan) for data := range p.dataChan { p.handle(data) + atomic.AddInt64(&p.numInQueue, -1) select { case <-ctx.Done(): log.Warn("WorkerPool: %d Cleanup context closed before finishing clean-up", p.qid) @@ -245,6 +287,37 @@ func (p *WorkerPool) CleanUp(ctx context.Context) { log.Trace("WorkerPool: %d CleanUp Done", p.qid) } +// Flush flushes the channel with a timeout - the Flush worker will be registered as a flush worker with the manager +func (p *WorkerPool) Flush(timeout time.Duration) error { + ctx, cancel := p.commonRegisterWorkers(1, timeout, true) + defer cancel() + return p.FlushWithContext(ctx) +} + +// IsEmpty returns if true if the worker queue is empty +func (p *WorkerPool) IsEmpty() bool { + return atomic.LoadInt64(&p.numInQueue) == 0 +} + +// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty +// NB: The worker will not be registered with the manager. +func (p *WorkerPool) FlushWithContext(ctx context.Context) error { + log.Trace("WorkerPool: %d Flush", p.qid) + for { + select { + case data := <-p.dataChan: + p.handle(data) + atomic.AddInt64(&p.numInQueue, -1) + case <-p.baseCtx.Done(): + return p.baseCtx.Err() + case <-ctx.Done(): + return ctx.Err() + default: + return nil + } + } +} + func (p *WorkerPool) doWork(ctx context.Context) { delay := time.Millisecond * 300 var data = make([]Data, 0, p.batchLength) @@ -254,6 +327,7 @@ func (p *WorkerPool) doWork(ctx context.Context) { if len(data) > 0 { log.Trace("Handling: %d data, %v", len(data), data) p.handle(data...) + atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) } log.Trace("Worker shutting down") return @@ -263,6 +337,7 @@ func (p *WorkerPool) doWork(ctx context.Context) { if len(data) > 0 { log.Trace("Handling: %d data, %v", len(data), data) p.handle(data...) + atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) } log.Trace("Worker shutting down") return @@ -271,6 +346,7 @@ func (p *WorkerPool) doWork(ctx context.Context) { if len(data) >= p.batchLength { log.Trace("Handling: %d data, %v", len(data), data) p.handle(data...) + atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) data = make([]Data, 0, p.batchLength) } default: @@ -286,6 +362,7 @@ func (p *WorkerPool) doWork(ctx context.Context) { if len(data) > 0 { log.Trace("Handling: %d data, %v", len(data), data) p.handle(data...) + atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) } log.Trace("Worker shutting down") return @@ -301,6 +378,7 @@ func (p *WorkerPool) doWork(ctx context.Context) { if len(data) > 0 { log.Trace("Handling: %d data, %v", len(data), data) p.handle(data...) + atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) } log.Trace("Worker shutting down") return @@ -309,6 +387,7 @@ func (p *WorkerPool) doWork(ctx context.Context) { if len(data) >= p.batchLength { log.Trace("Handling: %d data, %v", len(data), data) p.handle(data...) + atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) data = make([]Data, 0, p.batchLength) } case <-timer.C: @@ -316,6 +395,7 @@ func (p *WorkerPool) doWork(ctx context.Context) { if len(data) > 0 { log.Trace("Handling: %d data, %v", len(data), data) p.handle(data...) + atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) data = make([]Data, 0, p.batchLength) } |