aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--modules/queue/workergroup.go20
-rw-r--r--modules/queue/workerqueue.go2
-rw-r--r--modules/queue/workerqueue_test.go29
3 files changed, 42 insertions, 9 deletions
diff --git a/modules/queue/workergroup.go b/modules/queue/workergroup.go
index 147a4f335e..e3801ef2b2 100644
--- a/modules/queue/workergroup.go
+++ b/modules/queue/workergroup.go
@@ -60,6 +60,9 @@ func (q *WorkerPoolQueue[T]) doDispatchBatchToWorker(wg *workerGroup[T], flushCh
full = true
}
+ // TODO: the logic could be improved in the future, to avoid a data-race between "doStartNewWorker" and "workerNum"
+ // The root problem is that if we skip "doStartNewWorker" here, the "workerNum" might be decreased by other workers later
+ // So ideally, it should check whether there are enough workers by some approaches, and start new workers if necessary.
q.workerNumMu.Lock()
noWorker := q.workerNum == 0
if full || noWorker {
@@ -143,7 +146,11 @@ func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) {
log.Debug("Queue %q starts new worker", q.GetName())
defer log.Debug("Queue %q stops idle worker", q.GetName())
+ atomic.AddInt32(&q.workerStartedCounter, 1) // Only increase counter, used for debugging
+
t := time.NewTicker(workerIdleDuration)
+ defer t.Stop()
+
keepWorking := true
stopWorking := func() {
q.workerNumMu.Lock()
@@ -158,13 +165,18 @@ func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) {
case batch, ok := <-q.batchChan:
if !ok {
stopWorking()
- } else {
- q.doWorkerHandle(batch)
- t.Reset(workerIdleDuration)
+ continue
+ }
+ q.doWorkerHandle(batch)
+ // reset the idle ticker, and drain the tick after reset in case a tick is already triggered
+ t.Reset(workerIdleDuration)
+ select {
+ case <-t.C:
+ default:
}
case <-t.C:
q.workerNumMu.Lock()
- keepWorking = q.workerNum <= 1
+ keepWorking = q.workerNum <= 1 // keep the last worker running
if !keepWorking {
q.workerNum--
}
diff --git a/modules/queue/workerqueue.go b/modules/queue/workerqueue.go
index b28fd88027..4160622d81 100644
--- a/modules/queue/workerqueue.go
+++ b/modules/queue/workerqueue.go
@@ -40,6 +40,8 @@ type WorkerPoolQueue[T any] struct {
workerMaxNum int
workerActiveNum int
workerNumMu sync.Mutex
+
+ workerStartedCounter int32
}
type flushType chan struct{}
diff --git a/modules/queue/workerqueue_test.go b/modules/queue/workerqueue_test.go
index e60120162a..e09669c542 100644
--- a/modules/queue/workerqueue_test.go
+++ b/modules/queue/workerqueue_test.go
@@ -11,6 +11,7 @@ import (
"time"
"code.gitea.io/gitea/modules/setting"
+ "code.gitea.io/gitea/modules/test"
"github.com/stretchr/testify/assert"
)
@@ -175,11 +176,7 @@ func testWorkerPoolQueuePersistence(t *testing.T, queueSetting setting.QueueSett
}
func TestWorkerPoolQueueActiveWorkers(t *testing.T) {
- oldWorkerIdleDuration := workerIdleDuration
- workerIdleDuration = 300 * time.Millisecond
- defer func() {
- workerIdleDuration = oldWorkerIdleDuration
- }()
+ defer test.MockVariableValue(&workerIdleDuration, 300*time.Millisecond)()
handler := func(items ...int) (unhandled []int) {
time.Sleep(100 * time.Millisecond)
@@ -250,3 +247,25 @@ func TestWorkerPoolQueueShutdown(t *testing.T) {
q, _ = newWorkerPoolQueueForTest("test-workpoolqueue", qs, handler, false)
assert.EqualValues(t, 20, q.GetQueueItemNumber())
}
+
+func TestWorkerPoolQueueWorkerIdleReset(t *testing.T) {
+ defer test.MockVariableValue(&workerIdleDuration, 10*time.Millisecond)()
+
+ handler := func(items ...int) (unhandled []int) {
+ time.Sleep(50 * time.Millisecond)
+ return nil
+ }
+
+ q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 2, Length: 100}, handler, false)
+ stop := runWorkerPoolQueue(q)
+ for i := 0; i < 20; i++ {
+ assert.NoError(t, q.Push(i))
+ }
+
+ time.Sleep(500 * time.Millisecond)
+ assert.EqualValues(t, 2, q.GetWorkerNumber())
+ assert.EqualValues(t, 2, q.GetWorkerActiveNumber())
+ // when the queue never becomes empty, the existing workers should keep working
+ assert.EqualValues(t, 2, q.workerStartedCounter)
+ stop()
+}