summaryrefslogtreecommitdiffstats
path: root/modules
diff options
context:
space:
mode:
Diffstat (limited to 'modules')
-rw-r--r--modules/queue/manager.go15
-rw-r--r--modules/queue/queue_disk_channel.go16
-rw-r--r--modules/queue/workerpool.go16
3 files changed, 32 insertions, 15 deletions
diff --git a/modules/queue/manager.go b/modules/queue/manager.go
index 23e96155a9..310f3cd4e1 100644
--- a/modules/queue/manager.go
+++ b/modules/queue/manager.go
@@ -72,6 +72,8 @@ type ManagedPool interface {
BoostWorkers() int
// SetPoolSettings sets the user updatable settings for the pool
SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
+ // Done returns a channel that will be closed when the Pool's baseCtx is closed
+ Done() <-chan struct{}
}
// ManagedQueueList implements the sort.Interface
@@ -141,7 +143,6 @@ func (m *Manager) Remove(qid int64) {
delete(m.Queues, qid)
m.mutex.Unlock()
log.Trace("Queue Manager removed: QID: %d", qid)
-
}
// GetManagedQueue by qid
@@ -193,6 +194,17 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error
wg.Done()
continue
}
+
+ if pool, ok := mq.Managed.(ManagedPool); ok {
+ // No point into flushing pools when their base's ctx is already done.
+ select {
+ case <-pool.Done():
+ wg.Done()
+ continue
+ default:
+ }
+ }
+
allEmpty = false
if flushable, ok := mq.Managed.(Flushable); ok {
log.Debug("Flushing (flushable) queue: %s", mq.Name)
@@ -225,7 +237,6 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error
wg.Wait()
}
return nil
-
}
// ManagedQueues returns the managed queues
diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go
index c3a1c5781e..72f330670a 100644
--- a/modules/queue/queue_disk_channel.go
+++ b/modules/queue/queue_disk_channel.go
@@ -173,7 +173,6 @@ func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) {
q.internal.(*LevelQueue).Shutdown()
GetManager().Remove(q.internal.(*LevelQueue).qid)
}
-
}
// Flush flushes the queue and blocks till the queue is empty
@@ -252,14 +251,13 @@ func (q *PersistableChannelQueue) Shutdown() {
q.channelQueue.Wait()
q.internal.(*LevelQueue).Wait()
// Redirect all remaining data in the chan to the internal channel
- go func() {
- log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
- for data := range q.channelQueue.dataChan {
- _ = q.internal.Push(data)
- atomic.AddInt64(&q.channelQueue.numInQueue, -1)
- }
- log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
- }()
+ close(q.channelQueue.dataChan)
+ log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
+ for data := range q.channelQueue.dataChan {
+ _ = q.internal.Push(data)
+ atomic.AddInt64(&q.channelQueue.numInQueue, -1)
+ }
+ log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name)
}
diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go
index 0176e2e0b2..dc6ff3b633 100644
--- a/modules/queue/workerpool.go
+++ b/modules/queue/workerpool.go
@@ -65,6 +65,11 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo
return pool
}
+// Done returns when this worker pool's base context has been cancelled
+func (p *WorkerPool) Done() <-chan struct{} {
+ return p.baseCtx.Done()
+}
+
// Push pushes the data to the internal channel
func (p *WorkerPool) Push(data Data) {
atomic.AddInt64(&p.numInQueue, 1)
@@ -90,7 +95,7 @@ func (p *WorkerPool) zeroBoost() {
boost = p.maxNumberOfWorkers - p.numberOfWorkers
}
if mq != nil {
- log.Warn("WorkerPool: %d (for %s) has zero workers - adding %d temporary workers for %s", p.qid, mq.Name, boost, p.boostTimeout)
+ log.Debug("WorkerPool: %d (for %s) has zero workers - adding %d temporary workers for %s", p.qid, mq.Name, boost, p.boostTimeout)
start := time.Now()
pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), cancel, false)
@@ -98,7 +103,7 @@ func (p *WorkerPool) zeroBoost() {
mq.RemoveWorkers(pid)
}
} else {
- log.Warn("WorkerPool: %d has zero workers - adding %d temporary workers for %s", p.qid, p.boostWorkers, p.boostTimeout)
+ log.Debug("WorkerPool: %d has zero workers - adding %d temporary workers for %s", p.qid, p.boostWorkers, p.boostTimeout)
}
p.lock.Unlock()
p.addWorkers(ctx, cancel, boost)
@@ -326,7 +331,10 @@ func (p *WorkerPool) FlushWithContext(ctx context.Context) error {
log.Trace("WorkerPool: %d Flush", p.qid)
for {
select {
- case data := <-p.dataChan:
+ case data, ok := <-p.dataChan:
+ if !ok {
+ return nil
+ }
p.handle(data)
atomic.AddInt64(&p.numInQueue, -1)
case <-p.baseCtx.Done():
@@ -341,7 +349,7 @@ func (p *WorkerPool) FlushWithContext(ctx context.Context) error {
func (p *WorkerPool) doWork(ctx context.Context) {
delay := time.Millisecond * 300
- var data = make([]Data, 0, p.batchLength)
+ data := make([]Data, 0, p.batchLength)
for {
select {
case <-ctx.Done():