diff options
author | zeripath <art27@cantab.net> | 2022-01-25 23:09:57 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-26 01:09:57 +0200 |
commit | 713985b1a4c5d2faf997b5e96885e9d1a31250db (patch) | |
tree | b62a624b1ca1e6ee7345a8c35ac56132804d797f | |
parent | b53fd5ff9006af9d35e8de727f8ebbbb4bb30806 (diff) | |
download | gitea-713985b1a4c5d2faf997b5e96885e9d1a31250db.tar.gz gitea-713985b1a4c5d2faf997b5e96885e9d1a31250db.zip |
Prevent deadlocks in persistable channel pause test (#18410)
* Prevent deadlocks in persistable channel pause test
Because of reuse of the old paused/resumed channels in this test there
was a potential for deadlock. This PR ensures that the channels are always
reobtained.
It further adds some control code to detect hangs in future - and it
ensures that the pausing warning is not shown on shutdown.
Signed-off-by: Andrew Thornton <art27@cantab.net>
* do not warn but do pause
Signed-off-by: Andrew Thornton <art27@cantab.net>
-rw-r--r-- | modules/queue/queue_disk_channel_test.go | 48 | ||||
-rw-r--r-- | modules/queue/workerpool.go | 11 |
2 files changed, 53 insertions, 6 deletions
diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go index 9bbd146efe..026982fd92 100644 --- a/modules/queue/queue_disk_channel_test.go +++ b/modules/queue/queue_disk_channel_test.go @@ -287,11 +287,16 @@ func TestPersistableChannelQueue_Pause(t *testing.T) { assert.Nil(t, result2) pausable.Resume() + paused, resumed = pausable.IsPausedIsResumed() select { + case <-paused: + assert.Fail(t, "Queue should be resumed") + return case <-resumed: default: assert.Fail(t, "Queue should be resumed") + return } select { @@ -345,16 +350,22 @@ func TestPersistableChannelQueue_Pause(t *testing.T) { pausable.Resume() + paused, resumed = pausable.IsPausedIsResumed() select { + case <-paused: + assert.Fail(t, "Queue should not be paused") + return case <-resumed: default: assert.Fail(t, "Queue should be resumed") + return } select { case result1 = <-handleChan: case <-time.After(500 * time.Millisecond): assert.Fail(t, "handler chan should contain test1") + return } assert.Equal(t, test1.TestString, result1.TestString) assert.Equal(t, test1.TestInt, result1.TestInt) @@ -369,7 +380,12 @@ func TestPersistableChannelQueue_Pause(t *testing.T) { } // Wait til it is closed - <-queue.(*PersistableChannelQueue).closed + select { + case <-queue.(*PersistableChannelQueue).closed: + case <-time.After(5 * time.Second): + assert.Fail(t, "queue should close") + return + } err = queue.Push(&test1) assert.NoError(t, err) @@ -378,6 +394,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) { select { case <-handleChan: assert.Fail(t, "Handler processing should have stopped") + return default: } @@ -393,6 +410,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) { select { case <-handleChan: assert.Fail(t, "Handler processing should have stopped") + return default: } @@ -431,6 +449,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) { select { case <-handleChan: assert.Fail(t, "Handler processing should have stopped") + return case <-paused: } @@ -449,13 +468,36 @@ func TestPersistableChannelQueue_Pause(t *testing.T) { select { case <-handleChan: assert.Fail(t, "Handler processing should have stopped") + return default: } pausable.Resume() + paused, resumed = pausable.IsPausedIsResumed() + select { + case <-paused: + assert.Fail(t, "Queue should not be paused") + return + case <-resumed: + default: + assert.Fail(t, "Queue should be resumed") + return + } - result3 := <-handleChan - result4 := <-handleChan + var result3, result4 *testData + + select { + case result3 = <-handleChan: + case <-time.After(1 * time.Second): + assert.Fail(t, "Handler processing should have resumed") + return + } + select { + case result4 = <-handleChan: + case <-time.After(1 * time.Second): + assert.Fail(t, "Handler processing should have resumed") + return + } if result4.TestString == test1.TestString { result3, result4 = result4, result3 } diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index 30dc8073c9..3eeebaa1a0 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -301,9 +301,14 @@ func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc, cancel() } if p.hasNoWorkerScaling() { - log.Warn( - "Queue: %d is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n"+ - "The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required.", p.qid) + select { + case <-p.baseCtx.Done(): + // Don't warn if the baseCtx is shutdown + default: + log.Warn( + "Queue: %d is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n"+ + "The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required.", p.qid) + } p.pause() } p.lock.Unlock() |