summaryrefslogtreecommitdiffstats
path: root/modules/indexer/issues/indexer.go
diff options
context:
space:
mode:
authorLauris BH <lauris@nix.lv>2022-01-27 10:30:51 +0200
committerGitHub <noreply@github.com>2022-01-27 10:30:51 +0200
commit8038610a4279862a87e630e4f1d1077c510f9d15 (patch)
tree802489f8ddde899e76643ea157f9020f12ca1490 /modules/indexer/issues/indexer.go
parent2649eddcf0bb1190abab49c9d79ce19bfcf19e87 (diff)
downloadgitea-8038610a4279862a87e630e4f1d1077c510f9d15.tar.gz
gitea-8038610a4279862a87e630e4f1d1077c510f9d15.zip
Automatically pause queue if index service is unavailable (#15066)
* Handle keyword search error when issue indexer service is not available * Implement automatic disabling and resume of code indexer queue
Diffstat (limited to 'modules/indexer/issues/indexer.go')
-rw-r--r--modules/indexer/issues/indexer.go55
1 files changed, 51 insertions, 4 deletions
diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go
index 729981ec71..3aaa27eed2 100644
--- a/modules/indexer/issues/indexer.go
+++ b/modules/indexer/issues/indexer.go
@@ -47,9 +47,11 @@ type SearchResult struct {
// Indexer defines an interface to indexer issues contents
type Indexer interface {
Init() (bool, error)
+ Ping() bool
+ SetAvailabilityChangeCallback(callback func(bool))
Index(issue []*IndexerData) error
Delete(ids ...int64) error
- Search(kw string, repoIDs []int64, limit, start int) (*SearchResult, error)
+ Search(ctx context.Context, kw string, repoIDs []int64, limit, start int) (*SearchResult, error)
Close()
}
@@ -111,6 +113,7 @@ func InitIssueIndexer(syncReindex bool) {
}
iData := make([]*IndexerData, 0, len(data))
+ unhandled := make([]queue.Data, 0, len(data))
for _, datum := range data {
indexerData, ok := datum.(*IndexerData)
if !ok {
@@ -119,13 +122,34 @@ func InitIssueIndexer(syncReindex bool) {
}
log.Trace("IndexerData Process: %d %v %t", indexerData.ID, indexerData.IDs, indexerData.IsDelete)
if indexerData.IsDelete {
- _ = indexer.Delete(indexerData.IDs...)
+ if err := indexer.Delete(indexerData.IDs...); err != nil {
+ log.Error("Error whilst deleting from index: %v Error: %v", indexerData.IDs, err)
+ if indexer.Ping() {
+ continue
+ }
+ // Add back to queue
+ unhandled = append(unhandled, datum)
+ }
continue
}
iData = append(iData, indexerData)
}
+ if len(unhandled) > 0 {
+ for _, indexerData := range iData {
+ unhandled = append(unhandled, indexerData)
+ }
+ return unhandled
+ }
if err := indexer.Index(iData); err != nil {
log.Error("Error whilst indexing: %v Error: %v", iData, err)
+ if indexer.Ping() {
+ return nil
+ }
+ // Add back to queue
+ for _, indexerData := range iData {
+ unhandled = append(unhandled, indexerData)
+ }
+ return unhandled
}
return nil
}
@@ -193,6 +217,18 @@ func InitIssueIndexer(syncReindex bool) {
log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType)
}
+ if queue, ok := issueIndexerQueue.(queue.Pausable); ok {
+ holder.get().SetAvailabilityChangeCallback(func(available bool) {
+ if !available {
+ log.Info("Issue index queue paused")
+ queue.Pause()
+ } else {
+ log.Info("Issue index queue resumed")
+ queue.Resume()
+ }
+ })
+ }
+
// Start processing the queue
go graceful.GetManager().RunWithShutdownFns(issueIndexerQueue.Run)
@@ -334,7 +370,7 @@ func DeleteRepoIssueIndexer(repo *repo_model.Repository) {
// SearchIssuesByKeyword search issue ids by keywords and repo id
// WARNNING: You have to ensure user have permission to visit repoIDs' issues
-func SearchIssuesByKeyword(repoIDs []int64, keyword string) ([]int64, error) {
+func SearchIssuesByKeyword(ctx context.Context, repoIDs []int64, keyword string) ([]int64, error) {
var issueIDs []int64
indexer := holder.get()
@@ -342,7 +378,7 @@ func SearchIssuesByKeyword(repoIDs []int64, keyword string) ([]int64, error) {
log.Error("SearchIssuesByKeyword(): unable to get indexer!")
return nil, fmt.Errorf("unable to get issue indexer")
}
- res, err := indexer.Search(keyword, repoIDs, 50, 0)
+ res, err := indexer.Search(ctx, keyword, repoIDs, 50, 0)
if err != nil {
return nil, err
}
@@ -351,3 +387,14 @@ func SearchIssuesByKeyword(repoIDs []int64, keyword string) ([]int64, error) {
}
return issueIDs, nil
}
+
+// IsAvailable checks if issue indexer is available
+func IsAvailable() bool {
+ indexer := holder.get()
+ if indexer == nil {
+ log.Error("IsAvailable(): unable to get indexer!")
+ return false
+ }
+
+ return indexer.Ping()
+}