aboutsummaryrefslogtreecommitdiffstats
path: root/modules/indexer/internal
diff options
context:
space:
mode:
authorJason Song <i@wolfogre.com>2023-06-23 20:37:56 +0800
committerGitHub <noreply@github.com>2023-06-23 12:37:56 +0000
commit375fd15fbfc29787323840688bacdfa0a9e9e414 (patch)
tree69c82fdad0fad24e25d2b6ddd2e5cb7766b9768b /modules/indexer/internal
parentb0215c40cdf9a3e46a45a3823b894998d1044cda (diff)
downloadgitea-375fd15fbfc29787323840688bacdfa0a9e9e414.tar.gz
gitea-375fd15fbfc29787323840688bacdfa0a9e9e414.zip
Refactor indexer (#25174)
Refactor `modules/indexer` to make it more maintainable. And it can be easier to support more features. I'm trying to solve some of issue searching, this is a precursor to making functional changes. Current supported engines and the index versions: | engines | issues | code | | - | - | - | | db | Just a wrapper for database queries, doesn't need version | - | | bleve | The version of index is **2** | The version of index is **6** | | elasticsearch | The old index has no version, will be treated as version **0** in this PR | The version of index is **1** | | meilisearch | The old index has no version, will be treated as version **0** in this PR | - | ## Changes ### Split Splited it into mutiple packages ```text indexer ├── internal │   ├── bleve │   ├── db │   ├── elasticsearch │   └── meilisearch ├── code │   ├── bleve │   ├── elasticsearch │   └── internal └── issues ├── bleve ├── db ├── elasticsearch ├── internal └── meilisearch ``` - `indexer/interanal`: Internal shared package for indexer. - `indexer/interanal/[engine]`: Internal shared package for each engine (bleve/db/elasticsearch/meilisearch). - `indexer/code`: Implementations for code indexer. - `indexer/code/internal`: Internal shared package for code indexer. - `indexer/code/[engine]`: Implementation via each engine for code indexer. - `indexer/issues`: Implementations for issues indexer. ### Deduplication - Combine `Init/Ping/Close` for code indexer and issues indexer. - ~Combine `issues.indexerHolder` and `code.wrappedIndexer` to `internal.IndexHolder`.~ Remove it, use dummy indexer instead when the indexer is not ready. - Duplicate two copies of creating ES clients. - Duplicate two copies of `indexerID()`. ### Enhancement - [x] Support index version for elasticsearch issues indexer, the old index without version will be treated as version 0. - [x] Fix spell of `elastic_search/ElasticSearch`, it should be `Elasticsearch`. - [x] Improve versioning of ES index. We don't need `Aliases`: - Gitea does't need aliases for "Zero Downtime" because it never delete old indexes. - The old code of issues indexer uses the orignal name to create issue index, so it's tricky to convert it to an alias. - [x] Support index version for meilisearch issues indexer, the old index without version will be treated as version 0. - [x] Do "ping" only when `Ping` has been called, don't ping periodically and cache the status. - [x] Support the context parameter whenever possible. - [x] Fix outdated example config. - [x] Give up the requeue logic of issues indexer: When indexing fails, call Ping to check if it was caused by the engine being unavailable, and only requeue the task if the engine is unavailable. - It is fragile and tricky, could cause data losing (It did happen when I was doing some tests for this PR). And it works for ES only. - Just always requeue the failed task, if it caused by bad data, it's a bug of Gitea which should be fixed. --------- Co-authored-by: Giteabot <teabot@gitea.io>
Diffstat (limited to 'modules/indexer/internal')
-rw-r--r--modules/indexer/internal/base32.go21
-rw-r--r--modules/indexer/internal/bleve/batch.go58
-rw-r--r--modules/indexer/internal/bleve/indexer.go103
-rw-r--r--modules/indexer/internal/bleve/util.go49
-rw-r--r--modules/indexer/internal/db/indexer.go33
-rw-r--r--modules/indexer/internal/elasticsearch/indexer.go92
-rw-r--r--modules/indexer/internal/elasticsearch/util.go68
-rw-r--r--modules/indexer/internal/indexer.go37
-rw-r--r--modules/indexer/internal/meilisearch/indexer.go92
-rw-r--r--modules/indexer/internal/meilisearch/util.go38
10 files changed, 591 insertions, 0 deletions
diff --git a/modules/indexer/internal/base32.go b/modules/indexer/internal/base32.go
new file mode 100644
index 0000000000..aca756c638
--- /dev/null
+++ b/modules/indexer/internal/base32.go
@@ -0,0 +1,21 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package internal
+
+import (
+ "fmt"
+ "strconv"
+)
+
+func Base36(i int64) string {
+ return strconv.FormatInt(i, 36)
+}
+
+func ParseBase36(s string) (int64, error) {
+ i, err := strconv.ParseInt(s, 36, 64)
+ if err != nil {
+ return 0, fmt.Errorf("invalid base36 integer %q: %w", s, err)
+ }
+ return i, nil
+}
diff --git a/modules/indexer/internal/bleve/batch.go b/modules/indexer/internal/bleve/batch.go
new file mode 100644
index 0000000000..77675147b2
--- /dev/null
+++ b/modules/indexer/internal/bleve/batch.go
@@ -0,0 +1,58 @@
+// Copyright 2021 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package bleve
+
+import (
+ "github.com/blevesearch/bleve/v2"
+)
+
+// FlushingBatch is a batch of operations that automatically flushes to the
+// underlying index once it reaches a certain size.
+type FlushingBatch struct {
+ maxBatchSize int
+ batch *bleve.Batch
+ index bleve.Index
+}
+
+// NewFlushingBatch creates a new flushing batch for the specified index. Once
+// the number of operations in the batch reaches the specified limit, the batch
+// automatically flushes its operations to the index.
+func NewFlushingBatch(index bleve.Index, maxBatchSize int) *FlushingBatch {
+ return &FlushingBatch{
+ maxBatchSize: maxBatchSize,
+ batch: index.NewBatch(),
+ index: index,
+ }
+}
+
+// Index add a new index to batch
+func (b *FlushingBatch) Index(id string, data interface{}) error {
+ if err := b.batch.Index(id, data); err != nil {
+ return err
+ }
+ return b.flushIfFull()
+}
+
+// Delete add a delete index to batch
+func (b *FlushingBatch) Delete(id string) error {
+ b.batch.Delete(id)
+ return b.flushIfFull()
+}
+
+func (b *FlushingBatch) flushIfFull() error {
+ if b.batch.Size() < b.maxBatchSize {
+ return nil
+ }
+ return b.Flush()
+}
+
+// Flush submit the batch and create a new one
+func (b *FlushingBatch) Flush() error {
+ err := b.index.Batch(b.batch)
+ if err != nil {
+ return err
+ }
+ b.batch = b.index.NewBatch()
+ return nil
+}
diff --git a/modules/indexer/internal/bleve/indexer.go b/modules/indexer/internal/bleve/indexer.go
new file mode 100644
index 0000000000..ce06b5afcb
--- /dev/null
+++ b/modules/indexer/internal/bleve/indexer.go
@@ -0,0 +1,103 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package bleve
+
+import (
+ "context"
+ "fmt"
+
+ "code.gitea.io/gitea/modules/indexer/internal"
+ "code.gitea.io/gitea/modules/log"
+
+ "github.com/blevesearch/bleve/v2"
+ "github.com/blevesearch/bleve/v2/mapping"
+ "github.com/ethantkoenig/rupture"
+)
+
+var _ internal.Indexer = &Indexer{}
+
+// Indexer represents a basic bleve indexer implementation
+type Indexer struct {
+ Indexer bleve.Index
+
+ indexDir string
+ version int
+ mappingGetter MappingGetter
+}
+
+type MappingGetter func() (mapping.IndexMapping, error)
+
+func NewIndexer(indexDir string, version int, mappingGetter func() (mapping.IndexMapping, error)) *Indexer {
+ return &Indexer{
+ indexDir: indexDir,
+ version: version,
+ mappingGetter: mappingGetter,
+ }
+}
+
+// Init initializes the indexer
+func (i *Indexer) Init(_ context.Context) (bool, error) {
+ if i == nil {
+ return false, fmt.Errorf("cannot init nil indexer")
+ }
+
+ if i.Indexer != nil {
+ return false, fmt.Errorf("indexer is already initialized")
+ }
+
+ indexer, version, err := openIndexer(i.indexDir, i.version)
+ if err != nil {
+ return false, err
+ }
+ if indexer != nil {
+ i.Indexer = indexer
+ return true, nil
+ }
+
+ if version != 0 {
+ log.Warn("Found older bleve index with version %d, Gitea will remove it and rebuild", version)
+ }
+
+ indexMapping, err := i.mappingGetter()
+ if err != nil {
+ return false, err
+ }
+
+ indexer, err = bleve.New(i.indexDir, indexMapping)
+ if err != nil {
+ return false, err
+ }
+
+ if err = rupture.WriteIndexMetadata(i.indexDir, &rupture.IndexMetadata{
+ Version: i.version,
+ }); err != nil {
+ return false, err
+ }
+
+ i.Indexer = indexer
+
+ return false, nil
+}
+
+// Ping checks if the indexer is available
+func (i *Indexer) Ping(_ context.Context) error {
+ if i == nil {
+ return fmt.Errorf("cannot ping nil indexer")
+ }
+ if i.Indexer == nil {
+ return fmt.Errorf("indexer is not initialized")
+ }
+ return nil
+}
+
+func (i *Indexer) Close() {
+ if i == nil {
+ return
+ }
+
+ if err := i.Indexer.Close(); err != nil {
+ log.Error("Failed to close bleve indexer in %q: %v", i.indexDir, err)
+ }
+ i.Indexer = nil
+}
diff --git a/modules/indexer/internal/bleve/util.go b/modules/indexer/internal/bleve/util.go
new file mode 100644
index 0000000000..43a7c3c5ec
--- /dev/null
+++ b/modules/indexer/internal/bleve/util.go
@@ -0,0 +1,49 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package bleve
+
+import (
+ "errors"
+ "os"
+
+ "code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/util"
+
+ "github.com/blevesearch/bleve/v2"
+ "github.com/blevesearch/bleve/v2/index/upsidedown"
+ "github.com/ethantkoenig/rupture"
+)
+
+// openIndexer open the index at the specified path, checking for metadata
+// updates and bleve version updates. If index needs to be created (or
+// re-created), returns (nil, nil)
+func openIndexer(path string, latestVersion int) (bleve.Index, int, error) {
+ _, err := os.Stat(path)
+ if err != nil && os.IsNotExist(err) {
+ return nil, 0, nil
+ } else if err != nil {
+ return nil, 0, err
+ }
+
+ metadata, err := rupture.ReadIndexMetadata(path)
+ if err != nil {
+ return nil, 0, err
+ }
+ if metadata.Version < latestVersion {
+ // the indexer is using a previous version, so we should delete it and
+ // re-populate
+ return nil, metadata.Version, util.RemoveAll(path)
+ }
+
+ index, err := bleve.Open(path)
+ if err != nil {
+ if errors.Is(err, upsidedown.IncompatibleVersion) {
+ log.Warn("Indexer was built with a previous version of bleve, deleting and rebuilding")
+ return nil, 0, util.RemoveAll(path)
+ }
+ return nil, 0, err
+ }
+
+ return index, 0, nil
+}
diff --git a/modules/indexer/internal/db/indexer.go b/modules/indexer/internal/db/indexer.go
new file mode 100644
index 0000000000..3f7e00efbb
--- /dev/null
+++ b/modules/indexer/internal/db/indexer.go
@@ -0,0 +1,33 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package db
+
+import (
+ "context"
+
+ "code.gitea.io/gitea/modules/indexer/internal"
+)
+
+var _ internal.Indexer = &Indexer{}
+
+// Indexer represents a basic db indexer implementation
+type Indexer struct{}
+
+// Init initializes the indexer
+func (i *Indexer) Init(_ context.Context) (bool, error) {
+ // nothing to do
+ return false, nil
+}
+
+// Ping checks if the indexer is available
+func (i *Indexer) Ping(_ context.Context) error {
+ // No need to ping database to check if it is available.
+ // If the database goes down, Gitea will go down, so nobody will care if the indexer is available.
+ return nil
+}
+
+// Close closes the indexer
+func (i *Indexer) Close() {
+ // nothing to do
+}
diff --git a/modules/indexer/internal/elasticsearch/indexer.go b/modules/indexer/internal/elasticsearch/indexer.go
new file mode 100644
index 0000000000..2c60efad56
--- /dev/null
+++ b/modules/indexer/internal/elasticsearch/indexer.go
@@ -0,0 +1,92 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package elasticsearch
+
+import (
+ "context"
+ "fmt"
+
+ "code.gitea.io/gitea/modules/indexer/internal"
+
+ "github.com/olivere/elastic/v7"
+)
+
+var _ internal.Indexer = &Indexer{}
+
+// Indexer represents a basic elasticsearch indexer implementation
+type Indexer struct {
+ Client *elastic.Client
+
+ url string
+ indexName string
+ version int
+ mapping string
+}
+
+func NewIndexer(url, indexName string, version int, mapping string) *Indexer {
+ return &Indexer{
+ url: url,
+ indexName: indexName,
+ version: version,
+ mapping: mapping,
+ }
+}
+
+// Init initializes the indexer
+func (i *Indexer) Init(ctx context.Context) (bool, error) {
+ if i == nil {
+ return false, fmt.Errorf("cannot init nil indexer")
+ }
+ if i.Client != nil {
+ return false, fmt.Errorf("indexer is already initialized")
+ }
+
+ client, err := i.initClient()
+ if err != nil {
+ return false, err
+ }
+ i.Client = client
+
+ exists, err := i.Client.IndexExists(i.VersionedIndexName()).Do(ctx)
+ if err != nil {
+ return false, err
+ }
+ if exists {
+ return true, nil
+ }
+
+ if err := i.createIndex(ctx); err != nil {
+ return false, err
+ }
+
+ return exists, nil
+}
+
+// Ping checks if the indexer is available
+func (i *Indexer) Ping(ctx context.Context) error {
+ if i == nil {
+ return fmt.Errorf("cannot ping nil indexer")
+ }
+ if i.Client == nil {
+ return fmt.Errorf("indexer is not initialized")
+ }
+
+ resp, err := i.Client.ClusterHealth().Do(ctx)
+ if err != nil {
+ return err
+ }
+ if resp.Status != "green" {
+ // see https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-health.html
+ return fmt.Errorf("status of elasticsearch cluster is %s", resp.Status)
+ }
+ return nil
+}
+
+// Close closes the indexer
+func (i *Indexer) Close() {
+ if i == nil {
+ return
+ }
+ i.Client = nil
+}
diff --git a/modules/indexer/internal/elasticsearch/util.go b/modules/indexer/internal/elasticsearch/util.go
new file mode 100644
index 0000000000..9e034bd553
--- /dev/null
+++ b/modules/indexer/internal/elasticsearch/util.go
@@ -0,0 +1,68 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package elasticsearch
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "code.gitea.io/gitea/modules/log"
+
+ "github.com/olivere/elastic/v7"
+)
+
+// VersionedIndexName returns the full index name with version
+func (i *Indexer) VersionedIndexName() string {
+ return versionedIndexName(i.indexName, i.version)
+}
+
+func versionedIndexName(indexName string, version int) string {
+ if version == 0 {
+ // Old index name without version
+ return indexName
+ }
+ return fmt.Sprintf("%s.v%d", indexName, version)
+}
+
+func (i *Indexer) createIndex(ctx context.Context) error {
+ createIndex, err := i.Client.CreateIndex(i.VersionedIndexName()).BodyString(i.mapping).Do(ctx)
+ if err != nil {
+ return err
+ }
+ if !createIndex.Acknowledged {
+ return fmt.Errorf("create index %s with %s failed", i.VersionedIndexName(), i.mapping)
+ }
+
+ i.checkOldIndexes(ctx)
+
+ return nil
+}
+
+func (i *Indexer) initClient() (*elastic.Client, error) {
+ opts := []elastic.ClientOptionFunc{
+ elastic.SetURL(i.url),
+ elastic.SetSniff(false),
+ elastic.SetHealthcheckInterval(10 * time.Second),
+ elastic.SetGzip(false),
+ }
+
+ logger := log.GetLogger(log.DEFAULT)
+
+ opts = append(opts, elastic.SetTraceLog(&log.PrintfLogger{Logf: logger.Trace}))
+ opts = append(opts, elastic.SetInfoLog(&log.PrintfLogger{Logf: logger.Info}))
+ opts = append(opts, elastic.SetErrorLog(&log.PrintfLogger{Logf: logger.Error}))
+
+ return elastic.NewClient(opts...)
+}
+
+func (i *Indexer) checkOldIndexes(ctx context.Context) {
+ for v := 0; v < i.version; v++ {
+ indexName := versionedIndexName(i.indexName, v)
+ exists, err := i.Client.IndexExists(indexName).Do(ctx)
+ if err == nil && exists {
+ log.Warn("Found older elasticsearch index named %q, Gitea will keep the old NOT DELETED. You can delete the old version after the upgrade succeed.", indexName)
+ }
+ }
+}
diff --git a/modules/indexer/internal/indexer.go b/modules/indexer/internal/indexer.go
new file mode 100644
index 0000000000..c7f356da1e
--- /dev/null
+++ b/modules/indexer/internal/indexer.go
@@ -0,0 +1,37 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package internal
+
+import (
+ "context"
+ "fmt"
+)
+
+// Indexer defines an basic indexer interface
+type Indexer interface {
+ // Init initializes the indexer
+ // returns true if the index was opened/existed (with data populated), false if it was created/not-existed (with no data)
+ Init(ctx context.Context) (bool, error)
+ // Ping checks if the indexer is available
+ Ping(ctx context.Context) error
+ // Close closes the indexer
+ Close()
+}
+
+// NewDummyIndexer returns a dummy indexer
+func NewDummyIndexer() Indexer {
+ return &dummyIndexer{}
+}
+
+type dummyIndexer struct{}
+
+func (d *dummyIndexer) Init(ctx context.Context) (bool, error) {
+ return false, fmt.Errorf("indexer is not ready")
+}
+
+func (d *dummyIndexer) Ping(ctx context.Context) error {
+ return fmt.Errorf("indexer is not ready")
+}
+
+func (d *dummyIndexer) Close() {}
diff --git a/modules/indexer/internal/meilisearch/indexer.go b/modules/indexer/internal/meilisearch/indexer.go
new file mode 100644
index 0000000000..06747ff7e0
--- /dev/null
+++ b/modules/indexer/internal/meilisearch/indexer.go
@@ -0,0 +1,92 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package meilisearch
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/meilisearch/meilisearch-go"
+)
+
+// Indexer represents a basic meilisearch indexer implementation
+type Indexer struct {
+ Client *meilisearch.Client
+
+ url, apiKey string
+ indexName string
+ version int
+}
+
+func NewIndexer(url, apiKey, indexName string, version int) *Indexer {
+ return &Indexer{
+ url: url,
+ apiKey: apiKey,
+ indexName: indexName,
+ version: version,
+ }
+}
+
+// Init initializes the indexer
+func (i *Indexer) Init(_ context.Context) (bool, error) {
+ if i == nil {
+ return false, fmt.Errorf("cannot init nil indexer")
+ }
+
+ if i.Client != nil {
+ return false, fmt.Errorf("indexer is already initialized")
+ }
+
+ i.Client = meilisearch.NewClient(meilisearch.ClientConfig{
+ Host: i.url,
+ APIKey: i.apiKey,
+ })
+
+ _, err := i.Client.GetIndex(i.VersionedIndexName())
+ if err == nil {
+ return true, nil
+ }
+ _, err = i.Client.CreateIndex(&meilisearch.IndexConfig{
+ Uid: i.VersionedIndexName(),
+ PrimaryKey: "id",
+ })
+ if err != nil {
+ return false, err
+ }
+
+ i.checkOldIndexes()
+
+ _, err = i.Client.Index(i.VersionedIndexName()).UpdateFilterableAttributes(&[]string{"repo_id"})
+ return false, err
+}
+
+// Ping checks if the indexer is available
+func (i *Indexer) Ping(ctx context.Context) error {
+ if i == nil {
+ return fmt.Errorf("cannot ping nil indexer")
+ }
+ if i.Client == nil {
+ return fmt.Errorf("indexer is not initialized")
+ }
+ resp, err := i.Client.Health()
+ if err != nil {
+ return err
+ }
+ if resp.Status != "available" {
+ // See https://docs.meilisearch.com/reference/api/health.html#status
+ return fmt.Errorf("status of meilisearch is not available: %s", resp.Status)
+ }
+ return nil
+}
+
+// Close closes the indexer
+func (i *Indexer) Close() {
+ if i == nil {
+ return
+ }
+ if i.Client == nil {
+ return
+ }
+ i.Client = nil
+}
diff --git a/modules/indexer/internal/meilisearch/util.go b/modules/indexer/internal/meilisearch/util.go
new file mode 100644
index 0000000000..e6d8fefade
--- /dev/null
+++ b/modules/indexer/internal/meilisearch/util.go
@@ -0,0 +1,38 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package meilisearch
+
+import (
+ "fmt"
+
+ "code.gitea.io/gitea/modules/log"
+)
+
+// VersionedIndexName returns the full index name with version
+func (i *Indexer) VersionedIndexName() string {
+ return versionedIndexName(i.indexName, i.version)
+}
+
+func versionedIndexName(indexName string, version int) string {
+ if version == 0 {
+ // Old index name without version
+ return indexName
+ }
+
+ // The format of the index name is <index_name>_v<version>, not <index_name>.v<version> like elasticsearch.
+ // Because meilisearch does not support "." in index name, it should contain only alphanumeric characters, hyphens (-) and underscores (_).
+ // See https://www.meilisearch.com/docs/learn/core_concepts/indexes#index-uid
+
+ return fmt.Sprintf("%s_v%d", indexName, version)
+}
+
+func (i *Indexer) checkOldIndexes() {
+ for v := 0; v < i.version; v++ {
+ indexName := versionedIndexName(i.indexName, v)
+ _, err := i.Client.GetIndex(indexName)
+ if err == nil {
+ log.Warn("Found older meilisearch index named %q, Gitea will keep the old NOT DELETED. You can delete the old version after the upgrade succeed.", indexName)
+ }
+ }
+}