From 8038610a4279862a87e630e4f1d1077c510f9d15 Mon Sep 17 00:00:00 2001
From: Lauris BH <lauris@nix.lv>
Date: Thu, 27 Jan 2022 10:30:51 +0200
Subject: Automatically pause queue if index service is unavailable (#15066)

* Handle keyword search error when issue indexer service is not available

* Implement automatic disabling and resume of code indexer queue
---
 modules/indexer/issues/bleve.go          |  14 +++-
 modules/indexer/issues/bleve_test.go     |   3 +-
 modules/indexer/issues/db.go             |  28 +++++--
 modules/indexer/issues/elastic_search.go | 123 ++++++++++++++++++++++++++-----
 modules/indexer/issues/indexer.go        |  55 +++++++++++++-
 modules/indexer/issues/indexer_test.go   |  17 +++--
 6 files changed, 199 insertions(+), 41 deletions(-)

(limited to 'modules/indexer/issues')

diff --git a/modules/indexer/issues/bleve.go b/modules/indexer/issues/bleve.go
index d986a0e55e..c298b7de3e 100644
--- a/modules/indexer/issues/bleve.go
+++ b/modules/indexer/issues/bleve.go
@@ -5,6 +5,7 @@
 package issues
 
 import (
+	"context"
 	"fmt"
 	"os"
 	"strconv"
@@ -186,6 +187,15 @@ func (b *BleveIndexer) Init() (bool, error) {
 	return false, err
 }
 
+// SetAvailabilityChangeCallback does nothing
+func (b *BleveIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
+}
+
+// Ping does nothing
+func (b *BleveIndexer) Ping() bool {
+	return true
+}
+
 // Close will close the bleve indexer
 func (b *BleveIndexer) Close() {
 	if b.indexer != nil {
@@ -229,7 +239,7 @@ func (b *BleveIndexer) Delete(ids ...int64) error {
 
 // Search searches for issues by given conditions.
 // Returns the matching issue IDs
-func (b *BleveIndexer) Search(keyword string, repoIDs []int64, limit, start int) (*SearchResult, error) {
+func (b *BleveIndexer) Search(ctx context.Context, keyword string, repoIDs []int64, limit, start int) (*SearchResult, error) {
 	var repoQueriesP []*query.NumericRangeQuery
 	for _, repoID := range repoIDs {
 		repoQueriesP = append(repoQueriesP, numericEqualityQuery(repoID, "RepoID"))
@@ -249,7 +259,7 @@ func (b *BleveIndexer) Search(keyword string, repoIDs []int64, limit, start int)
 	search := bleve.NewSearchRequestOptions(indexerQuery, limit, start, false)
 	search.SortBy([]string{"-_score"})
 
-	result, err := b.indexer.Search(search)
+	result, err := b.indexer.SearchInContext(ctx, search)
 	if err != nil {
 		return nil, err
 	}
diff --git a/modules/indexer/issues/bleve_test.go b/modules/indexer/issues/bleve_test.go
index df036fb573..926c32e242 100644
--- a/modules/indexer/issues/bleve_test.go
+++ b/modules/indexer/issues/bleve_test.go
@@ -5,6 +5,7 @@
 package issues
 
 import (
+	"context"
 	"os"
 	"testing"
 
@@ -84,7 +85,7 @@ func TestBleveIndexAndSearch(t *testing.T) {
 	}
 
 	for _, kw := range keywords {
-		res, err := indexer.Search(kw.Keyword, []int64{2}, 10, 0)
+		res, err := indexer.Search(context.TODO(), kw.Keyword, []int64{2}, 10, 0)
 		assert.NoError(t, err)
 
 		ids := make([]int64, 0, len(res.Hits))
diff --git a/modules/indexer/issues/db.go b/modules/indexer/issues/db.go
index f02cbddce8..e2badf64f2 100644
--- a/modules/indexer/issues/db.go
+++ b/modules/indexer/issues/db.go
@@ -4,33 +4,47 @@
 
 package issues
 
-import "code.gitea.io/gitea/models"
+import (
+	"context"
+
+	"code.gitea.io/gitea/models"
+	"code.gitea.io/gitea/models/db"
+)
 
 // DBIndexer implements Indexer interface to use database's like search
 type DBIndexer struct{}
 
 // Init dummy function
-func (db *DBIndexer) Init() (bool, error) {
+func (i *DBIndexer) Init() (bool, error) {
 	return false, nil
 }
 
+// SetAvailabilityChangeCallback dummy function
+func (i *DBIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
+}
+
+// Ping checks if database is available
+func (i *DBIndexer) Ping() bool {
+	return db.GetEngine(db.DefaultContext).Ping() != nil
+}
+
 // Index dummy function
-func (db *DBIndexer) Index(issue []*IndexerData) error {
+func (i *DBIndexer) Index(issue []*IndexerData) error {
 	return nil
 }
 
 // Delete dummy function
-func (db *DBIndexer) Delete(ids ...int64) error {
+func (i *DBIndexer) Delete(ids ...int64) error {
 	return nil
 }
 
 // Close dummy function
-func (db *DBIndexer) Close() {
+func (i *DBIndexer) Close() {
 }
 
 // Search dummy function
-func (db *DBIndexer) Search(kw string, repoIDs []int64, limit, start int) (*SearchResult, error) {
-	total, ids, err := models.SearchIssueIDsByKeyword(kw, repoIDs, limit, start)
+func (i *DBIndexer) Search(ctx context.Context, kw string, repoIDs []int64, limit, start int) (*SearchResult, error) {
+	total, ids, err := models.SearchIssueIDsByKeyword(ctx, kw, repoIDs, limit, start)
 	if err != nil {
 		return nil, err
 	}
diff --git a/modules/indexer/issues/elastic_search.go b/modules/indexer/issues/elastic_search.go
index 187b69b749..97e32b8975 100644
--- a/modules/indexer/issues/elastic_search.go
+++ b/modules/indexer/issues/elastic_search.go
@@ -8,9 +8,12 @@ import (
 	"context"
 	"errors"
 	"fmt"
+	"net"
 	"strconv"
+	"sync"
 	"time"
 
+	"code.gitea.io/gitea/modules/graceful"
 	"code.gitea.io/gitea/modules/log"
 
 	"github.com/olivere/elastic/v7"
@@ -20,8 +23,12 @@ var _ Indexer = &ElasticSearchIndexer{}
 
 // ElasticSearchIndexer implements Indexer interface
 type ElasticSearchIndexer struct {
-	client      *elastic.Client
-	indexerName string
+	client               *elastic.Client
+	indexerName          string
+	available            bool
+	availabilityCallback func(bool)
+	stopTimer            chan struct{}
+	lock                 sync.RWMutex
 }
 
 type elasticLogger struct {
@@ -56,10 +63,27 @@ func NewElasticSearchIndexer(url, indexerName string) (*ElasticSearchIndexer, er
 		return nil, err
 	}
 
-	return &ElasticSearchIndexer{
+	indexer := &ElasticSearchIndexer{
 		client:      client,
 		indexerName: indexerName,
-	}, nil
+		available:   true,
+		stopTimer:   make(chan struct{}),
+	}
+
+	ticker := time.NewTicker(10 * time.Second)
+	go func() {
+		for {
+			select {
+			case <-ticker.C:
+				indexer.checkAvailability()
+			case <-indexer.stopTimer:
+				ticker.Stop()
+				return
+			}
+		}
+	}()
+
+	return indexer, nil
 }
 
 const (
@@ -93,10 +117,10 @@ const (
 
 // Init will initialize the indexer
 func (b *ElasticSearchIndexer) Init() (bool, error) {
-	ctx := context.Background()
+	ctx := graceful.GetManager().HammerContext()
 	exists, err := b.client.IndexExists(b.indexerName).Do(ctx)
 	if err != nil {
-		return false, err
+		return false, b.checkError(err)
 	}
 
 	if !exists {
@@ -104,7 +128,7 @@ func (b *ElasticSearchIndexer) Init() (bool, error) {
 
 		createIndex, err := b.client.CreateIndex(b.indexerName).BodyString(mapping).Do(ctx)
 		if err != nil {
-			return false, err
+			return false, b.checkError(err)
 		}
 		if !createIndex.Acknowledged {
 			return false, errors.New("init failed")
@@ -115,6 +139,20 @@ func (b *ElasticSearchIndexer) Init() (bool, error) {
 	return true, nil
 }
 
+// SetAvailabilityChangeCallback sets callback that will be triggered when availability changes
+func (b *ElasticSearchIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
+	b.lock.Lock()
+	defer b.lock.Unlock()
+	b.availabilityCallback = callback
+}
+
+// Ping checks if elastic is available
+func (b *ElasticSearchIndexer) Ping() bool {
+	b.lock.RLock()
+	defer b.lock.RUnlock()
+	return b.available
+}
+
 // Index will save the index data
 func (b *ElasticSearchIndexer) Index(issues []*IndexerData) error {
 	if len(issues) == 0 {
@@ -131,8 +169,8 @@ func (b *ElasticSearchIndexer) Index(issues []*IndexerData) error {
 				"content":  issue.Content,
 				"comments": issue.Comments,
 			}).
-			Do(context.Background())
-		return err
+			Do(graceful.GetManager().HammerContext())
+		return b.checkError(err)
 	}
 
 	reqs := make([]elastic.BulkableRequest, 0)
@@ -154,8 +192,8 @@ func (b *ElasticSearchIndexer) Index(issues []*IndexerData) error {
 	_, err := b.client.Bulk().
 		Index(b.indexerName).
 		Add(reqs...).
-		Do(context.Background())
-	return err
+		Do(graceful.GetManager().HammerContext())
+	return b.checkError(err)
 }
 
 // Delete deletes indexes by ids
@@ -166,8 +204,8 @@ func (b *ElasticSearchIndexer) Delete(ids ...int64) error {
 		_, err := b.client.Delete().
 			Index(b.indexerName).
 			Id(fmt.Sprintf("%d", ids[0])).
-			Do(context.Background())
-		return err
+			Do(graceful.GetManager().HammerContext())
+		return b.checkError(err)
 	}
 
 	reqs := make([]elastic.BulkableRequest, 0)
@@ -182,13 +220,13 @@ func (b *ElasticSearchIndexer) Delete(ids ...int64) error {
 	_, err := b.client.Bulk().
 		Index(b.indexerName).
 		Add(reqs...).
-		Do(context.Background())
-	return err
+		Do(graceful.GetManager().HammerContext())
+	return b.checkError(err)
 }
 
 // Search searches for issues by given conditions.
 // Returns the matching issue IDs
-func (b *ElasticSearchIndexer) Search(keyword string, repoIDs []int64, limit, start int) (*SearchResult, error) {
+func (b *ElasticSearchIndexer) Search(ctx context.Context, keyword string, repoIDs []int64, limit, start int) (*SearchResult, error) {
 	kwQuery := elastic.NewMultiMatchQuery(keyword, "title", "content", "comments")
 	query := elastic.NewBoolQuery()
 	query = query.Must(kwQuery)
@@ -205,9 +243,9 @@ func (b *ElasticSearchIndexer) Search(keyword string, repoIDs []int64, limit, st
 		Query(query).
 		Sort("_score", false).
 		From(start).Size(limit).
-		Do(context.Background())
+		Do(ctx)
 	if err != nil {
-		return nil, err
+		return nil, b.checkError(err)
 	}
 
 	hits := make([]Match, 0, limit)
@@ -225,4 +263,51 @@ func (b *ElasticSearchIndexer) Search(keyword string, repoIDs []int64, limit, st
 }
 
 // Close implements indexer
-func (b *ElasticSearchIndexer) Close() {}
+func (b *ElasticSearchIndexer) Close() {
+	select {
+	case <-b.stopTimer:
+	default:
+		close(b.stopTimer)
+	}
+}
+
+func (b *ElasticSearchIndexer) checkError(err error) error {
+	var opErr *net.OpError
+	if !(elastic.IsConnErr(err) || (errors.As(err, &opErr) && (opErr.Op == "dial" || opErr.Op == "read"))) {
+		return err
+	}
+
+	b.setAvailability(false)
+
+	return err
+}
+
+func (b *ElasticSearchIndexer) checkAvailability() {
+	if b.Ping() {
+		return
+	}
+
+	// Request cluster state to check if elastic is available again
+	_, err := b.client.ClusterState().Do(graceful.GetManager().ShutdownContext())
+	if err != nil {
+		b.setAvailability(false)
+		return
+	}
+
+	b.setAvailability(true)
+}
+
+func (b *ElasticSearchIndexer) setAvailability(available bool) {
+	b.lock.Lock()
+	defer b.lock.Unlock()
+
+	if b.available == available {
+		return
+	}
+
+	b.available = available
+	if b.availabilityCallback != nil {
+		// Call the callback from within the lock to ensure that the ordering remains correct
+		b.availabilityCallback(b.available)
+	}
+}
diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go
index 729981ec71..3aaa27eed2 100644
--- a/modules/indexer/issues/indexer.go
+++ b/modules/indexer/issues/indexer.go
@@ -47,9 +47,11 @@ type SearchResult struct {
 // Indexer defines an interface to indexer issues contents
 type Indexer interface {
 	Init() (bool, error)
+	Ping() bool
+	SetAvailabilityChangeCallback(callback func(bool))
 	Index(issue []*IndexerData) error
 	Delete(ids ...int64) error
-	Search(kw string, repoIDs []int64, limit, start int) (*SearchResult, error)
+	Search(ctx context.Context, kw string, repoIDs []int64, limit, start int) (*SearchResult, error)
 	Close()
 }
 
@@ -111,6 +113,7 @@ func InitIssueIndexer(syncReindex bool) {
 			}
 
 			iData := make([]*IndexerData, 0, len(data))
+			unhandled := make([]queue.Data, 0, len(data))
 			for _, datum := range data {
 				indexerData, ok := datum.(*IndexerData)
 				if !ok {
@@ -119,13 +122,34 @@ func InitIssueIndexer(syncReindex bool) {
 				}
 				log.Trace("IndexerData Process: %d %v %t", indexerData.ID, indexerData.IDs, indexerData.IsDelete)
 				if indexerData.IsDelete {
-					_ = indexer.Delete(indexerData.IDs...)
+					if err := indexer.Delete(indexerData.IDs...); err != nil {
+						log.Error("Error whilst deleting from index: %v Error: %v", indexerData.IDs, err)
+						if indexer.Ping() {
+							continue
+						}
+						// Add back to queue
+						unhandled = append(unhandled, datum)
+					}
 					continue
 				}
 				iData = append(iData, indexerData)
 			}
+			if len(unhandled) > 0 {
+				for _, indexerData := range iData {
+					unhandled = append(unhandled, indexerData)
+				}
+				return unhandled
+			}
 			if err := indexer.Index(iData); err != nil {
 				log.Error("Error whilst indexing: %v Error: %v", iData, err)
+				if indexer.Ping() {
+					return nil
+				}
+				// Add back to queue
+				for _, indexerData := range iData {
+					unhandled = append(unhandled, indexerData)
+				}
+				return unhandled
 			}
 			return nil
 		}
@@ -193,6 +217,18 @@ func InitIssueIndexer(syncReindex bool) {
 			log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType)
 		}
 
+		if queue, ok := issueIndexerQueue.(queue.Pausable); ok {
+			holder.get().SetAvailabilityChangeCallback(func(available bool) {
+				if !available {
+					log.Info("Issue index queue paused")
+					queue.Pause()
+				} else {
+					log.Info("Issue index queue resumed")
+					queue.Resume()
+				}
+			})
+		}
+
 		// Start processing the queue
 		go graceful.GetManager().RunWithShutdownFns(issueIndexerQueue.Run)
 
@@ -334,7 +370,7 @@ func DeleteRepoIssueIndexer(repo *repo_model.Repository) {
 
 // SearchIssuesByKeyword search issue ids by keywords and repo id
 // WARNNING: You have to ensure user have permission to visit repoIDs' issues
-func SearchIssuesByKeyword(repoIDs []int64, keyword string) ([]int64, error) {
+func SearchIssuesByKeyword(ctx context.Context, repoIDs []int64, keyword string) ([]int64, error) {
 	var issueIDs []int64
 	indexer := holder.get()
 
@@ -342,7 +378,7 @@ func SearchIssuesByKeyword(repoIDs []int64, keyword string) ([]int64, error) {
 		log.Error("SearchIssuesByKeyword(): unable to get indexer!")
 		return nil, fmt.Errorf("unable to get issue indexer")
 	}
-	res, err := indexer.Search(keyword, repoIDs, 50, 0)
+	res, err := indexer.Search(ctx, keyword, repoIDs, 50, 0)
 	if err != nil {
 		return nil, err
 	}
@@ -351,3 +387,14 @@ func SearchIssuesByKeyword(repoIDs []int64, keyword string) ([]int64, error) {
 	}
 	return issueIDs, nil
 }
+
+// IsAvailable checks if issue indexer is available
+func IsAvailable() bool {
+	indexer := holder.get()
+	if indexer == nil {
+		log.Error("IsAvailable(): unable to get indexer!")
+		return false
+	}
+
+	return indexer.Ping()
+}
diff --git a/modules/indexer/issues/indexer_test.go b/modules/indexer/issues/indexer_test.go
index ee6ebcdd18..d516615b56 100644
--- a/modules/indexer/issues/indexer_test.go
+++ b/modules/indexer/issues/indexer_test.go
@@ -5,6 +5,7 @@
 package issues
 
 import (
+	"context"
 	"os"
 	"path"
 	"path/filepath"
@@ -56,19 +57,19 @@ func TestBleveSearchIssues(t *testing.T) {
 
 	time.Sleep(5 * time.Second)
 
-	ids, err := SearchIssuesByKeyword([]int64{1}, "issue2")
+	ids, err := SearchIssuesByKeyword(context.TODO(), []int64{1}, "issue2")
 	assert.NoError(t, err)
 	assert.EqualValues(t, []int64{2}, ids)
 
-	ids, err = SearchIssuesByKeyword([]int64{1}, "first")
+	ids, err = SearchIssuesByKeyword(context.TODO(), []int64{1}, "first")
 	assert.NoError(t, err)
 	assert.EqualValues(t, []int64{1}, ids)
 
-	ids, err = SearchIssuesByKeyword([]int64{1}, "for")
+	ids, err = SearchIssuesByKeyword(context.TODO(), []int64{1}, "for")
 	assert.NoError(t, err)
 	assert.ElementsMatch(t, []int64{1, 2, 3, 5, 11}, ids)
 
-	ids, err = SearchIssuesByKeyword([]int64{1}, "good")
+	ids, err = SearchIssuesByKeyword(context.TODO(), []int64{1}, "good")
 	assert.NoError(t, err)
 	assert.EqualValues(t, []int64{1}, ids)
 }
@@ -79,19 +80,19 @@ func TestDBSearchIssues(t *testing.T) {
 	setting.Indexer.IssueType = "db"
 	InitIssueIndexer(true)
 
-	ids, err := SearchIssuesByKeyword([]int64{1}, "issue2")
+	ids, err := SearchIssuesByKeyword(context.TODO(), []int64{1}, "issue2")
 	assert.NoError(t, err)
 	assert.EqualValues(t, []int64{2}, ids)
 
-	ids, err = SearchIssuesByKeyword([]int64{1}, "first")
+	ids, err = SearchIssuesByKeyword(context.TODO(), []int64{1}, "first")
 	assert.NoError(t, err)
 	assert.EqualValues(t, []int64{1}, ids)
 
-	ids, err = SearchIssuesByKeyword([]int64{1}, "for")
+	ids, err = SearchIssuesByKeyword(context.TODO(), []int64{1}, "for")
 	assert.NoError(t, err)
 	assert.ElementsMatch(t, []int64{1, 2, 3, 5, 11}, ids)
 
-	ids, err = SearchIssuesByKeyword([]int64{1}, "good")
+	ids, err = SearchIssuesByKeyword(context.TODO(), []int64{1}, "good")
 	assert.NoError(t, err)
 	assert.EqualValues(t, []int64{1}, ids)
 }
-- 
cgit v1.2.3