diff options
author | wxiaoguang <wxiaoguang@gmail.com> | 2024-11-15 23:45:07 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-15 23:45:07 +0800 |
commit | ecbb03dc6d0e0565663dff977a4fc3d40a1e0c1e (patch) | |
tree | 662c4fe1755d323811c3fd3abcb81e256a86326e /modules/queue | |
parent | a0c0cb3a2c426e39b91e7301c94ddc3a988c8607 (diff) | |
download | gitea-ecbb03dc6d0e0565663dff977a4fc3d40a1e0c1e.tar.gz gitea-ecbb03dc6d0e0565663dff977a4fc3d40a1e0c1e.zip |
Improve testing and try to fix MySQL hanging (#32515)
By some CI fine tunes (`run tests`), SQLite & MSSQL could complete
in about 12~13 minutes (before > 14), MySQL could complete in 18 minutes
(before: about 23 or even > 30)
Major changes:
1. use tmpfs for MySQL storage
1. run `make test-mysql` instead of `make integration-test-coverage`
because the code coverage is not really used at the moment.
1. refactor testlogger to make it more reliable and be able to report
stuck stacktrace
1. do not requeue failed items when a queue is being flushed (failed
items would keep failing and make flush uncompleted)
1. reduce the file sizes for testing
1. use math ChaCha20 random data instead of crypot/rand (for testing
purpose only)
1. no need to `DeleteRepository` in `TestLinguist`
1. other related refactoring to make code easier to maintain
Diffstat (limited to 'modules/queue')
-rw-r--r-- | modules/queue/workergroup.go | 34 | ||||
-rw-r--r-- | modules/queue/workerqueue.go | 5 |
2 files changed, 24 insertions, 15 deletions
diff --git a/modules/queue/workergroup.go b/modules/queue/workergroup.go index 5859b64c0d..82b0790d5a 100644 --- a/modules/queue/workergroup.go +++ b/modules/queue/workergroup.go @@ -23,7 +23,7 @@ var ( ) func init() { - unhandledItemRequeueDuration.Store(int64(5 * time.Second)) + unhandledItemRequeueDuration.Store(int64(time.Second)) } // workerGroup is a group of workers to work with a WorkerPoolQueue @@ -104,7 +104,12 @@ func (q *WorkerPoolQueue[T]) doWorkerHandle(batch []T) { // if none of the items were handled, it should back-off for a few seconds // in this case the handler (eg: document indexer) may have encountered some errors/failures if len(unhandled) == len(batch) && unhandledItemRequeueDuration.Load() != 0 { + if q.isFlushing.Load() { + return // do not requeue items when flushing, since all items failed, requeue them will continue failing. + } log.Error("Queue %q failed to handle batch of %d items, backoff for a few seconds", q.GetName(), len(batch)) + // TODO: ideally it shouldn't "sleep" here (blocks the worker, then blocks flush). + // It could debounce the requeue operation, and try to requeue the items in the future. select { case <-q.ctxRun.Done(): case <-time.After(time.Duration(unhandledItemRequeueDuration.Load())): @@ -193,6 +198,9 @@ func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) { // doFlush flushes the queue: it tries to read all items from the queue and handles them. // It is for testing purpose only. It's not designed to work for a cluster. func (q *WorkerPoolQueue[T]) doFlush(wg *workerGroup[T], flush flushType) { + q.isFlushing.Store(true) + defer q.isFlushing.Store(false) + log.Debug("Queue %q starts flushing", q.GetName()) defer log.Debug("Queue %q finishes flushing", q.GetName()) @@ -236,6 +244,9 @@ loop: emptyCounter := 0 for { select { + case <-q.ctxRun.Done(): + log.Debug("Queue %q is shutting down", q.GetName()) + return case data, dataOk := <-wg.popItemChan: if !dataOk { return @@ -251,9 +262,6 @@ loop: log.Error("Failed to pop item from queue %q (doFlush): %v", q.GetName(), err) } return - case <-q.ctxRun.Done(): - log.Debug("Queue %q is shutting down", q.GetName()) - return case <-time.After(20 * time.Millisecond): // There is no reliable way to make sure all queue items are consumed by the Flush, there always might be some items stored in some buffers/temp variables. // If we run Gitea in a cluster, we can even not guarantee all items are consumed in a deterministic instance. @@ -331,6 +339,15 @@ func (q *WorkerPoolQueue[T]) doRun() { var batchDispatchC <-chan time.Time = infiniteTimerC for { select { + case flush := <-q.flushChan: + // before flushing, it needs to try to dispatch the batch to worker first, in case there is no worker running + // after the flushing, there is at least one worker running, so "doFlush" could wait for workers to finish + // since we are already in a "flush" operation, so the dispatching function shouldn't read the flush chan. + q.doDispatchBatchToWorker(wg, skipFlushChan) + q.doFlush(wg, flush) + case <-q.ctxRun.Done(): + log.Debug("Queue %q is shutting down", q.GetName()) + return case data, dataOk := <-wg.popItemChan: if !dataOk { return @@ -349,20 +366,11 @@ func (q *WorkerPoolQueue[T]) doRun() { case <-batchDispatchC: batchDispatchC = infiniteTimerC q.doDispatchBatchToWorker(wg, q.flushChan) - case flush := <-q.flushChan: - // before flushing, it needs to try to dispatch the batch to worker first, in case there is no worker running - // after the flushing, there is at least one worker running, so "doFlush" could wait for workers to finish - // since we are already in a "flush" operation, so the dispatching function shouldn't read the flush chan. - q.doDispatchBatchToWorker(wg, skipFlushChan) - q.doFlush(wg, flush) case err := <-wg.popItemErr: if !q.isCtxRunCanceled() { log.Error("Failed to pop item from queue %q (doRun): %v", q.GetName(), err) } return - case <-q.ctxRun.Done(): - log.Debug("Queue %q is shutting down", q.GetName()) - return } } } diff --git a/modules/queue/workerqueue.go b/modules/queue/workerqueue.go index f35ed93239..672e9a4114 100644 --- a/modules/queue/workerqueue.go +++ b/modules/queue/workerqueue.go @@ -32,8 +32,9 @@ type WorkerPoolQueue[T any] struct { baseConfig *BaseConfig baseQueue baseQueue - batchChan chan []T - flushChan chan flushType + batchChan chan []T + flushChan chan flushType + isFlushing atomic.Bool batchLength int workerNum int |