summaryrefslogtreecommitdiffstats
path: root/modules/queue/unique_queue_channel.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/queue/unique_queue_channel.go')
-rw-r--r--modules/queue/unique_queue_channel.go77
1 files changed, 58 insertions, 19 deletions
diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go
index dec1cfc5c0..5bec67c4d3 100644
--- a/modules/queue/unique_queue_channel.go
+++ b/modules/queue/unique_queue_channel.go
@@ -28,11 +28,15 @@ type ChannelUniqueQueueConfiguration ChannelQueueConfiguration
// only guaranteed whilst the task is waiting in the queue.
type ChannelUniqueQueue struct {
*WorkerPool
- lock sync.Mutex
- table map[Data]bool
- exemplar interface{}
- workers int
- name string
+ lock sync.Mutex
+ table map[Data]bool
+ shutdownCtx context.Context
+ shutdownCtxCancel context.CancelFunc
+ terminateCtx context.Context
+ terminateCtxCancel context.CancelFunc
+ exemplar interface{}
+ workers int
+ name string
}
// NewChannelUniqueQueue create a memory channel queue
@@ -45,11 +49,19 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
if config.BatchLength == 0 {
config.BatchLength = 1
}
+
+ terminateCtx, terminateCtxCancel := context.WithCancel(context.Background())
+ shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
+
queue := &ChannelUniqueQueue{
- table: map[Data]bool{},
- exemplar: exemplar,
- workers: config.Workers,
- name: config.Name,
+ table: map[Data]bool{},
+ shutdownCtx: shutdownCtx,
+ shutdownCtxCancel: shutdownCtxCancel,
+ terminateCtx: terminateCtx,
+ terminateCtxCancel: terminateCtxCancel,
+ exemplar: exemplar,
+ workers: config.Workers,
+ name: config.Name,
}
queue.WorkerPool = NewWorkerPool(func(data ...Data) {
for _, datum := range data {
@@ -65,17 +77,11 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
}
// Run starts to run the queue
-func (q *ChannelUniqueQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
- atShutdown(context.Background(), func() {
- log.Warn("ChannelUniqueQueue: %s is not shutdownable!", q.name)
- })
- atTerminate(context.Background(), func() {
- log.Warn("ChannelUniqueQueue: %s is not terminatable!", q.name)
- })
+func (q *ChannelUniqueQueue) Run(atShutdown, atTerminate func(func())) {
+ atShutdown(q.Shutdown)
+ atTerminate(q.Terminate)
log.Debug("ChannelUniqueQueue: %s Starting", q.name)
- go func() {
- _ = q.AddWorkers(q.workers, 0)
- }()
+ _ = q.AddWorkers(q.workers, 0)
}
// Push will push data into the queue if the data is not already in the queue
@@ -122,6 +128,39 @@ func (q *ChannelUniqueQueue) Has(data Data) (bool, error) {
return has, nil
}
+// Shutdown processing from this queue
+func (q *ChannelUniqueQueue) Shutdown() {
+ log.Trace("ChannelUniqueQueue: %s Shutting down", q.name)
+ select {
+ case <-q.shutdownCtx.Done():
+ return
+ default:
+ }
+ go func() {
+ log.Trace("ChannelUniqueQueue: %s Flushing", q.name)
+ if err := q.FlushWithContext(q.terminateCtx); err != nil {
+ log.Warn("ChannelUniqueQueue: %s Terminated before completed flushing", q.name)
+ return
+ }
+ log.Debug("ChannelUniqueQueue: %s Flushed", q.name)
+ }()
+ q.shutdownCtxCancel()
+ log.Debug("ChannelUniqueQueue: %s Shutdown", q.name)
+}
+
+// Terminate this queue and close the queue
+func (q *ChannelUniqueQueue) Terminate() {
+ log.Trace("ChannelUniqueQueue: %s Terminating", q.name)
+ q.Shutdown()
+ select {
+ case <-q.terminateCtx.Done():
+ return
+ default:
+ }
+ q.terminateCtxCancel()
+ log.Debug("ChannelUniqueQueue: %s Terminated", q.name)
+}
+
// Name returns the name of this queue
func (q *ChannelUniqueQueue) Name() string {
return q.name