summaryrefslogtreecommitdiffstats
path: root/modules
diff options
context:
space:
mode:
authorzeripath <art27@cantab.net>2022-04-02 08:59:04 +0100
committerGitHub <noreply@github.com>2022-04-02 15:59:04 +0800
commit7b4c3c7bb12504f107122ce799948daf1e36b3e8 (patch)
treefeabbf5219d1ce65ab3e3c60e7756303cd33f544 /modules
parentcf5d4a7230e716f77a54a1591e90b34b21b2a7b8 (diff)
downloadgitea-7b4c3c7bb12504f107122ce799948daf1e36b3e8.tar.gz
gitea-7b4c3c7bb12504f107122ce799948daf1e36b3e8.zip
Prevent intermittent NPE in queue tests (#19301)
There appears to be an intermittent NPE in queue tests relating to the deferred shutdown/terminate functions. This PR more formally asserts that shutdown and termination occurs before starting and finishing the tests but leaves the defer in place to ensure that if there is an issue shutdown/termination will occur. Signed-off-by: Andrew Thornton <art27@cantab.net>
Diffstat (limited to 'modules')
-rw-r--r--modules/queue/queue_channel_test.go49
-rw-r--r--modules/queue/queue_disk_channel_test.go76
2 files changed, 97 insertions, 28 deletions
diff --git a/modules/queue/queue_channel_test.go b/modules/queue/queue_channel_test.go
index d30b908861..949c452893 100644
--- a/modules/queue/queue_channel_test.go
+++ b/modules/queue/queue_channel_test.go
@@ -128,6 +128,8 @@ func TestChannelQueue_Pause(t *testing.T) {
queueShutdown := []func(){}
queueTerminate := []func(){}
+ terminated := make(chan struct{})
+
queue, err = NewChannelQueue(handle,
ChannelQueueConfiguration{
WorkerPoolConfiguration: WorkerPoolConfiguration{
@@ -142,15 +144,18 @@ func TestChannelQueue_Pause(t *testing.T) {
}, &testData{})
assert.NoError(t, err)
- go queue.Run(func(shutdown func()) {
- lock.Lock()
- defer lock.Unlock()
- queueShutdown = append(queueShutdown, shutdown)
- }, func(terminate func()) {
- lock.Lock()
- defer lock.Unlock()
- queueTerminate = append(queueTerminate, terminate)
- })
+ go func() {
+ queue.Run(func(shutdown func()) {
+ lock.Lock()
+ defer lock.Unlock()
+ queueShutdown = append(queueShutdown, shutdown)
+ }, func(terminate func()) {
+ lock.Lock()
+ defer lock.Unlock()
+ queueTerminate = append(queueTerminate, terminate)
+ })
+ close(terminated)
+ }()
// Shutdown and Terminate in defer
defer func() {
@@ -278,4 +283,30 @@ func TestChannelQueue_Pause(t *testing.T) {
}
assert.Equal(t, test1.TestString, result1.TestString)
assert.Equal(t, test1.TestInt, result1.TestInt)
+
+ lock.Lock()
+ callbacks := make([]func(), len(queueShutdown))
+ copy(callbacks, queueShutdown)
+ queueShutdown = queueShutdown[:0]
+ lock.Unlock()
+ // Now shutdown the queue
+ for _, callback := range callbacks {
+ callback()
+ }
+
+ // terminate the queue
+ lock.Lock()
+ callbacks = make([]func(), len(queueTerminate))
+ copy(callbacks, queueTerminate)
+ queueShutdown = queueTerminate[:0]
+ lock.Unlock()
+ for _, callback := range callbacks {
+ callback()
+ }
+ select {
+ case <-terminated:
+ case <-time.After(10 * time.Second):
+ assert.Fail(t, "Queue should have terminated")
+ return
+ }
}
diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go
index f092bb1f56..22b4f0f452 100644
--- a/modules/queue/queue_disk_channel_test.go
+++ b/modules/queue/queue_disk_channel_test.go
@@ -221,6 +221,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
queueShutdown := []func(){}
queueTerminate := []func(){}
+ terminated := make(chan struct{})
tmpDir, err := os.MkdirTemp("", "persistable-channel-queue-pause-test-data")
assert.NoError(t, err)
@@ -237,15 +238,18 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
}, &testData{})
assert.NoError(t, err)
- go queue.Run(func(shutdown func()) {
- lock.Lock()
- defer lock.Unlock()
- queueShutdown = append(queueShutdown, shutdown)
- }, func(terminate func()) {
- lock.Lock()
- defer lock.Unlock()
- queueTerminate = append(queueTerminate, terminate)
- })
+ go func() {
+ queue.Run(func(shutdown func()) {
+ lock.Lock()
+ defer lock.Unlock()
+ queueShutdown = append(queueShutdown, shutdown)
+ }, func(terminate func()) {
+ lock.Lock()
+ defer lock.Unlock()
+ queueTerminate = append(queueTerminate, terminate)
+ })
+ close(terminated)
+ }()
// Shutdown and Terminate in defer
defer func() {
@@ -417,7 +421,10 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
case <-handleChan:
assert.Fail(t, "Handler processing should have stopped")
return
- default:
+ case <-terminated:
+ case <-time.After(10 * time.Second):
+ assert.Fail(t, "Queue should have terminated")
+ return
}
lock.Lock()
@@ -425,6 +432,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
lock.Unlock()
// Reopen queue
+ terminated = make(chan struct{})
queue, err = NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{
DataDir: tmpDir,
BatchLength: 1,
@@ -442,15 +450,18 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
paused, _ = pausable.IsPausedIsResumed()
- go queue.Run(func(shutdown func()) {
- lock.Lock()
- defer lock.Unlock()
- queueShutdown = append(queueShutdown, shutdown)
- }, func(terminate func()) {
- lock.Lock()
- defer lock.Unlock()
- queueTerminate = append(queueTerminate, terminate)
- })
+ go func() {
+ queue.Run(func(shutdown func()) {
+ lock.Lock()
+ defer lock.Unlock()
+ queueShutdown = append(queueShutdown, shutdown)
+ }, func(terminate func()) {
+ lock.Lock()
+ defer lock.Unlock()
+ queueTerminate = append(queueTerminate, terminate)
+ })
+ close(terminated)
+ }()
select {
case <-handleChan:
@@ -510,4 +521,31 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
assert.Equal(t, test2.TestString, result4.TestString)
assert.Equal(t, test2.TestInt, result4.TestInt)
+
+ lock.Lock()
+ callbacks = make([]func(), len(queueShutdown))
+ copy(callbacks, queueShutdown)
+ queueShutdown = queueShutdown[:0]
+ lock.Unlock()
+ // Now shutdown the queue
+ for _, callback := range callbacks {
+ callback()
+ }
+
+ // terminate the queue
+ lock.Lock()
+ callbacks = make([]func(), len(queueTerminate))
+ copy(callbacks, queueTerminate)
+ queueShutdown = queueTerminate[:0]
+ lock.Unlock()
+ for _, callback := range callbacks {
+ callback()
+ }
+
+ select {
+ case <-time.After(10 * time.Second):
+ assert.Fail(t, "Queue should have terminated")
+ return
+ case <-terminated:
+ }
}