diff options
author | zeripath <art27@cantab.net> | 2022-01-22 21:22:14 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-22 21:22:14 +0000 |
commit | a82fd98d5368a75cbcf6b74c12f58f3f81e66662 (patch) | |
tree | cb64c9348ee3d3194c786bb970770c06a8bd4fb1 /modules/indexer | |
parent | 27ee01e1e866f2f13603af65224ddae77d5149d7 (diff) | |
download | gitea-a82fd98d5368a75cbcf6b74c12f58f3f81e66662.tar.gz gitea-a82fd98d5368a75cbcf6b74c12f58f3f81e66662.zip |
Pause queues (#15928)
* Start adding mechanism to return unhandled data
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Create pushback interface
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Add Pausable interface to WorkerPool and Manager
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Implement Pausable and PushBack for the bytefifos
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Implement Pausable and Pushback for ChannelQueues and ChannelUniqueQueues
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Wire in UI for pausing
Signed-off-by: Andrew Thornton <art27@cantab.net>
* add testcases and fix a few issues
Signed-off-by: Andrew Thornton <art27@cantab.net>
* fix build
Signed-off-by: Andrew Thornton <art27@cantab.net>
* prevent "race" in the test
Signed-off-by: Andrew Thornton <art27@cantab.net>
* fix jsoniter mismerge
Signed-off-by: Andrew Thornton <art27@cantab.net>
* fix conflicts
Signed-off-by: Andrew Thornton <art27@cantab.net>
* fix format
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Add warnings for no worker configurations and prevent data-loss with redis/levelqueue
Signed-off-by: Andrew Thornton <art27@cantab.net>
* Use StopTimer
Signed-off-by: Andrew Thornton <art27@cantab.net>
Co-authored-by: Lauris BH <lauris@nix.lv>
Co-authored-by: 6543 <6543@obermui.de>
Co-authored-by: techknowlogick <techknowlogick@gitea.io>
Co-authored-by: wxiaoguang <wxiaoguang@gmail.com>
Diffstat (limited to 'modules/indexer')
-rw-r--r-- | modules/indexer/code/indexer.go | 5 | ||||
-rw-r--r-- | modules/indexer/issues/indexer.go | 5 | ||||
-rw-r--r-- | modules/indexer/stats/queue.go | 3 |
3 files changed, 8 insertions, 5 deletions
diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go index 4c7a1d4f17..9ae3abff60 100644 --- a/modules/indexer/code/indexer.go +++ b/modules/indexer/code/indexer.go @@ -133,11 +133,11 @@ func Init() { // Create the Queue switch setting.Indexer.RepoType { case "bleve", "elasticsearch": - handler := func(data ...queue.Data) { + handler := func(data ...queue.Data) []queue.Data { idx, err := indexer.get() if idx == nil || err != nil { log.Error("Codes indexer handler: unable to get indexer!") - return + return data } for _, datum := range data { @@ -153,6 +153,7 @@ func Init() { continue } } + return nil } indexerQueue = queue.CreateUniqueQueue("code_indexer", handler, &IndexerData{}) diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index 8530210628..729981ec71 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -103,11 +103,11 @@ func InitIssueIndexer(syncReindex bool) { // Create the Queue switch setting.Indexer.IssueType { case "bleve", "elasticsearch": - handler := func(data ...queue.Data) { + handler := func(data ...queue.Data) []queue.Data { indexer := holder.get() if indexer == nil { log.Error("Issue indexer handler: unable to get indexer!") - return + return data } iData := make([]*IndexerData, 0, len(data)) @@ -127,6 +127,7 @@ func InitIssueIndexer(syncReindex bool) { if err := indexer.Index(iData); err != nil { log.Error("Error whilst indexing: %v Error: %v", iData, err) } + return nil } issueIndexerQueue = queue.CreateQueue("issue_indexer", handler, &IndexerData{}) diff --git a/modules/indexer/stats/queue.go b/modules/indexer/stats/queue.go index b458444697..f983fcd11d 100644 --- a/modules/indexer/stats/queue.go +++ b/modules/indexer/stats/queue.go @@ -17,13 +17,14 @@ import ( var statsQueue queue.UniqueQueue // handle passed PR IDs and test the PRs -func handle(data ...queue.Data) { +func handle(data ...queue.Data) []queue.Data { for _, datum := range data { opts := datum.(int64) if err := indexer.Index(opts); err != nil { log.Error("stats queue indexer.Index(%d) failed: %v", opts, err) } } + return nil } func initStatsQueue() error { |