diff options
author | zeripath <art27@cantab.net> | 2022-01-22 21:22:14 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-22 21:22:14 +0000 |
commit | a82fd98d5368a75cbcf6b74c12f58f3f81e66662 (patch) | |
tree | cb64c9348ee3d3194c786bb970770c06a8bd4fb1 /modules | |
parent | 27ee01e1e866f2f13603af65224ddae77d5149d7 (diff) | |
download | gitea-a82fd98d5368a75cbcf6b74c12f58f3f81e66662.tar.gz gitea-a82fd98d5368a75cbcf6b74c12f58f3f81e66662.zip |
Pause queues (#15928)
* Start adding mechanism to return unhandled data
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Create pushback interface
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Add Pausable interface to WorkerPool and Manager
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Implement Pausable and PushBack for the bytefifos
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Implement Pausable and Pushback for ChannelQueues and ChannelUniqueQueues
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Wire in UI for pausing
Signed-off-by: Andrew Thornton <art27@cantab.net>
* add testcases and fix a few issues
Signed-off-by: Andrew Thornton <art27@cantab.net>
* fix build
Signed-off-by: Andrew Thornton <art27@cantab.net>
* prevent "race" in the test
Signed-off-by: Andrew Thornton <art27@cantab.net>
* fix jsoniter mismerge
Signed-off-by: Andrew Thornton <art27@cantab.net>
* fix conflicts
Signed-off-by: Andrew Thornton <art27@cantab.net>
* fix format
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Add warnings for no worker configurations and prevent data-loss with redis/levelqueue
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Use StopTimer
Signed-off-by: Andrew Thornton <art27@cantab.net>
Co-authored-by: Lauris BH <lauris@nix.lv>
Co-authored-by: 6543 <6543@obermui.de>
Co-authored-by: techknowlogick <techknowlogick@gitea.io>
Co-authored-by: wxiaoguang <wxiaoguang@gmail.com>
Diffstat (limited to 'modules')
23 files changed, 1314 insertions, 115 deletions
diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go index 4c7a1d4f17..9ae3abff60 100644 --- a/modules/indexer/code/indexer.go +++ b/modules/indexer/code/indexer.go @@ -133,11 +133,11 @@ func Init() { // Create the Queue switch setting.Indexer.RepoType { case "bleve", "elasticsearch": - handler := func(data ...queue.Data) { + handler := func(data ...queue.Data) []queue.Data { idx, err := indexer.get() if idx == nil || err != nil { log.Error("Codes indexer handler: unable to get indexer!") - return + return data } for _, datum := range data { @@ -153,6 +153,7 @@ func Init() { continue } } + return nil } indexerQueue = queue.CreateUniqueQueue("code_indexer", handler, &IndexerData{}) diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index 8530210628..729981ec71 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -103,11 +103,11 @@ func InitIssueIndexer(syncReindex bool) { // Create the Queue switch setting.Indexer.IssueType { case "bleve", "elasticsearch": - handler := func(data ...queue.Data) { + handler := func(data ...queue.Data) []queue.Data { indexer := holder.get() if indexer == nil { log.Error("Issue indexer handler: unable to get indexer!") - return + return data } iData := make([]*IndexerData, 0, len(data)) @@ -127,6 +127,7 @@ func InitIssueIndexer(syncReindex bool) { if err := indexer.Index(iData); err != nil { log.Error("Error whilst indexing: %v Error: %v", iData, err) } + return nil } issueIndexerQueue = queue.CreateQueue("issue_indexer", handler, &IndexerData{}) diff --git a/modules/indexer/stats/queue.go b/modules/indexer/stats/queue.go index b458444697..f983fcd11d 100644 --- a/modules/indexer/stats/queue.go +++ b/modules/indexer/stats/queue.go @@ -17,13 +17,14 @@ import ( var statsQueue queue.UniqueQueue // handle passed PR IDs and test the PRs -func handle(data ...queue.Data) { +func handle(data ...queue.Data) []queue.Data { for _, datum := range data { opts := datum.(int64) if err := indexer.Index(opts); err != nil { log.Error("stats queue indexer.Index(%d) failed: %v", opts, err) } } + return nil } func initStatsQueue() error { diff --git a/modules/notification/ui/ui.go b/modules/notification/ui/ui.go index ecedd70193..a27c5f699c 100644 --- a/modules/notification/ui/ui.go +++ b/modules/notification/ui/ui.go @@ -38,13 +38,14 @@ func NewNotifier() base.Notifier { return ns } -func (ns *notificationService) handle(data ...queue.Data) { +func (ns *notificationService) handle(data ...queue.Data) []queue.Data { for _, datum := range data { opts := datum.(issueNotificationOpts) if err := models.CreateOrUpdateIssueNotifications(opts.IssueID, opts.CommentID, opts.NotificationAuthorID, opts.ReceiverID); err != nil { log.Error("Was unable to create issue notification: %v", err) } } + return nil } func (ns *notificationService) Run() { diff --git a/modules/queue/bytefifo.go b/modules/queue/bytefifo.go index 3a10c8e125..bb98d468fb 100644 --- a/modules/queue/bytefifo.go +++ b/modules/queue/bytefifo.go @@ -16,6 +16,8 @@ type ByteFIFO interface { Pop(ctx context.Context) ([]byte, error) // Close this fifo Close() error + // PushBack pushes data back to the top of the fifo + PushBack(ctx context.Context, data []byte) error } // UniqueByteFIFO defines a FIFO that Uniques its contents @@ -50,6 +52,11 @@ func (*DummyByteFIFO) Len(ctx context.Context) int64 { return 0 } +// PushBack pushes data back to the top of the fifo +func (*DummyByteFIFO) PushBack(ctx context.Context, data []byte) error { + return nil +} + var _ UniqueByteFIFO = &DummyUniqueByteFIFO{} // DummyUniqueByteFIFO represents a dummy unique fifo diff --git a/modules/queue/manager.go b/modules/queue/manager.go index e0384d15a3..56298a3e00 100644 --- a/modules/queue/manager.go +++ b/modules/queue/manager.go @@ -54,6 +54,18 @@ type Flushable interface { IsEmpty() bool } +// Pausable represents a pool or queue that is Pausable +type Pausable interface { + // IsPaused will return if the pool or queue is paused + IsPaused() bool + // Pause will pause the pool or queue + Pause() + // Resume will resume the pool or queue + Resume() + // IsPausedIsResumed will return a bool indicating if the pool or queue is paused and a channel that will be closed when it is resumed + IsPausedIsResumed() (paused, resumed <-chan struct{}) +} + // ManagedPool is a simple interface to get certain details from a worker pool type ManagedPool interface { // AddWorkers adds a number of worker as group to the pool with the provided timeout. A CancelFunc is provided to cancel the group @@ -192,6 +204,14 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error wg.Done() continue } + if pausable, ok := mq.Managed.(Pausable); ok { + // no point flushing paused queues + if pausable.IsPaused() { + wg.Done() + continue + } + } + allEmpty = false if flushable, ok := mq.Managed.(Flushable); ok { log.Debug("Flushing (flushable) queue: %s", mq.Name) @@ -215,7 +235,7 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error log.Debug("All queues are empty") break } - // Ensure there are always at least 100ms between loops but not more if we've actually been doing some flushign + // Ensure there are always at least 100ms between loops but not more if we've actually been doing some flushing // but don't delay cancellation here. select { case <-ctx.Done(): @@ -298,6 +318,12 @@ func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.Can return nil } +// Flushable returns true if the queue is flushable +func (q *ManagedQueue) Flushable() bool { + _, ok := q.Managed.(Flushable) + return ok +} + // Flush flushes the queue with a timeout func (q *ManagedQueue) Flush(timeout time.Duration) error { if flushable, ok := q.Managed.(Flushable); ok { @@ -315,6 +341,34 @@ func (q *ManagedQueue) IsEmpty() bool { return true } +// Pausable returns whether the queue is Pausable +func (q *ManagedQueue) Pausable() bool { + _, ok := q.Managed.(Pausable) + return ok +} + +// Pause pauses the queue +func (q *ManagedQueue) Pause() { + if pausable, ok := q.Managed.(Pausable); ok { + pausable.Pause() + } +} + +// IsPaused reveals if the queue is paused +func (q *ManagedQueue) IsPaused() bool { + if pausable, ok := q.Managed.(Pausable); ok { + return pausable.IsPaused() + } + return false +} + +// Resume resumes the queue +func (q *ManagedQueue) Resume() { + if pausable, ok := q.Managed.(Pausable); ok { + pausable.Resume() + } +} + // NumberOfWorkers returns the number of workers in the queue func (q *ManagedQueue) NumberOfWorkers() int { if pool, ok := q.Managed.(ManagedPool); ok { diff --git a/modules/queue/queue.go b/modules/queue/queue.go index 80a9f1f2c7..3a51965143 100644 --- a/modules/queue/queue.go +++ b/modules/queue/queue.go @@ -36,7 +36,7 @@ type Type string type Data interface{} // HandlerFunc is a function that takes a variable amount of data and processes it -type HandlerFunc func(...Data) +type HandlerFunc func(...Data) (unhandled []Data) // NewQueueFunc is a function that creates a queue type NewQueueFunc func(handler HandlerFunc, config, exemplar interface{}) (Queue, error) @@ -61,6 +61,12 @@ type Queue interface { Push(Data) error } +// PushBackable queues can be pushed back to +type PushBackable interface { + // PushBack pushes data back to the top of the fifo + PushBack(Data) error +} + // DummyQueueType is the type for the dummy queue const DummyQueueType Type = "dummy" diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go index c4d5d20a89..0380497ea6 100644 --- a/modules/queue/queue_bytefifo.go +++ b/modules/queue/queue_bytefifo.go @@ -8,10 +8,12 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "code.gitea.io/gitea/modules/json" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/util" ) // ByteFIFOQueueConfiguration is the configuration for a ByteFIFOQueue @@ -52,8 +54,7 @@ func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exem terminateCtx, terminateCtxCancel := context.WithCancel(context.Background()) shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx) - return &ByteFIFOQueue{ - WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), + q := &ByteFIFOQueue{ byteFIFO: byteFIFO, typ: typ, shutdownCtx: shutdownCtx, @@ -65,7 +66,17 @@ func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exem name: config.Name, waitOnEmpty: config.WaitOnEmpty, pushed: make(chan struct{}, 1), - }, nil + } + q.WorkerPool = NewWorkerPool(func(data ...Data) (failed []Data) { + for _, unhandled := range handle(data...) { + if fail := q.PushBack(unhandled); fail != nil { + failed = append(failed, fail) + } + } + return + }, config.WorkerPoolConfiguration) + + return q, nil } // Name returns the name of this queue @@ -78,6 +89,24 @@ func (q *ByteFIFOQueue) Push(data Data) error { return q.PushFunc(data, nil) } +// PushBack pushes data to the fifo +func (q *ByteFIFOQueue) PushBack(data Data) error { + if !assignableTo(data, q.exemplar) { + return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name) + } + bs, err := json.Marshal(data) + if err != nil { + return err + } + defer func() { + select { + case q.pushed <- struct{}{}: + default: + } + }() + return q.byteFIFO.PushBack(q.terminateCtx, bs) +} + // PushFunc pushes data to the fifo func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error { if !assignableTo(data, q.exemplar) { @@ -87,14 +116,12 @@ func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error { if err != nil { return err } - if q.waitOnEmpty { - defer func() { - select { - case q.pushed <- struct{}{}: - default: - } - }() - } + defer func() { + select { + case q.pushed <- struct{}{}: + default: + } + }() return q.byteFIFO.PushFunc(q.terminateCtx, bs, fn) } @@ -108,6 +135,15 @@ func (q *ByteFIFOQueue) IsEmpty() bool { return q.byteFIFO.Len(q.terminateCtx) == 0 } +// Flush flushes the ByteFIFOQueue +func (q *ByteFIFOQueue) Flush(timeout time.Duration) error { + select { + case q.pushed <- struct{}{}: + default: + } + return q.WorkerPool.Flush(timeout) +} + // Run runs the bytefifo queue func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(func())) { atShutdown(q.Shutdown) @@ -142,31 +178,67 @@ func (q *ByteFIFOQueue) readToChan() { // Default backoff values backOffTime := time.Millisecond * 100 + backOffTimer := time.NewTimer(0) + util.StopTimer(backOffTimer) + + paused, _ := q.IsPausedIsResumed() loop: for { - err := q.doPop() - if err == errQueueEmpty { - log.Trace("%s: %s Waiting on Empty", q.typ, q.name) + select { + case <-paused: + log.Trace("Queue %s pausing", q.name) + _, resumed := q.IsPausedIsResumed() + select { - case <-q.pushed: - // reset backOffTime - backOffTime = 100 * time.Millisecond - continue loop + case <-resumed: + paused, _ = q.IsPausedIsResumed() + log.Trace("Queue %s resuming", q.name) + if q.HasNoWorkerScaling() { + log.Warn( + "Queue: %s is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n"+ + "The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required.", q.name) + q.Pause() + continue loop + } case <-q.shutdownCtx.Done(): - // Oops we've been shutdown whilst waiting - // Make sure the worker pool is shutdown too + // tell the pool to shutdown. q.baseCtxCancel() return + case data := <-q.dataChan: + if err := q.PushBack(data); err != nil { + log.Error("Unable to push back data into queue %s", q.name) + } + atomic.AddInt64(&q.numInQueue, -1) } + default: } - // Reset the backOffTime if there is no error or an unmarshalError - if err == nil || err == errUnmarshal { - backOffTime = 100 * time.Millisecond + // empty the pushed channel + select { + case <-q.pushed: + default: } + err := q.doPop() + + util.StopTimer(backOffTimer) + if err != nil { + if err == errQueueEmpty && q.waitOnEmpty { + log.Trace("%s: %s Waiting on Empty", q.typ, q.name) + + // reset the backoff time but don't set the timer + backOffTime = 100 * time.Millisecond + } else if err == errUnmarshal { + // reset the timer and backoff + backOffTime = 100 * time.Millisecond + backOffTimer.Reset(backOffTime) + } else { + // backoff + backOffTimer.Reset(backOffTime) + } + // Need to Backoff select { case <-q.shutdownCtx.Done(): @@ -174,8 +246,13 @@ loop: // Make sure the worker pool is shutdown too q.baseCtxCancel() return - case <-time.After(backOffTime): - // OK we've waited - so backoff a bit + case <-q.pushed: + // Data has been pushed to the fifo (or flush has been called) + // reset the backoff time + backOffTime = 100 * time.Millisecond + continue loop + case <-backOffTimer.C: + // Calculate the next backoff time backOffTime += backOffTime / 2 if backOffTime > maxBackOffTime { backOffTime = maxBackOffTime @@ -183,6 +260,10 @@ loop: continue loop } } + + // Reset the backoff time + backOffTime = 100 * time.Millisecond + select { case <-q.shutdownCtx.Done(): // Oops we've been shutdown @@ -289,9 +370,8 @@ func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFun terminateCtx, terminateCtxCancel := context.WithCancel(context.Background()) shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx) - return &ByteFIFOUniqueQueue{ + q := &ByteFIFOUniqueQueue{ ByteFIFOQueue: ByteFIFOQueue{ - WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), byteFIFO: byteFIFO, typ: typ, shutdownCtx: shutdownCtx, @@ -302,7 +382,17 @@ func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFun workers: config.Workers, name: config.Name, }, - }, nil + } + q.WorkerPool = NewWorkerPool(func(data ...Data) (failed []Data) { + for _, unhandled := range handle(data...) { + if fail := q.PushBack(unhandled); fail != nil { + failed = append(failed, fail) + } + } + return + }, config.WorkerPoolConfiguration) + + return q, nil } // Has checks if the provided data is in the queue diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go index 4df64b69ee..7de9c17c86 100644 --- a/modules/queue/queue_channel.go +++ b/modules/queue/queue_channel.go @@ -7,6 +7,8 @@ package queue import ( "context" "fmt" + "sync/atomic" + "time" "code.gitea.io/gitea/modules/log" ) @@ -51,7 +53,6 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx) queue := &ChannelQueue{ - WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), shutdownCtx: shutdownCtx, shutdownCtxCancel: shutdownCtxCancel, terminateCtx: terminateCtx, @@ -60,6 +61,23 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro workers: config.Workers, name: config.Name, } + queue.WorkerPool = NewWorkerPool(func(data ...Data) []Data { + unhandled := handle(data...) + if len(unhandled) > 0 { + // We can only pushback to the channel if we're paused. + if queue.IsPaused() { + atomic.AddInt64(&queue.numInQueue, int64(len(unhandled))) + go func() { + for _, datum := range data { + queue.dataChan <- datum + } + }() + return nil + } + } + return unhandled + }, config.WorkerPoolConfiguration) + queue.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar) return queue, nil } @@ -81,6 +99,39 @@ func (q *ChannelQueue) Push(data Data) error { return nil } +// Flush flushes the channel with a timeout - the Flush worker will be registered as a flush worker with the manager +func (q *ChannelQueue) Flush(timeout time.Duration) error { + if q.IsPaused() { + return nil + } + ctx, cancel := q.commonRegisterWorkers(1, timeout, true) + defer cancel() + return q.FlushWithContext(ctx) +} + +// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty +func (q *ChannelQueue) FlushWithContext(ctx context.Context) error { + log.Trace("ChannelQueue: %d Flush", q.qid) + paused, _ := q.IsPausedIsResumed() + for { + select { + case <-paused: + return nil + case data := <-q.dataChan: + if unhandled := q.handle(data); unhandled != nil { + log.Error("Unhandled Data whilst flushing queue %d", q.qid) + } + atomic.AddInt64(&q.numInQueue, -1) + case <-q.baseCtx.Done(): + return q.baseCtx.Err() + case <-ctx.Done(): + return ctx.Err() + default: + return nil + } + } +} + // Shutdown processing from this queue func (q *ChannelQueue) Shutdown() { q.lock.Lock() @@ -94,6 +145,7 @@ func (q *ChannelQueue) Shutdown() { log.Trace("ChannelQueue: %s Shutting down", q.name) go func() { log.Trace("ChannelQueue: %s Flushing", q.name) + // We can't use Cleanup here because that will close the channel if err := q.FlushWithContext(q.terminateCtx); err != nil { log.Warn("ChannelQueue: %s Terminated before completed flushing", q.name) return diff --git a/modules/queue/queue_channel_test.go b/modules/queue/queue_channel_test.go index f1ddd7ec92..b700b28a14 100644 --- a/modules/queue/queue_channel_test.go +++ b/modules/queue/queue_channel_test.go @@ -5,6 +5,7 @@ package queue import ( + "sync" "testing" "time" @@ -13,11 +14,12 @@ import ( func TestChannelQueue(t *testing.T) { handleChan := make(chan *testData) - handle := func(data ...Data) { + handle := func(data ...Data) []Data { for _, datum := range data { testDatum := datum.(*testData) handleChan <- testDatum } + return nil } nilFn := func(_ func()) {} @@ -52,12 +54,13 @@ func TestChannelQueue(t *testing.T) { func TestChannelQueue_Batch(t *testing.T) { handleChan := make(chan *testData) - handle := func(data ...Data) { + handle := func(data ...Data) []Data { assert.True(t, len(data) == 2) for _, datum := range data { testDatum := datum.(*testData) handleChan <- testDatum } + return nil } nilFn := func(_ func()) {} @@ -95,3 +98,156 @@ func TestChannelQueue_Batch(t *testing.T) { err = queue.Push(test1) assert.Error(t, err) } + +func TestChannelQueue_Pause(t *testing.T) { + lock := sync.Mutex{} + var queue Queue + var err error + pushBack := false + handleChan := make(chan *testData) + handle := func(data ...Data) []Data { + lock.Lock() + if pushBack { + if pausable, ok := queue.(Pausable); ok { + pausable.Pause() + } + pushBack = false + lock.Unlock() + return data + } + lock.Unlock() + + for _, datum := range data { + testDatum := datum.(*testData) + handleChan <- testDatum + } + return nil + } + nilFn := func(_ func()) {} + + queue, err = NewChannelQueue(handle, + ChannelQueueConfiguration{ + WorkerPoolConfiguration: WorkerPoolConfiguration{ + QueueLength: 20, + BatchLength: 1, + BlockTimeout: 0, + BoostTimeout: 0, + BoostWorkers: 0, + MaxWorkers: 10, + }, + Workers: 1, + }, &testData{}) + assert.NoError(t, err) + + go queue.Run(nilFn, nilFn) + + test1 := testData{"A", 1} + test2 := testData{"B", 2} + queue.Push(&test1) + + pausable, ok := queue.(Pausable) + if !assert.True(t, ok) { + return + } + result1 := <-handleChan + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) + + pausable.Pause() + + paused, resumed := pausable.IsPausedIsResumed() + + select { + case <-paused: + case <-resumed: + assert.Fail(t, "Queue should not be resumed") + return + default: + assert.Fail(t, "Queue is not paused") + return + } + + queue.Push(&test2) + + var result2 *testData + select { + case result2 = <-handleChan: + assert.Fail(t, "handler chan should be empty") + case <-time.After(100 * time.Millisecond): + } + + assert.Nil(t, result2) + + pausable.Resume() + + select { + case <-resumed: + default: + assert.Fail(t, "Queue should be resumed") + } + + select { + case result2 = <-handleChan: + case <-time.After(500 * time.Millisecond): + assert.Fail(t, "handler chan should contain test2") + } + + assert.Equal(t, test2.TestString, result2.TestString) + assert.Equal(t, test2.TestInt, result2.TestInt) + + lock.Lock() + pushBack = true + lock.Unlock() + + paused, resumed = pausable.IsPausedIsResumed() + + select { + case <-paused: + assert.Fail(t, "Queue should not be paused") + return + case <-resumed: + default: + assert.Fail(t, "Queue is not resumed") + return + } + + queue.Push(&test1) + + select { + case <-paused: + case <-handleChan: + assert.Fail(t, "handler chan should not contain test1") + return + case <-time.After(500 * time.Millisecond): + assert.Fail(t, "queue should be paused") + return + } + + paused, resumed = pausable.IsPausedIsResumed() + + select { + case <-paused: + case <-resumed: + assert.Fail(t, "Queue should not be resumed") + return + default: + assert.Fail(t, "Queue is not paused") + return + } + + pausable.Resume() + + select { + case <-resumed: + default: + assert.Fail(t, "Queue should be resumed") + } + + select { + case result1 = <-handleChan: + case <-time.After(500 * time.Millisecond): + assert.Fail(t, "handler chan should contain test1") + } + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) +} diff --git a/modules/queue/queue_disk.go b/modules/queue/queue_disk.go index 911233a5d9..2691ab02f5 100644 --- a/modules/queue/queue_disk.go +++ b/modules/queue/queue_disk.go @@ -94,6 +94,11 @@ func (fifo *LevelQueueByteFIFO) PushFunc(ctx context.Context, data []byte, fn fu return fifo.internal.LPush(data) } +// PushBack pushes data to the top of the fifo +func (fifo *LevelQueueByteFIFO) PushBack(ctx context.Context, data []byte) error { + return fifo.internal.RPush(data) +} + // Pop pops data from the start of the fifo func (fifo *LevelQueueByteFIFO) Pop(ctx context.Context) ([]byte, error) { data, err := fifo.internal.RPop() diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index f3cd132d7d..3b21575a0e 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -51,7 +51,20 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( } config := configInterface.(PersistableChannelQueueConfiguration) - channelQueue, err := NewChannelQueue(handle, ChannelQueueConfiguration{ + queue := &PersistableChannelQueue{ + closed: make(chan struct{}), + } + + wrappedHandle := func(data ...Data) (failed []Data) { + for _, unhandled := range handle(data...) { + if fail := queue.PushBack(unhandled); fail != nil { + failed = append(failed, fail) + } + } + return + } + + channelQueue, err := NewChannelQueue(wrappedHandle, ChannelQueueConfiguration{ WorkerPoolConfiguration: WorkerPoolConfiguration{ QueueLength: config.QueueLength, BatchLength: config.BatchLength, @@ -84,15 +97,12 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( DataDir: config.DataDir, } - levelQueue, err := NewLevelQueue(handle, levelCfg, exemplar) + levelQueue, err := NewLevelQueue(wrappedHandle, levelCfg, exemplar) if err == nil { - queue := &PersistableChannelQueue{ - channelQueue: channelQueue.(*ChannelQueue), - delayedStarter: delayedStarter{ - internal: levelQueue.(*LevelQueue), - name: config.Name, - }, - closed: make(chan struct{}), + queue.channelQueue = channelQueue.(*ChannelQueue) + queue.delayedStarter = delayedStarter{ + internal: levelQueue.(*LevelQueue), + name: config.Name, } _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar) return queue, nil @@ -102,16 +112,13 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( return nil, ErrInvalidConfiguration{cfg: cfg} } - queue := &PersistableChannelQueue{ - channelQueue: channelQueue.(*ChannelQueue), - delayedStarter: delayedStarter{ - cfg: levelCfg, - underlying: LevelQueueType, - timeout: config.Timeout, - maxAttempts: config.MaxAttempts, - name: config.Name, - }, - closed: make(chan struct{}), + queue.channelQueue = channelQueue.(*ChannelQueue) + queue.delayedStarter = delayedStarter{ + cfg: levelCfg, + underlying: LevelQueueType, + timeout: config.Timeout, + maxAttempts: config.MaxAttempts, + name: config.Name, } _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar) return queue, nil @@ -132,6 +139,19 @@ func (q *PersistableChannelQueue) Push(data Data) error { } } +// PushBack will push the indexer data to queue +func (q *PersistableChannelQueue) PushBack(data Data) error { + select { + case <-q.closed: + if pbr, ok := q.internal.(PushBackable); ok { + return pbr.PushBack(data) + } + return q.internal.Push(data) + default: + return q.channelQueue.Push(data) + } +} + // Run starts to run the queue func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) { log.Debug("PersistableChannelQueue: %s Starting", q.delayedStarter.name) @@ -226,6 +246,48 @@ func (q *PersistableChannelQueue) IsEmpty() bool { return q.internal.IsEmpty() } +// IsPaused returns if the pool is paused +func (q *PersistableChannelQueue) IsPaused() bool { + return q.channelQueue.IsPaused() +} + +// IsPausedIsResumed returns if the pool is paused and a channel that is closed when it is resumed +func (q *PersistableChannelQueue) IsPausedIsResumed() (<-chan struct{}, <-chan struct{}) { + return q.channelQueue.IsPausedIsResumed() +} + +// Pause pauses the WorkerPool +func (q *PersistableChannelQueue) Pause() { + q.channelQueue.Pause() + q.lock.Lock() + defer q.lock.Unlock() + if q.internal == nil { + return + } + + pausable, ok := q.internal.(Pausable) + if !ok { + return + } + pausable.Pause() +} + +// Resume resumes the WorkerPool +func (q *PersistableChannelQueue) Resume() { + q.channelQueue.Resume() + q.lock.Lock() + defer q.lock.Unlock() + if q.internal == nil { + return + } + + pausable, ok := q.internal.(Pausable) + if !ok { + return + } + pausable.Resume() +} + // Shutdown processing this queue func (q *PersistableChannelQueue) Shutdown() { log.Trace("PersistableChannelQueue: %s Shutting down", q.delayedStarter.name) diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go index db12d9575c..9bbd146efe 100644 --- a/modules/queue/queue_disk_channel_test.go +++ b/modules/queue/queue_disk_channel_test.go @@ -8,7 +8,9 @@ import ( "os" "sync" "testing" + "time" + "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/util" "github.com/stretchr/testify/assert" @@ -16,7 +18,7 @@ import ( func TestPersistableChannelQueue(t *testing.T) { handleChan := make(chan *testData) - handle := func(data ...Data) { + handle := func(data ...Data) []Data { for _, datum := range data { if datum == nil { continue @@ -24,6 +26,7 @@ func TestPersistableChannelQueue(t *testing.T) { testDatum := datum.(*testData) handleChan <- testDatum } + return nil } lock := sync.Mutex{} @@ -189,3 +192,290 @@ func TestPersistableChannelQueue(t *testing.T) { callback() } } + +func TestPersistableChannelQueue_Pause(t *testing.T) { + lock := sync.Mutex{} + var queue Queue + var err error + pushBack := false + + handleChan := make(chan *testData) + handle := func(data ...Data) []Data { + lock.Lock() + if pushBack { + if pausable, ok := queue.(Pausable); ok { + log.Info("pausing") + pausable.Pause() + } + pushBack = false + lock.Unlock() + return data + } + lock.Unlock() + + for _, datum := range data { + testDatum := datum.(*testData) + handleChan <- testDatum + } + return nil + } + + queueShutdown := []func(){} + queueTerminate := []func(){} + + tmpDir, err := os.MkdirTemp("", "persistable-channel-queue-pause-test-data") + assert.NoError(t, err) + defer util.RemoveAll(tmpDir) + + queue, err = NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{ + DataDir: tmpDir, + BatchLength: 2, + QueueLength: 20, + Workers: 1, + BoostWorkers: 0, + MaxWorkers: 10, + Name: "first", + }, &testData{}) + assert.NoError(t, err) + + go queue.Run(func(shutdown func()) { + lock.Lock() + defer lock.Unlock() + queueShutdown = append(queueShutdown, shutdown) + }, func(terminate func()) { + lock.Lock() + defer lock.Unlock() + queueTerminate = append(queueTerminate, terminate) + }) + + test1 := testData{"A", 1} + test2 := testData{"B", 2} + + err = queue.Push(&test1) + assert.NoError(t, err) + + pausable, ok := queue.(Pausable) + if !assert.True(t, ok) { + return + } + result1 := <-handleChan + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) + + pausable.Pause() + paused, resumed := pausable.IsPausedIsResumed() + + select { + case <-paused: + case <-resumed: + assert.Fail(t, "Queue should not be resumed") + return + default: + assert.Fail(t, "Queue is not paused") + return + } + + queue.Push(&test2) + + var result2 *testData + select { + case result2 = <-handleChan: + assert.Fail(t, "handler chan should be empty") + case <-time.After(100 * time.Millisecond): + } + + assert.Nil(t, result2) + + pausable.Resume() + + select { + case <-resumed: + default: + assert.Fail(t, "Queue should be resumed") + } + + select { + case result2 = <-handleChan: + case <-time.After(500 * time.Millisecond): + assert.Fail(t, "handler chan should contain test2") + } + + assert.Equal(t, test2.TestString, result2.TestString) + assert.Equal(t, test2.TestInt, result2.TestInt) + + lock.Lock() + pushBack = true + lock.Unlock() + + paused, resumed = pausable.IsPausedIsResumed() + + select { + case <-paused: + assert.Fail(t, "Queue should not be paused") + return + case <-resumed: + default: + assert.Fail(t, "Queue is not resumed") + return + } + + queue.Push(&test1) + + select { + case <-paused: + case <-handleChan: + assert.Fail(t, "handler chan should not contain test1") + return + case <-time.After(500 * time.Millisecond): + assert.Fail(t, "queue should be paused") + return + } + + paused, resumed = pausable.IsPausedIsResumed() + + select { + case <-paused: + case <-resumed: + assert.Fail(t, "Queue should not be resumed") + return + default: + assert.Fail(t, "Queue is not paused") + return + } + + pausable.Resume() + + select { + case <-resumed: + default: + assert.Fail(t, "Queue should be resumed") + } + + select { + case result1 = <-handleChan: + case <-time.After(500 * time.Millisecond): + assert.Fail(t, "handler chan should contain test1") + } + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) + + lock.Lock() + callbacks := make([]func(), len(queueShutdown)) + copy(callbacks, queueShutdown) + lock.Unlock() + // Now shutdown the queue + for _, callback := range callbacks { + callback() + } + + // Wait til it is closed + <-queue.(*PersistableChannelQueue).closed + + err = queue.Push(&test1) + assert.NoError(t, err) + err = queue.Push(&test2) + assert.NoError(t, err) + select { + case <-handleChan: + assert.Fail(t, "Handler processing should have stopped") + default: + } + + // terminate the queue + lock.Lock() + callbacks = make([]func(), len(queueTerminate)) + copy(callbacks, queueTerminate) + lock.Unlock() + for _, callback := range callbacks { + callback() + } + + select { + case <-handleChan: + assert.Fail(t, "Handler processing should have stopped") + default: + } + + lock.Lock() + pushBack = true + lock.Unlock() + + // Reopen queue + queue, err = NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{ + DataDir: tmpDir, + BatchLength: 1, + QueueLength: 20, + Workers: 1, + BoostWorkers: 0, + MaxWorkers: 10, + Name: "second", + }, &testData{}) + assert.NoError(t, err) + pausable, ok = queue.(Pausable) + if !assert.True(t, ok) { + return + } + + paused, _ = pausable.IsPausedIsResumed() + + go queue.Run(func(shutdown func()) { + lock.Lock() + defer lock.Unlock() + queueShutdown = append(queueShutdown, shutdown) + }, func(terminate func()) { + lock.Lock() + defer lock.Unlock() + queueTerminate = append(queueTerminate, terminate) + }) + + select { + case <-handleChan: + assert.Fail(t, "Handler processing should have stopped") + case <-paused: + } + + paused, resumed = pausable.IsPausedIsResumed() + + select { + case <-paused: + case <-resumed: + assert.Fail(t, "Queue should not be resumed") + return + default: + assert.Fail(t, "Queue is not paused") + return + } + + select { + case <-handleChan: + assert.Fail(t, "Handler processing should have stopped") + default: + } + + pausable.Resume() + + result3 := <-handleChan + result4 := <-handleChan + if result4.TestString == test1.TestString { + result3, result4 = result4, result3 + } + assert.Equal(t, test1.TestString, result3.TestString) + assert.Equal(t, test1.TestInt, result3.TestInt) + + assert.Equal(t, test2.TestString, result4.TestString) + assert.Equal(t, test2.TestInt, result4.TestInt) + lock.Lock() + callbacks = make([]func(), len(queueShutdown)) + copy(callbacks, queueShutdown) + lock.Unlock() + for _, callback := range callbacks { + callback() + } + lock.Lock() + callbacks = make([]func(), len(queueTerminate)) + copy(callbacks, queueTerminate) + lock.Unlock() + for _, callback := range callbacks { + callback() + } +} diff --git a/modules/queue/queue_disk_test.go b/modules/queue/queue_disk_test.go index a2c21fec08..d2d8e135cb 100644 --- a/modules/queue/queue_disk_test.go +++ b/modules/queue/queue_disk_test.go @@ -17,12 +17,13 @@ import ( func TestLevelQueue(t *testing.T) { handleChan := make(chan *testData) - handle := func(data ...Data) { + handle := func(data ...Data) []Data { assert.True(t, len(data) == 2) for _, datum := range data { testDatum := datum.(*testData) handleChan <- testDatum } + return nil } var lock sync.Mutex diff --git a/modules/queue/queue_redis.go b/modules/queue/queue_redis.go index a5fb866dc1..84ab235d5e 100644 --- a/modules/queue/queue_redis.go +++ b/modules/queue/queue_redis.go @@ -57,6 +57,7 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) type redisClient interface { RPush(ctx context.Context, key string, args ...interface{}) *redis.IntCmd + LPush(ctx context.Context, key string, args ...interface{}) *redis.IntCmd LPop(ctx context.Context, key string) *redis.StringCmd LLen(ctx context.Context, key string) *redis.IntCmd SAdd(ctx context.Context, key string, members ...interface{}) *redis.IntCmd @@ -103,6 +104,11 @@ func (fifo *RedisByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() return fifo.client.RPush(ctx, fifo.queueName, data).Err() } +// PushBack pushes data to the top of the fifo +func (fifo *RedisByteFIFO) PushBack(ctx context.Context, data []byte) error { + return fifo.client.LPush(ctx, fifo.queueName, data).Err() +} + // Pop pops data from the start of the fifo func (fifo *RedisByteFIFO) Pop(ctx context.Context) ([]byte, error) { data, err := fifo.client.LPop(ctx, fifo.queueName).Bytes() diff --git a/modules/queue/setting.go b/modules/queue/setting.go index caaf123d42..61f156c377 100644 --- a/modules/queue/setting.go +++ b/modules/queue/setting.go @@ -65,6 +65,16 @@ func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue { log.Error("Unable to create queue for %s: %v", name, err) return nil } + + // Sanity check configuration + if q.Workers == 0 && (q.BoostTimeout == 0 || q.BoostWorkers == 0 || q.MaxWorkers == 0) { + log.Warn("Queue: %s is configured to be non-scaling and have no workers\n - this configuration is likely incorrect and could cause Gitea to block", q.Name) + if pausable, ok := returnable.(Pausable); ok { + log.Warn("Queue: %s is being paused to prevent data-loss, add workers manually and unpause.", q.Name) + pausable.Pause() + } + } + return returnable } @@ -103,5 +113,15 @@ func CreateUniqueQueue(name string, handle HandlerFunc, exemplar interface{}) Un log.Error("Unable to create unique queue for %s: %v", name, err) return nil } + + // Sanity check configuration + if q.Workers == 0 && (q.BoostTimeout == 0 || q.BoostWorkers == 0 || q.MaxWorkers == 0) { + log.Warn("Queue: %s is configured to be non-scaling and have no workers\n - this configuration is likely incorrect and could cause Gitea to block", q.Name) + if pausable, ok := returnable.(Pausable); ok { + log.Warn("Queue: %s is being paused to prevent data-loss, add workers manually and unpause.", q.Name) + pausable.Pause() + } + } + return returnable.(UniqueQueue) } diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go index f617595c04..b6d2e770fc 100644 --- a/modules/queue/unique_queue_channel.go +++ b/modules/queue/unique_queue_channel.go @@ -8,6 +8,8 @@ import ( "context" "fmt" "sync" + "sync/atomic" + "time" "code.gitea.io/gitea/modules/json" "code.gitea.io/gitea/modules/log" @@ -64,7 +66,7 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue workers: config.Workers, name: config.Name, } - queue.WorkerPool = NewWorkerPool(func(data ...Data) { + queue.WorkerPool = NewWorkerPool(func(data ...Data) (unhandled []Data) { for _, datum := range data { // No error is possible here because PushFunc ensures that this can be marshalled bs, _ := json.Marshal(datum) @@ -73,8 +75,20 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue delete(queue.table, string(bs)) queue.lock.Unlock() - handle(datum) + if u := handle(datum); u != nil { + if queue.IsPaused() { + // We can only pushback to the channel if we're paused. + go func() { + if err := queue.Push(u[0]); err != nil { + log.Error("Unable to push back to queue %d. Error: %v", queue.qid, err) + } + }() + } else { + unhandled = append(unhandled, u...) + } + } } + return unhandled }, config.WorkerPoolConfiguration) queue.qid = GetManager().Add(queue, ChannelUniqueQueueType, config, exemplar) @@ -143,6 +157,42 @@ func (q *ChannelUniqueQueue) Has(data Data) (bool, error) { return has, nil } +// Flush flushes the channel with a timeout - the Flush worker will be registered as a flush worker with the manager +func (q *ChannelUniqueQueue) Flush(timeout time.Duration) error { + if q.IsPaused() { + return nil + } + ctx, cancel := q.commonRegisterWorkers(1, timeout, true) + defer cancel() + return q.FlushWithContext(ctx) +} + +// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty +func (q *ChannelUniqueQueue) FlushWithContext(ctx context.Context) error { + log.Trace("ChannelUniqueQueue: %d Flush", q.qid) + paused, _ := q.IsPausedIsResumed() + for { + select { + case <-paused: + return nil + default: + } + select { + case data := <-q.dataChan: + if unhandled := q.handle(data); unhandled != nil { + log.Error("Unhandled Data whilst flushing queue %d", q.qid) + } + atomic.AddInt64(&q.numInQueue, -1) + case <-q.baseCtx.Done(): + return q.baseCtx.Err() + case <-ctx.Done(): + return ctx.Err() + default: + return nil + } + } +} + // Shutdown processing from this queue func (q *ChannelUniqueQueue) Shutdown() { log.Trace("ChannelUniqueQueue: %s Shutting down", q.name) diff --git a/modules/queue/unique_queue_channel_test.go b/modules/queue/unique_queue_channel_test.go new file mode 100644 index 0000000000..ef6752079e --- /dev/null +++ b/modules/queue/unique_queue_channel_test.go @@ -0,0 +1,252 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestChannelUniqueQueue(t *testing.T) { + handleChan := make(chan *testData) + handle := func(data ...Data) []Data { + for _, datum := range data { + testDatum := datum.(*testData) + handleChan <- testDatum + } + return nil + } + + nilFn := func(_ func()) {} + + queue, err := NewChannelUniqueQueue(handle, + ChannelQueueConfiguration{ + WorkerPoolConfiguration: WorkerPoolConfiguration{ + QueueLength: 0, + MaxWorkers: 10, + BlockTimeout: 1 * time.Second, + BoostTimeout: 5 * time.Minute, + BoostWorkers: 5, + }, + Workers: 0, + Name: "TestChannelQueue", + }, &testData{}) + assert.NoError(t, err) + + assert.Equal(t, queue.(*ChannelUniqueQueue).WorkerPool.boostWorkers, 5) + + go queue.Run(nilFn, nilFn) + + test1 := testData{"A", 1} + go queue.Push(&test1) + result1 := <-handleChan + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) + + err = queue.Push(test1) + assert.Error(t, err) +} + +func TestChannelUniqueQueue_Batch(t *testing.T) { + handleChan := make(chan *testData) + handle := func(data ...Data) []Data { + for _, datum := range data { + testDatum := datum.(*testData) + handleChan <- testDatum + } + return nil + } + + nilFn := func(_ func()) {} + + queue, err := NewChannelUniqueQueue(handle, + ChannelQueueConfiguration{ + WorkerPoolConfiguration: WorkerPoolConfiguration{ + QueueLength: 20, + BatchLength: 2, + BlockTimeout: 0, + BoostTimeout: 0, + BoostWorkers: 0, + MaxWorkers: 10, + }, + Workers: 1, + }, &testData{}) + assert.NoError(t, err) + + go queue.Run(nilFn, nilFn) + + test1 := testData{"A", 1} + test2 := testData{"B", 2} + + queue.Push(&test1) + go queue.Push(&test2) + + result1 := <-handleChan + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) + + result2 := <-handleChan + assert.Equal(t, test2.TestString, result2.TestString) + assert.Equal(t, test2.TestInt, result2.TestInt) + + err = queue.Push(test1) + assert.Error(t, err) +} + +func TestChannelUniqueQueue_Pause(t *testing.T) { + lock := sync.Mutex{} + var queue Queue + var err error + pushBack := false + handleChan := make(chan *testData) + handle := func(data ...Data) []Data { + lock.Lock() + if pushBack { + if pausable, ok := queue.(Pausable); ok { + pausable.Pause() + } + pushBack = false + lock.Unlock() + return data + } + lock.Unlock() + + for _, datum := range data { + testDatum := datum.(*testData) + handleChan <- testDatum + } + return nil + } + nilFn := func(_ func()) {} + + queue, err = NewChannelUniqueQueue(handle, + ChannelQueueConfiguration{ + WorkerPoolConfiguration: WorkerPoolConfiguration{ + QueueLength: 20, + BatchLength: 1, + BlockTimeout: 0, + BoostTimeout: 0, + BoostWorkers: 0, + MaxWorkers: 10, + }, + Workers: 1, + }, &testData{}) + assert.NoError(t, err) + + go queue.Run(nilFn, nilFn) + + test1 := testData{"A", 1} + test2 := testData{"B", 2} + queue.Push(&test1) + + pausable, ok := queue.(Pausable) + if !assert.True(t, ok) { + return + } + result1 := <-handleChan + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) + + pausable.Pause() + + paused, resumed := pausable.IsPausedIsResumed() + + select { + case <-paused: + case <-resumed: + assert.Fail(t, "Queue should not be resumed") + return + default: + assert.Fail(t, "Queue is not paused") + return + } + + queue.Push(&test2) + + var result2 *testData + select { + case result2 = <-handleChan: + assert.Fail(t, "handler chan should be empty") + case <-time.After(100 * time.Millisecond): + } + + assert.Nil(t, result2) + + pausable.Resume() + + select { + case <-resumed: + default: + assert.Fail(t, "Queue should be resumed") + } + + select { + case result2 = <-handleChan: + case <-time.After(500 * time.Millisecond): + assert.Fail(t, "handler chan should contain test2") + } + + assert.Equal(t, test2.TestString, result2.TestString) + assert.Equal(t, test2.TestInt, result2.TestInt) + + lock.Lock() + pushBack = true + lock.Unlock() + + paused, resumed = pausable.IsPausedIsResumed() + + select { + case <-paused: + assert.Fail(t, "Queue should not be paused") + return + case <-resumed: + default: + assert.Fail(t, "Queue is not resumed") + return + } + + queue.Push(&test1) + + select { + case <-paused: + case <-handleChan: + assert.Fail(t, "handler chan should not contain test1") + return + case <-time.After(500 * time.Millisecond): + assert.Fail(t, "queue should be paused") + return + } + + paused, resumed = pausable.IsPausedIsResumed() + + select { + case <-paused: + case <-resumed: + assert.Fail(t, "Queue should not be resumed") + return + default: + assert.Fail(t, "Queue is not paused") + return + } + + pausable.Resume() + + select { + case <-resumed: + default: + assert.Fail(t, "Queue should be resumed") + } + + select { + case result1 = <-handleChan: + case <-time.After(500 * time.Millisecond): + assert.Fail(t, "handler chan should contain test1") + } + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) +} diff --git a/modules/queue/unique_queue_disk.go b/modules/queue/unique_queue_disk.go index bb0eb7d950..dae32f75a8 100644 --- a/modules/queue/unique_queue_disk.go +++ b/modules/queue/unique_queue_disk.go @@ -93,6 +93,11 @@ func (fifo *LevelUniqueQueueByteFIFO) PushFunc(ctx context.Context, data []byte, return fifo.internal.LPushFunc(data, fn) } +// PushBack pushes data to the top of the fifo +func (fifo *LevelUniqueQueueByteFIFO) PushBack(ctx context.Context, data []byte) error { + return fifo.internal.RPush(data) +} + // Pop pops data from the start of the fifo func (fifo *LevelUniqueQueueByteFIFO) Pop(ctx context.Context) ([]byte, error) { data, err := fifo.internal.RPop() diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go index d71f5e2b04..7fc304b17e 100644 --- a/modules/queue/unique_queue_disk_channel.go +++ b/modules/queue/unique_queue_disk_channel.go @@ -51,7 +51,20 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac } config := configInterface.(PersistableChannelUniqueQueueConfiguration) - channelUniqueQueue, err := NewChannelUniqueQueue(handle, ChannelUniqueQueueConfiguration{ + queue := &PersistableChannelUniqueQueue{ + closed: make(chan struct{}), + } + + wrappedHandle := func(data ...Data) (failed []Data) { + for _, unhandled := range handle(data...) { + if fail := queue.PushBack(unhandled); fail != nil { + failed = append(failed, fail) + } + } + return + } + + channelUniqueQueue, err := NewChannelUniqueQueue(wrappedHandle, ChannelUniqueQueueConfiguration{ WorkerPoolConfiguration: WorkerPoolConfiguration{ QueueLength: config.QueueLength, BatchLength: config.BatchLength, @@ -84,18 +97,16 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac DataDir: config.DataDir, } - queue := &PersistableChannelUniqueQueue{ - channelQueue: channelUniqueQueue.(*ChannelUniqueQueue), - closed: make(chan struct{}), - } + queue.channelQueue = channelUniqueQueue.(*ChannelUniqueQueue) - levelQueue, err := NewLevelUniqueQueue(func(data ...Data) { + levelQueue, err := NewLevelUniqueQueue(func(data ...Data) []Data { for _, datum := range data { err := queue.Push(datum) if err != nil && err != ErrAlreadyInQueue { log.Error("Unable push to channelled queue: %v", err) } } + return nil }, levelCfg, exemplar) if err == nil { queue.delayedStarter = delayedStarter{ @@ -142,6 +153,19 @@ func (q *PersistableChannelUniqueQueue) PushFunc(data Data, fn func() error) err } } +// PushBack will push the indexer data to queue +func (q *PersistableChannelUniqueQueue) PushBack(data Data) error { + select { + case <-q.closed: + if pbr, ok := q.internal.(PushBackable); ok { + return pbr.PushBack(data) + } + return q.internal.Push(data) + default: + return q.channelQueue.Push(data) + } +} + // Has will test if the queue has the data func (q *PersistableChannelUniqueQueue) Has(data Data) (bool, error) { // This is more difficult... @@ -163,13 +187,14 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func()) q.lock.Lock() if q.internal == nil { - err := q.setInternal(atShutdown, func(data ...Data) { + err := q.setInternal(atShutdown, func(data ...Data) []Data { for _, datum := range data { err := q.Push(datum) if err != nil && err != ErrAlreadyInQueue { log.Error("Unable push to channelled queue: %v", err) } } + return nil }, q.channelQueue.exemplar) q.lock.Unlock() if err != nil { diff --git a/modules/queue/unique_queue_redis.go b/modules/queue/unique_queue_redis.go index 7474c09665..477d5dd81f 100644 --- a/modules/queue/unique_queue_redis.go +++ b/modules/queue/unique_queue_redis.go @@ -105,6 +105,18 @@ func (fifo *RedisUniqueByteFIFO) PushFunc(ctx context.Context, data []byte, fn f return fifo.client.RPush(ctx, fifo.queueName, data).Err() } +// PushBack pushes data to the top of the fifo +func (fifo *RedisUniqueByteFIFO) PushBack(ctx context.Context, data []byte) error { + added, err := fifo.client.SAdd(ctx, fifo.setName, data).Result() + if err != nil { + return err + } + if added == 0 { + return ErrAlreadyInQueue + } + return fifo.client.LPush(ctx, fifo.queueName, data).Err() +} + // Pop pops data from the start of the fifo func (fifo *RedisUniqueByteFIFO) Pop(ctx context.Context) ([]byte, error) { data, err := fifo.client.LPop(ctx, fifo.queueName).Bytes() diff --git a/modules/queue/unique_queue_wrapped.go b/modules/queue/unique_queue_wrapped.go index 8c815218dd..32fa9ed970 100644 --- a/modules/queue/unique_queue_wrapped.go +++ b/modules/queue/unique_queue_wrapped.go @@ -73,7 +73,7 @@ func NewWrappedUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue // wrapped.handle is passed to the delayedStarting internal queue and is run to handle // data passed to - wrapped.handle = func(data ...Data) { + wrapped.handle = func(data ...Data) (unhandled []Data) { for _, datum := range data { wrapped.tlock.Lock() if !wrapped.ready { @@ -87,8 +87,11 @@ func NewWrappedUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue } } wrapped.tlock.Unlock() - handle(datum) + if u := handle(datum); u != nil { + unhandled = append(unhandled, u...) + } } + return unhandled } _ = GetManager().Add(queue, WrappedUniqueQueueType, config, exemplar) return wrapped, nil diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index 653d0558c8..da56216dcb 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -22,6 +22,8 @@ type WorkerPool struct { lock sync.Mutex baseCtx context.Context baseCtxCancel context.CancelFunc + paused chan struct{} + resumed chan struct{} cond *sync.Cond qid int64 maxNumberOfWorkers int @@ -35,6 +37,11 @@ type WorkerPool struct { numInQueue int64 } +var ( + _ Flushable = &WorkerPool{} + _ ManagedPool = &WorkerPool{} +) + // WorkerPoolConfiguration is the basic configuration for a WorkerPool type WorkerPoolConfiguration struct { QueueLength int @@ -50,11 +57,15 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo ctx, cancel := context.WithCancel(context.Background()) dataChan := make(chan Data, config.QueueLength) + resumed := make(chan struct{}) + close(resumed) pool := &WorkerPool{ baseCtx: ctx, baseCtxCancel: cancel, batchLength: config.BatchLength, dataChan: dataChan, + resumed: resumed, + paused: make(chan struct{}), handle: handle, blockTimeout: config.BlockTimeout, boostTimeout: config.BoostTimeout, @@ -69,6 +80,14 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo func (p *WorkerPool) Push(data Data) { atomic.AddInt64(&p.numInQueue, 1) p.lock.Lock() + select { + case <-p.paused: + p.lock.Unlock() + p.dataChan <- data + return + default: + } + if p.blockTimeout > 0 && p.boostTimeout > 0 && (p.numberOfWorkers <= p.maxNumberOfWorkers || p.maxNumberOfWorkers < 0) { if p.numberOfWorkers == 0 { p.zeroBoost() @@ -82,6 +101,17 @@ func (p *WorkerPool) Push(data Data) { } } +// HasNoWorkerScaling will return true if the queue has no workers, and has no worker boosting +func (p *WorkerPool) HasNoWorkerScaling() bool { + p.lock.Lock() + defer p.lock.Unlock() + return p.hasNoWorkerScaling() +} + +func (p *WorkerPool) hasNoWorkerScaling() bool { + return p.numberOfWorkers == 0 && (p.boostTimeout == 0 || p.boostWorkers == 0 || p.maxNumberOfWorkers == 0) +} + func (p *WorkerPool) zeroBoost() { ctx, cancel := context.WithTimeout(p.baseCtx, p.boostTimeout) mq := GetManager().GetManagedQueue(p.qid) @@ -272,6 +302,12 @@ func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc, p.cond.Broadcast() cancel() } + if p.hasNoWorkerScaling() { + log.Warn( + "Queue: %d is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n"+ + "The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required.", p.qid) + p.pause() + } p.lock.Unlock() }() } @@ -290,13 +326,65 @@ func (p *WorkerPool) Wait() { p.cond.Wait() } +// IsPaused returns if the pool is paused +func (p *WorkerPool) IsPaused() bool { + p.lock.Lock() + defer p.lock.Unlock() + select { + case <-p.paused: + return true + default: + return false + } +} + +// IsPausedIsResumed returns if the pool is paused and a channel that is closed when it is resumed +func (p *WorkerPool) IsPausedIsResumed() (<-chan struct{}, <-chan struct{}) { + p.lock.Lock() + defer p.lock.Unlock() + return p.paused, p.resumed +} + +// Pause pauses the WorkerPool +func (p *WorkerPool) Pause() { + p.lock.Lock() + defer p.lock.Unlock() + p.pause() +} + +func (p *WorkerPool) pause() { + select { + case <-p.paused: + default: + p.resumed = make(chan struct{}) + close(p.paused) + } +} + +// Resume resumes the WorkerPool +func (p *WorkerPool) Resume() { + p.lock.Lock() + defer p.lock.Unlock() + select { + case <-p.resumed: + default: + p.paused = make(chan struct{}) + close(p.resumed) + } +} + // CleanUp will drain the remaining contents of the channel // This should be called after AddWorkers context is closed func (p *WorkerPool) CleanUp(ctx context.Context) { log.Trace("WorkerPool: %d CleanUp", p.qid) close(p.dataChan) for data := range p.dataChan { - p.handle(data) + if unhandled := p.handle(data); unhandled != nil { + if unhandled != nil { + log.Error("Unhandled Data in clean-up of queue %d", p.qid) + } + } + atomic.AddInt64(&p.numInQueue, -1) select { case <-ctx.Done(): @@ -327,7 +415,9 @@ func (p *WorkerPool) FlushWithContext(ctx context.Context) error { for { select { case data := <-p.dataChan: - p.handle(data) + if unhandled := p.handle(data); unhandled != nil { + log.Error("Unhandled Data whilst flushing queue %d", p.qid) + } atomic.AddInt64(&p.numInQueue, -1) case <-p.baseCtx.Done(): return p.baseCtx.Err() @@ -341,13 +431,45 @@ func (p *WorkerPool) FlushWithContext(ctx context.Context) error { func (p *WorkerPool) doWork(ctx context.Context) { delay := time.Millisecond * 300 + + // Create a common timer - we will use this elsewhere + timer := time.NewTimer(0) + util.StopTimer(timer) + + paused, _ := p.IsPausedIsResumed() data := make([]Data, 0, p.batchLength) for { select { + case <-paused: + log.Trace("Worker for Queue %d Pausing", p.qid) + if len(data) > 0 { + log.Trace("Handling: %d data, %v", len(data), data) + if unhandled := p.handle(data...); unhandled != nil { + log.Error("Unhandled Data in queue %d", p.qid) + } + atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) + } + _, resumed := p.IsPausedIsResumed() + select { + case <-resumed: + paused, _ = p.IsPausedIsResumed() + log.Trace("Worker for Queue %d Resuming", p.qid) + util.StopTimer(timer) + case <-ctx.Done(): + log.Trace("Worker shutting down") + return + } + default: + } + select { + case <-paused: + // go back around case <-ctx.Done(): if len(data) > 0 { log.Trace("Handling: %d data, %v", len(data), data) - p.handle(data...) + if unhandled := p.handle(data...); unhandled != nil { + log.Error("Unhandled Data in queue %d", p.qid) + } atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) } log.Trace("Worker shutting down") @@ -357,59 +479,36 @@ func (p *WorkerPool) doWork(ctx context.Context) { // the dataChan has been closed - we should finish up: if len(data) > 0 { log.Trace("Handling: %d data, %v", len(data), data) - p.handle(data...) + if unhandled := p.handle(data...); unhandled != nil { + log.Error("Unhandled Data in queue %d", p.qid) + } atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) } log.Trace("Worker shutting down") return } data = append(data, datum) + util.StopTimer(timer) + if len(data) >= p.batchLength { log.Trace("Handling: %d data, %v", len(data), data) - p.handle(data...) + if unhandled := p.handle(data...); unhandled != nil { + log.Error("Unhandled Data in queue %d", p.qid) + } atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) data = make([]Data, 0, p.batchLength) + } else { + timer.Reset(delay) } - default: - timer := time.NewTimer(delay) - select { - case <-ctx.Done(): - util.StopTimer(timer) - if len(data) > 0 { - log.Trace("Handling: %d data, %v", len(data), data) - p.handle(data...) - atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) - } - log.Trace("Worker shutting down") - return - case datum, ok := <-p.dataChan: - util.StopTimer(timer) - if !ok { - // the dataChan has been closed - we should finish up: - if len(data) > 0 { - log.Trace("Handling: %d data, %v", len(data), data) - p.handle(data...) - atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) - } - log.Trace("Worker shutting down") - return - } - data = append(data, datum) - if len(data) >= p.batchLength { - log.Trace("Handling: %d data, %v", len(data), data) - p.handle(data...) - atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) - data = make([]Data, 0, p.batchLength) - } - case <-timer.C: - delay = time.Millisecond * 100 - if len(data) > 0 { - log.Trace("Handling: %d data, %v", len(data), data) - p.handle(data...) - atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) - data = make([]Data, 0, p.batchLength) + case <-timer.C: + delay = time.Millisecond * 100 + if len(data) > 0 { + log.Trace("Handling: %d data, %v", len(data), data) + if unhandled := p.handle(data...); unhandled != nil { + log.Error("Unhandled Data in queue %d", p.qid) } - + atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) + data = make([]Data, 0, p.batchLength) } } } |