summaryrefslogtreecommitdiffstats
path: root/modules
diff options
context:
space:
mode:
Diffstat (limited to 'modules')
-rw-r--r--modules/queue/manager.go10
-rw-r--r--modules/queue/queue_bytefifo.go7
-rw-r--r--modules/queue/workerpool.go5
3 files changed, 22 insertions, 0 deletions
diff --git a/modules/queue/manager.go b/modules/queue/manager.go
index 73c57540be..bba2c54ad2 100644
--- a/modules/queue/manager.go
+++ b/modules/queue/manager.go
@@ -84,6 +84,8 @@ type ManagedPool interface {
BoostWorkers() int
// SetPoolSettings sets the user updatable settings for the pool
SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
+ // NumberInQueue returns the total number of items in the pool
+ NumberInQueue() int64
// Done returns a channel that will be closed when the Pool's baseCtx is closed
Done() <-chan struct{}
}
@@ -427,6 +429,14 @@ func (q *ManagedQueue) SetPoolSettings(maxNumberOfWorkers, boostWorkers int, tim
}
}
+// NumberInQueue returns the number of items in the queue
+func (q *ManagedQueue) NumberInQueue() int64 {
+ if pool, ok := q.Managed.(ManagedPool); ok {
+ return pool.NumberInQueue()
+ }
+ return -1
+}
+
func (l ManagedQueueList) Len() int {
return len(l)
}
diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go
index bf153d70bb..ead3828f33 100644
--- a/modules/queue/queue_bytefifo.go
+++ b/modules/queue/queue_bytefifo.go
@@ -135,6 +135,13 @@ func (q *ByteFIFOQueue) IsEmpty() bool {
return q.byteFIFO.Len(q.terminateCtx) == 0
}
+// NumberInQueue returns the number in the queue
+func (q *ByteFIFOQueue) NumberInQueue() int64 {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+ return q.byteFIFO.Len(q.terminateCtx) + q.WorkerPool.NumberInQueue()
+}
+
// Flush flushes the ByteFIFOQueue
func (q *ByteFIFOQueue) Flush(timeout time.Duration) error {
select {
diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go
index 100197c5e1..5f6ec18710 100644
--- a/modules/queue/workerpool.go
+++ b/modules/queue/workerpool.go
@@ -204,6 +204,11 @@ func (p *WorkerPool) NumberOfWorkers() int {
return p.numberOfWorkers
}
+// NumberInQueue returns the number of items in the queue
+func (p *WorkerPool) NumberInQueue() int64 {
+ return atomic.LoadInt64(&p.numInQueue)
+}
+
// MaxNumberOfWorkers returns the maximum number of workers automatically added to the pool
func (p *WorkerPool) MaxNumberOfWorkers() int {
p.lock.Lock()