diff options
author | techknowlogick <techknowlogick@gitea.io> | 2023-03-28 22:23:23 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-03-28 22:23:23 -0400 |
commit | 92c160d8e716cb3d05215a97cf521e843596f562 (patch) | |
tree | 0e99e7e1db24a825fbb9056f356c4692902759a0 /modules | |
parent | 265675a31c1939cb25ee659868559aec03fecb48 (diff) | |
download | gitea-92c160d8e716cb3d05215a97cf521e843596f562.tar.gz gitea-92c160d8e716cb3d05215a97cf521e843596f562.zip |
Add meilisearch support (#23136)
Add meilisearch support
Fixes #20665
Diffstat (limited to 'modules')
-rw-r--r-- | modules/indexer/issues/indexer.go | 17 | ||||
-rw-r--r-- | modules/indexer/issues/meilisearch.go | 183 | ||||
-rw-r--r-- | modules/setting/indexer.go | 15 |
3 files changed, 214 insertions, 1 deletions
diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index 55d3c7bc09..47a8b10794 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -107,7 +107,7 @@ func InitIssueIndexer(syncReindex bool) { // Create the Queue switch setting.Indexer.IssueType { - case "bleve", "elasticsearch": + case "bleve", "elasticsearch", "meilisearch": handler := func(data ...queue.Data) []queue.Data { indexer := holder.get() if indexer == nil { @@ -220,6 +220,21 @@ func InitIssueIndexer(syncReindex bool) { issueIndexer := &DBIndexer{} holder.set(issueIndexer) graceful.GetManager().RunAtTerminate(finished) + case "meilisearch": + graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(func())) { + pprof.SetGoroutineLabels(ctx) + issueIndexer, err := NewMeilisearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueConnAuth, setting.Indexer.IssueIndexerName) + if err != nil { + log.Fatal("Unable to initialize Meilisearch Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err) + } + exist, err := issueIndexer.Init() + if err != nil { + log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err) + } + populate = !exist + holder.set(issueIndexer) + atTerminate(finished) + }) default: holder.cancel() log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType) diff --git a/modules/indexer/issues/meilisearch.go b/modules/indexer/issues/meilisearch.go new file mode 100644 index 0000000000..5c45236e66 --- /dev/null +++ b/modules/indexer/issues/meilisearch.go @@ -0,0 +1,183 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package issues + +import ( + "context" + "strconv" + "sync" + "time" + + "github.com/meilisearch/meilisearch-go" +) + +var _ Indexer = &MeilisearchIndexer{} + +// MeilisearchIndexer implements Indexer interface +type MeilisearchIndexer struct { + client *meilisearch.Client + indexerName string + available bool + availabilityCallback func(bool) + stopTimer chan struct{} + lock sync.RWMutex +} + +// MeilisearchIndexer creates a new meilisearch indexer +func NewMeilisearchIndexer(url, apiKey, indexerName string) (*MeilisearchIndexer, error) { + client := meilisearch.NewClient(meilisearch.ClientConfig{ + Host: url, + APIKey: apiKey, + }) + + indexer := &MeilisearchIndexer{ + client: client, + indexerName: indexerName, + available: true, + stopTimer: make(chan struct{}), + } + + ticker := time.NewTicker(10 * time.Second) + go func() { + for { + select { + case <-ticker.C: + indexer.checkAvailability() + case <-indexer.stopTimer: + ticker.Stop() + return + } + } + }() + + return indexer, nil +} + +// Init will initialize the indexer +func (b *MeilisearchIndexer) Init() (bool, error) { + _, err := b.client.GetIndex(b.indexerName) + if err == nil { + return true, nil + } + _, err = b.client.CreateIndex(&meilisearch.IndexConfig{ + Uid: b.indexerName, + PrimaryKey: "id", + }) + if err != nil { + return false, b.checkError(err) + } + + _, err = b.client.Index(b.indexerName).UpdateFilterableAttributes(&[]string{"repo_id"}) + return false, b.checkError(err) +} + +// SetAvailabilityChangeCallback sets callback that will be triggered when availability changes +func (b *MeilisearchIndexer) SetAvailabilityChangeCallback(callback func(bool)) { + b.lock.Lock() + defer b.lock.Unlock() + b.availabilityCallback = callback +} + +// Ping checks if meilisearch is available +func (b *MeilisearchIndexer) Ping() bool { + b.lock.RLock() + defer b.lock.RUnlock() + return b.available +} + +// Index will save the index data +func (b *MeilisearchIndexer) Index(issues []*IndexerData) error { + if len(issues) == 0 { + return nil + } + for _, issue := range issues { + _, err := b.client.Index(b.indexerName).AddDocuments(issue) + if err != nil { + return b.checkError(err) + } + } + // TODO: bulk send index data + return nil +} + +// Delete deletes indexes by ids +func (b *MeilisearchIndexer) Delete(ids ...int64) error { + if len(ids) == 0 { + return nil + } + + for _, id := range ids { + _, err := b.client.Index(b.indexerName).DeleteDocument(strconv.FormatInt(id, 10)) + if err != nil { + return b.checkError(err) + } + } + // TODO: bulk send deletes + return nil +} + +// Search searches for issues by given conditions. +// Returns the matching issue IDs +func (b *MeilisearchIndexer) Search(ctx context.Context, keyword string, repoIDs []int64, limit, start int) (*SearchResult, error) { + filter := make([][]string, 0, len(repoIDs)) + for _, repoID := range repoIDs { + filter = append(filter, []string{"repo_id = " + strconv.FormatInt(repoID, 10)}) + } + searchRes, err := b.client.Index(b.indexerName).Search(keyword, &meilisearch.SearchRequest{ + Filter: filter, + Limit: int64(limit), + Offset: int64(start), + }) + if err != nil { + return nil, b.checkError(err) + } + + hits := make([]Match, 0, len(searchRes.Hits)) + for _, hit := range searchRes.Hits { + hits = append(hits, Match{ + ID: int64(hit.(map[string]interface{})["id"].(float64)), + }) + } + return &SearchResult{ + Total: searchRes.TotalHits, + Hits: hits, + }, nil +} + +// Close implements indexer +func (b *MeilisearchIndexer) Close() { + select { + case <-b.stopTimer: + default: + close(b.stopTimer) + } +} + +func (b *MeilisearchIndexer) checkError(err error) error { + return err +} + +func (b *MeilisearchIndexer) checkAvailability() { + _, err := b.client.Health() + if err != nil { + b.setAvailability(false) + return + } + b.setAvailability(true) +} + +func (b *MeilisearchIndexer) setAvailability(available bool) { + b.lock.Lock() + defer b.lock.Unlock() + + if b.available == available { + return + } + + b.available = available + if b.availabilityCallback != nil { + // Call the callback from within the lock to ensure that the ordering remains correct + b.availabilityCallback(b.available) + } +} diff --git a/modules/setting/indexer.go b/modules/setting/indexer.go index 5b10018eb7..8aee8596de 100644 --- a/modules/setting/indexer.go +++ b/modules/setting/indexer.go @@ -4,6 +4,7 @@ package setting import ( + "net/url" "path/filepath" "strings" "time" @@ -18,6 +19,7 @@ var Indexer = struct { IssueType string IssuePath string IssueConnStr string + IssueConnAuth string IssueIndexerName string StartupTimeout time.Duration @@ -34,6 +36,7 @@ var Indexer = struct { IssueType: "bleve", IssuePath: "indexers/issues.bleve", IssueConnStr: "", + IssueConnAuth: "", IssueIndexerName: "gitea_issues", RepoIndexerEnabled: false, @@ -53,6 +56,18 @@ func loadIndexerFrom(rootCfg ConfigProvider) { Indexer.IssuePath = filepath.ToSlash(filepath.Join(AppWorkPath, Indexer.IssuePath)) } Indexer.IssueConnStr = sec.Key("ISSUE_INDEXER_CONN_STR").MustString(Indexer.IssueConnStr) + + if Indexer.IssueType == "meilisearch" { + u, err := url.Parse(Indexer.IssueConnStr) + if err != nil { + log.Warn("Failed to parse ISSUE_INDEXER_CONN_STR: %v", err) + u = &url.URL{} + } + Indexer.IssueConnAuth, _ = u.User.Password() + u.User = nil + Indexer.IssueConnStr = u.String() + } + Indexer.IssueIndexerName = sec.Key("ISSUE_INDEXER_NAME").MustString(Indexer.IssueIndexerName) // The following settings are deprecated and can be overridden by settings in [queue] or [queue.issue_indexer] |