diff options
Diffstat (limited to 'modules/queue/queue_channel.go')
-rw-r--r-- | modules/queue/queue_channel.go | 59 |
1 files changed, 19 insertions, 40 deletions
diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go index c8f8a53804..45df8a443e 100644 --- a/modules/queue/queue_channel.go +++ b/modules/queue/queue_channel.go @@ -7,8 +7,6 @@ package queue import ( "context" "fmt" - "reflect" - "time" "code.gitea.io/gitea/modules/log" ) @@ -18,25 +16,23 @@ const ChannelQueueType Type = "channel" // ChannelQueueConfiguration is the configuration for a ChannelQueue type ChannelQueueConfiguration struct { - QueueLength int - BatchLength int - Workers int - MaxWorkers int - BlockTimeout time.Duration - BoostTimeout time.Duration - BoostWorkers int - Name string + WorkerPoolConfiguration + Workers int + Name string } -// ChannelQueue implements +// ChannelQueue implements Queue +// +// A channel queue is not persistable and does not shutdown or terminate cleanly +// It is basically a very thin wrapper around a WorkerPool type ChannelQueue struct { - pool *WorkerPool + *WorkerPool exemplar interface{} workers int name string } -// NewChannelQueue create a memory channel queue +// NewChannelQueue creates a memory channel queue func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) { configInterface, err := toConfig(ChannelQueueConfiguration{}, cfg) if err != nil { @@ -46,26 +42,13 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro if config.BatchLength == 0 { config.BatchLength = 1 } - dataChan := make(chan Data, config.QueueLength) - - ctx, cancel := context.WithCancel(context.Background()) queue := &ChannelQueue{ - pool: &WorkerPool{ - baseCtx: ctx, - cancel: cancel, - batchLength: config.BatchLength, - handle: handle, - dataChan: dataChan, - blockTimeout: config.BlockTimeout, - boostTimeout: config.BoostTimeout, - boostWorkers: config.BoostWorkers, - maxNumberOfWorkers: config.MaxWorkers, - }, - exemplar: exemplar, - workers: config.Workers, - name: config.Name, + WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), + exemplar: exemplar, + workers: config.Workers, + name: config.Name, } - queue.pool.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar, queue.pool) + queue.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar) return queue, nil } @@ -77,22 +60,18 @@ func (c *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func()) atTerminate(context.Background(), func() { log.Warn("ChannelQueue: %s is not terminatable!", c.name) }) + log.Debug("ChannelQueue: %s Starting", c.name) go func() { - _ = c.pool.AddWorkers(c.workers, 0) + _ = c.AddWorkers(c.workers, 0) }() } // Push will push data into the queue func (c *ChannelQueue) Push(data Data) error { - if c.exemplar != nil { - // Assert data is of same type as r.exemplar - t := reflect.TypeOf(data) - exemplarType := reflect.TypeOf(c.exemplar) - if !t.AssignableTo(exemplarType) || data == nil { - return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, c.exemplar, c.name) - } + if !assignableTo(data, c.exemplar) { + return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, c.exemplar, c.name) } - c.pool.Push(data) + c.WorkerPool.Push(data) return nil } |