summaryrefslogtreecommitdiffstats
path: root/modules/queue
diff options
context:
space:
mode:
authorzeripath <art27@cantab.net>2022-01-25 23:09:57 +0000
committerGitHub <noreply@github.com>2022-01-26 01:09:57 +0200
commit713985b1a4c5d2faf997b5e96885e9d1a31250db (patch)
treeb62a624b1ca1e6ee7345a8c35ac56132804d797f /modules/queue
parentb53fd5ff9006af9d35e8de727f8ebbbb4bb30806 (diff)
downloadgitea-713985b1a4c5d2faf997b5e96885e9d1a31250db.tar.gz
gitea-713985b1a4c5d2faf997b5e96885e9d1a31250db.zip
Prevent deadlocks in persistable channel pause test (#18410)
* Prevent deadlocks in persistable channel pause test Because of reuse of the old paused/resumed channels in this test there was a potential for deadlock. This PR ensures that the channels are always reobtained. It further adds some control code to detect hangs in future - and it ensures that the pausing warning is not shown on shutdown. Signed-off-by: Andrew Thornton <art27@cantab.net> * do not warn but do pause Signed-off-by: Andrew Thornton <art27@cantab.net>
Diffstat (limited to 'modules/queue')
-rw-r--r--modules/queue/queue_disk_channel_test.go48
-rw-r--r--modules/queue/workerpool.go11
2 files changed, 53 insertions, 6 deletions
diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go
index 9bbd146efe..026982fd92 100644
--- a/modules/queue/queue_disk_channel_test.go
+++ b/modules/queue/queue_disk_channel_test.go
@@ -287,11 +287,16 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
assert.Nil(t, result2)
pausable.Resume()
+ paused, resumed = pausable.IsPausedIsResumed()
select {
+ case <-paused:
+ assert.Fail(t, "Queue should be resumed")
+ return
case <-resumed:
default:
assert.Fail(t, "Queue should be resumed")
+ return
}
select {
@@ -345,16 +350,22 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
pausable.Resume()
+ paused, resumed = pausable.IsPausedIsResumed()
select {
+ case <-paused:
+ assert.Fail(t, "Queue should not be paused")
+ return
case <-resumed:
default:
assert.Fail(t, "Queue should be resumed")
+ return
}
select {
case result1 = <-handleChan:
case <-time.After(500 * time.Millisecond):
assert.Fail(t, "handler chan should contain test1")
+ return
}
assert.Equal(t, test1.TestString, result1.TestString)
assert.Equal(t, test1.TestInt, result1.TestInt)
@@ -369,7 +380,12 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
}
// Wait til it is closed
- <-queue.(*PersistableChannelQueue).closed
+ select {
+ case <-queue.(*PersistableChannelQueue).closed:
+ case <-time.After(5 * time.Second):
+ assert.Fail(t, "queue should close")
+ return
+ }
err = queue.Push(&test1)
assert.NoError(t, err)
@@ -378,6 +394,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
select {
case <-handleChan:
assert.Fail(t, "Handler processing should have stopped")
+ return
default:
}
@@ -393,6 +410,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
select {
case <-handleChan:
assert.Fail(t, "Handler processing should have stopped")
+ return
default:
}
@@ -431,6 +449,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
select {
case <-handleChan:
assert.Fail(t, "Handler processing should have stopped")
+ return
case <-paused:
}
@@ -449,13 +468,36 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
select {
case <-handleChan:
assert.Fail(t, "Handler processing should have stopped")
+ return
default:
}
pausable.Resume()
+ paused, resumed = pausable.IsPausedIsResumed()
+ select {
+ case <-paused:
+ assert.Fail(t, "Queue should not be paused")
+ return
+ case <-resumed:
+ default:
+ assert.Fail(t, "Queue should be resumed")
+ return
+ }
- result3 := <-handleChan
- result4 := <-handleChan
+ var result3, result4 *testData
+
+ select {
+ case result3 = <-handleChan:
+ case <-time.After(1 * time.Second):
+ assert.Fail(t, "Handler processing should have resumed")
+ return
+ }
+ select {
+ case result4 = <-handleChan:
+ case <-time.After(1 * time.Second):
+ assert.Fail(t, "Handler processing should have resumed")
+ return
+ }
if result4.TestString == test1.TestString {
result3, result4 = result4, result3
}
diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go
index 30dc8073c9..3eeebaa1a0 100644
--- a/modules/queue/workerpool.go
+++ b/modules/queue/workerpool.go
@@ -301,9 +301,14 @@ func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc,
cancel()
}
if p.hasNoWorkerScaling() {
- log.Warn(
- "Queue: %d is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n"+
- "The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required.", p.qid)
+ select {
+ case <-p.baseCtx.Done():
+ // Don't warn if the baseCtx is shutdown
+ default:
+ log.Warn(
+ "Queue: %d is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n"+
+ "The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required.", p.qid)
+ }
p.pause()
}
p.lock.Unlock()