aboutsummaryrefslogtreecommitdiffstats
path: root/modules/indexer/issues/elastic_search.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/indexer/issues/elastic_search.go')
-rw-r--r--modules/indexer/issues/elastic_search.go123
1 files changed, 104 insertions, 19 deletions
diff --git a/modules/indexer/issues/elastic_search.go b/modules/indexer/issues/elastic_search.go
index 187b69b749..97e32b8975 100644
--- a/modules/indexer/issues/elastic_search.go
+++ b/modules/indexer/issues/elastic_search.go
@@ -8,9 +8,12 @@ import (
"context"
"errors"
"fmt"
+ "net"
"strconv"
+ "sync"
"time"
+ "code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
"github.com/olivere/elastic/v7"
@@ -20,8 +23,12 @@ var _ Indexer = &ElasticSearchIndexer{}
// ElasticSearchIndexer implements Indexer interface
type ElasticSearchIndexer struct {
- client *elastic.Client
- indexerName string
+ client *elastic.Client
+ indexerName string
+ available bool
+ availabilityCallback func(bool)
+ stopTimer chan struct{}
+ lock sync.RWMutex
}
type elasticLogger struct {
@@ -56,10 +63,27 @@ func NewElasticSearchIndexer(url, indexerName string) (*ElasticSearchIndexer, er
return nil, err
}
- return &ElasticSearchIndexer{
+ indexer := &ElasticSearchIndexer{
client: client,
indexerName: indexerName,
- }, nil
+ available: true,
+ stopTimer: make(chan struct{}),
+ }
+
+ ticker := time.NewTicker(10 * time.Second)
+ go func() {
+ for {
+ select {
+ case <-ticker.C:
+ indexer.checkAvailability()
+ case <-indexer.stopTimer:
+ ticker.Stop()
+ return
+ }
+ }
+ }()
+
+ return indexer, nil
}
const (
@@ -93,10 +117,10 @@ const (
// Init will initialize the indexer
func (b *ElasticSearchIndexer) Init() (bool, error) {
- ctx := context.Background()
+ ctx := graceful.GetManager().HammerContext()
exists, err := b.client.IndexExists(b.indexerName).Do(ctx)
if err != nil {
- return false, err
+ return false, b.checkError(err)
}
if !exists {
@@ -104,7 +128,7 @@ func (b *ElasticSearchIndexer) Init() (bool, error) {
createIndex, err := b.client.CreateIndex(b.indexerName).BodyString(mapping).Do(ctx)
if err != nil {
- return false, err
+ return false, b.checkError(err)
}
if !createIndex.Acknowledged {
return false, errors.New("init failed")
@@ -115,6 +139,20 @@ func (b *ElasticSearchIndexer) Init() (bool, error) {
return true, nil
}
+// SetAvailabilityChangeCallback sets callback that will be triggered when availability changes
+func (b *ElasticSearchIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
+ b.lock.Lock()
+ defer b.lock.Unlock()
+ b.availabilityCallback = callback
+}
+
+// Ping checks if elastic is available
+func (b *ElasticSearchIndexer) Ping() bool {
+ b.lock.RLock()
+ defer b.lock.RUnlock()
+ return b.available
+}
+
// Index will save the index data
func (b *ElasticSearchIndexer) Index(issues []*IndexerData) error {
if len(issues) == 0 {
@@ -131,8 +169,8 @@ func (b *ElasticSearchIndexer) Index(issues []*IndexerData) error {
"content": issue.Content,
"comments": issue.Comments,
}).
- Do(context.Background())
- return err
+ Do(graceful.GetManager().HammerContext())
+ return b.checkError(err)
}
reqs := make([]elastic.BulkableRequest, 0)
@@ -154,8 +192,8 @@ func (b *ElasticSearchIndexer) Index(issues []*IndexerData) error {
_, err := b.client.Bulk().
Index(b.indexerName).
Add(reqs...).
- Do(context.Background())
- return err
+ Do(graceful.GetManager().HammerContext())
+ return b.checkError(err)
}
// Delete deletes indexes by ids
@@ -166,8 +204,8 @@ func (b *ElasticSearchIndexer) Delete(ids ...int64) error {
_, err := b.client.Delete().
Index(b.indexerName).
Id(fmt.Sprintf("%d", ids[0])).
- Do(context.Background())
- return err
+ Do(graceful.GetManager().HammerContext())
+ return b.checkError(err)
}
reqs := make([]elastic.BulkableRequest, 0)
@@ -182,13 +220,13 @@ func (b *ElasticSearchIndexer) Delete(ids ...int64) error {
_, err := b.client.Bulk().
Index(b.indexerName).
Add(reqs...).
- Do(context.Background())
- return err
+ Do(graceful.GetManager().HammerContext())
+ return b.checkError(err)
}
// Search searches for issues by given conditions.
// Returns the matching issue IDs
-func (b *ElasticSearchIndexer) Search(keyword string, repoIDs []int64, limit, start int) (*SearchResult, error) {
+func (b *ElasticSearchIndexer) Search(ctx context.Context, keyword string, repoIDs []int64, limit, start int) (*SearchResult, error) {
kwQuery := elastic.NewMultiMatchQuery(keyword, "title", "content", "comments")
query := elastic.NewBoolQuery()
query = query.Must(kwQuery)
@@ -205,9 +243,9 @@ func (b *ElasticSearchIndexer) Search(keyword string, repoIDs []int64, limit, st
Query(query).
Sort("_score", false).
From(start).Size(limit).
- Do(context.Background())
+ Do(ctx)
if err != nil {
- return nil, err
+ return nil, b.checkError(err)
}
hits := make([]Match, 0, limit)
@@ -225,4 +263,51 @@ func (b *ElasticSearchIndexer) Search(keyword string, repoIDs []int64, limit, st
}
// Close implements indexer
-func (b *ElasticSearchIndexer) Close() {}
+func (b *ElasticSearchIndexer) Close() {
+ select {
+ case <-b.stopTimer:
+ default:
+ close(b.stopTimer)
+ }
+}
+
+func (b *ElasticSearchIndexer) checkError(err error) error {
+ var opErr *net.OpError
+ if !(elastic.IsConnErr(err) || (errors.As(err, &opErr) && (opErr.Op == "dial" || opErr.Op == "read"))) {
+ return err
+ }
+
+ b.setAvailability(false)
+
+ return err
+}
+
+func (b *ElasticSearchIndexer) checkAvailability() {
+ if b.Ping() {
+ return
+ }
+
+ // Request cluster state to check if elastic is available again
+ _, err := b.client.ClusterState().Do(graceful.GetManager().ShutdownContext())
+ if err != nil {
+ b.setAvailability(false)
+ return
+ }
+
+ b.setAvailability(true)
+}
+
+func (b *ElasticSearchIndexer) setAvailability(available bool) {
+ b.lock.Lock()
+ defer b.lock.Unlock()
+
+ if b.available == available {
+ return
+ }
+
+ b.available = available
+ if b.availabilityCallback != nil {
+ // Call the callback from within the lock to ensure that the ordering remains correct
+ b.availabilityCallback(b.available)
+ }
+}