diff options
author | zeripath <art27@cantab.net> | 2022-01-22 21:22:14 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-22 21:22:14 +0000 |
commit | a82fd98d5368a75cbcf6b74c12f58f3f81e66662 (patch) | |
tree | cb64c9348ee3d3194c786bb970770c06a8bd4fb1 /modules/queue/queue_disk_channel.go | |
parent | 27ee01e1e866f2f13603af65224ddae77d5149d7 (diff) | |
download | gitea-a82fd98d5368a75cbcf6b74c12f58f3f81e66662.tar.gz gitea-a82fd98d5368a75cbcf6b74c12f58f3f81e66662.zip |
Pause queues (#15928)
* Start adding mechanism to return unhandled data
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Create pushback interface
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Add Pausable interface to WorkerPool and Manager
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Implement Pausable and PushBack for the bytefifos
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Implement Pausable and Pushback for ChannelQueues and ChannelUniqueQueues
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Wire in UI for pausing
Signed-off-by: Andrew Thornton <art27@cantab.net>
* add testcases and fix a few issues
Signed-off-by: Andrew Thornton <art27@cantab.net>
* fix build
Signed-off-by: Andrew Thornton <art27@cantab.net>
* prevent "race" in the test
Signed-off-by: Andrew Thornton <art27@cantab.net>
* fix jsoniter mismerge
Signed-off-by: Andrew Thornton <art27@cantab.net>
* fix conflicts
Signed-off-by: Andrew Thornton <art27@cantab.net>
* fix format
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Add warnings for no worker configurations and prevent data-loss with redis/levelqueue
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Use StopTimer
Signed-off-by: Andrew Thornton <art27@cantab.net>
Co-authored-by: Lauris BH <lauris@nix.lv>
Co-authored-by: 6543 <6543@obermui.de>
Co-authored-by: techknowlogick <techknowlogick@gitea.io>
Co-authored-by: wxiaoguang <wxiaoguang@gmail.com>
Diffstat (limited to 'modules/queue/queue_disk_channel.go')
-rw-r--r-- | modules/queue/queue_disk_channel.go | 100 |
1 files changed, 81 insertions, 19 deletions
diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index f3cd132d7d..3b21575a0e 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -51,7 +51,20 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( } config := configInterface.(PersistableChannelQueueConfiguration) - channelQueue, err := NewChannelQueue(handle, ChannelQueueConfiguration{ + queue := &PersistableChannelQueue{ + closed: make(chan struct{}), + } + + wrappedHandle := func(data ...Data) (failed []Data) { + for _, unhandled := range handle(data...) { + if fail := queue.PushBack(unhandled); fail != nil { + failed = append(failed, fail) + } + } + return + } + + channelQueue, err := NewChannelQueue(wrappedHandle, ChannelQueueConfiguration{ WorkerPoolConfiguration: WorkerPoolConfiguration{ QueueLength: config.QueueLength, BatchLength: config.BatchLength, @@ -84,15 +97,12 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( DataDir: config.DataDir, } - levelQueue, err := NewLevelQueue(handle, levelCfg, exemplar) + levelQueue, err := NewLevelQueue(wrappedHandle, levelCfg, exemplar) if err == nil { - queue := &PersistableChannelQueue{ - channelQueue: channelQueue.(*ChannelQueue), - delayedStarter: delayedStarter{ - internal: levelQueue.(*LevelQueue), - name: config.Name, - }, - closed: make(chan struct{}), + queue.channelQueue = channelQueue.(*ChannelQueue) + queue.delayedStarter = delayedStarter{ + internal: levelQueue.(*LevelQueue), + name: config.Name, } _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar) return queue, nil @@ -102,16 +112,13 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( return nil, ErrInvalidConfiguration{cfg: cfg} } - queue := &PersistableChannelQueue{ - channelQueue: channelQueue.(*ChannelQueue), - delayedStarter: delayedStarter{ - cfg: levelCfg, - underlying: LevelQueueType, - timeout: config.Timeout, - maxAttempts: config.MaxAttempts, - name: config.Name, - }, - closed: make(chan struct{}), + queue.channelQueue = channelQueue.(*ChannelQueue) + queue.delayedStarter = delayedStarter{ + cfg: levelCfg, + underlying: LevelQueueType, + timeout: config.Timeout, + maxAttempts: config.MaxAttempts, + name: config.Name, } _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar) return queue, nil @@ -132,6 +139,19 @@ func (q *PersistableChannelQueue) Push(data Data) error { } } +// PushBack will push the indexer data to queue +func (q *PersistableChannelQueue) PushBack(data Data) error { + select { + case <-q.closed: + if pbr, ok := q.internal.(PushBackable); ok { + return pbr.PushBack(data) + } + return q.internal.Push(data) + default: + return q.channelQueue.Push(data) + } +} + // Run starts to run the queue func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) { log.Debug("PersistableChannelQueue: %s Starting", q.delayedStarter.name) @@ -226,6 +246,48 @@ func (q *PersistableChannelQueue) IsEmpty() bool { return q.internal.IsEmpty() } +// IsPaused returns if the pool is paused +func (q *PersistableChannelQueue) IsPaused() bool { + return q.channelQueue.IsPaused() +} + +// IsPausedIsResumed returns if the pool is paused and a channel that is closed when it is resumed +func (q *PersistableChannelQueue) IsPausedIsResumed() (<-chan struct{}, <-chan struct{}) { + return q.channelQueue.IsPausedIsResumed() +} + +// Pause pauses the WorkerPool +func (q *PersistableChannelQueue) Pause() { + q.channelQueue.Pause() + q.lock.Lock() + defer q.lock.Unlock() + if q.internal == nil { + return + } + + pausable, ok := q.internal.(Pausable) + if !ok { + return + } + pausable.Pause() +} + +// Resume resumes the WorkerPool +func (q *PersistableChannelQueue) Resume() { + q.channelQueue.Resume() + q.lock.Lock() + defer q.lock.Unlock() + if q.internal == nil { + return + } + + pausable, ok := q.internal.(Pausable) + if !ok { + return + } + pausable.Resume() +} + // Shutdown processing this queue func (q *PersistableChannelQueue) Shutdown() { log.Trace("PersistableChannelQueue: %s Shutting down", q.delayedStarter.name) |