summaryrefslogtreecommitdiffstats
path: root/modules/queue/manager.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/queue/manager.go')
-rw-r--r--modules/queue/manager.go159
1 files changed, 129 insertions, 30 deletions
diff --git a/modules/queue/manager.go b/modules/queue/manager.go
index 88b2644848..a6734787a9 100644
--- a/modules/queue/manager.go
+++ b/modules/queue/manager.go
@@ -26,36 +26,57 @@ type Manager struct {
Queues map[int64]*ManagedQueue
}
-// ManagedQueue represents a working queue inheriting from Gitea.
+// ManagedQueue represents a working queue with a Pool of workers.
+//
+// Although a ManagedQueue should really represent a Queue this does not
+// necessarily have to be the case. This could be used to describe any queue.WorkerPool.
type ManagedQueue struct {
mutex sync.Mutex
QID int64
- Queue Queue
Type Type
Name string
Configuration interface{}
ExemplarType string
- Pool ManagedPool
+ Managed interface{}
counter int64
PoolWorkers map[int64]*PoolWorkers
}
+// Flushable represents a pool or queue that is flushable
+type Flushable interface {
+ // Flush will add a flush worker to the pool - the worker should be autoregistered with the manager
+ Flush(time.Duration) error
+ // FlushWithContext is very similar to Flush
+ // NB: The worker will not be registered with the manager.
+ FlushWithContext(ctx context.Context) error
+ // IsEmpty will return if the managed pool is empty and has no work
+ IsEmpty() bool
+}
+
// ManagedPool is a simple interface to get certain details from a worker pool
type ManagedPool interface {
+ // AddWorkers adds a number of worker as group to the pool with the provided timeout. A CancelFunc is provided to cancel the group
AddWorkers(number int, timeout time.Duration) context.CancelFunc
+ // NumberOfWorkers returns the total number of workers in the pool
NumberOfWorkers() int
+ // MaxNumberOfWorkers returns the maximum number of workers the pool can dynamically grow to
MaxNumberOfWorkers() int
+ // SetMaxNumberOfWorkers sets the maximum number of workers the pool can dynamically grow to
SetMaxNumberOfWorkers(int)
+ // BoostTimeout returns the current timeout for worker groups created during a boost
BoostTimeout() time.Duration
+ // BlockTimeout returns the timeout the internal channel can block for before a boost would occur
BlockTimeout() time.Duration
+ // BoostWorkers sets the number of workers to be created during a boost
BoostWorkers() int
- SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
+ // SetPoolSettings sets the user updatable settings for the pool
+ SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
}
// ManagedQueueList implements the sort.Interface
type ManagedQueueList []*ManagedQueue
-// PoolWorkers represents a working queue inheriting from Gitea.
+// PoolWorkers represents a group of workers working on a queue
type PoolWorkers struct {
PID int64
Workers int
@@ -63,9 +84,10 @@ type PoolWorkers struct {
Timeout time.Time
HasTimeout bool
Cancel context.CancelFunc
+ IsFlusher bool
}
-// PoolWorkersList implements the sort.Interface
+// PoolWorkersList implements the sort.Interface for PoolWorkers
type PoolWorkersList []*PoolWorkers
func init() {
@@ -83,27 +105,28 @@ func GetManager() *Manager {
}
// Add adds a queue to this manager
-func (m *Manager) Add(queue Queue,
+func (m *Manager) Add(managed interface{},
t Type,
configuration,
- exemplar interface{},
- pool ManagedPool) int64 {
+ exemplar interface{}) int64 {
cfg, _ := json.Marshal(configuration)
mq := &ManagedQueue{
- Queue: queue,
Type: t,
Configuration: string(cfg),
ExemplarType: reflect.TypeOf(exemplar).String(),
PoolWorkers: make(map[int64]*PoolWorkers),
- Pool: pool,
+ Managed: managed,
}
m.mutex.Lock()
m.counter++
mq.QID = m.counter
mq.Name = fmt.Sprintf("queue-%d", mq.QID)
- if named, ok := queue.(Named); ok {
- mq.Name = named.Name()
+ if named, ok := managed.(Named); ok {
+ name := named.Name()
+ if len(name) > 0 {
+ mq.Name = name
+ }
}
m.Queues[mq.QID] = mq
m.mutex.Unlock()
@@ -127,6 +150,64 @@ func (m *Manager) GetManagedQueue(qid int64) *ManagedQueue {
return m.Queues[qid]
}
+// FlushAll flushes all the flushable queues attached to this manager
+func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error {
+ var ctx context.Context
+ var cancel context.CancelFunc
+ start := time.Now()
+ end := start
+ hasTimeout := false
+ if timeout > 0 {
+ ctx, cancel = context.WithTimeout(baseCtx, timeout)
+ end = start.Add(timeout)
+ hasTimeout = true
+ } else {
+ ctx, cancel = context.WithCancel(baseCtx)
+ }
+ defer cancel()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
+ mqs := m.ManagedQueues()
+ wg := sync.WaitGroup{}
+ wg.Add(len(mqs))
+ allEmpty := true
+ for _, mq := range mqs {
+ if mq.IsEmpty() {
+ wg.Done()
+ continue
+ }
+ allEmpty = false
+ if flushable, ok := mq.Managed.(Flushable); ok {
+ go func() {
+ localCtx, localCancel := context.WithCancel(ctx)
+ pid := mq.RegisterWorkers(1, start, hasTimeout, end, localCancel, true)
+ err := flushable.FlushWithContext(localCtx)
+ if err != nil && err != ctx.Err() {
+ cancel()
+ }
+ mq.CancelWorkers(pid)
+ localCancel()
+ wg.Done()
+ }()
+ } else {
+ wg.Done()
+ }
+
+ }
+ if allEmpty {
+ break
+ }
+ wg.Wait()
+ }
+ return nil
+
+}
+
// ManagedQueues returns the managed queues
func (m *Manager) ManagedQueues() []*ManagedQueue {
m.mutex.Lock()
@@ -152,7 +233,7 @@ func (q *ManagedQueue) Workers() []*PoolWorkers {
}
// RegisterWorkers registers workers to this queue
-func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, cancel context.CancelFunc) int64 {
+func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, cancel context.CancelFunc, isFlusher bool) int64 {
q.mutex.Lock()
defer q.mutex.Unlock()
q.counter++
@@ -163,6 +244,7 @@ func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout b
Timeout: timeout,
HasTimeout: hasTimeout,
Cancel: cancel,
+ IsFlusher: isFlusher,
}
return q.counter
}
@@ -191,57 +273,74 @@ func (q *ManagedQueue) RemoveWorkers(pid int64) {
// AddWorkers adds workers to the queue if it has registered an add worker function
func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.CancelFunc {
- if q.Pool != nil {
+ if pool, ok := q.Managed.(ManagedPool); ok {
// the cancel will be added to the pool workers description above
- return q.Pool.AddWorkers(number, timeout)
+ return pool.AddWorkers(number, timeout)
}
return nil
}
+// Flush flushes the queue with a timeout
+func (q *ManagedQueue) Flush(timeout time.Duration) error {
+ if flushable, ok := q.Managed.(Flushable); ok {
+ // the cancel will be added to the pool workers description above
+ return flushable.Flush(timeout)
+ }
+ return nil
+}
+
+// IsEmpty returns if the queue is empty
+func (q *ManagedQueue) IsEmpty() bool {
+ if flushable, ok := q.Managed.(Flushable); ok {
+ return flushable.IsEmpty()
+ }
+ return true
+}
+
// NumberOfWorkers returns the number of workers in the queue
func (q *ManagedQueue) NumberOfWorkers() int {
- if q.Pool != nil {
- return q.Pool.NumberOfWorkers()
+ if pool, ok := q.Managed.(ManagedPool); ok {
+ return pool.NumberOfWorkers()
}
return -1
}
// MaxNumberOfWorkers returns the maximum number of workers for the pool
func (q *ManagedQueue) MaxNumberOfWorkers() int {
- if q.Pool != nil {
- return q.Pool.MaxNumberOfWorkers()
+ if pool, ok := q.Managed.(ManagedPool); ok {
+ return pool.MaxNumberOfWorkers()
}
return 0
}
// BoostWorkers returns the number of workers for a boost
func (q *ManagedQueue) BoostWorkers() int {
- if q.Pool != nil {
- return q.Pool.BoostWorkers()
+ if pool, ok := q.Managed.(ManagedPool); ok {
+ return pool.BoostWorkers()
}
return -1
}
// BoostTimeout returns the timeout of the next boost
func (q *ManagedQueue) BoostTimeout() time.Duration {
- if q.Pool != nil {
- return q.Pool.BoostTimeout()
+ if pool, ok := q.Managed.(ManagedPool); ok {
+ return pool.BoostTimeout()
}
return 0
}
// BlockTimeout returns the timeout til the next boost
func (q *ManagedQueue) BlockTimeout() time.Duration {
- if q.Pool != nil {
- return q.Pool.BlockTimeout()
+ if pool, ok := q.Managed.(ManagedPool); ok {
+ return pool.BlockTimeout()
}
return 0
}
-// SetSettings sets the setable boost values
-func (q *ManagedQueue) SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) {
- if q.Pool != nil {
- q.Pool.SetSettings(maxNumberOfWorkers, boostWorkers, timeout)
+// SetPoolSettings sets the setable boost values
+func (q *ManagedQueue) SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) {
+ if pool, ok := q.Managed.(ManagedPool); ok {
+ pool.SetPoolSettings(maxNumberOfWorkers, boostWorkers, timeout)
}
}