diff options
Diffstat (limited to 'modules/queue/manager.go')
-rw-r--r-- | modules/queue/manager.go | 159 |
1 files changed, 129 insertions, 30 deletions
diff --git a/modules/queue/manager.go b/modules/queue/manager.go index 88b2644848..a6734787a9 100644 --- a/modules/queue/manager.go +++ b/modules/queue/manager.go @@ -26,36 +26,57 @@ type Manager struct { Queues map[int64]*ManagedQueue } -// ManagedQueue represents a working queue inheriting from Gitea. +// ManagedQueue represents a working queue with a Pool of workers. +// +// Although a ManagedQueue should really represent a Queue this does not +// necessarily have to be the case. This could be used to describe any queue.WorkerPool. type ManagedQueue struct { mutex sync.Mutex QID int64 - Queue Queue Type Type Name string Configuration interface{} ExemplarType string - Pool ManagedPool + Managed interface{} counter int64 PoolWorkers map[int64]*PoolWorkers } +// Flushable represents a pool or queue that is flushable +type Flushable interface { + // Flush will add a flush worker to the pool - the worker should be autoregistered with the manager + Flush(time.Duration) error + // FlushWithContext is very similar to Flush + // NB: The worker will not be registered with the manager. + FlushWithContext(ctx context.Context) error + // IsEmpty will return if the managed pool is empty and has no work + IsEmpty() bool +} + // ManagedPool is a simple interface to get certain details from a worker pool type ManagedPool interface { + // AddWorkers adds a number of worker as group to the pool with the provided timeout. A CancelFunc is provided to cancel the group AddWorkers(number int, timeout time.Duration) context.CancelFunc + // NumberOfWorkers returns the total number of workers in the pool NumberOfWorkers() int + // MaxNumberOfWorkers returns the maximum number of workers the pool can dynamically grow to MaxNumberOfWorkers() int + // SetMaxNumberOfWorkers sets the maximum number of workers the pool can dynamically grow to SetMaxNumberOfWorkers(int) + // BoostTimeout returns the current timeout for worker groups created during a boost BoostTimeout() time.Duration + // BlockTimeout returns the timeout the internal channel can block for before a boost would occur BlockTimeout() time.Duration + // BoostWorkers sets the number of workers to be created during a boost BoostWorkers() int - SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) + // SetPoolSettings sets the user updatable settings for the pool + SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) } // ManagedQueueList implements the sort.Interface type ManagedQueueList []*ManagedQueue -// PoolWorkers represents a working queue inheriting from Gitea. +// PoolWorkers represents a group of workers working on a queue type PoolWorkers struct { PID int64 Workers int @@ -63,9 +84,10 @@ type PoolWorkers struct { Timeout time.Time HasTimeout bool Cancel context.CancelFunc + IsFlusher bool } -// PoolWorkersList implements the sort.Interface +// PoolWorkersList implements the sort.Interface for PoolWorkers type PoolWorkersList []*PoolWorkers func init() { @@ -83,27 +105,28 @@ func GetManager() *Manager { } // Add adds a queue to this manager -func (m *Manager) Add(queue Queue, +func (m *Manager) Add(managed interface{}, t Type, configuration, - exemplar interface{}, - pool ManagedPool) int64 { + exemplar interface{}) int64 { cfg, _ := json.Marshal(configuration) mq := &ManagedQueue{ - Queue: queue, Type: t, Configuration: string(cfg), ExemplarType: reflect.TypeOf(exemplar).String(), PoolWorkers: make(map[int64]*PoolWorkers), - Pool: pool, + Managed: managed, } m.mutex.Lock() m.counter++ mq.QID = m.counter mq.Name = fmt.Sprintf("queue-%d", mq.QID) - if named, ok := queue.(Named); ok { - mq.Name = named.Name() + if named, ok := managed.(Named); ok { + name := named.Name() + if len(name) > 0 { + mq.Name = name + } } m.Queues[mq.QID] = mq m.mutex.Unlock() @@ -127,6 +150,64 @@ func (m *Manager) GetManagedQueue(qid int64) *ManagedQueue { return m.Queues[qid] } +// FlushAll flushes all the flushable queues attached to this manager +func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error { + var ctx context.Context + var cancel context.CancelFunc + start := time.Now() + end := start + hasTimeout := false + if timeout > 0 { + ctx, cancel = context.WithTimeout(baseCtx, timeout) + end = start.Add(timeout) + hasTimeout = true + } else { + ctx, cancel = context.WithCancel(baseCtx) + } + defer cancel() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + mqs := m.ManagedQueues() + wg := sync.WaitGroup{} + wg.Add(len(mqs)) + allEmpty := true + for _, mq := range mqs { + if mq.IsEmpty() { + wg.Done() + continue + } + allEmpty = false + if flushable, ok := mq.Managed.(Flushable); ok { + go func() { + localCtx, localCancel := context.WithCancel(ctx) + pid := mq.RegisterWorkers(1, start, hasTimeout, end, localCancel, true) + err := flushable.FlushWithContext(localCtx) + if err != nil && err != ctx.Err() { + cancel() + } + mq.CancelWorkers(pid) + localCancel() + wg.Done() + }() + } else { + wg.Done() + } + + } + if allEmpty { + break + } + wg.Wait() + } + return nil + +} + // ManagedQueues returns the managed queues func (m *Manager) ManagedQueues() []*ManagedQueue { m.mutex.Lock() @@ -152,7 +233,7 @@ func (q *ManagedQueue) Workers() []*PoolWorkers { } // RegisterWorkers registers workers to this queue -func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, cancel context.CancelFunc) int64 { +func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, cancel context.CancelFunc, isFlusher bool) int64 { q.mutex.Lock() defer q.mutex.Unlock() q.counter++ @@ -163,6 +244,7 @@ func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout b Timeout: timeout, HasTimeout: hasTimeout, Cancel: cancel, + IsFlusher: isFlusher, } return q.counter } @@ -191,57 +273,74 @@ func (q *ManagedQueue) RemoveWorkers(pid int64) { // AddWorkers adds workers to the queue if it has registered an add worker function func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.CancelFunc { - if q.Pool != nil { + if pool, ok := q.Managed.(ManagedPool); ok { // the cancel will be added to the pool workers description above - return q.Pool.AddWorkers(number, timeout) + return pool.AddWorkers(number, timeout) } return nil } +// Flush flushes the queue with a timeout +func (q *ManagedQueue) Flush(timeout time.Duration) error { + if flushable, ok := q.Managed.(Flushable); ok { + // the cancel will be added to the pool workers description above + return flushable.Flush(timeout) + } + return nil +} + +// IsEmpty returns if the queue is empty +func (q *ManagedQueue) IsEmpty() bool { + if flushable, ok := q.Managed.(Flushable); ok { + return flushable.IsEmpty() + } + return true +} + // NumberOfWorkers returns the number of workers in the queue func (q *ManagedQueue) NumberOfWorkers() int { - if q.Pool != nil { - return q.Pool.NumberOfWorkers() + if pool, ok := q.Managed.(ManagedPool); ok { + return pool.NumberOfWorkers() } return -1 } // MaxNumberOfWorkers returns the maximum number of workers for the pool func (q *ManagedQueue) MaxNumberOfWorkers() int { - if q.Pool != nil { - return q.Pool.MaxNumberOfWorkers() + if pool, ok := q.Managed.(ManagedPool); ok { + return pool.MaxNumberOfWorkers() } return 0 } // BoostWorkers returns the number of workers for a boost func (q *ManagedQueue) BoostWorkers() int { - if q.Pool != nil { - return q.Pool.BoostWorkers() + if pool, ok := q.Managed.(ManagedPool); ok { + return pool.BoostWorkers() } return -1 } // BoostTimeout returns the timeout of the next boost func (q *ManagedQueue) BoostTimeout() time.Duration { - if q.Pool != nil { - return q.Pool.BoostTimeout() + if pool, ok := q.Managed.(ManagedPool); ok { + return pool.BoostTimeout() } return 0 } // BlockTimeout returns the timeout til the next boost func (q *ManagedQueue) BlockTimeout() time.Duration { - if q.Pool != nil { - return q.Pool.BlockTimeout() + if pool, ok := q.Managed.(ManagedPool); ok { + return pool.BlockTimeout() } return 0 } -// SetSettings sets the setable boost values -func (q *ManagedQueue) SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) { - if q.Pool != nil { - q.Pool.SetSettings(maxNumberOfWorkers, boostWorkers, timeout) +// SetPoolSettings sets the setable boost values +func (q *ManagedQueue) SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) { + if pool, ok := q.Managed.(ManagedPool); ok { + pool.SetPoolSettings(maxNumberOfWorkers, boostWorkers, timeout) } } |