diff options
Diffstat (limited to 'modules/queue/queue_channel.go')
-rw-r--r-- | modules/queue/queue_channel.go | 76 |
1 files changed, 59 insertions, 17 deletions
diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go index d7a11e79f5..4df64b69ee 100644 --- a/modules/queue/queue_channel.go +++ b/modules/queue/queue_channel.go @@ -27,9 +27,13 @@ type ChannelQueueConfiguration struct { // It is basically a very thin wrapper around a WorkerPool type ChannelQueue struct { *WorkerPool - exemplar interface{} - workers int - name string + shutdownCtx context.Context + shutdownCtxCancel context.CancelFunc + terminateCtx context.Context + terminateCtxCancel context.CancelFunc + exemplar interface{} + workers int + name string } // NewChannelQueue creates a memory channel queue @@ -42,28 +46,30 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro if config.BatchLength == 0 { config.BatchLength = 1 } + + terminateCtx, terminateCtxCancel := context.WithCancel(context.Background()) + shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx) + queue := &ChannelQueue{ - WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), - exemplar: exemplar, - workers: config.Workers, - name: config.Name, + WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), + shutdownCtx: shutdownCtx, + shutdownCtxCancel: shutdownCtxCancel, + terminateCtx: terminateCtx, + terminateCtxCancel: terminateCtxCancel, + exemplar: exemplar, + workers: config.Workers, + name: config.Name, } queue.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar) return queue, nil } // Run starts to run the queue -func (q *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { - atShutdown(context.Background(), func() { - log.Warn("ChannelQueue: %s is not shutdownable!", q.name) - }) - atTerminate(context.Background(), func() { - log.Warn("ChannelQueue: %s is not terminatable!", q.name) - }) +func (q *ChannelQueue) Run(atShutdown, atTerminate func(func())) { + atShutdown(q.Shutdown) + atTerminate(q.Terminate) log.Debug("ChannelQueue: %s Starting", q.name) - go func() { - _ = q.AddWorkers(q.workers, 0) - }() + _ = q.AddWorkers(q.workers, 0) } // Push will push data into the queue @@ -75,6 +81,42 @@ func (q *ChannelQueue) Push(data Data) error { return nil } +// Shutdown processing from this queue +func (q *ChannelQueue) Shutdown() { + q.lock.Lock() + defer q.lock.Unlock() + select { + case <-q.shutdownCtx.Done(): + log.Trace("ChannelQueue: %s Already Shutting down", q.name) + return + default: + } + log.Trace("ChannelQueue: %s Shutting down", q.name) + go func() { + log.Trace("ChannelQueue: %s Flushing", q.name) + if err := q.FlushWithContext(q.terminateCtx); err != nil { + log.Warn("ChannelQueue: %s Terminated before completed flushing", q.name) + return + } + log.Debug("ChannelQueue: %s Flushed", q.name) + }() + q.shutdownCtxCancel() + log.Debug("ChannelQueue: %s Shutdown", q.name) +} + +// Terminate this queue and close the queue +func (q *ChannelQueue) Terminate() { + log.Trace("ChannelQueue: %s Terminating", q.name) + q.Shutdown() + select { + case <-q.terminateCtx.Done(): + return + default: + } + q.terminateCtxCancel() + log.Debug("ChannelQueue: %s Terminated", q.name) +} + // Name returns the name of this queue func (q *ChannelQueue) Name() string { return q.name |