From 5dbf36f356e67c6eb7df68727417702fa100bec5 Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Thu, 13 Feb 2020 14:06:17 +0800 Subject: Issue search support elasticsearch (#9428) * Issue search support elasticsearch * Fix lint * Add indexer name on app.ini * add a warnning on SearchIssuesByKeyword * improve code --- modules/indexer/issues/bleve.go | 2 +- modules/indexer/issues/elastic_search.go | 230 +++++++++++++++++++++++++++++++ modules/indexer/issues/indexer.go | 32 +++-- modules/setting/indexer.go | 30 ++-- 4 files changed, 273 insertions(+), 21 deletions(-) create mode 100644 modules/indexer/issues/elastic_search.go (limited to 'modules') diff --git a/modules/indexer/issues/bleve.go b/modules/indexer/issues/bleve.go index 787ff0dec5..655bc7dd17 100644 --- a/modules/indexer/issues/bleve.go +++ b/modules/indexer/issues/bleve.go @@ -170,7 +170,7 @@ func NewBleveIndexer(indexDir string) *BleveIndexer { } } -// Init will initial the indexer +// Init will initialize the indexer func (b *BleveIndexer) Init() (bool, error) { var err error b.indexer, err = openIndexer(b.indexDir, issueIndexerLatestVersion) diff --git a/modules/indexer/issues/elastic_search.go b/modules/indexer/issues/elastic_search.go new file mode 100644 index 0000000000..1f9c59965c --- /dev/null +++ b/modules/indexer/issues/elastic_search.go @@ -0,0 +1,230 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package issues + +import ( + "context" + "errors" + "fmt" + "strconv" + "time" + + "code.gitea.io/gitea/modules/log" + + "github.com/olivere/elastic/v7" +) + +var ( + _ Indexer = &ElasticSearchIndexer{} +) + +// ElasticSearchIndexer implements Indexer interface +type ElasticSearchIndexer struct { + client *elastic.Client + indexerName string +} + +type elasticLogger struct { + *log.Logger +} + +func (l elasticLogger) Printf(format string, args ...interface{}) { + _ = l.Logger.Log(2, l.Logger.GetLevel(), format, args...) +} + +// NewElasticSearchIndexer creates a new elasticsearch indexer +func NewElasticSearchIndexer(url, indexerName string) (*ElasticSearchIndexer, error) { + opts := []elastic.ClientOptionFunc{ + elastic.SetURL(url), + elastic.SetSniff(false), + elastic.SetHealthcheckInterval(10 * time.Second), + elastic.SetGzip(false), + } + + logger := elasticLogger{log.GetLogger(log.DEFAULT)} + + if logger.GetLevel() == log.TRACE || logger.GetLevel() == log.DEBUG { + opts = append(opts, elastic.SetTraceLog(logger)) + } else if logger.GetLevel() == log.ERROR || logger.GetLevel() == log.CRITICAL || logger.GetLevel() == log.FATAL { + opts = append(opts, elastic.SetErrorLog(logger)) + } else if logger.GetLevel() == log.INFO || logger.GetLevel() == log.WARN { + opts = append(opts, elastic.SetInfoLog(logger)) + } + + client, err := elastic.NewClient(opts...) + if err != nil { + return nil, err + } + + return &ElasticSearchIndexer{ + client: client, + indexerName: indexerName, + }, nil +} + +const ( + defaultMapping = `{ + "mappings": { + "properties": { + "id": { + "type": "integer", + "index": true + }, + "repo_id": { + "type": "integer", + "index": true + }, + "title": { + "type": "text", + "index": true + }, + "content": { + "type": "text", + "index": true + }, + "comments": { + "type" : "text", + "index": true + } + } + } + }` +) + +// Init will initialize the indexer +func (b *ElasticSearchIndexer) Init() (bool, error) { + ctx := context.Background() + exists, err := b.client.IndexExists(b.indexerName).Do(ctx) + if err != nil { + return false, err + } + + if !exists { + var mapping = defaultMapping + + createIndex, err := b.client.CreateIndex(b.indexerName).BodyString(mapping).Do(ctx) + if err != nil { + return false, err + } + if !createIndex.Acknowledged { + return false, errors.New("init failed") + } + + return false, nil + } + return true, nil +} + +// Index will save the index data +func (b *ElasticSearchIndexer) Index(issues []*IndexerData) error { + if len(issues) == 0 { + return nil + } else if len(issues) == 1 { + issue := issues[0] + _, err := b.client.Index(). + Index(b.indexerName). + Id(fmt.Sprintf("%d", issue.ID)). + BodyJson(map[string]interface{}{ + "id": issue.ID, + "repo_id": issue.RepoID, + "title": issue.Title, + "content": issue.Content, + "comments": issue.Comments, + }). + Do(context.Background()) + return err + } + + reqs := make([]elastic.BulkableRequest, 0) + for _, issue := range issues { + reqs = append(reqs, + elastic.NewBulkIndexRequest(). + Index(b.indexerName). + Id(fmt.Sprintf("%d", issue.ID)). + Doc(map[string]interface{}{ + "id": issue.ID, + "repo_id": issue.RepoID, + "title": issue.Title, + "content": issue.Content, + "comments": issue.Comments, + }), + ) + } + + _, err := b.client.Bulk(). + Index(b.indexerName). + Add(reqs...). + Do(context.Background()) + return err +} + +// Delete deletes indexes by ids +func (b *ElasticSearchIndexer) Delete(ids ...int64) error { + if len(ids) == 0 { + return nil + } else if len(ids) == 1 { + _, err := b.client.Delete(). + Index(b.indexerName). + Id(fmt.Sprintf("%d", ids[0])). + Do(context.Background()) + return err + } + + reqs := make([]elastic.BulkableRequest, 0) + for _, id := range ids { + reqs = append(reqs, + elastic.NewBulkDeleteRequest(). + Index(b.indexerName). + Id(fmt.Sprintf("%d", id)), + ) + } + + _, err := b.client.Bulk(). + Index(b.indexerName). + Add(reqs...). + Do(context.Background()) + return err +} + +// Search searches for issues by given conditions. +// Returns the matching issue IDs +func (b *ElasticSearchIndexer) Search(keyword string, repoIDs []int64, limit, start int) (*SearchResult, error) { + kwQuery := elastic.NewMultiMatchQuery(keyword, "title", "content", "comments") + query := elastic.NewBoolQuery() + query = query.Must(kwQuery) + if len(repoIDs) > 0 { + var repoStrs = make([]interface{}, 0, len(repoIDs)) + for _, repoID := range repoIDs { + repoStrs = append(repoStrs, repoID) + } + repoQuery := elastic.NewTermsQuery("repo_id", repoStrs...) + query = query.Must(repoQuery) + } + searchResult, err := b.client.Search(). + Index(b.indexerName). + Query(query). + Sort("id", true). + From(start).Size(limit). + Do(context.Background()) + if err != nil { + return nil, err + } + + hits := make([]Match, 0, limit) + for _, hit := range searchResult.Hits.Hits { + id, _ := strconv.ParseInt(hit.Id, 10, 64) + hits = append(hits, Match{ + ID: id, + }) + } + + return &SearchResult{ + Total: searchResult.TotalHits(), + Hits: hits, + }, nil +} + +// Close implements indexer +func (b *ElasticSearchIndexer) Close() {} diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index c942013e34..d2233ac6a8 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -21,13 +21,13 @@ import ( // IndexerData data stored in the issue indexer type IndexerData struct { - ID int64 - RepoID int64 - Title string - Content string - Comments []string - IsDelete bool - IDs []int64 + ID int64 `json:"id"` + RepoID int64 `json:"repo_id"` + Title string `json:"title"` + Content string `json:"content"` + Comments []string `json:"comments"` + IsDelete bool `json:"is_delete"` + IDs []int64 `json:"ids"` } // Match represents on search result @@ -100,7 +100,7 @@ func InitIssueIndexer(syncReindex bool) { // Create the Queue switch setting.Indexer.IssueType { - case "bleve": + case "bleve", "elasticsearch": handler := func(data ...queue.Data) { indexer := holder.get() if indexer == nil { @@ -160,6 +160,19 @@ func InitIssueIndexer(syncReindex bool) { log.Info("PID: %d Issue Indexer closed", os.Getpid()) }) log.Debug("Created Bleve Indexer") + case "elasticsearch": + graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(context.Context, func())) { + issueIndexer, err := NewElasticSearchIndexer(setting.Indexer.IssueConnStr, "gitea_issues") + if err != nil { + log.Fatal("Unable to initialize Elastic Search Issue Indexer: %v", err) + } + exist, err := issueIndexer.Init() + if err != nil { + log.Fatal("Unable to issueIndexer.Init: %v", err) + } + populate = !exist + holder.set(issueIndexer) + }) case "db": issueIndexer := &DBIndexer{} holder.set(issueIndexer) @@ -308,6 +321,7 @@ func DeleteRepoIssueIndexer(repo *models.Repository) { } // SearchIssuesByKeyword search issue ids by keywords and repo id +// WARNNING: You have to ensure user have permission to visit repoIDs' issues func SearchIssuesByKeyword(repoIDs []int64, keyword string) ([]int64, error) { var issueIDs []int64 indexer := holder.get() @@ -316,7 +330,7 @@ func SearchIssuesByKeyword(repoIDs []int64, keyword string) ([]int64, error) { log.Error("SearchIssuesByKeyword(): unable to get indexer!") return nil, fmt.Errorf("unable to get issue indexer") } - res, err := indexer.Search(keyword, repoIDs, 1000, 0) + res, err := indexer.Search(keyword, repoIDs, 50, 0) if err != nil { return nil, err } diff --git a/modules/setting/indexer.go b/modules/setting/indexer.go index 589e7bf864..859535281c 100644 --- a/modules/setting/indexer.go +++ b/modules/setting/indexer.go @@ -27,20 +27,25 @@ var ( Indexer = struct { IssueType string IssuePath string - RepoIndexerEnabled bool - RepoPath string - UpdateQueueLength int - MaxIndexerFileSize int64 + IssueConnStr string + IssueIndexerName string IssueQueueType string IssueQueueDir string IssueQueueConnStr string IssueQueueBatchNumber int StartupTimeout time.Duration - IncludePatterns []glob.Glob - ExcludePatterns []glob.Glob + + RepoIndexerEnabled bool + RepoPath string + UpdateQueueLength int + MaxIndexerFileSize int64 + IncludePatterns []glob.Glob + ExcludePatterns []glob.Glob }{ IssueType: "bleve", IssuePath: "indexers/issues.bleve", + IssueConnStr: "", + IssueIndexerName: "gitea_issues", IssueQueueType: LevelQueueType, IssueQueueDir: "indexers/issues.queue", IssueQueueConnStr: "", @@ -57,6 +62,14 @@ func newIndexerService() { if !filepath.IsAbs(Indexer.IssuePath) { Indexer.IssuePath = path.Join(AppWorkPath, Indexer.IssuePath) } + Indexer.IssueConnStr = sec.Key("ISSUE_INDEXER_CONN_STR").MustString(Indexer.IssueConnStr) + Indexer.IssueIndexerName = sec.Key("ISSUE_INDEXER_NAME").MustString(Indexer.IssueIndexerName) + + Indexer.IssueQueueType = sec.Key("ISSUE_INDEXER_QUEUE_TYPE").MustString(LevelQueueType) + Indexer.IssueQueueDir = sec.Key("ISSUE_INDEXER_QUEUE_DIR").MustString(path.Join(AppDataPath, "indexers/issues.queue")) + Indexer.IssueQueueConnStr = sec.Key("ISSUE_INDEXER_QUEUE_CONN_STR").MustString(path.Join(AppDataPath, "")) + Indexer.IssueQueueBatchNumber = sec.Key("ISSUE_INDEXER_QUEUE_BATCH_NUMBER").MustInt(20) + Indexer.RepoIndexerEnabled = sec.Key("REPO_INDEXER_ENABLED").MustBool(false) Indexer.RepoPath = sec.Key("REPO_INDEXER_PATH").MustString(path.Join(AppDataPath, "indexers/repos.bleve")) if !filepath.IsAbs(Indexer.RepoPath) { @@ -64,13 +77,8 @@ func newIndexerService() { } Indexer.IncludePatterns = IndexerGlobFromString(sec.Key("REPO_INDEXER_INCLUDE").MustString("")) Indexer.ExcludePatterns = IndexerGlobFromString(sec.Key("REPO_INDEXER_EXCLUDE").MustString("")) - Indexer.UpdateQueueLength = sec.Key("UPDATE_BUFFER_LEN").MustInt(20) Indexer.MaxIndexerFileSize = sec.Key("MAX_FILE_SIZE").MustInt64(1024 * 1024) - Indexer.IssueQueueType = sec.Key("ISSUE_INDEXER_QUEUE_TYPE").MustString(LevelQueueType) - Indexer.IssueQueueDir = sec.Key("ISSUE_INDEXER_QUEUE_DIR").MustString(path.Join(AppDataPath, "indexers/issues.queue")) - Indexer.IssueQueueConnStr = sec.Key("ISSUE_INDEXER_QUEUE_CONN_STR").MustString(path.Join(AppDataPath, "")) - Indexer.IssueQueueBatchNumber = sec.Key("ISSUE_INDEXER_QUEUE_BATCH_NUMBER").MustInt(20) Indexer.StartupTimeout = sec.Key("STARTUP_TIMEOUT").MustDuration(30 * time.Second) } -- cgit v1.2.3