summaryrefslogtreecommitdiffstats
path: root/modules
diff options
context:
space:
mode:
authorLunny Xiao <xiaolunwen@gmail.com>2020-02-13 14:06:17 +0800
committerGitHub <noreply@github.com>2020-02-13 14:06:17 +0800
commit5dbf36f356e67c6eb7df68727417702fa100bec5 (patch)
tree7c624c1302be1515a7c63a4e86ebda75d1d3b580 /modules
parent17656021f165f8011e5f462f0336a083321c9e20 (diff)
downloadgitea-5dbf36f356e67c6eb7df68727417702fa100bec5.tar.gz
gitea-5dbf36f356e67c6eb7df68727417702fa100bec5.zip
Issue search support elasticsearch (#9428)
* Issue search support elasticsearch * Fix lint * Add indexer name on app.ini * add a warnning on SearchIssuesByKeyword * improve code
Diffstat (limited to 'modules')
-rw-r--r--modules/indexer/issues/bleve.go2
-rw-r--r--modules/indexer/issues/elastic_search.go230
-rw-r--r--modules/indexer/issues/indexer.go32
-rw-r--r--modules/setting/indexer.go30
4 files changed, 273 insertions, 21 deletions
diff --git a/modules/indexer/issues/bleve.go b/modules/indexer/issues/bleve.go
index 787ff0dec5..655bc7dd17 100644
--- a/modules/indexer/issues/bleve.go
+++ b/modules/indexer/issues/bleve.go
@@ -170,7 +170,7 @@ func NewBleveIndexer(indexDir string) *BleveIndexer {
}
}
-// Init will initial the indexer
+// Init will initialize the indexer
func (b *BleveIndexer) Init() (bool, error) {
var err error
b.indexer, err = openIndexer(b.indexDir, issueIndexerLatestVersion)
diff --git a/modules/indexer/issues/elastic_search.go b/modules/indexer/issues/elastic_search.go
new file mode 100644
index 0000000000..1f9c59965c
--- /dev/null
+++ b/modules/indexer/issues/elastic_search.go
@@ -0,0 +1,230 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package issues
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "strconv"
+ "time"
+
+ "code.gitea.io/gitea/modules/log"
+
+ "github.com/olivere/elastic/v7"
+)
+
+var (
+ _ Indexer = &ElasticSearchIndexer{}
+)
+
+// ElasticSearchIndexer implements Indexer interface
+type ElasticSearchIndexer struct {
+ client *elastic.Client
+ indexerName string
+}
+
+type elasticLogger struct {
+ *log.Logger
+}
+
+func (l elasticLogger) Printf(format string, args ...interface{}) {
+ _ = l.Logger.Log(2, l.Logger.GetLevel(), format, args...)
+}
+
+// NewElasticSearchIndexer creates a new elasticsearch indexer
+func NewElasticSearchIndexer(url, indexerName string) (*ElasticSearchIndexer, error) {
+ opts := []elastic.ClientOptionFunc{
+ elastic.SetURL(url),
+ elastic.SetSniff(false),
+ elastic.SetHealthcheckInterval(10 * time.Second),
+ elastic.SetGzip(false),
+ }
+
+ logger := elasticLogger{log.GetLogger(log.DEFAULT)}
+
+ if logger.GetLevel() == log.TRACE || logger.GetLevel() == log.DEBUG {
+ opts = append(opts, elastic.SetTraceLog(logger))
+ } else if logger.GetLevel() == log.ERROR || logger.GetLevel() == log.CRITICAL || logger.GetLevel() == log.FATAL {
+ opts = append(opts, elastic.SetErrorLog(logger))
+ } else if logger.GetLevel() == log.INFO || logger.GetLevel() == log.WARN {
+ opts = append(opts, elastic.SetInfoLog(logger))
+ }
+
+ client, err := elastic.NewClient(opts...)
+ if err != nil {
+ return nil, err
+ }
+
+ return &ElasticSearchIndexer{
+ client: client,
+ indexerName: indexerName,
+ }, nil
+}
+
+const (
+ defaultMapping = `{
+ "mappings": {
+ "properties": {
+ "id": {
+ "type": "integer",
+ "index": true
+ },
+ "repo_id": {
+ "type": "integer",
+ "index": true
+ },
+ "title": {
+ "type": "text",
+ "index": true
+ },
+ "content": {
+ "type": "text",
+ "index": true
+ },
+ "comments": {
+ "type" : "text",
+ "index": true
+ }
+ }
+ }
+ }`
+)
+
+// Init will initialize the indexer
+func (b *ElasticSearchIndexer) Init() (bool, error) {
+ ctx := context.Background()
+ exists, err := b.client.IndexExists(b.indexerName).Do(ctx)
+ if err != nil {
+ return false, err
+ }
+
+ if !exists {
+ var mapping = defaultMapping
+
+ createIndex, err := b.client.CreateIndex(b.indexerName).BodyString(mapping).Do(ctx)
+ if err != nil {
+ return false, err
+ }
+ if !createIndex.Acknowledged {
+ return false, errors.New("init failed")
+ }
+
+ return false, nil
+ }
+ return true, nil
+}
+
+// Index will save the index data
+func (b *ElasticSearchIndexer) Index(issues []*IndexerData) error {
+ if len(issues) == 0 {
+ return nil
+ } else if len(issues) == 1 {
+ issue := issues[0]
+ _, err := b.client.Index().
+ Index(b.indexerName).
+ Id(fmt.Sprintf("%d", issue.ID)).
+ BodyJson(map[string]interface{}{
+ "id": issue.ID,
+ "repo_id": issue.RepoID,
+ "title": issue.Title,
+ "content": issue.Content,
+ "comments": issue.Comments,
+ }).
+ Do(context.Background())
+ return err
+ }
+
+ reqs := make([]elastic.BulkableRequest, 0)
+ for _, issue := range issues {
+ reqs = append(reqs,
+ elastic.NewBulkIndexRequest().
+ Index(b.indexerName).
+ Id(fmt.Sprintf("%d", issue.ID)).
+ Doc(map[string]interface{}{
+ "id": issue.ID,
+ "repo_id": issue.RepoID,
+ "title": issue.Title,
+ "content": issue.Content,
+ "comments": issue.Comments,
+ }),
+ )
+ }
+
+ _, err := b.client.Bulk().
+ Index(b.indexerName).
+ Add(reqs...).
+ Do(context.Background())
+ return err
+}
+
+// Delete deletes indexes by ids
+func (b *ElasticSearchIndexer) Delete(ids ...int64) error {
+ if len(ids) == 0 {
+ return nil
+ } else if len(ids) == 1 {
+ _, err := b.client.Delete().
+ Index(b.indexerName).
+ Id(fmt.Sprintf("%d", ids[0])).
+ Do(context.Background())
+ return err
+ }
+
+ reqs := make([]elastic.BulkableRequest, 0)
+ for _, id := range ids {
+ reqs = append(reqs,
+ elastic.NewBulkDeleteRequest().
+ Index(b.indexerName).
+ Id(fmt.Sprintf("%d", id)),
+ )
+ }
+
+ _, err := b.client.Bulk().
+ Index(b.indexerName).
+ Add(reqs...).
+ Do(context.Background())
+ return err
+}
+
+// Search searches for issues by given conditions.
+// Returns the matching issue IDs
+func (b *ElasticSearchIndexer) Search(keyword string, repoIDs []int64, limit, start int) (*SearchResult, error) {
+ kwQuery := elastic.NewMultiMatchQuery(keyword, "title", "content", "comments")
+ query := elastic.NewBoolQuery()
+ query = query.Must(kwQuery)
+ if len(repoIDs) > 0 {
+ var repoStrs = make([]interface{}, 0, len(repoIDs))
+ for _, repoID := range repoIDs {
+ repoStrs = append(repoStrs, repoID)
+ }
+ repoQuery := elastic.NewTermsQuery("repo_id", repoStrs...)
+ query = query.Must(repoQuery)
+ }
+ searchResult, err := b.client.Search().
+ Index(b.indexerName).
+ Query(query).
+ Sort("id", true).
+ From(start).Size(limit).
+ Do(context.Background())
+ if err != nil {
+ return nil, err
+ }
+
+ hits := make([]Match, 0, limit)
+ for _, hit := range searchResult.Hits.Hits {
+ id, _ := strconv.ParseInt(hit.Id, 10, 64)
+ hits = append(hits, Match{
+ ID: id,
+ })
+ }
+
+ return &SearchResult{
+ Total: searchResult.TotalHits(),
+ Hits: hits,
+ }, nil
+}
+
+// Close implements indexer
+func (b *ElasticSearchIndexer) Close() {}
diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go
index c942013e34..d2233ac6a8 100644
--- a/modules/indexer/issues/indexer.go
+++ b/modules/indexer/issues/indexer.go
@@ -21,13 +21,13 @@ import (
// IndexerData data stored in the issue indexer
type IndexerData struct {
- ID int64
- RepoID int64
- Title string
- Content string
- Comments []string
- IsDelete bool
- IDs []int64
+ ID int64 `json:"id"`
+ RepoID int64 `json:"repo_id"`
+ Title string `json:"title"`
+ Content string `json:"content"`
+ Comments []string `json:"comments"`
+ IsDelete bool `json:"is_delete"`
+ IDs []int64 `json:"ids"`
}
// Match represents on search result
@@ -100,7 +100,7 @@ func InitIssueIndexer(syncReindex bool) {
// Create the Queue
switch setting.Indexer.IssueType {
- case "bleve":
+ case "bleve", "elasticsearch":
handler := func(data ...queue.Data) {
indexer := holder.get()
if indexer == nil {
@@ -160,6 +160,19 @@ func InitIssueIndexer(syncReindex bool) {
log.Info("PID: %d Issue Indexer closed", os.Getpid())
})
log.Debug("Created Bleve Indexer")
+ case "elasticsearch":
+ graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(context.Context, func())) {
+ issueIndexer, err := NewElasticSearchIndexer(setting.Indexer.IssueConnStr, "gitea_issues")
+ if err != nil {
+ log.Fatal("Unable to initialize Elastic Search Issue Indexer: %v", err)
+ }
+ exist, err := issueIndexer.Init()
+ if err != nil {
+ log.Fatal("Unable to issueIndexer.Init: %v", err)
+ }
+ populate = !exist
+ holder.set(issueIndexer)
+ })
case "db":
issueIndexer := &DBIndexer{}
holder.set(issueIndexer)
@@ -308,6 +321,7 @@ func DeleteRepoIssueIndexer(repo *models.Repository) {
}
// SearchIssuesByKeyword search issue ids by keywords and repo id
+// WARNNING: You have to ensure user have permission to visit repoIDs' issues
func SearchIssuesByKeyword(repoIDs []int64, keyword string) ([]int64, error) {
var issueIDs []int64
indexer := holder.get()
@@ -316,7 +330,7 @@ func SearchIssuesByKeyword(repoIDs []int64, keyword string) ([]int64, error) {
log.Error("SearchIssuesByKeyword(): unable to get indexer!")
return nil, fmt.Errorf("unable to get issue indexer")
}
- res, err := indexer.Search(keyword, repoIDs, 1000, 0)
+ res, err := indexer.Search(keyword, repoIDs, 50, 0)
if err != nil {
return nil, err
}
diff --git a/modules/setting/indexer.go b/modules/setting/indexer.go
index 589e7bf864..859535281c 100644
--- a/modules/setting/indexer.go
+++ b/modules/setting/indexer.go
@@ -27,20 +27,25 @@ var (
Indexer = struct {
IssueType string
IssuePath string
- RepoIndexerEnabled bool
- RepoPath string
- UpdateQueueLength int
- MaxIndexerFileSize int64
+ IssueConnStr string
+ IssueIndexerName string
IssueQueueType string
IssueQueueDir string
IssueQueueConnStr string
IssueQueueBatchNumber int
StartupTimeout time.Duration
- IncludePatterns []glob.Glob
- ExcludePatterns []glob.Glob
+
+ RepoIndexerEnabled bool
+ RepoPath string
+ UpdateQueueLength int
+ MaxIndexerFileSize int64
+ IncludePatterns []glob.Glob
+ ExcludePatterns []glob.Glob
}{
IssueType: "bleve",
IssuePath: "indexers/issues.bleve",
+ IssueConnStr: "",
+ IssueIndexerName: "gitea_issues",
IssueQueueType: LevelQueueType,
IssueQueueDir: "indexers/issues.queue",
IssueQueueConnStr: "",
@@ -57,6 +62,14 @@ func newIndexerService() {
if !filepath.IsAbs(Indexer.IssuePath) {
Indexer.IssuePath = path.Join(AppWorkPath, Indexer.IssuePath)
}
+ Indexer.IssueConnStr = sec.Key("ISSUE_INDEXER_CONN_STR").MustString(Indexer.IssueConnStr)
+ Indexer.IssueIndexerName = sec.Key("ISSUE_INDEXER_NAME").MustString(Indexer.IssueIndexerName)
+
+ Indexer.IssueQueueType = sec.Key("ISSUE_INDEXER_QUEUE_TYPE").MustString(LevelQueueType)
+ Indexer.IssueQueueDir = sec.Key("ISSUE_INDEXER_QUEUE_DIR").MustString(path.Join(AppDataPath, "indexers/issues.queue"))
+ Indexer.IssueQueueConnStr = sec.Key("ISSUE_INDEXER_QUEUE_CONN_STR").MustString(path.Join(AppDataPath, ""))
+ Indexer.IssueQueueBatchNumber = sec.Key("ISSUE_INDEXER_QUEUE_BATCH_NUMBER").MustInt(20)
+
Indexer.RepoIndexerEnabled = sec.Key("REPO_INDEXER_ENABLED").MustBool(false)
Indexer.RepoPath = sec.Key("REPO_INDEXER_PATH").MustString(path.Join(AppDataPath, "indexers/repos.bleve"))
if !filepath.IsAbs(Indexer.RepoPath) {
@@ -64,13 +77,8 @@ func newIndexerService() {
}
Indexer.IncludePatterns = IndexerGlobFromString(sec.Key("REPO_INDEXER_INCLUDE").MustString(""))
Indexer.ExcludePatterns = IndexerGlobFromString(sec.Key("REPO_INDEXER_EXCLUDE").MustString(""))
-
Indexer.UpdateQueueLength = sec.Key("UPDATE_BUFFER_LEN").MustInt(20)
Indexer.MaxIndexerFileSize = sec.Key("MAX_FILE_SIZE").MustInt64(1024 * 1024)
- Indexer.IssueQueueType = sec.Key("ISSUE_INDEXER_QUEUE_TYPE").MustString(LevelQueueType)
- Indexer.IssueQueueDir = sec.Key("ISSUE_INDEXER_QUEUE_DIR").MustString(path.Join(AppDataPath, "indexers/issues.queue"))
- Indexer.IssueQueueConnStr = sec.Key("ISSUE_INDEXER_QUEUE_CONN_STR").MustString(path.Join(AppDataPath, ""))
- Indexer.IssueQueueBatchNumber = sec.Key("ISSUE_INDEXER_QUEUE_BATCH_NUMBER").MustInt(20)
Indexer.StartupTimeout = sec.Key("STARTUP_TIMEOUT").MustDuration(30 * time.Second)
}