From a82fd98d5368a75cbcf6b74c12f58f3f81e66662 Mon Sep 17 00:00:00 2001 From: zeripath Date: Sat, 22 Jan 2022 21:22:14 +0000 Subject: Pause queues (#15928) * Start adding mechanism to return unhandled data Signed-off-by: Andrew Thornton * Create pushback interface Signed-off-by: Andrew Thornton * Add Pausable interface to WorkerPool and Manager Signed-off-by: Andrew Thornton * Implement Pausable and PushBack for the bytefifos Signed-off-by: Andrew Thornton * Implement Pausable and Pushback for ChannelQueues and ChannelUniqueQueues Signed-off-by: Andrew Thornton * Wire in UI for pausing Signed-off-by: Andrew Thornton * add testcases and fix a few issues Signed-off-by: Andrew Thornton * fix build Signed-off-by: Andrew Thornton * prevent "race" in the test Signed-off-by: Andrew Thornton * fix jsoniter mismerge Signed-off-by: Andrew Thornton * fix conflicts Signed-off-by: Andrew Thornton * fix format Signed-off-by: Andrew Thornton * Add warnings for no worker configurations and prevent data-loss with redis/levelqueue Signed-off-by: Andrew Thornton * Use StopTimer Signed-off-by: Andrew Thornton Co-authored-by: Lauris BH Co-authored-by: 6543 <6543@obermui.de> Co-authored-by: techknowlogick Co-authored-by: wxiaoguang --- modules/indexer/issues/indexer.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'modules/indexer/issues') diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index 8530210628..729981ec71 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -103,11 +103,11 @@ func InitIssueIndexer(syncReindex bool) { // Create the Queue switch setting.Indexer.IssueType { case "bleve", "elasticsearch": - handler := func(data ...queue.Data) { + handler := func(data ...queue.Data) []queue.Data { indexer := holder.get() if indexer == nil { log.Error("Issue indexer handler: unable to get indexer!") - return + return data } iData := make([]*IndexerData, 0, len(data)) @@ -127,6 +127,7 @@ func InitIssueIndexer(syncReindex bool) { if err := indexer.Index(iData); err != nil { log.Error("Error whilst indexing: %v Error: %v", iData, err) } + return nil } issueIndexerQueue = queue.CreateQueue("issue_indexer", handler, &IndexerData{}) -- cgit v1.2.3