summaryrefslogtreecommitdiffstats
path: root/modules/queue
diff options
context:
space:
mode:
authorzeripath <art27@cantab.net>2020-01-29 01:01:06 +0000
committerGitHub <noreply@github.com>2020-01-28 20:01:06 -0500
commitc01221e70fc71f5bcff5f699095fbcbfc1e2b4a3 (patch)
tree4017848a786da2080e9a003a77bd40bd81625680 /modules/queue
parent7c84dbca4f0f79dc90752105800a6964693283bd (diff)
downloadgitea-c01221e70fc71f5bcff5f699095fbcbfc1e2b4a3.tar.gz
gitea-c01221e70fc71f5bcff5f699095fbcbfc1e2b4a3.zip
Queue: Make WorkerPools and Queues flushable (#10001)
* Make WorkerPools and Queues flushable Adds Flush methods to Queues and the WorkerPool Further abstracts the WorkerPool Adds a final step to Flush the queues in the defer from PrintCurrentTest Fixes an issue with Settings inheritance in queues Signed-off-by: Andrew Thornton <art27@cantab.net> * Change to for loop * Add IsEmpty and begin just making the queues composed WorkerPools * subsume workerpool into the queues and create a flushable interface * Add manager command * Move flushall to queue.Manager and add to testlogger * As per @guillep2k * as per @guillep2k * Just make queues all implement flushable and clean up the wrapped queue flushes * cope with no timeout Co-authored-by: Lauris BH <lauris@nix.lv>
Diffstat (limited to 'modules/queue')
-rw-r--r--modules/queue/helper.go63
-rw-r--r--modules/queue/manager.go159
-rw-r--r--modules/queue/queue.go43
-rw-r--r--modules/queue/queue_channel.go59
-rw-r--r--modules/queue/queue_channel_test.go30
-rw-r--r--modules/queue/queue_disk.go87
-rw-r--r--modules/queue/queue_disk_channel.go126
-rw-r--r--modules/queue/queue_disk_test.go36
-rw-r--r--modules/queue/queue_redis.go102
-rw-r--r--modules/queue/queue_wrapped.go97
-rw-r--r--modules/queue/setting.go24
-rw-r--r--modules/queue/workerpool.go94
12 files changed, 628 insertions, 292 deletions
diff --git a/modules/queue/helper.go b/modules/queue/helper.go
new file mode 100644
index 0000000000..e6fb1b94f9
--- /dev/null
+++ b/modules/queue/helper.go
@@ -0,0 +1,63 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+ "encoding/json"
+ "reflect"
+)
+
+// toConfig will attempt to convert a given configuration cfg into the provided exemplar type.
+//
+// It will tolerate the cfg being passed as a []byte or string of a json representation of the
+// exemplar or the correct type of the exemplar itself
+func toConfig(exemplar, cfg interface{}) (interface{}, error) {
+ if reflect.TypeOf(cfg).AssignableTo(reflect.TypeOf(exemplar)) {
+ return cfg, nil
+ }
+
+ configBytes, ok := cfg.([]byte)
+ if !ok {
+ configStr, ok := cfg.(string)
+ if !ok {
+ return nil, ErrInvalidConfiguration{cfg: cfg}
+ }
+ configBytes = []byte(configStr)
+ }
+ newVal := reflect.New(reflect.TypeOf(exemplar))
+ if err := json.Unmarshal(configBytes, newVal.Interface()); err != nil {
+ return nil, ErrInvalidConfiguration{cfg: cfg, err: err}
+ }
+ return newVal.Elem().Interface(), nil
+}
+
+// unmarshalAs will attempt to unmarshal provided bytes as the provided exemplar
+func unmarshalAs(bs []byte, exemplar interface{}) (data Data, err error) {
+ if exemplar != nil {
+ t := reflect.TypeOf(exemplar)
+ n := reflect.New(t)
+ ne := n.Elem()
+ err = json.Unmarshal(bs, ne.Addr().Interface())
+ data = ne.Interface().(Data)
+ } else {
+ err = json.Unmarshal(bs, &data)
+ }
+
+ return
+}
+
+// assignableTo will check if provided data is assignable to the same type as the exemplar
+// if the provided exemplar is nil then it will always return true
+func assignableTo(data Data, exemplar interface{}) bool {
+ if exemplar == nil {
+ return true
+ }
+
+ // Assert data is of same type as exemplar
+ t := reflect.TypeOf(data)
+ exemplarType := reflect.TypeOf(exemplar)
+
+ return t.AssignableTo(exemplarType) && data != nil
+}
diff --git a/modules/queue/manager.go b/modules/queue/manager.go
index 88b2644848..a6734787a9 100644
--- a/modules/queue/manager.go
+++ b/modules/queue/manager.go
@@ -26,36 +26,57 @@ type Manager struct {
Queues map[int64]*ManagedQueue
}
-// ManagedQueue represents a working queue inheriting from Gitea.
+// ManagedQueue represents a working queue with a Pool of workers.
+//
+// Although a ManagedQueue should really represent a Queue this does not
+// necessarily have to be the case. This could be used to describe any queue.WorkerPool.
type ManagedQueue struct {
mutex sync.Mutex
QID int64
- Queue Queue
Type Type
Name string
Configuration interface{}
ExemplarType string
- Pool ManagedPool
+ Managed interface{}
counter int64
PoolWorkers map[int64]*PoolWorkers
}
+// Flushable represents a pool or queue that is flushable
+type Flushable interface {
+ // Flush will add a flush worker to the pool - the worker should be autoregistered with the manager
+ Flush(time.Duration) error
+ // FlushWithContext is very similar to Flush
+ // NB: The worker will not be registered with the manager.
+ FlushWithContext(ctx context.Context) error
+ // IsEmpty will return if the managed pool is empty and has no work
+ IsEmpty() bool
+}
+
// ManagedPool is a simple interface to get certain details from a worker pool
type ManagedPool interface {
+ // AddWorkers adds a number of worker as group to the pool with the provided timeout. A CancelFunc is provided to cancel the group
AddWorkers(number int, timeout time.Duration) context.CancelFunc
+ // NumberOfWorkers returns the total number of workers in the pool
NumberOfWorkers() int
+ // MaxNumberOfWorkers returns the maximum number of workers the pool can dynamically grow to
MaxNumberOfWorkers() int
+ // SetMaxNumberOfWorkers sets the maximum number of workers the pool can dynamically grow to
SetMaxNumberOfWorkers(int)
+ // BoostTimeout returns the current timeout for worker groups created during a boost
BoostTimeout() time.Duration
+ // BlockTimeout returns the timeout the internal channel can block for before a boost would occur
BlockTimeout() time.Duration
+ // BoostWorkers sets the number of workers to be created during a boost
BoostWorkers() int
- SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
+ // SetPoolSettings sets the user updatable settings for the pool
+ SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
}
// ManagedQueueList implements the sort.Interface
type ManagedQueueList []*ManagedQueue
-// PoolWorkers represents a working queue inheriting from Gitea.
+// PoolWorkers represents a group of workers working on a queue
type PoolWorkers struct {
PID int64
Workers int
@@ -63,9 +84,10 @@ type PoolWorkers struct {
Timeout time.Time
HasTimeout bool
Cancel context.CancelFunc
+ IsFlusher bool
}
-// PoolWorkersList implements the sort.Interface
+// PoolWorkersList implements the sort.Interface for PoolWorkers
type PoolWorkersList []*PoolWorkers
func init() {
@@ -83,27 +105,28 @@ func GetManager() *Manager {
}
// Add adds a queue to this manager
-func (m *Manager) Add(queue Queue,
+func (m *Manager) Add(managed interface{},
t Type,
configuration,
- exemplar interface{},
- pool ManagedPool) int64 {
+ exemplar interface{}) int64 {
cfg, _ := json.Marshal(configuration)
mq := &ManagedQueue{
- Queue: queue,
Type: t,
Configuration: string(cfg),
ExemplarType: reflect.TypeOf(exemplar).String(),
PoolWorkers: make(map[int64]*PoolWorkers),
- Pool: pool,
+ Managed: managed,
}
m.mutex.Lock()
m.counter++
mq.QID = m.counter
mq.Name = fmt.Sprintf("queue-%d", mq.QID)
- if named, ok := queue.(Named); ok {
- mq.Name = named.Name()
+ if named, ok := managed.(Named); ok {
+ name := named.Name()
+ if len(name) > 0 {
+ mq.Name = name
+ }
}
m.Queues[mq.QID] = mq
m.mutex.Unlock()
@@ -127,6 +150,64 @@ func (m *Manager) GetManagedQueue(qid int64) *ManagedQueue {
return m.Queues[qid]
}
+// FlushAll flushes all the flushable queues attached to this manager
+func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error {
+ var ctx context.Context
+ var cancel context.CancelFunc
+ start := time.Now()
+ end := start
+ hasTimeout := false
+ if timeout > 0 {
+ ctx, cancel = context.WithTimeout(baseCtx, timeout)
+ end = start.Add(timeout)
+ hasTimeout = true
+ } else {
+ ctx, cancel = context.WithCancel(baseCtx)
+ }
+ defer cancel()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
+ mqs := m.ManagedQueues()
+ wg := sync.WaitGroup{}
+ wg.Add(len(mqs))
+ allEmpty := true
+ for _, mq := range mqs {
+ if mq.IsEmpty() {
+ wg.Done()
+ continue
+ }
+ allEmpty = false
+ if flushable, ok := mq.Managed.(Flushable); ok {
+ go func() {
+ localCtx, localCancel := context.WithCancel(ctx)
+ pid := mq.RegisterWorkers(1, start, hasTimeout, end, localCancel, true)
+ err := flushable.FlushWithContext(localCtx)
+ if err != nil && err != ctx.Err() {
+ cancel()
+ }
+ mq.CancelWorkers(pid)
+ localCancel()
+ wg.Done()
+ }()
+ } else {
+ wg.Done()
+ }
+
+ }
+ if allEmpty {
+ break
+ }
+ wg.Wait()
+ }
+ return nil
+
+}
+
// ManagedQueues returns the managed queues
func (m *Manager) ManagedQueues() []*ManagedQueue {
m.mutex.Lock()
@@ -152,7 +233,7 @@ func (q *ManagedQueue) Workers() []*PoolWorkers {
}
// RegisterWorkers registers workers to this queue
-func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, cancel context.CancelFunc) int64 {
+func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, cancel context.CancelFunc, isFlusher bool) int64 {
q.mutex.Lock()
defer q.mutex.Unlock()
q.counter++
@@ -163,6 +244,7 @@ func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout b
Timeout: timeout,
HasTimeout: hasTimeout,
Cancel: cancel,
+ IsFlusher: isFlusher,
}
return q.counter
}
@@ -191,57 +273,74 @@ func (q *ManagedQueue) RemoveWorkers(pid int64) {
// AddWorkers adds workers to the queue if it has registered an add worker function
func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.CancelFunc {
- if q.Pool != nil {
+ if pool, ok := q.Managed.(ManagedPool); ok {
// the cancel will be added to the pool workers description above
- return q.Pool.AddWorkers(number, timeout)
+ return pool.AddWorkers(number, timeout)
}
return nil
}
+// Flush flushes the queue with a timeout
+func (q *ManagedQueue) Flush(timeout time.Duration) error {
+ if flushable, ok := q.Managed.(Flushable); ok {
+ // the cancel will be added to the pool workers description above
+ return flushable.Flush(timeout)
+ }
+ return nil
+}
+
+// IsEmpty returns if the queue is empty
+func (q *ManagedQueue) IsEmpty() bool {
+ if flushable, ok := q.Managed.(Flushable); ok {
+ return flushable.IsEmpty()
+ }
+ return true
+}
+
// NumberOfWorkers returns the number of workers in the queue
func (q *ManagedQueue) NumberOfWorkers() int {
- if q.Pool != nil {
- return q.Pool.NumberOfWorkers()
+ if pool, ok := q.Managed.(ManagedPool); ok {
+ return pool.NumberOfWorkers()
}
return -1
}
// MaxNumberOfWorkers returns the maximum number of workers for the pool
func (q *ManagedQueue) MaxNumberOfWorkers() int {
- if q.Pool != nil {
- return q.Pool.MaxNumberOfWorkers()
+ if pool, ok := q.Managed.(ManagedPool); ok {
+ return pool.MaxNumberOfWorkers()
}
return 0
}
// BoostWorkers returns the number of workers for a boost
func (q *ManagedQueue) BoostWorkers() int {
- if q.Pool != nil {
- return q.Pool.BoostWorkers()
+ if pool, ok := q.Managed.(ManagedPool); ok {
+ return pool.BoostWorkers()
}
return -1
}
// BoostTimeout returns the timeout of the next boost
func (q *ManagedQueue) BoostTimeout() time.Duration {
- if q.Pool != nil {
- return q.Pool.BoostTimeout()
+ if pool, ok := q.Managed.(ManagedPool); ok {
+ return pool.BoostTimeout()
}
return 0
}
// BlockTimeout returns the timeout til the next boost
func (q *ManagedQueue) BlockTimeout() time.Duration {
- if q.Pool != nil {
- return q.Pool.BlockTimeout()
+ if pool, ok := q.Managed.(ManagedPool); ok {
+ return pool.BlockTimeout()
}
return 0
}
-// SetSettings sets the setable boost values
-func (q *ManagedQueue) SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) {
- if q.Pool != nil {
- q.Pool.SetSettings(maxNumberOfWorkers, boostWorkers, timeout)
+// SetPoolSettings sets the setable boost values
+func (q *ManagedQueue) SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) {
+ if pool, ok := q.Managed.(ManagedPool); ok {
+ pool.SetPoolSettings(maxNumberOfWorkers, boostWorkers, timeout)
}
}
diff --git a/modules/queue/queue.go b/modules/queue/queue.go
index d458a7d506..094699d4af 100644
--- a/modules/queue/queue.go
+++ b/modules/queue/queue.go
@@ -6,9 +6,8 @@ package queue
import (
"context"
- "encoding/json"
"fmt"
- "reflect"
+ "time"
)
// ErrInvalidConfiguration is called when there is invalid configuration for a queue
@@ -53,8 +52,11 @@ type Named interface {
Name() string
}
-// Queue defines an interface to save an issue indexer queue
+// Queue defines an interface of a queue-like item
+//
+// Queues will handle their own contents in the Run method
type Queue interface {
+ Flushable
Run(atShutdown, atTerminate func(context.Context, func()))
Push(Data) error
}
@@ -71,32 +73,27 @@ func NewDummyQueue(handler HandlerFunc, opts, exemplar interface{}) (Queue, erro
type DummyQueue struct {
}
-// Run starts to run the queue
+// Run does nothing
func (b *DummyQueue) Run(_, _ func(context.Context, func())) {}
-// Push pushes data to the queue
+// Push fakes a push of data to the queue
func (b *DummyQueue) Push(Data) error {
return nil
}
-func toConfig(exemplar, cfg interface{}) (interface{}, error) {
- if reflect.TypeOf(cfg).AssignableTo(reflect.TypeOf(exemplar)) {
- return cfg, nil
- }
+// Flush always returns nil
+func (b *DummyQueue) Flush(time.Duration) error {
+ return nil
+}
- configBytes, ok := cfg.([]byte)
- if !ok {
- configStr, ok := cfg.(string)
- if !ok {
- return nil, ErrInvalidConfiguration{cfg: cfg}
- }
- configBytes = []byte(configStr)
- }
- newVal := reflect.New(reflect.TypeOf(exemplar))
- if err := json.Unmarshal(configBytes, newVal.Interface()); err != nil {
- return nil, ErrInvalidConfiguration{cfg: cfg, err: err}
- }
- return newVal.Elem().Interface(), nil
+// FlushWithContext always returns nil
+func (b *DummyQueue) FlushWithContext(context.Context) error {
+ return nil
+}
+
+// IsEmpty asserts that the queue is empty
+func (b *DummyQueue) IsEmpty() bool {
+ return true
}
var queuesMap = map[Type]NewQueueFunc{DummyQueueType: NewDummyQueue}
@@ -123,7 +120,7 @@ func RegisteredTypesAsString() []string {
return types
}
-// NewQueue takes a queue Type and HandlerFunc some options and possibly an exemplar and returns a Queue or an error
+// NewQueue takes a queue Type, HandlerFunc, some options and possibly an exemplar and returns a Queue or an error
func NewQueue(queueType Type, handlerFunc HandlerFunc, opts, exemplar interface{}) (Queue, error) {
newFn, ok := queuesMap[queueType]
if !ok {
diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go
index c8f8a53804..45df8a443e 100644
--- a/modules/queue/queue_channel.go
+++ b/modules/queue/queue_channel.go
@@ -7,8 +7,6 @@ package queue
import (
"context"
"fmt"
- "reflect"
- "time"
"code.gitea.io/gitea/modules/log"
)
@@ -18,25 +16,23 @@ const ChannelQueueType Type = "channel"
// ChannelQueueConfiguration is the configuration for a ChannelQueue
type ChannelQueueConfiguration struct {
- QueueLength int
- BatchLength int
- Workers int
- MaxWorkers int
- BlockTimeout time.Duration
- BoostTimeout time.Duration
- BoostWorkers int
- Name string
+ WorkerPoolConfiguration
+ Workers int
+ Name string
}
-// ChannelQueue implements
+// ChannelQueue implements Queue
+//
+// A channel queue is not persistable and does not shutdown or terminate cleanly
+// It is basically a very thin wrapper around a WorkerPool
type ChannelQueue struct {
- pool *WorkerPool
+ *WorkerPool
exemplar interface{}
workers int
name string
}
-// NewChannelQueue create a memory channel queue
+// NewChannelQueue creates a memory channel queue
func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
configInterface, err := toConfig(ChannelQueueConfiguration{}, cfg)
if err != nil {
@@ -46,26 +42,13 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro
if config.BatchLength == 0 {
config.BatchLength = 1
}
- dataChan := make(chan Data, config.QueueLength)
-
- ctx, cancel := context.WithCancel(context.Background())
queue := &ChannelQueue{
- pool: &WorkerPool{
- baseCtx: ctx,
- cancel: cancel,
- batchLength: config.BatchLength,
- handle: handle,
- dataChan: dataChan,
- blockTimeout: config.BlockTimeout,
- boostTimeout: config.BoostTimeout,
- boostWorkers: config.BoostWorkers,
- maxNumberOfWorkers: config.MaxWorkers,
- },
- exemplar: exemplar,
- workers: config.Workers,
- name: config.Name,
+ WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
+ exemplar: exemplar,
+ workers: config.Workers,
+ name: config.Name,
}
- queue.pool.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar, queue.pool)
+ queue.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar)
return queue, nil
}
@@ -77,22 +60,18 @@ func (c *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())
atTerminate(context.Background(), func() {
log.Warn("ChannelQueue: %s is not terminatable!", c.name)
})
+ log.Debug("ChannelQueue: %s Starting", c.name)
go func() {
- _ = c.pool.AddWorkers(c.workers, 0)
+ _ = c.AddWorkers(c.workers, 0)
}()
}
// Push will push data into the queue
func (c *ChannelQueue) Push(data Data) error {
- if c.exemplar != nil {
- // Assert data is of same type as r.exemplar
- t := reflect.TypeOf(data)
- exemplarType := reflect.TypeOf(c.exemplar)
- if !t.AssignableTo(exemplarType) || data == nil {
- return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, c.exemplar, c.name)
- }
+ if !assignableTo(data, c.exemplar) {
+ return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, c.exemplar, c.name)
}
- c.pool.Push(data)
+ c.WorkerPool.Push(data)
return nil
}
diff --git a/modules/queue/queue_channel_test.go b/modules/queue/queue_channel_test.go
index fafc1e3303..8234b0f6f2 100644
--- a/modules/queue/queue_channel_test.go
+++ b/modules/queue/queue_channel_test.go
@@ -25,12 +25,14 @@ func TestChannelQueue(t *testing.T) {
queue, err := NewChannelQueue(handle,
ChannelQueueConfiguration{
- QueueLength: 20,
- Workers: 1,
- MaxWorkers: 10,
- BlockTimeout: 1 * time.Second,
- BoostTimeout: 5 * time.Minute,
- BoostWorkers: 5,
+ WorkerPoolConfiguration: WorkerPoolConfiguration{
+ QueueLength: 20,
+ MaxWorkers: 10,
+ BlockTimeout: 1 * time.Second,
+ BoostTimeout: 5 * time.Minute,
+ BoostWorkers: 5,
+ },
+ Workers: 1,
}, &testData{})
assert.NoError(t, err)
@@ -60,13 +62,15 @@ func TestChannelQueue_Batch(t *testing.T) {
queue, err := NewChannelQueue(handle,
ChannelQueueConfiguration{
- QueueLength: 20,
- BatchLength: 2,
- Workers: 1,
- MaxWorkers: 10,
- BlockTimeout: 1 * time.Second,
- BoostTimeout: 5 * time.Minute,
- BoostWorkers: 5,
+ WorkerPoolConfiguration: WorkerPoolConfiguration{
+ QueueLength: 20,
+ BatchLength: 2,
+ BlockTimeout: 1 * time.Second,
+ BoostTimeout: 5 * time.Minute,
+ BoostWorkers: 5,
+ MaxWorkers: 10,
+ },
+ Workers: 1,
}, &testData{})
assert.NoError(t, err)
diff --git a/modules/queue/queue_disk.go b/modules/queue/queue_disk.go
index 98e7b24e42..ca3e230e3d 100644
--- a/modules/queue/queue_disk.go
+++ b/modules/queue/queue_disk.go
@@ -8,8 +8,8 @@ import (
"context"
"encoding/json"
"fmt"
- "reflect"
"sync"
+ "sync/atomic"
"time"
"code.gitea.io/gitea/modules/log"
@@ -22,20 +22,15 @@ const LevelQueueType Type = "level"
// LevelQueueConfiguration is the configuration for a LevelQueue
type LevelQueueConfiguration struct {
- DataDir string
- QueueLength int
- BatchLength int
- Workers int
- MaxWorkers int
- BlockTimeout time.Duration
- BoostTimeout time.Duration
- BoostWorkers int
- Name string
+ WorkerPoolConfiguration
+ DataDir string
+ Workers int
+ Name string
}
// LevelQueue implements a disk library queue
type LevelQueue struct {
- pool *WorkerPool
+ *WorkerPool
queue *levelqueue.Queue
closed chan struct{}
terminated chan struct{}
@@ -58,21 +53,8 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
return nil, err
}
- dataChan := make(chan Data, config.QueueLength)
- ctx, cancel := context.WithCancel(context.Background())
-
queue := &LevelQueue{
- pool: &WorkerPool{
- baseCtx: ctx,
- cancel: cancel,
- batchLength: config.BatchLength,
- handle: handle,
- dataChan: dataChan,
- blockTimeout: config.BlockTimeout,
- boostTimeout: config.BoostTimeout,
- boostWorkers: config.BoostWorkers,
- maxNumberOfWorkers: config.MaxWorkers,
- },
+ WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
queue: internal,
exemplar: exemplar,
closed: make(chan struct{}),
@@ -80,7 +62,7 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
workers: config.Workers,
name: config.Name,
}
- queue.pool.qid = GetManager().Add(queue, LevelQueueType, config, exemplar, queue.pool)
+ queue.qid = GetManager().Add(queue, LevelQueueType, config, exemplar)
return queue, nil
}
@@ -88,9 +70,10 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
atShutdown(context.Background(), l.Shutdown)
atTerminate(context.Background(), l.Terminate)
+ log.Debug("LevelQueue: %s Starting", l.name)
go func() {
- _ = l.pool.AddWorkers(l.workers, 0)
+ _ = l.AddWorkers(l.workers, 0)
}()
go l.readToChan()
@@ -99,12 +82,12 @@ func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func()))
<-l.closed
log.Trace("LevelQueue: %s Waiting til done", l.name)
- l.pool.Wait()
+ l.Wait()
log.Trace("LevelQueue: %s Waiting til cleaned", l.name)
ctx, cancel := context.WithCancel(context.Background())
atTerminate(ctx, cancel)
- l.pool.CleanUp(ctx)
+ l.CleanUp(ctx)
cancel()
log.Trace("LevelQueue: %s Cleaned", l.name)
@@ -115,56 +98,45 @@ func (l *LevelQueue) readToChan() {
select {
case <-l.closed:
// tell the pool to shutdown.
- l.pool.cancel()
+ l.cancel()
return
default:
+ atomic.AddInt64(&l.numInQueue, 1)
bs, err := l.queue.RPop()
if err != nil {
if err != levelqueue.ErrNotFound {
log.Error("LevelQueue: %s Error on RPop: %v", l.name, err)
}
+ atomic.AddInt64(&l.numInQueue, -1)
time.Sleep(time.Millisecond * 100)
continue
}
if len(bs) == 0 {
+ atomic.AddInt64(&l.numInQueue, -1)
time.Sleep(time.Millisecond * 100)
continue
}
- var data Data
- if l.exemplar != nil {
- t := reflect.TypeOf(l.exemplar)
- n := reflect.New(t)
- ne := n.Elem()
- err = json.Unmarshal(bs, ne.Addr().Interface())
- data = ne.Interface().(Data)
- } else {
- err = json.Unmarshal(bs, &data)
- }
+ data, err := unmarshalAs(bs, l.exemplar)
if err != nil {
log.Error("LevelQueue: %s Failed to unmarshal with error: %v", l.name, err)
+ atomic.AddInt64(&l.numInQueue, -1)
time.Sleep(time.Millisecond * 100)
continue
}
log.Trace("LevelQueue %s: Task found: %#v", l.name, data)
- l.pool.Push(data)
-
+ l.WorkerPool.Push(data)
+ atomic.AddInt64(&l.numInQueue, -1)
}
}
}
// Push will push the indexer data to queue
func (l *LevelQueue) Push(data Data) error {
- if l.exemplar != nil {
- // Assert data is of same type as r.exemplar
- value := reflect.ValueOf(data)
- t := value.Type()
- exemplarType := reflect.ValueOf(l.exemplar).Type()
- if !t.AssignableTo(exemplarType) || data == nil {
- return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, l.exemplar, l.name)
- }
+ if !assignableTo(data, l.exemplar) {
+ return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, l.exemplar, l.name)
}
bs, err := json.Marshal(data)
if err != nil {
@@ -173,16 +145,25 @@ func (l *LevelQueue) Push(data Data) error {
return l.queue.LPush(bs)
}
+// IsEmpty checks whether the queue is empty
+func (l *LevelQueue) IsEmpty() bool {
+ if !l.WorkerPool.IsEmpty() {
+ return false
+ }
+ return l.queue.Len() == 0
+}
+
// Shutdown this queue and stop processing
func (l *LevelQueue) Shutdown() {
l.lock.Lock()
defer l.lock.Unlock()
- log.Trace("LevelQueue: %s Shutdown", l.name)
+ log.Trace("LevelQueue: %s Shutting down", l.name)
select {
case <-l.closed:
default:
close(l.closed)
}
+ log.Debug("LevelQueue: %s Shutdown", l.name)
}
// Terminate this queue and close the queue
@@ -196,11 +177,15 @@ func (l *LevelQueue) Terminate() {
default:
close(l.terminated)
l.lock.Unlock()
+ if log.IsDebug() {
+ log.Debug("LevelQueue: %s Closing with %d tasks left in queue", l.name, l.queue.Len())
+ }
if err := l.queue.Close(); err != nil && err.Error() != "leveldb: closed" {
log.Error("Error whilst closing internal queue in %s: %v", l.name, err)
}
}
+ log.Debug("LevelQueue: %s Terminated", l.name)
}
// Name returns the name of this queue
diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go
index 6bb5a1be97..961187ab0d 100644
--- a/modules/queue/queue_disk_channel.go
+++ b/modules/queue/queue_disk_channel.go
@@ -6,7 +6,9 @@ package queue
import (
"context"
+ "fmt"
"sync"
+ "sync/atomic"
"time"
"code.gitea.io/gitea/modules/log"
@@ -31,8 +33,10 @@ type PersistableChannelQueueConfiguration struct {
}
// PersistableChannelQueue wraps a channel queue and level queue together
+// The disk level queue will be used to store data at shutdown and terminate - and will be restored
+// on start up.
type PersistableChannelQueue struct {
- *ChannelQueue
+ channelQueue *ChannelQueue
delayedStarter
lock sync.Mutex
closed chan struct{}
@@ -48,14 +52,16 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
config := configInterface.(PersistableChannelQueueConfiguration)
channelQueue, err := NewChannelQueue(handle, ChannelQueueConfiguration{
- QueueLength: config.QueueLength,
- BatchLength: config.BatchLength,
- Workers: config.Workers,
- MaxWorkers: config.MaxWorkers,
- BlockTimeout: config.BlockTimeout,
- BoostTimeout: config.BoostTimeout,
- BoostWorkers: config.BoostWorkers,
- Name: config.Name + "-channel",
+ WorkerPoolConfiguration: WorkerPoolConfiguration{
+ QueueLength: config.QueueLength,
+ BatchLength: config.BatchLength,
+ BlockTimeout: config.BlockTimeout,
+ BoostTimeout: config.BoostTimeout,
+ BoostWorkers: config.BoostWorkers,
+ MaxWorkers: config.MaxWorkers,
+ },
+ Workers: config.Workers,
+ Name: config.Name + "-channel",
}, exemplar)
if err != nil {
return nil, err
@@ -63,28 +69,30 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
// the level backend only needs temporary workers to catch up with the previously dropped work
levelCfg := LevelQueueConfiguration{
- DataDir: config.DataDir,
- QueueLength: config.QueueLength,
- BatchLength: config.BatchLength,
- Workers: 1,
- MaxWorkers: 6,
- BlockTimeout: 1 * time.Second,
- BoostTimeout: 5 * time.Minute,
- BoostWorkers: 5,
- Name: config.Name + "-level",
+ WorkerPoolConfiguration: WorkerPoolConfiguration{
+ QueueLength: config.QueueLength,
+ BatchLength: config.BatchLength,
+ BlockTimeout: 1 * time.Second,
+ BoostTimeout: 5 * time.Minute,
+ BoostWorkers: 5,
+ MaxWorkers: 6,
+ },
+ DataDir: config.DataDir,
+ Workers: 1,
+ Name: config.Name + "-level",
}
levelQueue, err := NewLevelQueue(handle, levelCfg, exemplar)
if err == nil {
queue := &PersistableChannelQueue{
- ChannelQueue: channelQueue.(*ChannelQueue),
+ channelQueue: channelQueue.(*ChannelQueue),
delayedStarter: delayedStarter{
internal: levelQueue.(*LevelQueue),
name: config.Name,
},
closed: make(chan struct{}),
}
- _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar, nil)
+ _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar)
return queue, nil
}
if IsErrInvalidConfiguration(err) {
@@ -93,7 +101,7 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
}
queue := &PersistableChannelQueue{
- ChannelQueue: channelQueue.(*ChannelQueue),
+ channelQueue: channelQueue.(*ChannelQueue),
delayedStarter: delayedStarter{
cfg: levelCfg,
underlying: LevelQueueType,
@@ -103,7 +111,7 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
},
closed: make(chan struct{}),
}
- _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar, nil)
+ _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar)
return queue, nil
}
@@ -118,15 +126,17 @@ func (p *PersistableChannelQueue) Push(data Data) error {
case <-p.closed:
return p.internal.Push(data)
default:
- return p.ChannelQueue.Push(data)
+ return p.channelQueue.Push(data)
}
}
// Run starts to run the queue
func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
+ log.Debug("PersistableChannelQueue: %s Starting", p.delayedStarter.name)
+
p.lock.Lock()
if p.internal == nil {
- err := p.setInternal(atShutdown, p.ChannelQueue.pool.handle, p.exemplar)
+ err := p.setInternal(atShutdown, p.channelQueue.handle, p.channelQueue.exemplar)
p.lock.Unlock()
if err != nil {
log.Fatal("Unable to create internal queue for %s Error: %v", p.Name(), err)
@@ -142,31 +152,83 @@ func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Conte
go p.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {})
go func() {
- _ = p.ChannelQueue.pool.AddWorkers(p.workers, 0)
+ _ = p.channelQueue.AddWorkers(p.channelQueue.workers, 0)
}()
log.Trace("PersistableChannelQueue: %s Waiting til closed", p.delayedStarter.name)
<-p.closed
log.Trace("PersistableChannelQueue: %s Cancelling pools", p.delayedStarter.name)
- p.ChannelQueue.pool.cancel()
- p.internal.(*LevelQueue).pool.cancel()
+ p.channelQueue.cancel()
+ p.internal.(*LevelQueue).cancel()
log.Trace("PersistableChannelQueue: %s Waiting til done", p.delayedStarter.name)
- p.ChannelQueue.pool.Wait()
- p.internal.(*LevelQueue).pool.Wait()
+ p.channelQueue.Wait()
+ p.internal.(*LevelQueue).Wait()
// Redirect all remaining data in the chan to the internal channel
go func() {
log.Trace("PersistableChannelQueue: %s Redirecting remaining data", p.delayedStarter.name)
- for data := range p.ChannelQueue.pool.dataChan {
+ for data := range p.channelQueue.dataChan {
_ = p.internal.Push(data)
+ atomic.AddInt64(&p.channelQueue.numInQueue, -1)
}
log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", p.delayedStarter.name)
}()
log.Trace("PersistableChannelQueue: %s Done main loop", p.delayedStarter.name)
}
+// Flush flushes the queue and blocks till the queue is empty
+func (p *PersistableChannelQueue) Flush(timeout time.Duration) error {
+ var ctx context.Context
+ var cancel context.CancelFunc
+ if timeout > 0 {
+ ctx, cancel = context.WithTimeout(context.Background(), timeout)
+ } else {
+ ctx, cancel = context.WithCancel(context.Background())
+ }
+ defer cancel()
+ return p.FlushWithContext(ctx)
+}
+
+// FlushWithContext flushes the queue and blocks till the queue is empty
+func (p *PersistableChannelQueue) FlushWithContext(ctx context.Context) error {
+ errChan := make(chan error, 1)
+ go func() {
+ errChan <- p.channelQueue.FlushWithContext(ctx)
+ }()
+ go func() {
+ p.lock.Lock()
+ if p.internal == nil {
+ p.lock.Unlock()
+ errChan <- fmt.Errorf("not ready to flush internal queue %s yet", p.Name())
+ return
+ }
+ p.lock.Unlock()
+ errChan <- p.internal.FlushWithContext(ctx)
+ }()
+ err1 := <-errChan
+ err2 := <-errChan
+
+ if err1 != nil {
+ return err1
+ }
+ return err2
+}
+
+// IsEmpty checks if a queue is empty
+func (p *PersistableChannelQueue) IsEmpty() bool {
+ if !p.channelQueue.IsEmpty() {
+ return false
+ }
+ p.lock.Lock()
+ defer p.lock.Unlock()
+ if p.internal == nil {
+ return false
+ }
+ return p.internal.IsEmpty()
+}
+
// Shutdown processing this queue
func (p *PersistableChannelQueue) Shutdown() {
- log.Trace("PersistableChannelQueue: %s Shutdown", p.delayedStarter.name)
+ log.Trace("PersistableChannelQueue: %s Shutting down", p.delayedStarter.name)
select {
case <-p.closed:
default:
@@ -177,6 +239,7 @@ func (p *PersistableChannelQueue) Shutdown() {
}
close(p.closed)
}
+ log.Debug("PersistableChannelQueue: %s Shutdown", p.delayedStarter.name)
}
// Terminate this queue and close the queue
@@ -188,6 +251,7 @@ func (p *PersistableChannelQueue) Terminate() {
if p.internal != nil {
p.internal.(*LevelQueue).Terminate()
}
+ log.Debug("PersistableChannelQueue: %s Terminated", p.delayedStarter.name)
}
func init() {
diff --git a/modules/queue/queue_disk_test.go b/modules/queue/queue_disk_test.go
index c5959d606f..8600b8d868 100644
--- a/modules/queue/queue_disk_test.go
+++ b/modules/queue/queue_disk_test.go
@@ -32,14 +32,16 @@ func TestLevelQueue(t *testing.T) {
defer os.RemoveAll(tmpDir)
queue, err := NewLevelQueue(handle, LevelQueueConfiguration{
- DataDir: tmpDir,
- BatchLength: 2,
- Workers: 1,
- MaxWorkers: 10,
- QueueLength: 20,
- BlockTimeout: 1 * time.Second,
- BoostTimeout: 5 * time.Minute,
- BoostWorkers: 5,
+ WorkerPoolConfiguration: WorkerPoolConfiguration{
+ QueueLength: 20,
+ BatchLength: 2,
+ BlockTimeout: 1 * time.Second,
+ BoostTimeout: 5 * time.Minute,
+ BoostWorkers: 5,
+ MaxWorkers: 10,
+ },
+ DataDir: tmpDir,
+ Workers: 1,
}, &testData{})
assert.NoError(t, err)
@@ -92,14 +94,16 @@ func TestLevelQueue(t *testing.T) {
WrappedQueueConfiguration{
Underlying: LevelQueueType,
Config: LevelQueueConfiguration{
- DataDir: tmpDir,
- BatchLength: 2,
- Workers: 1,
- MaxWorkers: 10,
- QueueLength: 20,
- BlockTimeout: 1 * time.Second,
- BoostTimeout: 5 * time.Minute,
- BoostWorkers: 5,
+ WorkerPoolConfiguration: WorkerPoolConfiguration{
+ QueueLength: 20,
+ BatchLength: 2,
+ BlockTimeout: 1 * time.Second,
+ BoostTimeout: 5 * time.Minute,
+ BoostWorkers: 5,
+ MaxWorkers: 10,
+ },
+ DataDir: tmpDir,
+ Workers: 1,
},
}, &testData{})
assert.NoError(t, err)
diff --git a/modules/queue/queue_redis.go b/modules/queue/queue_redis.go
index 7d3efb9cff..0167c1ec49 100644
--- a/modules/queue/queue_redis.go
+++ b/modules/queue/queue_redis.go
@@ -9,9 +9,9 @@ import (
"encoding/json"
"errors"
"fmt"
- "reflect"
"strings"
"sync"
+ "sync/atomic"
"time"
"code.gitea.io/gitea/modules/log"
@@ -25,13 +25,14 @@ const RedisQueueType Type = "redis"
type redisClient interface {
RPush(key string, args ...interface{}) *redis.IntCmd
LPop(key string) *redis.StringCmd
+ LLen(key string) *redis.IntCmd
Ping() *redis.StatusCmd
Close() error
}
// RedisQueue redis queue
type RedisQueue struct {
- pool *WorkerPool
+ *WorkerPool
client redisClient
queueName string
closed chan struct{}
@@ -44,19 +45,14 @@ type RedisQueue struct {
// RedisQueueConfiguration is the configuration for the redis queue
type RedisQueueConfiguration struct {
- Network string
- Addresses string
- Password string
- DBIndex int
- BatchLength int
- QueueLength int
- QueueName string
- Workers int
- MaxWorkers int
- BlockTimeout time.Duration
- BoostTimeout time.Duration
- BoostWorkers int
- Name string
+ WorkerPoolConfiguration
+ Network string
+ Addresses string
+ Password string
+ DBIndex int
+ QueueName string
+ Workers int
+ Name string
}
// NewRedisQueue creates single redis or cluster redis queue
@@ -69,21 +65,8 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
dbs := strings.Split(config.Addresses, ",")
- dataChan := make(chan Data, config.QueueLength)
- ctx, cancel := context.WithCancel(context.Background())
-
var queue = &RedisQueue{
- pool: &WorkerPool{
- baseCtx: ctx,
- cancel: cancel,
- batchLength: config.BatchLength,
- handle: handle,
- dataChan: dataChan,
- blockTimeout: config.BlockTimeout,
- boostTimeout: config.BoostTimeout,
- boostWorkers: config.BoostWorkers,
- maxNumberOfWorkers: config.MaxWorkers,
- },
+ WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
queueName: config.QueueName,
exemplar: exemplar,
closed: make(chan struct{}),
@@ -108,7 +91,7 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
if err := queue.client.Ping().Err(); err != nil {
return nil, err
}
- queue.pool.qid = GetManager().Add(queue, RedisQueueType, config, exemplar, queue.pool)
+ queue.qid = GetManager().Add(queue, RedisQueueType, config, exemplar)
return queue, nil
}
@@ -117,9 +100,10 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
func (r *RedisQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
atShutdown(context.Background(), r.Shutdown)
atTerminate(context.Background(), r.Terminate)
+ log.Debug("RedisQueue: %s Starting", r.name)
go func() {
- _ = r.pool.AddWorkers(r.workers, 0)
+ _ = r.AddWorkers(r.workers, 0)
}()
go r.readToChan()
@@ -127,12 +111,12 @@ func (r *RedisQueue) Run(atShutdown, atTerminate func(context.Context, func()))
log.Trace("RedisQueue: %s Waiting til closed", r.name)
<-r.closed
log.Trace("RedisQueue: %s Waiting til done", r.name)
- r.pool.Wait()
+ r.Wait()
log.Trace("RedisQueue: %s Waiting til cleaned", r.name)
ctx, cancel := context.WithCancel(context.Background())
atTerminate(ctx, cancel)
- r.pool.CleanUp(ctx)
+ r.CleanUp(ctx)
cancel()
}
@@ -141,53 +125,43 @@ func (r *RedisQueue) readToChan() {
select {
case <-r.closed:
// tell the pool to shutdown
- r.pool.cancel()
+ r.cancel()
return
default:
+ atomic.AddInt64(&r.numInQueue, 1)
bs, err := r.client.LPop(r.queueName).Bytes()
if err != nil && err != redis.Nil {
log.Error("RedisQueue: %s Error on LPop: %v", r.name, err)
+ atomic.AddInt64(&r.numInQueue, -1)
time.Sleep(time.Millisecond * 100)
continue
}
if len(bs) == 0 {
+ atomic.AddInt64(&r.numInQueue, -1)
time.Sleep(time.Millisecond * 100)
continue
}
- var data Data
- if r.exemplar != nil {
- t := reflect.TypeOf(r.exemplar)
- n := reflect.New(t)
- ne := n.Elem()
- err = json.Unmarshal(bs, ne.Addr().Interface())
- data = ne.Interface().(Data)
- } else {
- err = json.Unmarshal(bs, &data)
- }
+ data, err := unmarshalAs(bs, r.exemplar)
if err != nil {
log.Error("RedisQueue: %s Error on Unmarshal: %v", r.name, err)
+ atomic.AddInt64(&r.numInQueue, -1)
time.Sleep(time.Millisecond * 100)
continue
}
log.Trace("RedisQueue: %s Task found: %#v", r.name, data)
- r.pool.Push(data)
+ r.WorkerPool.Push(data)
+ atomic.AddInt64(&r.numInQueue, -1)
}
}
}
// Push implements Queue
func (r *RedisQueue) Push(data Data) error {
- if r.exemplar != nil {
- // Assert data is of same type as r.exemplar
- value := reflect.ValueOf(data)
- t := value.Type()
- exemplarType := reflect.ValueOf(r.exemplar).Type()
- if !t.AssignableTo(exemplarType) || data == nil {
- return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, r.exemplar, r.name)
- }
+ if !assignableTo(data, r.exemplar) {
+ return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, r.exemplar, r.name)
}
bs, err := json.Marshal(data)
if err != nil {
@@ -196,9 +170,22 @@ func (r *RedisQueue) Push(data Data) error {
return r.client.RPush(r.queueName, bs).Err()
}
+// IsEmpty checks if the queue is empty
+func (r *RedisQueue) IsEmpty() bool {
+ if !r.WorkerPool.IsEmpty() {
+ return false
+ }
+ length, err := r.client.LLen(r.queueName).Result()
+ if err != nil {
+ log.Error("Error whilst getting queue length for %s: Error: %v", r.name, err)
+ return false
+ }
+ return length == 0
+}
+
// Shutdown processing from this queue
func (r *RedisQueue) Shutdown() {
- log.Trace("Shutdown: %s", r.name)
+ log.Trace("RedisQueue: %s Shutting down", r.name)
r.lock.Lock()
select {
case <-r.closed:
@@ -206,11 +193,12 @@ func (r *RedisQueue) Shutdown() {
close(r.closed)
}
r.lock.Unlock()
+ log.Debug("RedisQueue: %s Shutdown", r.name)
}
// Terminate this queue and close the queue
func (r *RedisQueue) Terminate() {
- log.Trace("Terminating: %s", r.name)
+ log.Trace("RedisQueue: %s Terminating", r.name)
r.Shutdown()
r.lock.Lock()
select {
@@ -219,10 +207,14 @@ func (r *RedisQueue) Terminate() {
default:
close(r.terminated)
r.lock.Unlock()
+ if log.IsDebug() {
+ log.Debug("RedisQueue: %s Closing with %d tasks left in queue", r.name, r.client.LLen(r.queueName))
+ }
if err := r.client.Close(); err != nil {
log.Error("Error whilst closing internal redis client in %s: %v", r.name, err)
}
}
+ log.Debug("RedisQueue: %s Terminated", r.name)
}
// Name returns the name of this queue
diff --git a/modules/queue/queue_wrapped.go b/modules/queue/queue_wrapped.go
index c52e6e4673..ef90d18608 100644
--- a/modules/queue/queue_wrapped.go
+++ b/modules/queue/queue_wrapped.go
@@ -7,8 +7,8 @@ package queue
import (
"context"
"fmt"
- "reflect"
"sync"
+ "sync/atomic"
"time"
"code.gitea.io/gitea/modules/log"
@@ -56,7 +56,7 @@ func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), h
for q.internal == nil {
select {
case <-ctx.Done():
- return fmt.Errorf("Timedout creating queue %v with cfg %v in %s", q.underlying, q.cfg, q.name)
+ return fmt.Errorf("Timedout creating queue %v with cfg %s in %s", q.underlying, q.cfg, q.name)
default:
queue, err := NewQueue(q.underlying, handle, q.cfg, exemplar)
if err == nil {
@@ -64,11 +64,11 @@ func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), h
break
}
if err.Error() != "resource temporarily unavailable" {
- log.Warn("[Attempt: %d] Failed to create queue: %v for %s cfg: %v error: %v", i, q.underlying, q.name, q.cfg, err)
+ log.Warn("[Attempt: %d] Failed to create queue: %v for %s cfg: %s error: %v", i, q.underlying, q.name, q.cfg, err)
}
i++
if q.maxAttempts > 0 && i > q.maxAttempts {
- return fmt.Errorf("Unable to create queue %v for %s with cfg %v by max attempts: error: %v", q.underlying, q.name, q.cfg, err)
+ return fmt.Errorf("Unable to create queue %v for %s with cfg %s by max attempts: error: %v", q.underlying, q.name, q.cfg, err)
}
sleepTime := 100 * time.Millisecond
if q.timeout > 0 && q.maxAttempts > 0 {
@@ -88,10 +88,11 @@ func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), h
// WrappedQueue wraps a delayed starting queue
type WrappedQueue struct {
delayedStarter
- lock sync.Mutex
- handle HandlerFunc
- exemplar interface{}
- channel chan Data
+ lock sync.Mutex
+ handle HandlerFunc
+ exemplar interface{}
+ channel chan Data
+ numInQueue int64
}
// NewWrappedQueue will attempt to create a queue of the provided type,
@@ -127,7 +128,7 @@ func NewWrappedQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro
name: config.Name,
},
}
- _ = GetManager().Add(queue, WrappedQueueType, config, exemplar, nil)
+ _ = GetManager().Add(queue, WrappedQueueType, config, exemplar)
return queue, nil
}
@@ -138,21 +139,78 @@ func (q *WrappedQueue) Name() string {
// Push will push the data to the internal channel checking it against the exemplar
func (q *WrappedQueue) Push(data Data) error {
- if q.exemplar != nil {
- // Assert data is of same type as r.exemplar
- value := reflect.ValueOf(data)
- t := value.Type()
- exemplarType := reflect.ValueOf(q.exemplar).Type()
- if !t.AssignableTo(exemplarType) || data == nil {
- return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
- }
+ if !assignableTo(data, q.exemplar) {
+ return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
}
+ atomic.AddInt64(&q.numInQueue, 1)
q.channel <- data
return nil
}
+func (q *WrappedQueue) flushInternalWithContext(ctx context.Context) error {
+ q.lock.Lock()
+ if q.internal == nil {
+ q.lock.Unlock()
+ return fmt.Errorf("not ready to flush wrapped queue %s yet", q.Name())
+ }
+ q.lock.Unlock()
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
+ return q.internal.FlushWithContext(ctx)
+}
+
+// Flush flushes the queue and blocks till the queue is empty
+func (q *WrappedQueue) Flush(timeout time.Duration) error {
+ var ctx context.Context
+ var cancel context.CancelFunc
+ if timeout > 0 {
+ ctx, cancel = context.WithTimeout(context.Background(), timeout)
+ } else {
+ ctx, cancel = context.WithCancel(context.Background())
+ }
+ defer cancel()
+ return q.FlushWithContext(ctx)
+}
+
+// FlushWithContext implements the final part of Flushable
+func (q *WrappedQueue) FlushWithContext(ctx context.Context) error {
+ log.Trace("WrappedQueue: %s FlushWithContext", q.Name())
+ errChan := make(chan error, 1)
+ go func() {
+ errChan <- q.flushInternalWithContext(ctx)
+ close(errChan)
+ }()
+
+ select {
+ case err := <-errChan:
+ return err
+ case <-ctx.Done():
+ go func() {
+ <-errChan
+ }()
+ return ctx.Err()
+ }
+}
+
+// IsEmpty checks whether the queue is empty
+func (q *WrappedQueue) IsEmpty() bool {
+ if atomic.LoadInt64(&q.numInQueue) != 0 {
+ return false
+ }
+ q.lock.Lock()
+ defer q.lock.Unlock()
+ if q.internal == nil {
+ return false
+ }
+ return q.internal.IsEmpty()
+}
+
// Run starts to run the queue and attempts to create the internal queue
func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
+ log.Debug("WrappedQueue: %s Starting", q.name)
q.lock.Lock()
if q.internal == nil {
err := q.setInternal(atShutdown, q.handle, q.exemplar)
@@ -164,6 +222,7 @@ func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func())
go func() {
for data := range q.channel {
_ = q.internal.Push(data)
+ atomic.AddInt64(&q.numInQueue, -1)
}
}()
} else {
@@ -176,7 +235,7 @@ func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func())
// Shutdown this queue and stop processing
func (q *WrappedQueue) Shutdown() {
- log.Trace("WrappedQueue: %s Shutdown", q.name)
+ log.Trace("WrappedQueue: %s Shutting down", q.name)
q.lock.Lock()
defer q.lock.Unlock()
if q.internal == nil {
@@ -185,6 +244,7 @@ func (q *WrappedQueue) Shutdown() {
if shutdownable, ok := q.internal.(Shutdownable); ok {
shutdownable.Shutdown()
}
+ log.Debug("WrappedQueue: %s Shutdown", q.name)
}
// Terminate this queue and close the queue
@@ -198,6 +258,7 @@ func (q *WrappedQueue) Terminate() {
if shutdownable, ok := q.internal.(Shutdownable); ok {
shutdownable.Terminate()
}
+ log.Debug("WrappedQueue: %s Terminated", q.name)
}
func init() {
diff --git a/modules/queue/setting.go b/modules/queue/setting.go
index d5a6b41882..8760c09ae8 100644
--- a/modules/queue/setting.go
+++ b/modules/queue/setting.go
@@ -24,8 +24,7 @@ func validType(t string) (Type, error) {
return PersistableChannelQueueType, fmt.Errorf("Unknown queue type: %s defaulting to %s", t, string(PersistableChannelQueueType))
}
-// CreateQueue for name with provided handler and exemplar
-func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue {
+func getQueueSettings(name string) (setting.QueueSettings, []byte) {
q := setting.GetQueueSettings(name)
opts := make(map[string]interface{})
opts["Name"] = name
@@ -43,24 +42,33 @@ func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue {
opts["BoostTimeout"] = q.BoostTimeout
opts["BoostWorkers"] = q.BoostWorkers
- typ, err := validType(q.Type)
- if err != nil {
- log.Error("Invalid type %s provided for queue named %s defaulting to %s", q.Type, name, string(typ))
- }
-
cfg, err := json.Marshal(opts)
if err != nil {
log.Error("Unable to marshall generic options: %v Error: %v", opts, err)
log.Error("Unable to create queue for %s", name, err)
+ return q, []byte{}
+ }
+ return q, cfg
+}
+
+// CreateQueue for name with provided handler and exemplar
+func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue {
+ q, cfg := getQueueSettings(name)
+ if len(cfg) == 0 {
return nil
}
+ typ, err := validType(q.Type)
+ if err != nil {
+ log.Error("Invalid type %s provided for queue named %s defaulting to %s", q.Type, name, string(typ))
+ }
+
returnable, err := NewQueue(typ, handle, cfg, exemplar)
if q.WrapIfNecessary && err != nil {
log.Warn("Unable to create queue for %s: %v", name, err)
log.Warn("Attempting to create wrapped queue")
returnable, err = NewQueue(WrappedQueueType, handle, WrappedQueueConfiguration{
- Underlying: Type(q.Type),
+ Underlying: typ,
Timeout: q.Timeout,
MaxAttempts: q.MaxAttempts,
Config: cfg,
diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go
index 25fc7dd644..63ec897481 100644
--- a/modules/queue/workerpool.go
+++ b/modules/queue/workerpool.go
@@ -7,12 +7,16 @@ package queue
import (
"context"
"sync"
+ "sync/atomic"
"time"
"code.gitea.io/gitea/modules/log"
)
-// WorkerPool takes
+// WorkerPool represent a dynamically growable worker pool for a
+// provided handler function. They have an internal channel which
+// they use to detect if there is a block and will grow and shrink in
+// response to demand as per configuration.
type WorkerPool struct {
lock sync.Mutex
baseCtx context.Context
@@ -27,10 +31,42 @@ type WorkerPool struct {
blockTimeout time.Duration
boostTimeout time.Duration
boostWorkers int
+ numInQueue int64
+}
+
+// WorkerPoolConfiguration is the basic configuration for a WorkerPool
+type WorkerPoolConfiguration struct {
+ QueueLength int
+ BatchLength int
+ BlockTimeout time.Duration
+ BoostTimeout time.Duration
+ BoostWorkers int
+ MaxWorkers int
+}
+
+// NewWorkerPool creates a new worker pool
+func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPool {
+ ctx, cancel := context.WithCancel(context.Background())
+
+ dataChan := make(chan Data, config.QueueLength)
+ pool := &WorkerPool{
+ baseCtx: ctx,
+ cancel: cancel,
+ batchLength: config.BatchLength,
+ dataChan: dataChan,
+ handle: handle,
+ blockTimeout: config.BlockTimeout,
+ boostTimeout: config.BoostTimeout,
+ boostWorkers: config.BoostWorkers,
+ maxNumberOfWorkers: config.MaxWorkers,
+ }
+
+ return pool
}
// Push pushes the data to the internal channel
func (p *WorkerPool) Push(data Data) {
+ atomic.AddInt64(&p.numInQueue, 1)
p.lock.Lock()
if p.blockTimeout > 0 && p.boostTimeout > 0 && (p.numberOfWorkers <= p.maxNumberOfWorkers || p.maxNumberOfWorkers < 0) {
p.lock.Unlock()
@@ -80,7 +116,7 @@ func (p *WorkerPool) pushBoost(data Data) {
log.Warn("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, mq.Name, ourTimeout, boost, p.boostTimeout, p.blockTimeout)
start := time.Now()
- pid := mq.RegisterWorkers(boost, start, false, start, cancel)
+ pid := mq.RegisterWorkers(boost, start, false, start, cancel, false)
go func() {
<-ctx.Done()
mq.RemoveWorkers(pid)
@@ -138,8 +174,8 @@ func (p *WorkerPool) BlockTimeout() time.Duration {
return p.blockTimeout
}
-// SetSettings sets the setable boost values
-func (p *WorkerPool) SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) {
+// SetPoolSettings sets the setable boost values
+func (p *WorkerPool) SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) {
p.lock.Lock()
defer p.lock.Unlock()
p.maxNumberOfWorkers = maxNumberOfWorkers
@@ -156,8 +192,7 @@ func (p *WorkerPool) SetMaxNumberOfWorkers(newMax int) {
p.maxNumberOfWorkers = newMax
}
-// AddWorkers adds workers to the pool - this allows the number of workers to go above the limit
-func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc {
+func (p *WorkerPool) commonRegisterWorkers(number int, timeout time.Duration, isFlusher bool) (context.Context, context.CancelFunc) {
var ctx context.Context
var cancel context.CancelFunc
start := time.Now()
@@ -173,7 +208,7 @@ func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.Cance
mq := GetManager().GetManagedQueue(p.qid)
if mq != nil {
- pid := mq.RegisterWorkers(number, start, hasTimeout, end, cancel)
+ pid := mq.RegisterWorkers(number, start, hasTimeout, end, cancel, isFlusher)
go func() {
<-ctx.Done()
mq.RemoveWorkers(pid)
@@ -184,6 +219,12 @@ func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.Cance
log.Trace("WorkerPool: %d adding %d workers (no group id)", p.qid, number)
}
+ return ctx, cancel
+}
+
+// AddWorkers adds workers to the pool - this allows the number of workers to go above the limit
+func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc {
+ ctx, cancel := p.commonRegisterWorkers(number, timeout, false)
p.addWorkers(ctx, number)
return cancel
}
@@ -235,6 +276,7 @@ func (p *WorkerPool) CleanUp(ctx context.Context) {
close(p.dataChan)
for data := range p.dataChan {
p.handle(data)
+ atomic.AddInt64(&p.numInQueue, -1)
select {
case <-ctx.Done():
log.Warn("WorkerPool: %d Cleanup context closed before finishing clean-up", p.qid)
@@ -245,6 +287,37 @@ func (p *WorkerPool) CleanUp(ctx context.Context) {
log.Trace("WorkerPool: %d CleanUp Done", p.qid)
}
+// Flush flushes the channel with a timeout - the Flush worker will be registered as a flush worker with the manager
+func (p *WorkerPool) Flush(timeout time.Duration) error {
+ ctx, cancel := p.commonRegisterWorkers(1, timeout, true)
+ defer cancel()
+ return p.FlushWithContext(ctx)
+}
+
+// IsEmpty returns if true if the worker queue is empty
+func (p *WorkerPool) IsEmpty() bool {
+ return atomic.LoadInt64(&p.numInQueue) == 0
+}
+
+// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
+// NB: The worker will not be registered with the manager.
+func (p *WorkerPool) FlushWithContext(ctx context.Context) error {
+ log.Trace("WorkerPool: %d Flush", p.qid)
+ for {
+ select {
+ case data := <-p.dataChan:
+ p.handle(data)
+ atomic.AddInt64(&p.numInQueue, -1)
+ case <-p.baseCtx.Done():
+ return p.baseCtx.Err()
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ return nil
+ }
+ }
+}
+
func (p *WorkerPool) doWork(ctx context.Context) {
delay := time.Millisecond * 300
var data = make([]Data, 0, p.batchLength)
@@ -254,6 +327,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
if len(data) > 0 {
log.Trace("Handling: %d data, %v", len(data), data)
p.handle(data...)
+ atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
}
log.Trace("Worker shutting down")
return
@@ -263,6 +337,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
if len(data) > 0 {
log.Trace("Handling: %d data, %v", len(data), data)
p.handle(data...)
+ atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
}
log.Trace("Worker shutting down")
return
@@ -271,6 +346,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
if len(data) >= p.batchLength {
log.Trace("Handling: %d data, %v", len(data), data)
p.handle(data...)
+ atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
data = make([]Data, 0, p.batchLength)
}
default:
@@ -286,6 +362,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
if len(data) > 0 {
log.Trace("Handling: %d data, %v", len(data), data)
p.handle(data...)
+ atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
}
log.Trace("Worker shutting down")
return
@@ -301,6 +378,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
if len(data) > 0 {
log.Trace("Handling: %d data, %v", len(data), data)
p.handle(data...)
+ atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
}
log.Trace("Worker shutting down")
return
@@ -309,6 +387,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
if len(data) >= p.batchLength {
log.Trace("Handling: %d data, %v", len(data), data)
p.handle(data...)
+ atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
data = make([]Data, 0, p.batchLength)
}
case <-timer.C:
@@ -316,6 +395,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
if len(data) > 0 {
log.Trace("Handling: %d data, %v", len(data), data)
p.handle(data...)
+ atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
data = make([]Data, 0, p.batchLength)
}