summaryrefslogtreecommitdiffstats
path: root/modules/queue/unique_queue_wrapped.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/queue/unique_queue_wrapped.go')
-rw-r--r--modules/queue/unique_queue_wrapped.go7
1 files changed, 5 insertions, 2 deletions
diff --git a/modules/queue/unique_queue_wrapped.go b/modules/queue/unique_queue_wrapped.go
index 8c815218dd..32fa9ed970 100644
--- a/modules/queue/unique_queue_wrapped.go
+++ b/modules/queue/unique_queue_wrapped.go
@@ -73,7 +73,7 @@ func NewWrappedUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
// wrapped.handle is passed to the delayedStarting internal queue and is run to handle
// data passed to
- wrapped.handle = func(data ...Data) {
+ wrapped.handle = func(data ...Data) (unhandled []Data) {
for _, datum := range data {
wrapped.tlock.Lock()
if !wrapped.ready {
@@ -87,8 +87,11 @@ func NewWrappedUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
}
}
wrapped.tlock.Unlock()
- handle(datum)
+ if u := handle(datum); u != nil {
+ unhandled = append(unhandled, u...)
+ }
}
+ return unhandled
}
_ = GetManager().Add(queue, WrappedUniqueQueueType, config, exemplar)
return wrapped, nil