diff options
author | zeripath <art27@cantab.net> | 2022-01-22 21:22:14 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-22 21:22:14 +0000 |
commit | a82fd98d5368a75cbcf6b74c12f58f3f81e66662 (patch) | |
tree | cb64c9348ee3d3194c786bb970770c06a8bd4fb1 /modules/queue/queue_channel_test.go | |
parent | 27ee01e1e866f2f13603af65224ddae77d5149d7 (diff) | |
download | gitea-a82fd98d5368a75cbcf6b74c12f58f3f81e66662.tar.gz gitea-a82fd98d5368a75cbcf6b74c12f58f3f81e66662.zip |
Pause queues (#15928)
* Start adding mechanism to return unhandled data
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Create pushback interface
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Add Pausable interface to WorkerPool and Manager
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Implement Pausable and PushBack for the bytefifos
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Implement Pausable and Pushback for ChannelQueues and ChannelUniqueQueues
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Wire in UI for pausing
Signed-off-by: Andrew Thornton <art27@cantab.net>
* add testcases and fix a few issues
Signed-off-by: Andrew Thornton <art27@cantab.net>
* fix build
Signed-off-by: Andrew Thornton <art27@cantab.net>
* prevent "race" in the test
Signed-off-by: Andrew Thornton <art27@cantab.net>
* fix jsoniter mismerge
Signed-off-by: Andrew Thornton <art27@cantab.net>
* fix conflicts
Signed-off-by: Andrew Thornton <art27@cantab.net>
* fix format
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Add warnings for no worker configurations and prevent data-loss with redis/levelqueue
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Use StopTimer
Signed-off-by: Andrew Thornton <art27@cantab.net>
Co-authored-by: Lauris BH <lauris@nix.lv>
Co-authored-by: 6543 <6543@obermui.de>
Co-authored-by: techknowlogick <techknowlogick@gitea.io>
Co-authored-by: wxiaoguang <wxiaoguang@gmail.com>
Diffstat (limited to 'modules/queue/queue_channel_test.go')
-rw-r--r-- | modules/queue/queue_channel_test.go | 160 |
1 files changed, 158 insertions, 2 deletions
diff --git a/modules/queue/queue_channel_test.go b/modules/queue/queue_channel_test.go index f1ddd7ec92..b700b28a14 100644 --- a/modules/queue/queue_channel_test.go +++ b/modules/queue/queue_channel_test.go @@ -5,6 +5,7 @@ package queue import ( + "sync" "testing" "time" @@ -13,11 +14,12 @@ import ( func TestChannelQueue(t *testing.T) { handleChan := make(chan *testData) - handle := func(data ...Data) { + handle := func(data ...Data) []Data { for _, datum := range data { testDatum := datum.(*testData) handleChan <- testDatum } + return nil } nilFn := func(_ func()) {} @@ -52,12 +54,13 @@ func TestChannelQueue(t *testing.T) { func TestChannelQueue_Batch(t *testing.T) { handleChan := make(chan *testData) - handle := func(data ...Data) { + handle := func(data ...Data) []Data { assert.True(t, len(data) == 2) for _, datum := range data { testDatum := datum.(*testData) handleChan <- testDatum } + return nil } nilFn := func(_ func()) {} @@ -95,3 +98,156 @@ func TestChannelQueue_Batch(t *testing.T) { err = queue.Push(test1) assert.Error(t, err) } + +func TestChannelQueue_Pause(t *testing.T) { + lock := sync.Mutex{} + var queue Queue + var err error + pushBack := false + handleChan := make(chan *testData) + handle := func(data ...Data) []Data { + lock.Lock() + if pushBack { + if pausable, ok := queue.(Pausable); ok { + pausable.Pause() + } + pushBack = false + lock.Unlock() + return data + } + lock.Unlock() + + for _, datum := range data { + testDatum := datum.(*testData) + handleChan <- testDatum + } + return nil + } + nilFn := func(_ func()) {} + + queue, err = NewChannelQueue(handle, + ChannelQueueConfiguration{ + WorkerPoolConfiguration: WorkerPoolConfiguration{ + QueueLength: 20, + BatchLength: 1, + BlockTimeout: 0, + BoostTimeout: 0, + BoostWorkers: 0, + MaxWorkers: 10, + }, + Workers: 1, + }, &testData{}) + assert.NoError(t, err) + + go queue.Run(nilFn, nilFn) + + test1 := testData{"A", 1} + test2 := testData{"B", 2} + queue.Push(&test1) + + pausable, ok := queue.(Pausable) + if !assert.True(t, ok) { + return + } + result1 := <-handleChan + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) + + pausable.Pause() + + paused, resumed := pausable.IsPausedIsResumed() + + select { + case <-paused: + case <-resumed: + assert.Fail(t, "Queue should not be resumed") + return + default: + assert.Fail(t, "Queue is not paused") + return + } + + queue.Push(&test2) + + var result2 *testData + select { + case result2 = <-handleChan: + assert.Fail(t, "handler chan should be empty") + case <-time.After(100 * time.Millisecond): + } + + assert.Nil(t, result2) + + pausable.Resume() + + select { + case <-resumed: + default: + assert.Fail(t, "Queue should be resumed") + } + + select { + case result2 = <-handleChan: + case <-time.After(500 * time.Millisecond): + assert.Fail(t, "handler chan should contain test2") + } + + assert.Equal(t, test2.TestString, result2.TestString) + assert.Equal(t, test2.TestInt, result2.TestInt) + + lock.Lock() + pushBack = true + lock.Unlock() + + paused, resumed = pausable.IsPausedIsResumed() + + select { + case <-paused: + assert.Fail(t, "Queue should not be paused") + return + case <-resumed: + default: + assert.Fail(t, "Queue is not resumed") + return + } + + queue.Push(&test1) + + select { + case <-paused: + case <-handleChan: + assert.Fail(t, "handler chan should not contain test1") + return + case <-time.After(500 * time.Millisecond): + assert.Fail(t, "queue should be paused") + return + } + + paused, resumed = pausable.IsPausedIsResumed() + + select { + case <-paused: + case <-resumed: + assert.Fail(t, "Queue should not be resumed") + return + default: + assert.Fail(t, "Queue is not paused") + return + } + + pausable.Resume() + + select { + case <-resumed: + default: + assert.Fail(t, "Queue should be resumed") + } + + select { + case result1 = <-handleChan: + case <-time.After(500 * time.Millisecond): + assert.Fail(t, "handler chan should contain test1") + } + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) +} |