aboutsummaryrefslogtreecommitdiffstats
path: root/modules/queue/queue_channel_test.go
diff options
context:
space:
mode:
authorzeripath <art27@cantab.net>2022-01-22 21:22:14 +0000
committerGitHub <noreply@github.com>2022-01-22 21:22:14 +0000
commita82fd98d5368a75cbcf6b74c12f58f3f81e66662 (patch)
treecb64c9348ee3d3194c786bb970770c06a8bd4fb1 /modules/queue/queue_channel_test.go
parent27ee01e1e866f2f13603af65224ddae77d5149d7 (diff)
downloadgitea-a82fd98d5368a75cbcf6b74c12f58f3f81e66662.tar.gz
gitea-a82fd98d5368a75cbcf6b74c12f58f3f81e66662.zip
Pause queues (#15928)
* Start adding mechanism to return unhandled data Signed-off-by: Andrew Thornton <art27@cantab.net> * Create pushback interface Signed-off-by: Andrew Thornton <art27@cantab.net> * Add Pausable interface to WorkerPool and Manager Signed-off-by: Andrew Thornton <art27@cantab.net> * Implement Pausable and PushBack for the bytefifos Signed-off-by: Andrew Thornton <art27@cantab.net> * Implement Pausable and Pushback for ChannelQueues and ChannelUniqueQueues Signed-off-by: Andrew Thornton <art27@cantab.net> * Wire in UI for pausing Signed-off-by: Andrew Thornton <art27@cantab.net> * add testcases and fix a few issues Signed-off-by: Andrew Thornton <art27@cantab.net> * fix build Signed-off-by: Andrew Thornton <art27@cantab.net> * prevent "race" in the test Signed-off-by: Andrew Thornton <art27@cantab.net> * fix jsoniter mismerge Signed-off-by: Andrew Thornton <art27@cantab.net> * fix conflicts Signed-off-by: Andrew Thornton <art27@cantab.net> * fix format Signed-off-by: Andrew Thornton <art27@cantab.net> * Add warnings for no worker configurations and prevent data-loss with redis/levelqueue Signed-off-by: Andrew Thornton <art27@cantab.net> * Use StopTimer Signed-off-by: Andrew Thornton <art27@cantab.net> Co-authored-by: Lauris BH <lauris@nix.lv> Co-authored-by: 6543 <6543@obermui.de> Co-authored-by: techknowlogick <techknowlogick@gitea.io> Co-authored-by: wxiaoguang <wxiaoguang@gmail.com>
Diffstat (limited to 'modules/queue/queue_channel_test.go')
-rw-r--r--modules/queue/queue_channel_test.go160
1 files changed, 158 insertions, 2 deletions
diff --git a/modules/queue/queue_channel_test.go b/modules/queue/queue_channel_test.go
index f1ddd7ec92..b700b28a14 100644
--- a/modules/queue/queue_channel_test.go
+++ b/modules/queue/queue_channel_test.go
@@ -5,6 +5,7 @@
package queue
import (
+ "sync"
"testing"
"time"
@@ -13,11 +14,12 @@ import (
func TestChannelQueue(t *testing.T) {
handleChan := make(chan *testData)
- handle := func(data ...Data) {
+ handle := func(data ...Data) []Data {
for _, datum := range data {
testDatum := datum.(*testData)
handleChan <- testDatum
}
+ return nil
}
nilFn := func(_ func()) {}
@@ -52,12 +54,13 @@ func TestChannelQueue(t *testing.T) {
func TestChannelQueue_Batch(t *testing.T) {
handleChan := make(chan *testData)
- handle := func(data ...Data) {
+ handle := func(data ...Data) []Data {
assert.True(t, len(data) == 2)
for _, datum := range data {
testDatum := datum.(*testData)
handleChan <- testDatum
}
+ return nil
}
nilFn := func(_ func()) {}
@@ -95,3 +98,156 @@ func TestChannelQueue_Batch(t *testing.T) {
err = queue.Push(test1)
assert.Error(t, err)
}
+
+func TestChannelQueue_Pause(t *testing.T) {
+ lock := sync.Mutex{}
+ var queue Queue
+ var err error
+ pushBack := false
+ handleChan := make(chan *testData)
+ handle := func(data ...Data) []Data {
+ lock.Lock()
+ if pushBack {
+ if pausable, ok := queue.(Pausable); ok {
+ pausable.Pause()
+ }
+ pushBack = false
+ lock.Unlock()
+ return data
+ }
+ lock.Unlock()
+
+ for _, datum := range data {
+ testDatum := datum.(*testData)
+ handleChan <- testDatum
+ }
+ return nil
+ }
+ nilFn := func(_ func()) {}
+
+ queue, err = NewChannelQueue(handle,
+ ChannelQueueConfiguration{
+ WorkerPoolConfiguration: WorkerPoolConfiguration{
+ QueueLength: 20,
+ BatchLength: 1,
+ BlockTimeout: 0,
+ BoostTimeout: 0,
+ BoostWorkers: 0,
+ MaxWorkers: 10,
+ },
+ Workers: 1,
+ }, &testData{})
+ assert.NoError(t, err)
+
+ go queue.Run(nilFn, nilFn)
+
+ test1 := testData{"A", 1}
+ test2 := testData{"B", 2}
+ queue.Push(&test1)
+
+ pausable, ok := queue.(Pausable)
+ if !assert.True(t, ok) {
+ return
+ }
+ result1 := <-handleChan
+ assert.Equal(t, test1.TestString, result1.TestString)
+ assert.Equal(t, test1.TestInt, result1.TestInt)
+
+ pausable.Pause()
+
+ paused, resumed := pausable.IsPausedIsResumed()
+
+ select {
+ case <-paused:
+ case <-resumed:
+ assert.Fail(t, "Queue should not be resumed")
+ return
+ default:
+ assert.Fail(t, "Queue is not paused")
+ return
+ }
+
+ queue.Push(&test2)
+
+ var result2 *testData
+ select {
+ case result2 = <-handleChan:
+ assert.Fail(t, "handler chan should be empty")
+ case <-time.After(100 * time.Millisecond):
+ }
+
+ assert.Nil(t, result2)
+
+ pausable.Resume()
+
+ select {
+ case <-resumed:
+ default:
+ assert.Fail(t, "Queue should be resumed")
+ }
+
+ select {
+ case result2 = <-handleChan:
+ case <-time.After(500 * time.Millisecond):
+ assert.Fail(t, "handler chan should contain test2")
+ }
+
+ assert.Equal(t, test2.TestString, result2.TestString)
+ assert.Equal(t, test2.TestInt, result2.TestInt)
+
+ lock.Lock()
+ pushBack = true
+ lock.Unlock()
+
+ paused, resumed = pausable.IsPausedIsResumed()
+
+ select {
+ case <-paused:
+ assert.Fail(t, "Queue should not be paused")
+ return
+ case <-resumed:
+ default:
+ assert.Fail(t, "Queue is not resumed")
+ return
+ }
+
+ queue.Push(&test1)
+
+ select {
+ case <-paused:
+ case <-handleChan:
+ assert.Fail(t, "handler chan should not contain test1")
+ return
+ case <-time.After(500 * time.Millisecond):
+ assert.Fail(t, "queue should be paused")
+ return
+ }
+
+ paused, resumed = pausable.IsPausedIsResumed()
+
+ select {
+ case <-paused:
+ case <-resumed:
+ assert.Fail(t, "Queue should not be resumed")
+ return
+ default:
+ assert.Fail(t, "Queue is not paused")
+ return
+ }
+
+ pausable.Resume()
+
+ select {
+ case <-resumed:
+ default:
+ assert.Fail(t, "Queue should be resumed")
+ }
+
+ select {
+ case result1 = <-handleChan:
+ case <-time.After(500 * time.Millisecond):
+ assert.Fail(t, "handler chan should contain test1")
+ }
+ assert.Equal(t, test1.TestString, result1.TestString)
+ assert.Equal(t, test1.TestInt, result1.TestInt)
+}