diff options
author | zeripath <art27@cantab.net> | 2022-01-22 21:22:14 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-22 21:22:14 +0000 |
commit | a82fd98d5368a75cbcf6b74c12f58f3f81e66662 (patch) | |
tree | cb64c9348ee3d3194c786bb970770c06a8bd4fb1 /modules/queue/queue_disk_channel_test.go | |
parent | 27ee01e1e866f2f13603af65224ddae77d5149d7 (diff) | |
download | gitea-a82fd98d5368a75cbcf6b74c12f58f3f81e66662.tar.gz gitea-a82fd98d5368a75cbcf6b74c12f58f3f81e66662.zip |
Pause queues (#15928)
* Start adding mechanism to return unhandled data
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Create pushback interface
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Add Pausable interface to WorkerPool and Manager
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Implement Pausable and PushBack for the bytefifos
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Implement Pausable and Pushback for ChannelQueues and ChannelUniqueQueues
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Wire in UI for pausing
Signed-off-by: Andrew Thornton <art27@cantab.net>
* add testcases and fix a few issues
Signed-off-by: Andrew Thornton <art27@cantab.net>
* fix build
Signed-off-by: Andrew Thornton <art27@cantab.net>
* prevent "race" in the test
Signed-off-by: Andrew Thornton <art27@cantab.net>
* fix jsoniter mismerge
Signed-off-by: Andrew Thornton <art27@cantab.net>
* fix conflicts
Signed-off-by: Andrew Thornton <art27@cantab.net>
* fix format
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Add warnings for no worker configurations and prevent data-loss with redis/levelqueue
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Use StopTimer
Signed-off-by: Andrew Thornton <art27@cantab.net>
Co-authored-by: Lauris BH <lauris@nix.lv>
Co-authored-by: 6543 <6543@obermui.de>
Co-authored-by: techknowlogick <techknowlogick@gitea.io>
Co-authored-by: wxiaoguang <wxiaoguang@gmail.com>
Diffstat (limited to 'modules/queue/queue_disk_channel_test.go')
-rw-r--r-- | modules/queue/queue_disk_channel_test.go | 292 |
1 files changed, 291 insertions, 1 deletions
diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go index db12d9575c..9bbd146efe 100644 --- a/modules/queue/queue_disk_channel_test.go +++ b/modules/queue/queue_disk_channel_test.go @@ -8,7 +8,9 @@ import ( "os" "sync" "testing" + "time" + "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/util" "github.com/stretchr/testify/assert" @@ -16,7 +18,7 @@ import ( func TestPersistableChannelQueue(t *testing.T) { handleChan := make(chan *testData) - handle := func(data ...Data) { + handle := func(data ...Data) []Data { for _, datum := range data { if datum == nil { continue @@ -24,6 +26,7 @@ func TestPersistableChannelQueue(t *testing.T) { testDatum := datum.(*testData) handleChan <- testDatum } + return nil } lock := sync.Mutex{} @@ -189,3 +192,290 @@ func TestPersistableChannelQueue(t *testing.T) { callback() } } + +func TestPersistableChannelQueue_Pause(t *testing.T) { + lock := sync.Mutex{} + var queue Queue + var err error + pushBack := false + + handleChan := make(chan *testData) + handle := func(data ...Data) []Data { + lock.Lock() + if pushBack { + if pausable, ok := queue.(Pausable); ok { + log.Info("pausing") + pausable.Pause() + } + pushBack = false + lock.Unlock() + return data + } + lock.Unlock() + + for _, datum := range data { + testDatum := datum.(*testData) + handleChan <- testDatum + } + return nil + } + + queueShutdown := []func(){} + queueTerminate := []func(){} + + tmpDir, err := os.MkdirTemp("", "persistable-channel-queue-pause-test-data") + assert.NoError(t, err) + defer util.RemoveAll(tmpDir) + + queue, err = NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{ + DataDir: tmpDir, + BatchLength: 2, + QueueLength: 20, + Workers: 1, + BoostWorkers: 0, + MaxWorkers: 10, + Name: "first", + }, &testData{}) + assert.NoError(t, err) + + go queue.Run(func(shutdown func()) { + lock.Lock() + defer lock.Unlock() + queueShutdown = append(queueShutdown, shutdown) + }, func(terminate func()) { + lock.Lock() + defer lock.Unlock() + queueTerminate = append(queueTerminate, terminate) + }) + + test1 := testData{"A", 1} + test2 := testData{"B", 2} + + err = queue.Push(&test1) + assert.NoError(t, err) + + pausable, ok := queue.(Pausable) + if !assert.True(t, ok) { + return + } + result1 := <-handleChan + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) + + pausable.Pause() + paused, resumed := pausable.IsPausedIsResumed() + + select { + case <-paused: + case <-resumed: + assert.Fail(t, "Queue should not be resumed") + return + default: + assert.Fail(t, "Queue is not paused") + return + } + + queue.Push(&test2) + + var result2 *testData + select { + case result2 = <-handleChan: + assert.Fail(t, "handler chan should be empty") + case <-time.After(100 * time.Millisecond): + } + + assert.Nil(t, result2) + + pausable.Resume() + + select { + case <-resumed: + default: + assert.Fail(t, "Queue should be resumed") + } + + select { + case result2 = <-handleChan: + case <-time.After(500 * time.Millisecond): + assert.Fail(t, "handler chan should contain test2") + } + + assert.Equal(t, test2.TestString, result2.TestString) + assert.Equal(t, test2.TestInt, result2.TestInt) + + lock.Lock() + pushBack = true + lock.Unlock() + + paused, resumed = pausable.IsPausedIsResumed() + + select { + case <-paused: + assert.Fail(t, "Queue should not be paused") + return + case <-resumed: + default: + assert.Fail(t, "Queue is not resumed") + return + } + + queue.Push(&test1) + + select { + case <-paused: + case <-handleChan: + assert.Fail(t, "handler chan should not contain test1") + return + case <-time.After(500 * time.Millisecond): + assert.Fail(t, "queue should be paused") + return + } + + paused, resumed = pausable.IsPausedIsResumed() + + select { + case <-paused: + case <-resumed: + assert.Fail(t, "Queue should not be resumed") + return + default: + assert.Fail(t, "Queue is not paused") + return + } + + pausable.Resume() + + select { + case <-resumed: + default: + assert.Fail(t, "Queue should be resumed") + } + + select { + case result1 = <-handleChan: + case <-time.After(500 * time.Millisecond): + assert.Fail(t, "handler chan should contain test1") + } + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) + + lock.Lock() + callbacks := make([]func(), len(queueShutdown)) + copy(callbacks, queueShutdown) + lock.Unlock() + // Now shutdown the queue + for _, callback := range callbacks { + callback() + } + + // Wait til it is closed + <-queue.(*PersistableChannelQueue).closed + + err = queue.Push(&test1) + assert.NoError(t, err) + err = queue.Push(&test2) + assert.NoError(t, err) + select { + case <-handleChan: + assert.Fail(t, "Handler processing should have stopped") + default: + } + + // terminate the queue + lock.Lock() + callbacks = make([]func(), len(queueTerminate)) + copy(callbacks, queueTerminate) + lock.Unlock() + for _, callback := range callbacks { + callback() + } + + select { + case <-handleChan: + assert.Fail(t, "Handler processing should have stopped") + default: + } + + lock.Lock() + pushBack = true + lock.Unlock() + + // Reopen queue + queue, err = NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{ + DataDir: tmpDir, + BatchLength: 1, + QueueLength: 20, + Workers: 1, + BoostWorkers: 0, + MaxWorkers: 10, + Name: "second", + }, &testData{}) + assert.NoError(t, err) + pausable, ok = queue.(Pausable) + if !assert.True(t, ok) { + return + } + + paused, _ = pausable.IsPausedIsResumed() + + go queue.Run(func(shutdown func()) { + lock.Lock() + defer lock.Unlock() + queueShutdown = append(queueShutdown, shutdown) + }, func(terminate func()) { + lock.Lock() + defer lock.Unlock() + queueTerminate = append(queueTerminate, terminate) + }) + + select { + case <-handleChan: + assert.Fail(t, "Handler processing should have stopped") + case <-paused: + } + + paused, resumed = pausable.IsPausedIsResumed() + + select { + case <-paused: + case <-resumed: + assert.Fail(t, "Queue should not be resumed") + return + default: + assert.Fail(t, "Queue is not paused") + return + } + + select { + case <-handleChan: + assert.Fail(t, "Handler processing should have stopped") + default: + } + + pausable.Resume() + + result3 := <-handleChan + result4 := <-handleChan + if result4.TestString == test1.TestString { + result3, result4 = result4, result3 + } + assert.Equal(t, test1.TestString, result3.TestString) + assert.Equal(t, test1.TestInt, result3.TestInt) + + assert.Equal(t, test2.TestString, result4.TestString) + assert.Equal(t, test2.TestInt, result4.TestInt) + lock.Lock() + callbacks = make([]func(), len(queueShutdown)) + copy(callbacks, queueShutdown) + lock.Unlock() + for _, callback := range callbacks { + callback() + } + lock.Lock() + callbacks = make([]func(), len(queueTerminate)) + copy(callbacks, queueTerminate) + lock.Unlock() + for _, callback := range callbacks { + callback() + } +} |