aboutsummaryrefslogtreecommitdiffstats
path: root/modules
diff options
context:
space:
mode:
authorwxiaoguang <wxiaoguang@gmail.com>2024-04-23 16:30:32 +0800
committerGitHub <noreply@github.com>2024-04-23 08:30:32 +0000
commit32f895f2d7b9d74b84f99f7ea130b62adc6769be (patch)
treee083ace0132e58dc5a211d1bb8313177a9127b50 /modules
parentd95408bd5d3db8552b8044ffcc8c9e3b31b31f2e (diff)
downloadgitea-32f895f2d7b9d74b84f99f7ea130b62adc6769be.tar.gz
gitea-32f895f2d7b9d74b84f99f7ea130b62adc6769be.zip
Fix queue test (#30646) (#30650)
Backport #30553 and #30646
Diffstat (limited to 'modules')
-rw-r--r--modules/queue/backoff.go10
-rw-r--r--modules/queue/workergroup.go18
-rw-r--r--modules/queue/workerqueue.go2
-rw-r--r--modules/queue/workerqueue_test.go32
4 files changed, 43 insertions, 19 deletions
diff --git a/modules/queue/backoff.go b/modules/queue/backoff.go
index cda7233567..02fc88574a 100644
--- a/modules/queue/backoff.go
+++ b/modules/queue/backoff.go
@@ -8,7 +8,7 @@ import (
"time"
)
-const (
+var (
backoffBegin = 50 * time.Millisecond
backoffUpper = 2 * time.Second
)
@@ -18,6 +18,14 @@ type (
backoffFuncErr func() (retry bool, err error)
)
+func mockBackoffDuration(d time.Duration) func() {
+ oldBegin, oldUpper := backoffBegin, backoffUpper
+ backoffBegin, backoffUpper = d, d
+ return func() {
+ backoffBegin, backoffUpper = oldBegin, oldUpper
+ }
+}
+
func backoffRetErr[T any](ctx context.Context, begin, upper time.Duration, end <-chan time.Time, fn backoffFuncRetErr[T]) (ret T, err error) {
d := begin
for {
diff --git a/modules/queue/workergroup.go b/modules/queue/workergroup.go
index e3801ef2b2..153123f883 100644
--- a/modules/queue/workergroup.go
+++ b/modules/queue/workergroup.go
@@ -63,6 +63,8 @@ func (q *WorkerPoolQueue[T]) doDispatchBatchToWorker(wg *workerGroup[T], flushCh
// TODO: the logic could be improved in the future, to avoid a data-race between "doStartNewWorker" and "workerNum"
// The root problem is that if we skip "doStartNewWorker" here, the "workerNum" might be decreased by other workers later
// So ideally, it should check whether there are enough workers by some approaches, and start new workers if necessary.
+ // This data-race is not serious, as long as a new worker will be started soon to make sure there are enough workers,
+ // so no need to hugely refactor at the moment.
q.workerNumMu.Lock()
noWorker := q.workerNum == 0
if full || noWorker {
@@ -136,6 +138,14 @@ func (q *WorkerPoolQueue[T]) basePushForShutdown(items ...T) bool {
return true
}
+func resetIdleTicker(t *time.Ticker, dur time.Duration) {
+ t.Reset(dur)
+ select {
+ case <-t.C:
+ default:
+ }
+}
+
// doStartNewWorker starts a new worker for the queue, the worker reads from worker's channel and handles the items.
func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) {
wp.wg.Add(1)
@@ -146,8 +156,6 @@ func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) {
log.Debug("Queue %q starts new worker", q.GetName())
defer log.Debug("Queue %q stops idle worker", q.GetName())
- atomic.AddInt32(&q.workerStartedCounter, 1) // Only increase counter, used for debugging
-
t := time.NewTicker(workerIdleDuration)
defer t.Stop()
@@ -169,11 +177,7 @@ func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) {
}
q.doWorkerHandle(batch)
// reset the idle ticker, and drain the tick after reset in case a tick is already triggered
- t.Reset(workerIdleDuration)
- select {
- case <-t.C:
- default:
- }
+ resetIdleTicker(t, workerIdleDuration) // key code for TestWorkerPoolQueueWorkerIdleReset
case <-t.C:
q.workerNumMu.Lock()
keepWorking = q.workerNum <= 1 // keep the last worker running
diff --git a/modules/queue/workerqueue.go b/modules/queue/workerqueue.go
index 4160622d81..b28fd88027 100644
--- a/modules/queue/workerqueue.go
+++ b/modules/queue/workerqueue.go
@@ -40,8 +40,6 @@ type WorkerPoolQueue[T any] struct {
workerMaxNum int
workerActiveNum int
workerNumMu sync.Mutex
-
- workerStartedCounter int32
}
type flushType chan struct{}
diff --git a/modules/queue/workerqueue_test.go b/modules/queue/workerqueue_test.go
index e09669c542..d66253ff66 100644
--- a/modules/queue/workerqueue_test.go
+++ b/modules/queue/workerqueue_test.go
@@ -5,8 +5,10 @@ package queue
import (
"context"
+ "slices"
"strconv"
"sync"
+ "sync/atomic"
"testing"
"time"
@@ -250,22 +252,34 @@ func TestWorkerPoolQueueShutdown(t *testing.T) {
func TestWorkerPoolQueueWorkerIdleReset(t *testing.T) {
defer test.MockVariableValue(&workerIdleDuration, 10*time.Millisecond)()
+ defer mockBackoffDuration(5 * time.Millisecond)()
+ var q *WorkerPoolQueue[int]
+ var handledCount atomic.Int32
+ var hasOnlyOneWorkerRunning atomic.Bool
handler := func(items ...int) (unhandled []int) {
- time.Sleep(50 * time.Millisecond)
+ handledCount.Add(int32(len(items)))
+ // make each work have different duration, and check the active worker number periodically
+ var activeNums []int
+ for i := 0; i < 5-items[0]%2; i++ {
+ time.Sleep(workerIdleDuration * 2)
+ activeNums = append(activeNums, q.GetWorkerActiveNumber())
+ }
+ // When the queue never becomes empty, the existing workers should keep working
+ // It is not 100% true at the moment because the data-race in workergroup.go is not resolved, see that TODO */
+ // If the "active worker numbers" is like [2 2 ... 1 1], it means that an existing worker exited and the no new worker is started.
+ if slices.Equal([]int{1, 1}, activeNums[len(activeNums)-2:]) {
+ hasOnlyOneWorkerRunning.Store(true)
+ }
return nil
}
-
- q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 2, Length: 100}, handler, false)
+ q, _ = newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 2, Length: 100}, handler, false)
stop := runWorkerPoolQueue(q)
- for i := 0; i < 20; i++ {
+ for i := 0; i < 100; i++ {
assert.NoError(t, q.Push(i))
}
-
time.Sleep(500 * time.Millisecond)
- assert.EqualValues(t, 2, q.GetWorkerNumber())
- assert.EqualValues(t, 2, q.GetWorkerActiveNumber())
- // when the queue never becomes empty, the existing workers should keep working
- assert.EqualValues(t, 2, q.workerStartedCounter)
+ assert.Greater(t, int(handledCount.Load()), 4) // make sure there are enough items handled during the test
+ assert.False(t, hasOnlyOneWorkerRunning.Load(), "a slow handler should not block other workers from starting")
stop()
}