aboutsummaryrefslogtreecommitdiffstats
path: root/modules
diff options
context:
space:
mode:
Diffstat (limited to 'modules')
-rw-r--r--modules/queue/queue_disk_channel.go6
-rw-r--r--modules/queue/unique_queue_disk_channel.go10
-rw-r--r--modules/queue/workerpool.go42
3 files changed, 48 insertions, 10 deletions
diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go
index 433435c301..801fd8a122 100644
--- a/modules/queue/queue_disk_channel.go
+++ b/modules/queue/queue_disk_channel.go
@@ -75,10 +75,10 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
BatchLength: config.BatchLength,
BlockTimeout: 1 * time.Second,
BoostTimeout: 5 * time.Minute,
- BoostWorkers: 5,
- MaxWorkers: 6,
+ BoostWorkers: 1,
+ MaxWorkers: 5,
},
- Workers: 1,
+ Workers: 0,
Name: config.Name + "-level",
},
DataDir: config.DataDir,
diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go
index 4a69b43eae..47c4f2bdd5 100644
--- a/modules/queue/unique_queue_disk_channel.go
+++ b/modules/queue/unique_queue_disk_channel.go
@@ -73,12 +73,12 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac
WorkerPoolConfiguration: WorkerPoolConfiguration{
QueueLength: config.QueueLength,
BatchLength: config.BatchLength,
- BlockTimeout: 0,
- BoostTimeout: 0,
- BoostWorkers: 0,
- MaxWorkers: 1,
+ BlockTimeout: 1 * time.Second,
+ BoostTimeout: 5 * time.Minute,
+ BoostWorkers: 1,
+ MaxWorkers: 5,
},
- Workers: 1,
+ Workers: 0,
Name: config.Name + "-level",
},
DataDir: config.DataDir,
diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go
index 45378e3dae..0f15ccac9e 100644
--- a/modules/queue/workerpool.go
+++ b/modules/queue/workerpool.go
@@ -70,7 +70,11 @@ func (p *WorkerPool) Push(data Data) {
atomic.AddInt64(&p.numInQueue, 1)
p.lock.Lock()
if p.blockTimeout > 0 && p.boostTimeout > 0 && (p.numberOfWorkers <= p.maxNumberOfWorkers || p.maxNumberOfWorkers < 0) {
- p.lock.Unlock()
+ if p.numberOfWorkers == 0 {
+ p.zeroBoost()
+ } else {
+ p.lock.Unlock()
+ }
p.pushBoost(data)
} else {
p.lock.Unlock()
@@ -78,6 +82,40 @@ func (p *WorkerPool) Push(data Data) {
}
}
+func (p *WorkerPool) zeroBoost() {
+ ctx, cancel := context.WithCancel(p.baseCtx)
+ mq := GetManager().GetManagedQueue(p.qid)
+ boost := p.boostWorkers
+ if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 {
+ boost = p.maxNumberOfWorkers - p.numberOfWorkers
+ }
+ if mq != nil {
+ log.Warn("WorkerPool: %d (for %s) has zero workers - adding %d temporary workers for %s", p.qid, mq.Name, boost, p.boostTimeout)
+
+ start := time.Now()
+ pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), cancel, false)
+ go func() {
+ select {
+ case <-ctx.Done():
+ case <-time.After(p.boostTimeout):
+ }
+ mq.RemoveWorkers(pid)
+ cancel()
+ }()
+ } else {
+ log.Warn("WorkerPool: %d has zero workers - adding %d temporary workers for %s", p.qid, p.boostWorkers, p.boostTimeout)
+ go func() {
+ select {
+ case <-ctx.Done():
+ case <-time.After(p.boostTimeout):
+ }
+ cancel()
+ }()
+ }
+ p.lock.Unlock()
+ p.addWorkers(ctx, boost)
+}
+
func (p *WorkerPool) pushBoost(data Data) {
select {
case p.dataChan <- data:
@@ -112,7 +150,7 @@ func (p *WorkerPool) pushBoost(data Data) {
log.Warn("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, mq.Name, ourTimeout, boost, p.boostTimeout, p.blockTimeout)
start := time.Now()
- pid := mq.RegisterWorkers(boost, start, false, start, cancel, false)
+ pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), cancel, false)
go func() {
<-ctx.Done()
mq.RemoveWorkers(pid)