aboutsummaryrefslogtreecommitdiffstats
path: root/modules/queue
diff options
context:
space:
mode:
authorwxiaoguang <wxiaoguang@gmail.com>2024-11-15 23:45:07 +0800
committerGitHub <noreply@github.com>2024-11-15 23:45:07 +0800
commitecbb03dc6d0e0565663dff977a4fc3d40a1e0c1e (patch)
tree662c4fe1755d323811c3fd3abcb81e256a86326e /modules/queue
parenta0c0cb3a2c426e39b91e7301c94ddc3a988c8607 (diff)
downloadgitea-ecbb03dc6d0e0565663dff977a4fc3d40a1e0c1e.tar.gz
gitea-ecbb03dc6d0e0565663dff977a4fc3d40a1e0c1e.zip
Improve testing and try to fix MySQL hanging (#32515)
By some CI fine tunes (`run tests`), SQLite & MSSQL could complete in about 12~13 minutes (before > 14), MySQL could complete in 18 minutes (before: about 23 or even > 30) Major changes: 1. use tmpfs for MySQL storage 1. run `make test-mysql` instead of `make integration-test-coverage` because the code coverage is not really used at the moment. 1. refactor testlogger to make it more reliable and be able to report stuck stacktrace 1. do not requeue failed items when a queue is being flushed (failed items would keep failing and make flush uncompleted) 1. reduce the file sizes for testing 1. use math ChaCha20 random data instead of crypot/rand (for testing purpose only) 1. no need to `DeleteRepository` in `TestLinguist` 1. other related refactoring to make code easier to maintain
Diffstat (limited to 'modules/queue')
-rw-r--r--modules/queue/workergroup.go34
-rw-r--r--modules/queue/workerqueue.go5
2 files changed, 24 insertions, 15 deletions
diff --git a/modules/queue/workergroup.go b/modules/queue/workergroup.go
index 5859b64c0d..82b0790d5a 100644
--- a/modules/queue/workergroup.go
+++ b/modules/queue/workergroup.go
@@ -23,7 +23,7 @@ var (
)
func init() {
- unhandledItemRequeueDuration.Store(int64(5 * time.Second))
+ unhandledItemRequeueDuration.Store(int64(time.Second))
}
// workerGroup is a group of workers to work with a WorkerPoolQueue
@@ -104,7 +104,12 @@ func (q *WorkerPoolQueue[T]) doWorkerHandle(batch []T) {
// if none of the items were handled, it should back-off for a few seconds
// in this case the handler (eg: document indexer) may have encountered some errors/failures
if len(unhandled) == len(batch) && unhandledItemRequeueDuration.Load() != 0 {
+ if q.isFlushing.Load() {
+ return // do not requeue items when flushing, since all items failed, requeue them will continue failing.
+ }
log.Error("Queue %q failed to handle batch of %d items, backoff for a few seconds", q.GetName(), len(batch))
+ // TODO: ideally it shouldn't "sleep" here (blocks the worker, then blocks flush).
+ // It could debounce the requeue operation, and try to requeue the items in the future.
select {
case <-q.ctxRun.Done():
case <-time.After(time.Duration(unhandledItemRequeueDuration.Load())):
@@ -193,6 +198,9 @@ func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) {
// doFlush flushes the queue: it tries to read all items from the queue and handles them.
// It is for testing purpose only. It's not designed to work for a cluster.
func (q *WorkerPoolQueue[T]) doFlush(wg *workerGroup[T], flush flushType) {
+ q.isFlushing.Store(true)
+ defer q.isFlushing.Store(false)
+
log.Debug("Queue %q starts flushing", q.GetName())
defer log.Debug("Queue %q finishes flushing", q.GetName())
@@ -236,6 +244,9 @@ loop:
emptyCounter := 0
for {
select {
+ case <-q.ctxRun.Done():
+ log.Debug("Queue %q is shutting down", q.GetName())
+ return
case data, dataOk := <-wg.popItemChan:
if !dataOk {
return
@@ -251,9 +262,6 @@ loop:
log.Error("Failed to pop item from queue %q (doFlush): %v", q.GetName(), err)
}
return
- case <-q.ctxRun.Done():
- log.Debug("Queue %q is shutting down", q.GetName())
- return
case <-time.After(20 * time.Millisecond):
// There is no reliable way to make sure all queue items are consumed by the Flush, there always might be some items stored in some buffers/temp variables.
// If we run Gitea in a cluster, we can even not guarantee all items are consumed in a deterministic instance.
@@ -331,6 +339,15 @@ func (q *WorkerPoolQueue[T]) doRun() {
var batchDispatchC <-chan time.Time = infiniteTimerC
for {
select {
+ case flush := <-q.flushChan:
+ // before flushing, it needs to try to dispatch the batch to worker first, in case there is no worker running
+ // after the flushing, there is at least one worker running, so "doFlush" could wait for workers to finish
+ // since we are already in a "flush" operation, so the dispatching function shouldn't read the flush chan.
+ q.doDispatchBatchToWorker(wg, skipFlushChan)
+ q.doFlush(wg, flush)
+ case <-q.ctxRun.Done():
+ log.Debug("Queue %q is shutting down", q.GetName())
+ return
case data, dataOk := <-wg.popItemChan:
if !dataOk {
return
@@ -349,20 +366,11 @@ func (q *WorkerPoolQueue[T]) doRun() {
case <-batchDispatchC:
batchDispatchC = infiniteTimerC
q.doDispatchBatchToWorker(wg, q.flushChan)
- case flush := <-q.flushChan:
- // before flushing, it needs to try to dispatch the batch to worker first, in case there is no worker running
- // after the flushing, there is at least one worker running, so "doFlush" could wait for workers to finish
- // since we are already in a "flush" operation, so the dispatching function shouldn't read the flush chan.
- q.doDispatchBatchToWorker(wg, skipFlushChan)
- q.doFlush(wg, flush)
case err := <-wg.popItemErr:
if !q.isCtxRunCanceled() {
log.Error("Failed to pop item from queue %q (doRun): %v", q.GetName(), err)
}
return
- case <-q.ctxRun.Done():
- log.Debug("Queue %q is shutting down", q.GetName())
- return
}
}
}
diff --git a/modules/queue/workerqueue.go b/modules/queue/workerqueue.go
index f35ed93239..672e9a4114 100644
--- a/modules/queue/workerqueue.go
+++ b/modules/queue/workerqueue.go
@@ -32,8 +32,9 @@ type WorkerPoolQueue[T any] struct {
baseConfig *BaseConfig
baseQueue baseQueue
- batchChan chan []T
- flushChan chan flushType
+ batchChan chan []T
+ flushChan chan flushType
+ isFlushing atomic.Bool
batchLength int
workerNum int