aboutsummaryrefslogtreecommitdiffstats
path: root/modules/queue/queue_channel.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/queue/queue_channel.go')
-rw-r--r--modules/queue/queue_channel.go76
1 files changed, 59 insertions, 17 deletions
diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go
index d7a11e79f5..4df64b69ee 100644
--- a/modules/queue/queue_channel.go
+++ b/modules/queue/queue_channel.go
@@ -27,9 +27,13 @@ type ChannelQueueConfiguration struct {
// It is basically a very thin wrapper around a WorkerPool
type ChannelQueue struct {
*WorkerPool
- exemplar interface{}
- workers int
- name string
+ shutdownCtx context.Context
+ shutdownCtxCancel context.CancelFunc
+ terminateCtx context.Context
+ terminateCtxCancel context.CancelFunc
+ exemplar interface{}
+ workers int
+ name string
}
// NewChannelQueue creates a memory channel queue
@@ -42,28 +46,30 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro
if config.BatchLength == 0 {
config.BatchLength = 1
}
+
+ terminateCtx, terminateCtxCancel := context.WithCancel(context.Background())
+ shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
+
queue := &ChannelQueue{
- WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
- exemplar: exemplar,
- workers: config.Workers,
- name: config.Name,
+ WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
+ shutdownCtx: shutdownCtx,
+ shutdownCtxCancel: shutdownCtxCancel,
+ terminateCtx: terminateCtx,
+ terminateCtxCancel: terminateCtxCancel,
+ exemplar: exemplar,
+ workers: config.Workers,
+ name: config.Name,
}
queue.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar)
return queue, nil
}
// Run starts to run the queue
-func (q *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
- atShutdown(context.Background(), func() {
- log.Warn("ChannelQueue: %s is not shutdownable!", q.name)
- })
- atTerminate(context.Background(), func() {
- log.Warn("ChannelQueue: %s is not terminatable!", q.name)
- })
+func (q *ChannelQueue) Run(atShutdown, atTerminate func(func())) {
+ atShutdown(q.Shutdown)
+ atTerminate(q.Terminate)
log.Debug("ChannelQueue: %s Starting", q.name)
- go func() {
- _ = q.AddWorkers(q.workers, 0)
- }()
+ _ = q.AddWorkers(q.workers, 0)
}
// Push will push data into the queue
@@ -75,6 +81,42 @@ func (q *ChannelQueue) Push(data Data) error {
return nil
}
+// Shutdown processing from this queue
+func (q *ChannelQueue) Shutdown() {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+ select {
+ case <-q.shutdownCtx.Done():
+ log.Trace("ChannelQueue: %s Already Shutting down", q.name)
+ return
+ default:
+ }
+ log.Trace("ChannelQueue: %s Shutting down", q.name)
+ go func() {
+ log.Trace("ChannelQueue: %s Flushing", q.name)
+ if err := q.FlushWithContext(q.terminateCtx); err != nil {
+ log.Warn("ChannelQueue: %s Terminated before completed flushing", q.name)
+ return
+ }
+ log.Debug("ChannelQueue: %s Flushed", q.name)
+ }()
+ q.shutdownCtxCancel()
+ log.Debug("ChannelQueue: %s Shutdown", q.name)
+}
+
+// Terminate this queue and close the queue
+func (q *ChannelQueue) Terminate() {
+ log.Trace("ChannelQueue: %s Terminating", q.name)
+ q.Shutdown()
+ select {
+ case <-q.terminateCtx.Done():
+ return
+ default:
+ }
+ q.terminateCtxCancel()
+ log.Debug("ChannelQueue: %s Terminated", q.name)
+}
+
// Name returns the name of this queue
func (q *ChannelQueue) Name() string {
return q.name