summaryrefslogtreecommitdiffstats
path: root/modules/queue/queue_disk_channel_test.go
diff options
context:
space:
mode:
authorzeripath <art27@cantab.net>2022-01-22 21:22:14 +0000
committerGitHub <noreply@github.com>2022-01-22 21:22:14 +0000
commita82fd98d5368a75cbcf6b74c12f58f3f81e66662 (patch)
treecb64c9348ee3d3194c786bb970770c06a8bd4fb1 /modules/queue/queue_disk_channel_test.go
parent27ee01e1e866f2f13603af65224ddae77d5149d7 (diff)
downloadgitea-a82fd98d5368a75cbcf6b74c12f58f3f81e66662.tar.gz
gitea-a82fd98d5368a75cbcf6b74c12f58f3f81e66662.zip
Pause queues (#15928)
* Start adding mechanism to return unhandled data Signed-off-by: Andrew Thornton <art27@cantab.net> * Create pushback interface Signed-off-by: Andrew Thornton <art27@cantab.net> * Add Pausable interface to WorkerPool and Manager Signed-off-by: Andrew Thornton <art27@cantab.net> * Implement Pausable and PushBack for the bytefifos Signed-off-by: Andrew Thornton <art27@cantab.net> * Implement Pausable and Pushback for ChannelQueues and ChannelUniqueQueues Signed-off-by: Andrew Thornton <art27@cantab.net> * Wire in UI for pausing Signed-off-by: Andrew Thornton <art27@cantab.net> * add testcases and fix a few issues Signed-off-by: Andrew Thornton <art27@cantab.net> * fix build Signed-off-by: Andrew Thornton <art27@cantab.net> * prevent "race" in the test Signed-off-by: Andrew Thornton <art27@cantab.net> * fix jsoniter mismerge Signed-off-by: Andrew Thornton <art27@cantab.net> * fix conflicts Signed-off-by: Andrew Thornton <art27@cantab.net> * fix format Signed-off-by: Andrew Thornton <art27@cantab.net> * Add warnings for no worker configurations and prevent data-loss with redis/levelqueue Signed-off-by: Andrew Thornton <art27@cantab.net> * Use StopTimer Signed-off-by: Andrew Thornton <art27@cantab.net> Co-authored-by: Lauris BH <lauris@nix.lv> Co-authored-by: 6543 <6543@obermui.de> Co-authored-by: techknowlogick <techknowlogick@gitea.io> Co-authored-by: wxiaoguang <wxiaoguang@gmail.com>
Diffstat (limited to 'modules/queue/queue_disk_channel_test.go')
-rw-r--r--modules/queue/queue_disk_channel_test.go292
1 files changed, 291 insertions, 1 deletions
diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go
index db12d9575c..9bbd146efe 100644
--- a/modules/queue/queue_disk_channel_test.go
+++ b/modules/queue/queue_disk_channel_test.go
@@ -8,7 +8,9 @@ import (
"os"
"sync"
"testing"
+ "time"
+ "code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/util"
"github.com/stretchr/testify/assert"
@@ -16,7 +18,7 @@ import (
func TestPersistableChannelQueue(t *testing.T) {
handleChan := make(chan *testData)
- handle := func(data ...Data) {
+ handle := func(data ...Data) []Data {
for _, datum := range data {
if datum == nil {
continue
@@ -24,6 +26,7 @@ func TestPersistableChannelQueue(t *testing.T) {
testDatum := datum.(*testData)
handleChan <- testDatum
}
+ return nil
}
lock := sync.Mutex{}
@@ -189,3 +192,290 @@ func TestPersistableChannelQueue(t *testing.T) {
callback()
}
}
+
+func TestPersistableChannelQueue_Pause(t *testing.T) {
+ lock := sync.Mutex{}
+ var queue Queue
+ var err error
+ pushBack := false
+
+ handleChan := make(chan *testData)
+ handle := func(data ...Data) []Data {
+ lock.Lock()
+ if pushBack {
+ if pausable, ok := queue.(Pausable); ok {
+ log.Info("pausing")
+ pausable.Pause()
+ }
+ pushBack = false
+ lock.Unlock()
+ return data
+ }
+ lock.Unlock()
+
+ for _, datum := range data {
+ testDatum := datum.(*testData)
+ handleChan <- testDatum
+ }
+ return nil
+ }
+
+ queueShutdown := []func(){}
+ queueTerminate := []func(){}
+
+ tmpDir, err := os.MkdirTemp("", "persistable-channel-queue-pause-test-data")
+ assert.NoError(t, err)
+ defer util.RemoveAll(tmpDir)
+
+ queue, err = NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{
+ DataDir: tmpDir,
+ BatchLength: 2,
+ QueueLength: 20,
+ Workers: 1,
+ BoostWorkers: 0,
+ MaxWorkers: 10,
+ Name: "first",
+ }, &testData{})
+ assert.NoError(t, err)
+
+ go queue.Run(func(shutdown func()) {
+ lock.Lock()
+ defer lock.Unlock()
+ queueShutdown = append(queueShutdown, shutdown)
+ }, func(terminate func()) {
+ lock.Lock()
+ defer lock.Unlock()
+ queueTerminate = append(queueTerminate, terminate)
+ })
+
+ test1 := testData{"A", 1}
+ test2 := testData{"B", 2}
+
+ err = queue.Push(&test1)
+ assert.NoError(t, err)
+
+ pausable, ok := queue.(Pausable)
+ if !assert.True(t, ok) {
+ return
+ }
+ result1 := <-handleChan
+ assert.Equal(t, test1.TestString, result1.TestString)
+ assert.Equal(t, test1.TestInt, result1.TestInt)
+
+ pausable.Pause()
+ paused, resumed := pausable.IsPausedIsResumed()
+
+ select {
+ case <-paused:
+ case <-resumed:
+ assert.Fail(t, "Queue should not be resumed")
+ return
+ default:
+ assert.Fail(t, "Queue is not paused")
+ return
+ }
+
+ queue.Push(&test2)
+
+ var result2 *testData
+ select {
+ case result2 = <-handleChan:
+ assert.Fail(t, "handler chan should be empty")
+ case <-time.After(100 * time.Millisecond):
+ }
+
+ assert.Nil(t, result2)
+
+ pausable.Resume()
+
+ select {
+ case <-resumed:
+ default:
+ assert.Fail(t, "Queue should be resumed")
+ }
+
+ select {
+ case result2 = <-handleChan:
+ case <-time.After(500 * time.Millisecond):
+ assert.Fail(t, "handler chan should contain test2")
+ }
+
+ assert.Equal(t, test2.TestString, result2.TestString)
+ assert.Equal(t, test2.TestInt, result2.TestInt)
+
+ lock.Lock()
+ pushBack = true
+ lock.Unlock()
+
+ paused, resumed = pausable.IsPausedIsResumed()
+
+ select {
+ case <-paused:
+ assert.Fail(t, "Queue should not be paused")
+ return
+ case <-resumed:
+ default:
+ assert.Fail(t, "Queue is not resumed")
+ return
+ }
+
+ queue.Push(&test1)
+
+ select {
+ case <-paused:
+ case <-handleChan:
+ assert.Fail(t, "handler chan should not contain test1")
+ return
+ case <-time.After(500 * time.Millisecond):
+ assert.Fail(t, "queue should be paused")
+ return
+ }
+
+ paused, resumed = pausable.IsPausedIsResumed()
+
+ select {
+ case <-paused:
+ case <-resumed:
+ assert.Fail(t, "Queue should not be resumed")
+ return
+ default:
+ assert.Fail(t, "Queue is not paused")
+ return
+ }
+
+ pausable.Resume()
+
+ select {
+ case <-resumed:
+ default:
+ assert.Fail(t, "Queue should be resumed")
+ }
+
+ select {
+ case result1 = <-handleChan:
+ case <-time.After(500 * time.Millisecond):
+ assert.Fail(t, "handler chan should contain test1")
+ }
+ assert.Equal(t, test1.TestString, result1.TestString)
+ assert.Equal(t, test1.TestInt, result1.TestInt)
+
+ lock.Lock()
+ callbacks := make([]func(), len(queueShutdown))
+ copy(callbacks, queueShutdown)
+ lock.Unlock()
+ // Now shutdown the queue
+ for _, callback := range callbacks {
+ callback()
+ }
+
+ // Wait til it is closed
+ <-queue.(*PersistableChannelQueue).closed
+
+ err = queue.Push(&test1)
+ assert.NoError(t, err)
+ err = queue.Push(&test2)
+ assert.NoError(t, err)
+ select {
+ case <-handleChan:
+ assert.Fail(t, "Handler processing should have stopped")
+ default:
+ }
+
+ // terminate the queue
+ lock.Lock()
+ callbacks = make([]func(), len(queueTerminate))
+ copy(callbacks, queueTerminate)
+ lock.Unlock()
+ for _, callback := range callbacks {
+ callback()
+ }
+
+ select {
+ case <-handleChan:
+ assert.Fail(t, "Handler processing should have stopped")
+ default:
+ }
+
+ lock.Lock()
+ pushBack = true
+ lock.Unlock()
+
+ // Reopen queue
+ queue, err = NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{
+ DataDir: tmpDir,
+ BatchLength: 1,
+ QueueLength: 20,
+ Workers: 1,
+ BoostWorkers: 0,
+ MaxWorkers: 10,
+ Name: "second",
+ }, &testData{})
+ assert.NoError(t, err)
+ pausable, ok = queue.(Pausable)
+ if !assert.True(t, ok) {
+ return
+ }
+
+ paused, _ = pausable.IsPausedIsResumed()
+
+ go queue.Run(func(shutdown func()) {
+ lock.Lock()
+ defer lock.Unlock()
+ queueShutdown = append(queueShutdown, shutdown)
+ }, func(terminate func()) {
+ lock.Lock()
+ defer lock.Unlock()
+ queueTerminate = append(queueTerminate, terminate)
+ })
+
+ select {
+ case <-handleChan:
+ assert.Fail(t, "Handler processing should have stopped")
+ case <-paused:
+ }
+
+ paused, resumed = pausable.IsPausedIsResumed()
+
+ select {
+ case <-paused:
+ case <-resumed:
+ assert.Fail(t, "Queue should not be resumed")
+ return
+ default:
+ assert.Fail(t, "Queue is not paused")
+ return
+ }
+
+ select {
+ case <-handleChan:
+ assert.Fail(t, "Handler processing should have stopped")
+ default:
+ }
+
+ pausable.Resume()
+
+ result3 := <-handleChan
+ result4 := <-handleChan
+ if result4.TestString == test1.TestString {
+ result3, result4 = result4, result3
+ }
+ assert.Equal(t, test1.TestString, result3.TestString)
+ assert.Equal(t, test1.TestInt, result3.TestInt)
+
+ assert.Equal(t, test2.TestString, result4.TestString)
+ assert.Equal(t, test2.TestInt, result4.TestInt)
+ lock.Lock()
+ callbacks = make([]func(), len(queueShutdown))
+ copy(callbacks, queueShutdown)
+ lock.Unlock()
+ for _, callback := range callbacks {
+ callback()
+ }
+ lock.Lock()
+ callbacks = make([]func(), len(queueTerminate))
+ copy(callbacks, queueTerminate)
+ lock.Unlock()
+ for _, callback := range callbacks {
+ callback()
+ }
+}