diff options
Diffstat (limited to 'modules/queue/unique_queue_channel.go')
-rw-r--r-- | modules/queue/unique_queue_channel.go | 77 |
1 files changed, 58 insertions, 19 deletions
diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go index dec1cfc5c0..5bec67c4d3 100644 --- a/modules/queue/unique_queue_channel.go +++ b/modules/queue/unique_queue_channel.go @@ -28,11 +28,15 @@ type ChannelUniqueQueueConfiguration ChannelQueueConfiguration // only guaranteed whilst the task is waiting in the queue. type ChannelUniqueQueue struct { *WorkerPool - lock sync.Mutex - table map[Data]bool - exemplar interface{} - workers int - name string + lock sync.Mutex + table map[Data]bool + shutdownCtx context.Context + shutdownCtxCancel context.CancelFunc + terminateCtx context.Context + terminateCtxCancel context.CancelFunc + exemplar interface{} + workers int + name string } // NewChannelUniqueQueue create a memory channel queue @@ -45,11 +49,19 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue if config.BatchLength == 0 { config.BatchLength = 1 } + + terminateCtx, terminateCtxCancel := context.WithCancel(context.Background()) + shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx) + queue := &ChannelUniqueQueue{ - table: map[Data]bool{}, - exemplar: exemplar, - workers: config.Workers, - name: config.Name, + table: map[Data]bool{}, + shutdownCtx: shutdownCtx, + shutdownCtxCancel: shutdownCtxCancel, + terminateCtx: terminateCtx, + terminateCtxCancel: terminateCtxCancel, + exemplar: exemplar, + workers: config.Workers, + name: config.Name, } queue.WorkerPool = NewWorkerPool(func(data ...Data) { for _, datum := range data { @@ -65,17 +77,11 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue } // Run starts to run the queue -func (q *ChannelUniqueQueue) Run(atShutdown, atTerminate func(context.Context, func())) { - atShutdown(context.Background(), func() { - log.Warn("ChannelUniqueQueue: %s is not shutdownable!", q.name) - }) - atTerminate(context.Background(), func() { - log.Warn("ChannelUniqueQueue: %s is not terminatable!", q.name) - }) +func (q *ChannelUniqueQueue) Run(atShutdown, atTerminate func(func())) { + atShutdown(q.Shutdown) + atTerminate(q.Terminate) log.Debug("ChannelUniqueQueue: %s Starting", q.name) - go func() { - _ = q.AddWorkers(q.workers, 0) - }() + _ = q.AddWorkers(q.workers, 0) } // Push will push data into the queue if the data is not already in the queue @@ -122,6 +128,39 @@ func (q *ChannelUniqueQueue) Has(data Data) (bool, error) { return has, nil } +// Shutdown processing from this queue +func (q *ChannelUniqueQueue) Shutdown() { + log.Trace("ChannelUniqueQueue: %s Shutting down", q.name) + select { + case <-q.shutdownCtx.Done(): + return + default: + } + go func() { + log.Trace("ChannelUniqueQueue: %s Flushing", q.name) + if err := q.FlushWithContext(q.terminateCtx); err != nil { + log.Warn("ChannelUniqueQueue: %s Terminated before completed flushing", q.name) + return + } + log.Debug("ChannelUniqueQueue: %s Flushed", q.name) + }() + q.shutdownCtxCancel() + log.Debug("ChannelUniqueQueue: %s Shutdown", q.name) +} + +// Terminate this queue and close the queue +func (q *ChannelUniqueQueue) Terminate() { + log.Trace("ChannelUniqueQueue: %s Terminating", q.name) + q.Shutdown() + select { + case <-q.terminateCtx.Done(): + return + default: + } + q.terminateCtxCancel() + log.Debug("ChannelUniqueQueue: %s Terminated", q.name) +} + // Name returns the name of this queue func (q *ChannelUniqueQueue) Name() string { return q.name |