summaryrefslogtreecommitdiffstats
path: root/modules/indexer
diff options
context:
space:
mode:
authorzeripath <art27@cantab.net>2022-01-22 21:22:14 +0000
committerGitHub <noreply@github.com>2022-01-22 21:22:14 +0000
commita82fd98d5368a75cbcf6b74c12f58f3f81e66662 (patch)
treecb64c9348ee3d3194c786bb970770c06a8bd4fb1 /modules/indexer
parent27ee01e1e866f2f13603af65224ddae77d5149d7 (diff)
downloadgitea-a82fd98d5368a75cbcf6b74c12f58f3f81e66662.tar.gz
gitea-a82fd98d5368a75cbcf6b74c12f58f3f81e66662.zip
Pause queues (#15928)
* Start adding mechanism to return unhandled data Signed-off-by: Andrew Thornton <art27@cantab.net> * Create pushback interface Signed-off-by: Andrew Thornton <art27@cantab.net> * Add Pausable interface to WorkerPool and Manager Signed-off-by: Andrew Thornton <art27@cantab.net> * Implement Pausable and PushBack for the bytefifos Signed-off-by: Andrew Thornton <art27@cantab.net> * Implement Pausable and Pushback for ChannelQueues and ChannelUniqueQueues Signed-off-by: Andrew Thornton <art27@cantab.net> * Wire in UI for pausing Signed-off-by: Andrew Thornton <art27@cantab.net> * add testcases and fix a few issues Signed-off-by: Andrew Thornton <art27@cantab.net> * fix build Signed-off-by: Andrew Thornton <art27@cantab.net> * prevent "race" in the test Signed-off-by: Andrew Thornton <art27@cantab.net> * fix jsoniter mismerge Signed-off-by: Andrew Thornton <art27@cantab.net> * fix conflicts Signed-off-by: Andrew Thornton <art27@cantab.net> * fix format Signed-off-by: Andrew Thornton <art27@cantab.net> * Add warnings for no worker configurations and prevent data-loss with redis/levelqueue Signed-off-by: Andrew Thornton <art27@cantab.net> * Use StopTimer Signed-off-by: Andrew Thornton <art27@cantab.net> Co-authored-by: Lauris BH <lauris@nix.lv> Co-authored-by: 6543 <6543@obermui.de> Co-authored-by: techknowlogick <techknowlogick@gitea.io> Co-authored-by: wxiaoguang <wxiaoguang@gmail.com>
Diffstat (limited to 'modules/indexer')
-rw-r--r--modules/indexer/code/indexer.go5
-rw-r--r--modules/indexer/issues/indexer.go5
-rw-r--r--modules/indexer/stats/queue.go3
3 files changed, 8 insertions, 5 deletions
diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go
index 4c7a1d4f17..9ae3abff60 100644
--- a/modules/indexer/code/indexer.go
+++ b/modules/indexer/code/indexer.go
@@ -133,11 +133,11 @@ func Init() {
// Create the Queue
switch setting.Indexer.RepoType {
case "bleve", "elasticsearch":
- handler := func(data ...queue.Data) {
+ handler := func(data ...queue.Data) []queue.Data {
idx, err := indexer.get()
if idx == nil || err != nil {
log.Error("Codes indexer handler: unable to get indexer!")
- return
+ return data
}
for _, datum := range data {
@@ -153,6 +153,7 @@ func Init() {
continue
}
}
+ return nil
}
indexerQueue = queue.CreateUniqueQueue("code_indexer", handler, &IndexerData{})
diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go
index 8530210628..729981ec71 100644
--- a/modules/indexer/issues/indexer.go
+++ b/modules/indexer/issues/indexer.go
@@ -103,11 +103,11 @@ func InitIssueIndexer(syncReindex bool) {
// Create the Queue
switch setting.Indexer.IssueType {
case "bleve", "elasticsearch":
- handler := func(data ...queue.Data) {
+ handler := func(data ...queue.Data) []queue.Data {
indexer := holder.get()
if indexer == nil {
log.Error("Issue indexer handler: unable to get indexer!")
- return
+ return data
}
iData := make([]*IndexerData, 0, len(data))
@@ -127,6 +127,7 @@ func InitIssueIndexer(syncReindex bool) {
if err := indexer.Index(iData); err != nil {
log.Error("Error whilst indexing: %v Error: %v", iData, err)
}
+ return nil
}
issueIndexerQueue = queue.CreateQueue("issue_indexer", handler, &IndexerData{})
diff --git a/modules/indexer/stats/queue.go b/modules/indexer/stats/queue.go
index b458444697..f983fcd11d 100644
--- a/modules/indexer/stats/queue.go
+++ b/modules/indexer/stats/queue.go
@@ -17,13 +17,14 @@ import (
var statsQueue queue.UniqueQueue
// handle passed PR IDs and test the PRs
-func handle(data ...queue.Data) {
+func handle(data ...queue.Data) []queue.Data {
for _, datum := range data {
opts := datum.(int64)
if err := indexer.Index(opts); err != nil {
log.Error("stats queue indexer.Index(%d) failed: %v", opts, err)
}
}
+ return nil
}
func initStatsQueue() error {