diff options
author | Jason Song <i@wolfogre.com> | 2023-06-23 20:37:56 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-06-23 12:37:56 +0000 |
commit | 375fd15fbfc29787323840688bacdfa0a9e9e414 (patch) | |
tree | 69c82fdad0fad24e25d2b6ddd2e5cb7766b9768b /modules/indexer/internal | |
parent | b0215c40cdf9a3e46a45a3823b894998d1044cda (diff) | |
download | gitea-375fd15fbfc29787323840688bacdfa0a9e9e414.tar.gz gitea-375fd15fbfc29787323840688bacdfa0a9e9e414.zip |
Refactor indexer (#25174)
Refactor `modules/indexer` to make it more maintainable. And it can be
easier to support more features. I'm trying to solve some of issue
searching, this is a precursor to making functional changes.
Current supported engines and the index versions:
| engines | issues | code |
| - | - | - |
| db | Just a wrapper for database queries, doesn't need version | - |
| bleve | The version of index is **2** | The version of index is **6**
|
| elasticsearch | The old index has no version, will be treated as
version **0** in this PR | The version of index is **1** |
| meilisearch | The old index has no version, will be treated as version
**0** in this PR | - |
## Changes
### Split
Splited it into mutiple packages
```text
indexer
├── internal
│ ├── bleve
│ ├── db
│ ├── elasticsearch
│ └── meilisearch
├── code
│ ├── bleve
│ ├── elasticsearch
│ └── internal
└── issues
├── bleve
├── db
├── elasticsearch
├── internal
└── meilisearch
```
- `indexer/interanal`: Internal shared package for indexer.
- `indexer/interanal/[engine]`: Internal shared package for each engine
(bleve/db/elasticsearch/meilisearch).
- `indexer/code`: Implementations for code indexer.
- `indexer/code/internal`: Internal shared package for code indexer.
- `indexer/code/[engine]`: Implementation via each engine for code
indexer.
- `indexer/issues`: Implementations for issues indexer.
### Deduplication
- Combine `Init/Ping/Close` for code indexer and issues indexer.
- ~Combine `issues.indexerHolder` and `code.wrappedIndexer` to
`internal.IndexHolder`.~ Remove it, use dummy indexer instead when the
indexer is not ready.
- Duplicate two copies of creating ES clients.
- Duplicate two copies of `indexerID()`.
### Enhancement
- [x] Support index version for elasticsearch issues indexer, the old
index without version will be treated as version 0.
- [x] Fix spell of `elastic_search/ElasticSearch`, it should be
`Elasticsearch`.
- [x] Improve versioning of ES index. We don't need `Aliases`:
- Gitea does't need aliases for "Zero Downtime" because it never delete
old indexes.
- The old code of issues indexer uses the orignal name to create issue
index, so it's tricky to convert it to an alias.
- [x] Support index version for meilisearch issues indexer, the old
index without version will be treated as version 0.
- [x] Do "ping" only when `Ping` has been called, don't ping
periodically and cache the status.
- [x] Support the context parameter whenever possible.
- [x] Fix outdated example config.
- [x] Give up the requeue logic of issues indexer: When indexing fails,
call Ping to check if it was caused by the engine being unavailable, and
only requeue the task if the engine is unavailable.
- It is fragile and tricky, could cause data losing (It did happen when
I was doing some tests for this PR). And it works for ES only.
- Just always requeue the failed task, if it caused by bad data, it's a
bug of Gitea which should be fixed.
---------
Co-authored-by: Giteabot <teabot@gitea.io>
Diffstat (limited to 'modules/indexer/internal')
-rw-r--r-- | modules/indexer/internal/base32.go | 21 | ||||
-rw-r--r-- | modules/indexer/internal/bleve/batch.go | 58 | ||||
-rw-r--r-- | modules/indexer/internal/bleve/indexer.go | 103 | ||||
-rw-r--r-- | modules/indexer/internal/bleve/util.go | 49 | ||||
-rw-r--r-- | modules/indexer/internal/db/indexer.go | 33 | ||||
-rw-r--r-- | modules/indexer/internal/elasticsearch/indexer.go | 92 | ||||
-rw-r--r-- | modules/indexer/internal/elasticsearch/util.go | 68 | ||||
-rw-r--r-- | modules/indexer/internal/indexer.go | 37 | ||||
-rw-r--r-- | modules/indexer/internal/meilisearch/indexer.go | 92 | ||||
-rw-r--r-- | modules/indexer/internal/meilisearch/util.go | 38 |
10 files changed, 591 insertions, 0 deletions
diff --git a/modules/indexer/internal/base32.go b/modules/indexer/internal/base32.go new file mode 100644 index 0000000000..aca756c638 --- /dev/null +++ b/modules/indexer/internal/base32.go @@ -0,0 +1,21 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package internal + +import ( + "fmt" + "strconv" +) + +func Base36(i int64) string { + return strconv.FormatInt(i, 36) +} + +func ParseBase36(s string) (int64, error) { + i, err := strconv.ParseInt(s, 36, 64) + if err != nil { + return 0, fmt.Errorf("invalid base36 integer %q: %w", s, err) + } + return i, nil +} diff --git a/modules/indexer/internal/bleve/batch.go b/modules/indexer/internal/bleve/batch.go new file mode 100644 index 0000000000..77675147b2 --- /dev/null +++ b/modules/indexer/internal/bleve/batch.go @@ -0,0 +1,58 @@ +// Copyright 2021 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package bleve + +import ( + "github.com/blevesearch/bleve/v2" +) + +// FlushingBatch is a batch of operations that automatically flushes to the +// underlying index once it reaches a certain size. +type FlushingBatch struct { + maxBatchSize int + batch *bleve.Batch + index bleve.Index +} + +// NewFlushingBatch creates a new flushing batch for the specified index. Once +// the number of operations in the batch reaches the specified limit, the batch +// automatically flushes its operations to the index. +func NewFlushingBatch(index bleve.Index, maxBatchSize int) *FlushingBatch { + return &FlushingBatch{ + maxBatchSize: maxBatchSize, + batch: index.NewBatch(), + index: index, + } +} + +// Index add a new index to batch +func (b *FlushingBatch) Index(id string, data interface{}) error { + if err := b.batch.Index(id, data); err != nil { + return err + } + return b.flushIfFull() +} + +// Delete add a delete index to batch +func (b *FlushingBatch) Delete(id string) error { + b.batch.Delete(id) + return b.flushIfFull() +} + +func (b *FlushingBatch) flushIfFull() error { + if b.batch.Size() < b.maxBatchSize { + return nil + } + return b.Flush() +} + +// Flush submit the batch and create a new one +func (b *FlushingBatch) Flush() error { + err := b.index.Batch(b.batch) + if err != nil { + return err + } + b.batch = b.index.NewBatch() + return nil +} diff --git a/modules/indexer/internal/bleve/indexer.go b/modules/indexer/internal/bleve/indexer.go new file mode 100644 index 0000000000..ce06b5afcb --- /dev/null +++ b/modules/indexer/internal/bleve/indexer.go @@ -0,0 +1,103 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package bleve + +import ( + "context" + "fmt" + + "code.gitea.io/gitea/modules/indexer/internal" + "code.gitea.io/gitea/modules/log" + + "github.com/blevesearch/bleve/v2" + "github.com/blevesearch/bleve/v2/mapping" + "github.com/ethantkoenig/rupture" +) + +var _ internal.Indexer = &Indexer{} + +// Indexer represents a basic bleve indexer implementation +type Indexer struct { + Indexer bleve.Index + + indexDir string + version int + mappingGetter MappingGetter +} + +type MappingGetter func() (mapping.IndexMapping, error) + +func NewIndexer(indexDir string, version int, mappingGetter func() (mapping.IndexMapping, error)) *Indexer { + return &Indexer{ + indexDir: indexDir, + version: version, + mappingGetter: mappingGetter, + } +} + +// Init initializes the indexer +func (i *Indexer) Init(_ context.Context) (bool, error) { + if i == nil { + return false, fmt.Errorf("cannot init nil indexer") + } + + if i.Indexer != nil { + return false, fmt.Errorf("indexer is already initialized") + } + + indexer, version, err := openIndexer(i.indexDir, i.version) + if err != nil { + return false, err + } + if indexer != nil { + i.Indexer = indexer + return true, nil + } + + if version != 0 { + log.Warn("Found older bleve index with version %d, Gitea will remove it and rebuild", version) + } + + indexMapping, err := i.mappingGetter() + if err != nil { + return false, err + } + + indexer, err = bleve.New(i.indexDir, indexMapping) + if err != nil { + return false, err + } + + if err = rupture.WriteIndexMetadata(i.indexDir, &rupture.IndexMetadata{ + Version: i.version, + }); err != nil { + return false, err + } + + i.Indexer = indexer + + return false, nil +} + +// Ping checks if the indexer is available +func (i *Indexer) Ping(_ context.Context) error { + if i == nil { + return fmt.Errorf("cannot ping nil indexer") + } + if i.Indexer == nil { + return fmt.Errorf("indexer is not initialized") + } + return nil +} + +func (i *Indexer) Close() { + if i == nil { + return + } + + if err := i.Indexer.Close(); err != nil { + log.Error("Failed to close bleve indexer in %q: %v", i.indexDir, err) + } + i.Indexer = nil +} diff --git a/modules/indexer/internal/bleve/util.go b/modules/indexer/internal/bleve/util.go new file mode 100644 index 0000000000..43a7c3c5ec --- /dev/null +++ b/modules/indexer/internal/bleve/util.go @@ -0,0 +1,49 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package bleve + +import ( + "errors" + "os" + + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/util" + + "github.com/blevesearch/bleve/v2" + "github.com/blevesearch/bleve/v2/index/upsidedown" + "github.com/ethantkoenig/rupture" +) + +// openIndexer open the index at the specified path, checking for metadata +// updates and bleve version updates. If index needs to be created (or +// re-created), returns (nil, nil) +func openIndexer(path string, latestVersion int) (bleve.Index, int, error) { + _, err := os.Stat(path) + if err != nil && os.IsNotExist(err) { + return nil, 0, nil + } else if err != nil { + return nil, 0, err + } + + metadata, err := rupture.ReadIndexMetadata(path) + if err != nil { + return nil, 0, err + } + if metadata.Version < latestVersion { + // the indexer is using a previous version, so we should delete it and + // re-populate + return nil, metadata.Version, util.RemoveAll(path) + } + + index, err := bleve.Open(path) + if err != nil { + if errors.Is(err, upsidedown.IncompatibleVersion) { + log.Warn("Indexer was built with a previous version of bleve, deleting and rebuilding") + return nil, 0, util.RemoveAll(path) + } + return nil, 0, err + } + + return index, 0, nil +} diff --git a/modules/indexer/internal/db/indexer.go b/modules/indexer/internal/db/indexer.go new file mode 100644 index 0000000000..3f7e00efbb --- /dev/null +++ b/modules/indexer/internal/db/indexer.go @@ -0,0 +1,33 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package db + +import ( + "context" + + "code.gitea.io/gitea/modules/indexer/internal" +) + +var _ internal.Indexer = &Indexer{} + +// Indexer represents a basic db indexer implementation +type Indexer struct{} + +// Init initializes the indexer +func (i *Indexer) Init(_ context.Context) (bool, error) { + // nothing to do + return false, nil +} + +// Ping checks if the indexer is available +func (i *Indexer) Ping(_ context.Context) error { + // No need to ping database to check if it is available. + // If the database goes down, Gitea will go down, so nobody will care if the indexer is available. + return nil +} + +// Close closes the indexer +func (i *Indexer) Close() { + // nothing to do +} diff --git a/modules/indexer/internal/elasticsearch/indexer.go b/modules/indexer/internal/elasticsearch/indexer.go new file mode 100644 index 0000000000..2c60efad56 --- /dev/null +++ b/modules/indexer/internal/elasticsearch/indexer.go @@ -0,0 +1,92 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package elasticsearch + +import ( + "context" + "fmt" + + "code.gitea.io/gitea/modules/indexer/internal" + + "github.com/olivere/elastic/v7" +) + +var _ internal.Indexer = &Indexer{} + +// Indexer represents a basic elasticsearch indexer implementation +type Indexer struct { + Client *elastic.Client + + url string + indexName string + version int + mapping string +} + +func NewIndexer(url, indexName string, version int, mapping string) *Indexer { + return &Indexer{ + url: url, + indexName: indexName, + version: version, + mapping: mapping, + } +} + +// Init initializes the indexer +func (i *Indexer) Init(ctx context.Context) (bool, error) { + if i == nil { + return false, fmt.Errorf("cannot init nil indexer") + } + if i.Client != nil { + return false, fmt.Errorf("indexer is already initialized") + } + + client, err := i.initClient() + if err != nil { + return false, err + } + i.Client = client + + exists, err := i.Client.IndexExists(i.VersionedIndexName()).Do(ctx) + if err != nil { + return false, err + } + if exists { + return true, nil + } + + if err := i.createIndex(ctx); err != nil { + return false, err + } + + return exists, nil +} + +// Ping checks if the indexer is available +func (i *Indexer) Ping(ctx context.Context) error { + if i == nil { + return fmt.Errorf("cannot ping nil indexer") + } + if i.Client == nil { + return fmt.Errorf("indexer is not initialized") + } + + resp, err := i.Client.ClusterHealth().Do(ctx) + if err != nil { + return err + } + if resp.Status != "green" { + // see https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-health.html + return fmt.Errorf("status of elasticsearch cluster is %s", resp.Status) + } + return nil +} + +// Close closes the indexer +func (i *Indexer) Close() { + if i == nil { + return + } + i.Client = nil +} diff --git a/modules/indexer/internal/elasticsearch/util.go b/modules/indexer/internal/elasticsearch/util.go new file mode 100644 index 0000000000..9e034bd553 --- /dev/null +++ b/modules/indexer/internal/elasticsearch/util.go @@ -0,0 +1,68 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package elasticsearch + +import ( + "context" + "fmt" + "time" + + "code.gitea.io/gitea/modules/log" + + "github.com/olivere/elastic/v7" +) + +// VersionedIndexName returns the full index name with version +func (i *Indexer) VersionedIndexName() string { + return versionedIndexName(i.indexName, i.version) +} + +func versionedIndexName(indexName string, version int) string { + if version == 0 { + // Old index name without version + return indexName + } + return fmt.Sprintf("%s.v%d", indexName, version) +} + +func (i *Indexer) createIndex(ctx context.Context) error { + createIndex, err := i.Client.CreateIndex(i.VersionedIndexName()).BodyString(i.mapping).Do(ctx) + if err != nil { + return err + } + if !createIndex.Acknowledged { + return fmt.Errorf("create index %s with %s failed", i.VersionedIndexName(), i.mapping) + } + + i.checkOldIndexes(ctx) + + return nil +} + +func (i *Indexer) initClient() (*elastic.Client, error) { + opts := []elastic.ClientOptionFunc{ + elastic.SetURL(i.url), + elastic.SetSniff(false), + elastic.SetHealthcheckInterval(10 * time.Second), + elastic.SetGzip(false), + } + + logger := log.GetLogger(log.DEFAULT) + + opts = append(opts, elastic.SetTraceLog(&log.PrintfLogger{Logf: logger.Trace})) + opts = append(opts, elastic.SetInfoLog(&log.PrintfLogger{Logf: logger.Info})) + opts = append(opts, elastic.SetErrorLog(&log.PrintfLogger{Logf: logger.Error})) + + return elastic.NewClient(opts...) +} + +func (i *Indexer) checkOldIndexes(ctx context.Context) { + for v := 0; v < i.version; v++ { + indexName := versionedIndexName(i.indexName, v) + exists, err := i.Client.IndexExists(indexName).Do(ctx) + if err == nil && exists { + log.Warn("Found older elasticsearch index named %q, Gitea will keep the old NOT DELETED. You can delete the old version after the upgrade succeed.", indexName) + } + } +} diff --git a/modules/indexer/internal/indexer.go b/modules/indexer/internal/indexer.go new file mode 100644 index 0000000000..c7f356da1e --- /dev/null +++ b/modules/indexer/internal/indexer.go @@ -0,0 +1,37 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package internal + +import ( + "context" + "fmt" +) + +// Indexer defines an basic indexer interface +type Indexer interface { + // Init initializes the indexer + // returns true if the index was opened/existed (with data populated), false if it was created/not-existed (with no data) + Init(ctx context.Context) (bool, error) + // Ping checks if the indexer is available + Ping(ctx context.Context) error + // Close closes the indexer + Close() +} + +// NewDummyIndexer returns a dummy indexer +func NewDummyIndexer() Indexer { + return &dummyIndexer{} +} + +type dummyIndexer struct{} + +func (d *dummyIndexer) Init(ctx context.Context) (bool, error) { + return false, fmt.Errorf("indexer is not ready") +} + +func (d *dummyIndexer) Ping(ctx context.Context) error { + return fmt.Errorf("indexer is not ready") +} + +func (d *dummyIndexer) Close() {} diff --git a/modules/indexer/internal/meilisearch/indexer.go b/modules/indexer/internal/meilisearch/indexer.go new file mode 100644 index 0000000000..06747ff7e0 --- /dev/null +++ b/modules/indexer/internal/meilisearch/indexer.go @@ -0,0 +1,92 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package meilisearch + +import ( + "context" + "fmt" + + "github.com/meilisearch/meilisearch-go" +) + +// Indexer represents a basic meilisearch indexer implementation +type Indexer struct { + Client *meilisearch.Client + + url, apiKey string + indexName string + version int +} + +func NewIndexer(url, apiKey, indexName string, version int) *Indexer { + return &Indexer{ + url: url, + apiKey: apiKey, + indexName: indexName, + version: version, + } +} + +// Init initializes the indexer +func (i *Indexer) Init(_ context.Context) (bool, error) { + if i == nil { + return false, fmt.Errorf("cannot init nil indexer") + } + + if i.Client != nil { + return false, fmt.Errorf("indexer is already initialized") + } + + i.Client = meilisearch.NewClient(meilisearch.ClientConfig{ + Host: i.url, + APIKey: i.apiKey, + }) + + _, err := i.Client.GetIndex(i.VersionedIndexName()) + if err == nil { + return true, nil + } + _, err = i.Client.CreateIndex(&meilisearch.IndexConfig{ + Uid: i.VersionedIndexName(), + PrimaryKey: "id", + }) + if err != nil { + return false, err + } + + i.checkOldIndexes() + + _, err = i.Client.Index(i.VersionedIndexName()).UpdateFilterableAttributes(&[]string{"repo_id"}) + return false, err +} + +// Ping checks if the indexer is available +func (i *Indexer) Ping(ctx context.Context) error { + if i == nil { + return fmt.Errorf("cannot ping nil indexer") + } + if i.Client == nil { + return fmt.Errorf("indexer is not initialized") + } + resp, err := i.Client.Health() + if err != nil { + return err + } + if resp.Status != "available" { + // See https://docs.meilisearch.com/reference/api/health.html#status + return fmt.Errorf("status of meilisearch is not available: %s", resp.Status) + } + return nil +} + +// Close closes the indexer +func (i *Indexer) Close() { + if i == nil { + return + } + if i.Client == nil { + return + } + i.Client = nil +} diff --git a/modules/indexer/internal/meilisearch/util.go b/modules/indexer/internal/meilisearch/util.go new file mode 100644 index 0000000000..e6d8fefade --- /dev/null +++ b/modules/indexer/internal/meilisearch/util.go @@ -0,0 +1,38 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package meilisearch + +import ( + "fmt" + + "code.gitea.io/gitea/modules/log" +) + +// VersionedIndexName returns the full index name with version +func (i *Indexer) VersionedIndexName() string { + return versionedIndexName(i.indexName, i.version) +} + +func versionedIndexName(indexName string, version int) string { + if version == 0 { + // Old index name without version + return indexName + } + + // The format of the index name is <index_name>_v<version>, not <index_name>.v<version> like elasticsearch. + // Because meilisearch does not support "." in index name, it should contain only alphanumeric characters, hyphens (-) and underscores (_). + // See https://www.meilisearch.com/docs/learn/core_concepts/indexes#index-uid + + return fmt.Sprintf("%s_v%d", indexName, version) +} + +func (i *Indexer) checkOldIndexes() { + for v := 0; v < i.version; v++ { + indexName := versionedIndexName(i.indexName, v) + _, err := i.Client.GetIndex(indexName) + if err == nil { + log.Warn("Found older meilisearch index named %q, Gitea will keep the old NOT DELETED. You can delete the old version after the upgrade succeed.", indexName) + } + } +} |