summaryrefslogtreecommitdiffstats
path: root/modules/indexer
diff options
context:
space:
mode:
authorLunny Xiao <xiaolunwen@gmail.com>2020-08-31 00:08:01 +0800
committerGitHub <noreply@github.com>2020-08-30 19:08:01 +0300
commit9bc69ff26eeebaf3b622d62d18c757ff1f401dda (patch)
tree69ff71d9d460e83a6fff54b172b604732ab5d065 /modules/indexer
parentd257485bc0026c9717fe7bf4c9953ad1b7a1a9ae (diff)
downloadgitea-9bc69ff26eeebaf3b622d62d18c757ff1f401dda.tar.gz
gitea-9bc69ff26eeebaf3b622d62d18c757ff1f401dda.zip
Support elastic search for code search (#10273)
* Support elastic search for code search * Finished elastic search implementation and add some tests * Enable test on drone and added docs * Add new fields to elastic search * Fix bug * remove unused changes * Use indexer alias to keep the gitea indexer version * Improve codes * Some code improvements * The real indexer name changed to xxx.v1 Co-authored-by: zeripath <art27@cantab.net>
Diffstat (limited to 'modules/indexer')
-rw-r--r--modules/indexer/code/bleve.go132
-rw-r--r--modules/indexer/code/bleve_test.go53
-rw-r--r--modules/indexer/code/elastic_search.go385
-rw-r--r--modules/indexer/code/elastic_search_test.go36
-rw-r--r--modules/indexer/code/indexer.go90
-rw-r--r--modules/indexer/code/indexer_test.go83
-rw-r--r--modules/indexer/code/queue.go30
-rw-r--r--modules/indexer/code/wrapped.go6
8 files changed, 658 insertions, 157 deletions
diff --git a/modules/indexer/code/bleve.go b/modules/indexer/code/bleve.go
index 6502259ba4..81373bf3da 100644
--- a/modules/indexer/code/bleve.go
+++ b/modules/indexer/code/bleve.go
@@ -58,10 +58,10 @@ func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error {
})
}
-// openIndexer open the index at the specified path, checking for metadata
+// openBleveIndexer open the index at the specified path, checking for metadata
// updates and bleve version updates. If index needs to be created (or
// re-created), returns (nil, nil)
-func openIndexer(path string, latestVersion int) (bleve.Index, error) {
+func openBleveIndexer(path string, latestVersion int) (bleve.Index, error) {
_, err := os.Stat(path)
if err != nil && os.IsNotExist(err) {
return nil, nil
@@ -104,54 +104,14 @@ func (d *RepoIndexerData) Type() string {
return repoIndexerDocType
}
-func addUpdate(commitSha string, update fileUpdate, repo *models.Repository, batch rupture.FlushingBatch) error {
- // Ignore vendored files in code search
- if setting.Indexer.ExcludeVendored && enry.IsVendor(update.Filename) {
- return nil
- }
- stdout, err := git.NewCommand("cat-file", "-s", update.BlobSha).
- RunInDir(repo.RepoPath())
- if err != nil {
- return err
- }
- if size, err := strconv.Atoi(strings.TrimSpace(stdout)); err != nil {
- return fmt.Errorf("Misformatted git cat-file output: %v", err)
- } else if int64(size) > setting.Indexer.MaxIndexerFileSize {
- return addDelete(update.Filename, repo, batch)
- }
-
- fileContents, err := git.NewCommand("cat-file", "blob", update.BlobSha).
- RunInDirBytes(repo.RepoPath())
- if err != nil {
- return err
- } else if !base.IsTextFile(fileContents) {
- // FIXME: UTF-16 files will probably fail here
- return nil
- }
-
- id := filenameIndexerID(repo.ID, update.Filename)
- return batch.Index(id, &RepoIndexerData{
- RepoID: repo.ID,
- CommitID: commitSha,
- Content: string(charset.ToUTF8DropErrors(fileContents)),
- Language: analyze.GetCodeLanguage(update.Filename, fileContents),
- UpdatedAt: time.Now().UTC(),
- })
-}
-
-func addDelete(filename string, repo *models.Repository, batch rupture.FlushingBatch) error {
- id := filenameIndexerID(repo.ID, filename)
- return batch.Delete(id)
-}
-
const (
repoIndexerAnalyzer = "repoIndexerAnalyzer"
repoIndexerDocType = "repoIndexerDocType"
repoIndexerLatestVersion = 5
)
-// createRepoIndexer create a repo indexer if one does not already exist
-func createRepoIndexer(path string, latestVersion int) (bleve.Index, error) {
+// createBleveIndexer create a bleve repo indexer if one does not already exist
+func createBleveIndexer(path string, latestVersion int) (bleve.Index, error) {
docMapping := bleve.NewDocumentMapping()
numericFieldMapping := bleve.NewNumericFieldMapping()
numericFieldMapping.IncludeInAll = false
@@ -199,18 +159,6 @@ func createRepoIndexer(path string, latestVersion int) (bleve.Index, error) {
return indexer, nil
}
-func filenameIndexerID(repoID int64, filename string) string {
- return indexerID(repoID) + "_" + filename
-}
-
-func filenameOfIndexerID(indexerID string) string {
- index := strings.IndexByte(indexerID, '_')
- if index == -1 {
- log.Error("Unexpected ID in repo indexer: %s", indexerID)
- }
- return indexerID[index+1:]
-}
-
var (
_ Indexer = &BleveIndexer{}
)
@@ -230,10 +178,51 @@ func NewBleveIndexer(indexDir string) (*BleveIndexer, bool, error) {
return indexer, created, err
}
+func (b *BleveIndexer) addUpdate(commitSha string, update fileUpdate, repo *models.Repository, batch rupture.FlushingBatch) error {
+ // Ignore vendored files in code search
+ if setting.Indexer.ExcludeVendored && enry.IsVendor(update.Filename) {
+ return nil
+ }
+
+ stdout, err := git.NewCommand("cat-file", "-s", update.BlobSha).
+ RunInDir(repo.RepoPath())
+ if err != nil {
+ return err
+ }
+ if size, err := strconv.Atoi(strings.TrimSpace(stdout)); err != nil {
+ return fmt.Errorf("Misformatted git cat-file output: %v", err)
+ } else if int64(size) > setting.Indexer.MaxIndexerFileSize {
+ return b.addDelete(update.Filename, repo, batch)
+ }
+
+ fileContents, err := git.NewCommand("cat-file", "blob", update.BlobSha).
+ RunInDirBytes(repo.RepoPath())
+ if err != nil {
+ return err
+ } else if !base.IsTextFile(fileContents) {
+ // FIXME: UTF-16 files will probably fail here
+ return nil
+ }
+
+ id := filenameIndexerID(repo.ID, update.Filename)
+ return batch.Index(id, &RepoIndexerData{
+ RepoID: repo.ID,
+ CommitID: commitSha,
+ Content: string(charset.ToUTF8DropErrors(fileContents)),
+ Language: analyze.GetCodeLanguage(update.Filename, fileContents),
+ UpdatedAt: time.Now().UTC(),
+ })
+}
+
+func (b *BleveIndexer) addDelete(filename string, repo *models.Repository, batch rupture.FlushingBatch) error {
+ id := filenameIndexerID(repo.ID, filename)
+ return batch.Delete(id)
+}
+
// init init the indexer
func (b *BleveIndexer) init() (bool, error) {
var err error
- b.indexer, err = openIndexer(b.indexDir, repoIndexerLatestVersion)
+ b.indexer, err = openBleveIndexer(b.indexDir, repoIndexerLatestVersion)
if err != nil {
return false, err
}
@@ -241,7 +230,7 @@ func (b *BleveIndexer) init() (bool, error) {
return false, nil
}
- b.indexer, err = createRepoIndexer(b.indexDir, repoIndexerLatestVersion)
+ b.indexer, err = createBleveIndexer(b.indexDir, repoIndexerLatestVersion)
if err != nil {
return false, err
}
@@ -262,38 +251,19 @@ func (b *BleveIndexer) Close() {
}
// Index indexes the data
-func (b *BleveIndexer) Index(repoID int64) error {
- repo, err := models.GetRepositoryByID(repoID)
- if err != nil {
- return err
- }
-
- sha, err := getDefaultBranchSha(repo)
- if err != nil {
- return err
- }
- changes, err := getRepoChanges(repo, sha)
- if err != nil {
- return err
- } else if changes == nil {
- return nil
- }
-
+func (b *BleveIndexer) Index(repo *models.Repository, sha string, changes *repoChanges) error {
batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize)
for _, update := range changes.Updates {
- if err := addUpdate(sha, update, repo, batch); err != nil {
+ if err := b.addUpdate(sha, update, repo, batch); err != nil {
return err
}
}
for _, filename := range changes.RemovedFilenames {
- if err := addDelete(filename, repo, batch); err != nil {
+ if err := b.addDelete(filename, repo, batch); err != nil {
return err
}
}
- if err = batch.Flush(); err != nil {
- return err
- }
- return repo.UpdateIndexerStatus(models.RepoIndexerTypeCode, sha)
+ return batch.Flush()
}
// Delete deletes indexes by ids
diff --git a/modules/indexer/code/bleve_test.go b/modules/indexer/code/bleve_test.go
index 2b3128ac88..f79957220f 100644
--- a/modules/indexer/code/bleve_test.go
+++ b/modules/indexer/code/bleve_test.go
@@ -6,21 +6,15 @@ package code
import (
"io/ioutil"
- "path/filepath"
"testing"
"code.gitea.io/gitea/models"
- "code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/util"
"github.com/stretchr/testify/assert"
)
-func TestMain(m *testing.M) {
- models.MainTest(m, filepath.Join("..", "..", ".."))
-}
-
-func TestIndexAndSearch(t *testing.T) {
+func TestBleveIndexAndSearch(t *testing.T) {
models.PrepareTestEnv(t)
dir, err := ioutil.TempDir("", "bleve.index")
@@ -31,10 +25,9 @@ func TestIndexAndSearch(t *testing.T) {
}
defer util.RemoveAll(dir)
- setting.Indexer.RepoIndexerEnabled = true
idx, _, err := NewBleveIndexer(dir)
if err != nil {
- assert.Fail(t, "Unable to create indexer Error: %v", err)
+ assert.Fail(t, "Unable to create bleve indexer Error: %v", err)
if idx != nil {
idx.Close()
}
@@ -42,45 +35,5 @@ func TestIndexAndSearch(t *testing.T) {
}
defer idx.Close()
- err = idx.Index(1)
- assert.NoError(t, err)
-
- var (
- keywords = []struct {
- Keyword string
- IDs []int64
- Langs int
- }{
- {
- Keyword: "Description",
- IDs: []int64{1},
- Langs: 1,
- },
- {
- Keyword: "repo1",
- IDs: []int64{1},
- Langs: 1,
- },
- {
- Keyword: "non-exist",
- IDs: []int64{},
- Langs: 0,
- },
- }
- )
-
- for _, kw := range keywords {
- total, res, langs, err := idx.Search(nil, "", kw.Keyword, 1, 10)
- assert.NoError(t, err)
- assert.EqualValues(t, len(kw.IDs), total)
-
- assert.NotNil(t, langs)
- assert.Len(t, langs, kw.Langs)
-
- var ids = make([]int64, 0, len(res))
- for _, hit := range res {
- ids = append(ids, hit.RepoID)
- }
- assert.EqualValues(t, kw.IDs, ids)
- }
+ testIndexer("beleve", t, idx)
}
diff --git a/modules/indexer/code/elastic_search.go b/modules/indexer/code/elastic_search.go
new file mode 100644
index 0000000000..4f690ed806
--- /dev/null
+++ b/modules/indexer/code/elastic_search.go
@@ -0,0 +1,385 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package code
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "strconv"
+ "strings"
+ "time"
+
+ "code.gitea.io/gitea/models"
+ "code.gitea.io/gitea/modules/analyze"
+ "code.gitea.io/gitea/modules/base"
+ "code.gitea.io/gitea/modules/charset"
+ "code.gitea.io/gitea/modules/git"
+ "code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/setting"
+ "code.gitea.io/gitea/modules/timeutil"
+
+ "github.com/go-enry/go-enry/v2"
+ "github.com/olivere/elastic/v7"
+)
+
+const (
+ esRepoIndexerLatestVersion = 1
+)
+
+var (
+ _ Indexer = &ElasticSearchIndexer{}
+)
+
+// ElasticSearchIndexer implements Indexer interface
+type ElasticSearchIndexer struct {
+ client *elastic.Client
+ indexerAliasName string
+}
+
+type elasticLogger struct {
+ *log.Logger
+}
+
+func (l elasticLogger) Printf(format string, args ...interface{}) {
+ _ = l.Logger.Log(2, l.Logger.GetLevel(), format, args...)
+}
+
+// NewElasticSearchIndexer creates a new elasticsearch indexer
+func NewElasticSearchIndexer(url, indexerName string) (*ElasticSearchIndexer, bool, error) {
+ opts := []elastic.ClientOptionFunc{
+ elastic.SetURL(url),
+ elastic.SetSniff(false),
+ elastic.SetHealthcheckInterval(10 * time.Second),
+ elastic.SetGzip(false),
+ }
+
+ logger := elasticLogger{log.GetLogger(log.DEFAULT)}
+
+ if logger.GetLevel() == log.TRACE || logger.GetLevel() == log.DEBUG {
+ opts = append(opts, elastic.SetTraceLog(logger))
+ } else if logger.GetLevel() == log.ERROR || logger.GetLevel() == log.CRITICAL || logger.GetLevel() == log.FATAL {
+ opts = append(opts, elastic.SetErrorLog(logger))
+ } else if logger.GetLevel() == log.INFO || logger.GetLevel() == log.WARN {
+ opts = append(opts, elastic.SetInfoLog(logger))
+ }
+
+ client, err := elastic.NewClient(opts...)
+ if err != nil {
+ return nil, false, err
+ }
+
+ indexer := &ElasticSearchIndexer{
+ client: client,
+ indexerAliasName: indexerName,
+ }
+ exists, err := indexer.init()
+
+ return indexer, !exists, err
+}
+
+const (
+ defaultMapping = `{
+ "mappings": {
+ "properties": {
+ "repo_id": {
+ "type": "long",
+ "index": true
+ },
+ "content": {
+ "type": "text",
+ "index": true
+ },
+ "commit_id": {
+ "type": "keyword",
+ "index": true
+ },
+ "language": {
+ "type": "keyword",
+ "index": true
+ },
+ "updated_at": {
+ "type": "long",
+ "index": true
+ }
+ }
+ }
+ }`
+)
+
+func (b *ElasticSearchIndexer) realIndexerName() string {
+ return fmt.Sprintf("%s.v%d", b.indexerAliasName, esRepoIndexerLatestVersion)
+}
+
+// Init will initialize the indexer
+func (b *ElasticSearchIndexer) init() (bool, error) {
+ ctx := context.Background()
+ exists, err := b.client.IndexExists(b.realIndexerName()).Do(ctx)
+ if err != nil {
+ return false, err
+ }
+ if !exists {
+ var mapping = defaultMapping
+
+ createIndex, err := b.client.CreateIndex(b.realIndexerName()).BodyString(mapping).Do(ctx)
+ if err != nil {
+ return false, err
+ }
+ if !createIndex.Acknowledged {
+ return false, fmt.Errorf("create index %s with %s failed", b.realIndexerName(), mapping)
+ }
+ }
+
+ // check version
+ r, err := b.client.Aliases().Do(ctx)
+ if err != nil {
+ return false, err
+ }
+
+ realIndexerNames := r.IndicesByAlias(b.indexerAliasName)
+ if len(realIndexerNames) < 1 {
+ res, err := b.client.Alias().
+ Add(b.realIndexerName(), b.indexerAliasName).
+ Do(ctx)
+ if err != nil {
+ return false, err
+ }
+ if !res.Acknowledged {
+ return false, fmt.Errorf("")
+ }
+ } else if len(realIndexerNames) >= 1 && realIndexerNames[0] < b.realIndexerName() {
+ log.Warn("Found older gitea indexer named %s, but we will create a new one %s and keep the old NOT DELETED. You can delete the old version after the upgrade succeed.",
+ realIndexerNames[0], b.realIndexerName())
+ res, err := b.client.Alias().
+ Remove(realIndexerNames[0], b.indexerAliasName).
+ Add(b.realIndexerName(), b.indexerAliasName).
+ Do(ctx)
+ if err != nil {
+ return false, err
+ }
+ if !res.Acknowledged {
+ return false, fmt.Errorf("")
+ }
+ }
+
+ return exists, nil
+}
+
+func (b *ElasticSearchIndexer) addUpdate(sha string, update fileUpdate, repo *models.Repository) ([]elastic.BulkableRequest, error) {
+ stdout, err := git.NewCommand("cat-file", "-s", update.BlobSha).
+ RunInDir(repo.RepoPath())
+ if err != nil {
+ return nil, err
+ }
+ if size, err := strconv.Atoi(strings.TrimSpace(stdout)); err != nil {
+ return nil, fmt.Errorf("Misformatted git cat-file output: %v", err)
+ } else if int64(size) > setting.Indexer.MaxIndexerFileSize {
+ return []elastic.BulkableRequest{b.addDelete(update.Filename, repo)}, nil
+ }
+
+ fileContents, err := git.NewCommand("cat-file", "blob", update.BlobSha).
+ RunInDirBytes(repo.RepoPath())
+ if err != nil {
+ return nil, err
+ } else if !base.IsTextFile(fileContents) {
+ // FIXME: UTF-16 files will probably fail here
+ return nil, nil
+ }
+
+ id := filenameIndexerID(repo.ID, update.Filename)
+
+ return []elastic.BulkableRequest{
+ elastic.NewBulkIndexRequest().
+ Index(b.indexerAliasName).
+ Id(id).
+ Doc(map[string]interface{}{
+ "repo_id": repo.ID,
+ "content": string(charset.ToUTF8DropErrors(fileContents)),
+ "commit_id": sha,
+ "language": analyze.GetCodeLanguage(update.Filename, fileContents),
+ "updated_at": timeutil.TimeStampNow(),
+ }),
+ }, nil
+}
+
+func (b *ElasticSearchIndexer) addDelete(filename string, repo *models.Repository) elastic.BulkableRequest {
+ id := filenameIndexerID(repo.ID, filename)
+ return elastic.NewBulkDeleteRequest().
+ Index(b.indexerAliasName).
+ Id(id)
+}
+
+// Index will save the index data
+func (b *ElasticSearchIndexer) Index(repo *models.Repository, sha string, changes *repoChanges) error {
+ reqs := make([]elastic.BulkableRequest, 0)
+ for _, update := range changes.Updates {
+ updateReqs, err := b.addUpdate(sha, update, repo)
+ if err != nil {
+ return err
+ }
+ if len(updateReqs) > 0 {
+ reqs = append(reqs, updateReqs...)
+ }
+ }
+
+ for _, filename := range changes.RemovedFilenames {
+ reqs = append(reqs, b.addDelete(filename, repo))
+ }
+
+ if len(reqs) > 0 {
+ _, err := b.client.Bulk().
+ Index(b.indexerAliasName).
+ Add(reqs...).
+ Do(context.Background())
+ return err
+ }
+ return nil
+}
+
+// Delete deletes indexes by ids
+func (b *ElasticSearchIndexer) Delete(repoID int64) error {
+ _, err := b.client.DeleteByQuery(b.indexerAliasName).
+ Query(elastic.NewTermsQuery("repo_id", repoID)).
+ Do(context.Background())
+ return err
+}
+
+func convertResult(searchResult *elastic.SearchResult, kw string, pageSize int) (int64, []*SearchResult, []*SearchResultLanguages, error) {
+ hits := make([]*SearchResult, 0, pageSize)
+ for _, hit := range searchResult.Hits.Hits {
+ // FIXME: There is no way to get the position the keyword on the content currently on the same request.
+ // So we get it from content, this may made the query slower. See
+ // https://discuss.elastic.co/t/fetching-position-of-keyword-in-matched-document/94291
+ var startIndex, endIndex int = -1, -1
+ c, ok := hit.Highlight["content"]
+ if ok && len(c) > 0 {
+ var subStr = make([]rune, 0, len(kw))
+ startIndex = strings.IndexFunc(c[0], func(r rune) bool {
+ if len(subStr) >= len(kw) {
+ subStr = subStr[1:]
+ }
+ subStr = append(subStr, r)
+ return strings.EqualFold(kw, string(subStr))
+ })
+ if startIndex > -1 {
+ endIndex = startIndex + len(kw)
+ } else {
+ panic(fmt.Sprintf("1===%#v", hit.Highlight))
+ }
+ } else {
+ panic(fmt.Sprintf("2===%#v", hit.Highlight))
+ }
+
+ repoID, fileName := parseIndexerID(hit.Id)
+ var res = make(map[string]interface{})
+ if err := json.Unmarshal(hit.Source, &res); err != nil {
+ return 0, nil, nil, err
+ }
+
+ language := res["language"].(string)
+
+ hits = append(hits, &SearchResult{
+ RepoID: repoID,
+ Filename: fileName,
+ CommitID: res["commit_id"].(string),
+ Content: res["content"].(string),
+ UpdatedUnix: timeutil.TimeStamp(res["updated_at"].(float64)),
+ Language: language,
+ StartIndex: startIndex,
+ EndIndex: endIndex,
+ Color: enry.GetColor(language),
+ })
+ }
+
+ return searchResult.TotalHits(), hits, extractAggs(searchResult), nil
+}
+
+func extractAggs(searchResult *elastic.SearchResult) []*SearchResultLanguages {
+ var searchResultLanguages []*SearchResultLanguages
+ agg, found := searchResult.Aggregations.Terms("language")
+ if found {
+ searchResultLanguages = make([]*SearchResultLanguages, 0, 10)
+
+ for _, bucket := range agg.Buckets {
+ searchResultLanguages = append(searchResultLanguages, &SearchResultLanguages{
+ Language: bucket.Key.(string),
+ Color: enry.GetColor(bucket.Key.(string)),
+ Count: int(bucket.DocCount),
+ })
+ }
+ }
+ return searchResultLanguages
+}
+
+// Search searches for codes and language stats by given conditions.
+func (b *ElasticSearchIndexer) Search(repoIDs []int64, language, keyword string, page, pageSize int) (int64, []*SearchResult, []*SearchResultLanguages, error) {
+ kwQuery := elastic.NewMultiMatchQuery(keyword, "content")
+ query := elastic.NewBoolQuery()
+ query = query.Must(kwQuery)
+ if len(repoIDs) > 0 {
+ var repoStrs = make([]interface{}, 0, len(repoIDs))
+ for _, repoID := range repoIDs {
+ repoStrs = append(repoStrs, repoID)
+ }
+ repoQuery := elastic.NewTermsQuery("repo_id", repoStrs...)
+ query = query.Must(repoQuery)
+ }
+
+ var (
+ start int
+ kw = "<em>" + keyword + "</em>"
+ aggregation = elastic.NewTermsAggregation().Field("language").Size(10).OrderByCountDesc()
+ )
+
+ if page > 0 {
+ start = (page - 1) * pageSize
+ }
+
+ if len(language) == 0 {
+ searchResult, err := b.client.Search().
+ Index(b.indexerAliasName).
+ Aggregation("language", aggregation).
+ Query(query).
+ Highlight(elastic.NewHighlight().Field("content")).
+ Sort("repo_id", true).
+ From(start).Size(pageSize).
+ Do(context.Background())
+ if err != nil {
+ return 0, nil, nil, err
+ }
+
+ return convertResult(searchResult, kw, pageSize)
+ }
+
+ langQuery := elastic.NewMatchQuery("language", language)
+ countResult, err := b.client.Search().
+ Index(b.indexerAliasName).
+ Aggregation("language", aggregation).
+ Query(query).
+ Size(0). // We only needs stats information
+ Do(context.Background())
+ if err != nil {
+ return 0, nil, nil, err
+ }
+
+ query = query.Must(langQuery)
+ searchResult, err := b.client.Search().
+ Index(b.indexerAliasName).
+ Query(query).
+ Highlight(elastic.NewHighlight().Field("content")).
+ Sort("repo_id", true).
+ From(start).Size(pageSize).
+ Do(context.Background())
+ if err != nil {
+ return 0, nil, nil, err
+ }
+
+ total, hits, _, err := convertResult(searchResult, kw, pageSize)
+
+ return total, hits, extractAggs(countResult), err
+}
+
+// Close implements indexer
+func (b *ElasticSearchIndexer) Close() {}
diff --git a/modules/indexer/code/elastic_search_test.go b/modules/indexer/code/elastic_search_test.go
new file mode 100644
index 0000000000..a230939746
--- /dev/null
+++ b/modules/indexer/code/elastic_search_test.go
@@ -0,0 +1,36 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package code
+
+import (
+ "os"
+ "testing"
+
+ "code.gitea.io/gitea/models"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestESIndexAndSearch(t *testing.T) {
+ models.PrepareTestEnv(t)
+
+ u := os.Getenv("TEST_INDEXER_CODE_ES_URL")
+ if u == "" {
+ t.SkipNow()
+ return
+ }
+
+ indexer, _, err := NewElasticSearchIndexer(u, "gitea_codes")
+ if err != nil {
+ assert.Fail(t, "Unable to create ES indexer Error: %v", err)
+ if indexer != nil {
+ indexer.Close()
+ }
+ return
+ }
+ defer indexer.Close()
+
+ testIndexer("elastic_search", t, indexer)
+}
diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go
index a0f91ce4b5..468955cd89 100644
--- a/modules/indexer/code/indexer.go
+++ b/modules/indexer/code/indexer.go
@@ -7,8 +7,11 @@ package code
import (
"context"
"os"
+ "strconv"
+ "strings"
"time"
+ "code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
@@ -37,12 +40,33 @@ type SearchResultLanguages struct {
// Indexer defines an interface to indexer issues contents
type Indexer interface {
- Index(repoID int64) error
+ Index(repo *models.Repository, sha string, changes *repoChanges) error
Delete(repoID int64) error
Search(repoIDs []int64, language, keyword string, page, pageSize int) (int64, []*SearchResult, []*SearchResultLanguages, error)
Close()
}
+func filenameIndexerID(repoID int64, filename string) string {
+ return indexerID(repoID) + "_" + filename
+}
+
+func parseIndexerID(indexerID string) (int64, string) {
+ index := strings.IndexByte(indexerID, '_')
+ if index == -1 {
+ log.Error("Unexpected ID in repo indexer: %s", indexerID)
+ }
+ repoID, _ := strconv.ParseInt(indexerID[:index], 10, 64)
+ return repoID, indexerID[index+1:]
+}
+
+func filenameOfIndexerID(indexerID string) string {
+ index := strings.IndexByte(indexerID, '_')
+ if index == -1 {
+ log.Error("Unexpected ID in repo indexer: %s", indexerID)
+ }
+ return indexerID[index+1:]
+}
+
// Init initialize the repo indexer
func Init() {
if !setting.Indexer.RepoIndexerEnabled {
@@ -63,33 +87,61 @@ func Init() {
waitChannel := make(chan time.Duration)
go func() {
start := time.Now()
- log.Info("PID: %d Initializing Repository Indexer at: %s", os.Getpid(), setting.Indexer.RepoPath)
- defer func() {
- if err := recover(); err != nil {
- log.Error("PANIC whilst initializing repository indexer: %v\nStacktrace: %s", err, log.Stack(2))
- log.Error("The indexer files are likely corrupted and may need to be deleted")
- log.Error("You can completely remove the %q directory to make Gitea recreate the indexes", setting.Indexer.RepoPath)
+ var (
+ rIndexer Indexer
+ populate bool
+ err error
+ )
+ switch setting.Indexer.RepoType {
+ case "bleve":
+ log.Info("PID: %d Initializing Repository Indexer at: %s", os.Getpid(), setting.Indexer.RepoPath)
+ defer func() {
+ if err := recover(); err != nil {
+ log.Error("PANIC whilst initializing repository indexer: %v\nStacktrace: %s", err, log.Stack(2))
+ log.Error("The indexer files are likely corrupted and may need to be deleted")
+ log.Error("You can completely remove the \"%s\" directory to make Gitea recreate the indexes", setting.Indexer.RepoPath)
+ }
+ }()
+
+ rIndexer, populate, err = NewBleveIndexer(setting.Indexer.RepoPath)
+ if err != nil {
+ if rIndexer != nil {
+ rIndexer.Close()
+ }
cancel()
indexer.Close()
close(waitChannel)
- log.Fatal("PID: %d Unable to initialize the Repository Indexer at path: %s Error: %v", os.Getpid(), setting.Indexer.RepoPath, err)
+ log.Fatal("PID: %d Unable to initialize the bleve Repository Indexer at path: %s Error: %v", os.Getpid(), setting.Indexer.RepoPath, err)
}
- }()
- bleveIndexer, created, err := NewBleveIndexer(setting.Indexer.RepoPath)
- if err != nil {
- if bleveIndexer != nil {
- bleveIndexer.Close()
+ case "elasticsearch":
+ log.Info("PID: %d Initializing Repository Indexer at: %s", os.Getpid(), setting.Indexer.RepoConnStr)
+ defer func() {
+ if err := recover(); err != nil {
+ log.Error("PANIC whilst initializing repository indexer: %v\nStacktrace: %s", err, log.Stack(2))
+ log.Error("The indexer files are likely corrupted and may need to be deleted")
+ log.Error("You can completely remove the \"%s\" index to make Gitea recreate the indexes", setting.Indexer.RepoConnStr)
+ }
+ }()
+
+ rIndexer, populate, err = NewElasticSearchIndexer(setting.Indexer.RepoConnStr, setting.Indexer.RepoIndexerName)
+ if err != nil {
+ if rIndexer != nil {
+ rIndexer.Close()
+ }
+ cancel()
+ indexer.Close()
+ close(waitChannel)
+ log.Fatal("PID: %d Unable to initialize the elasticsearch Repository Indexer connstr: %s Error: %v", os.Getpid(), setting.Indexer.RepoConnStr, err)
}
- cancel()
- indexer.Close()
- close(waitChannel)
- log.Fatal("PID: %d Unable to initialize the Repository Indexer at path: %s Error: %v", os.Getpid(), setting.Indexer.RepoPath, err)
+ default:
+ log.Fatal("PID: %d Unknown Indexer type: %s", os.Getpid(), setting.Indexer.RepoType)
}
- indexer.set(bleveIndexer)
+
+ indexer.set(rIndexer)
go processRepoIndexerOperationQueue(indexer)
- if created {
+ if populate {
go populateRepoIndexer()
}
select {
diff --git a/modules/indexer/code/indexer_test.go b/modules/indexer/code/indexer_test.go
new file mode 100644
index 0000000000..0b4851a48a
--- /dev/null
+++ b/modules/indexer/code/indexer_test.go
@@ -0,0 +1,83 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package code
+
+import (
+ "path/filepath"
+ "testing"
+
+ "code.gitea.io/gitea/models"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestMain(m *testing.M) {
+ models.MainTest(m, filepath.Join("..", "..", ".."))
+}
+
+func testIndexer(name string, t *testing.T, indexer Indexer) {
+ t.Run(name, func(t *testing.T) {
+ var repoID int64 = 1
+ err := index(indexer, repoID)
+ assert.NoError(t, err)
+ var (
+ keywords = []struct {
+ RepoIDs []int64
+ Keyword string
+ IDs []int64
+ Langs int
+ }{
+ {
+ RepoIDs: nil,
+ Keyword: "Description",
+ IDs: []int64{repoID},
+ Langs: 1,
+ },
+ {
+ RepoIDs: []int64{2},
+ Keyword: "Description",
+ IDs: []int64{},
+ Langs: 0,
+ },
+ {
+ RepoIDs: nil,
+ Keyword: "repo1",
+ IDs: []int64{repoID},
+ Langs: 1,
+ },
+ {
+ RepoIDs: []int64{2},
+ Keyword: "repo1",
+ IDs: []int64{},
+ Langs: 0,
+ },
+ {
+ RepoIDs: nil,
+ Keyword: "non-exist",
+ IDs: []int64{},
+ Langs: 0,
+ },
+ }
+ )
+
+ for _, kw := range keywords {
+ t.Run(kw.Keyword, func(t *testing.T) {
+ total, res, langs, err := indexer.Search(kw.RepoIDs, "", kw.Keyword, 1, 10)
+ assert.NoError(t, err)
+ assert.EqualValues(t, len(kw.IDs), total)
+ assert.EqualValues(t, kw.Langs, len(langs))
+
+ var ids = make([]int64, 0, len(res))
+ for _, hit := range res {
+ ids = append(ids, hit.RepoID)
+ assert.EqualValues(t, "# repo1\n\nDescription for repo1", hit.Content)
+ }
+ assert.EqualValues(t, kw.IDs, ids)
+ })
+ }
+
+ assert.NoError(t, indexer.Delete(repoID))
+ })
+}
diff --git a/modules/indexer/code/queue.go b/modules/indexer/code/queue.go
index 94675559ea..844003e1fc 100644
--- a/modules/indexer/code/queue.go
+++ b/modules/indexer/code/queue.go
@@ -10,7 +10,6 @@ import (
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
- "code.gitea.io/gitea/modules/setting"
)
type repoIndexerOperation struct {
@@ -25,6 +24,30 @@ func initQueue(queueLength int) {
repoIndexerOperationQueue = make(chan repoIndexerOperation, queueLength)
}
+func index(indexer Indexer, repoID int64) error {
+ repo, err := models.GetRepositoryByID(repoID)
+ if err != nil {
+ return err
+ }
+
+ sha, err := getDefaultBranchSha(repo)
+ if err != nil {
+ return err
+ }
+ changes, err := getRepoChanges(repo, sha)
+ if err != nil {
+ return err
+ } else if changes == nil {
+ return nil
+ }
+
+ if err := indexer.Index(repo, sha, changes); err != nil {
+ return err
+ }
+
+ return repo.UpdateIndexerStatus(models.RepoIndexerTypeCode, sha)
+}
+
func processRepoIndexerOperationQueue(indexer Indexer) {
for {
select {
@@ -35,7 +58,7 @@ func processRepoIndexerOperationQueue(indexer Indexer) {
log.Error("indexer.Delete: %v", err)
}
} else {
- if err = indexer.Index(op.repoID); err != nil {
+ if err = index(indexer, op.repoID); err != nil {
log.Error("indexer.Index: %v", err)
}
}
@@ -60,9 +83,6 @@ func UpdateRepoIndexer(repo *models.Repository, watchers ...chan<- error) {
}
func addOperationToQueue(op repoIndexerOperation) {
- if !setting.Indexer.RepoIndexerEnabled {
- return
- }
select {
case repoIndexerOperationQueue <- op:
break
diff --git a/modules/indexer/code/wrapped.go b/modules/indexer/code/wrapped.go
index 926597a382..d839544874 100644
--- a/modules/indexer/code/wrapped.go
+++ b/modules/indexer/code/wrapped.go
@@ -7,6 +7,8 @@ package code
import (
"fmt"
"sync"
+
+ "code.gitea.io/gitea/models"
)
var (
@@ -55,12 +57,12 @@ func (w *wrappedIndexer) get() (Indexer, error) {
return w.internal, nil
}
-func (w *wrappedIndexer) Index(repoID int64) error {
+func (w *wrappedIndexer) Index(repo *models.Repository, sha string, changes *repoChanges) error {
indexer, err := w.get()
if err != nil {
return err
}
- return indexer.Index(repoID)
+ return indexer.Index(repo, sha, changes)
}
func (w *wrappedIndexer) Delete(repoID int64) error {