diff options
author | Lunny Xiao <xiaolunwen@gmail.com> | 2020-08-31 00:08:01 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-08-30 19:08:01 +0300 |
commit | 9bc69ff26eeebaf3b622d62d18c757ff1f401dda (patch) | |
tree | 69ff71d9d460e83a6fff54b172b604732ab5d065 /modules/indexer | |
parent | d257485bc0026c9717fe7bf4c9953ad1b7a1a9ae (diff) | |
download | gitea-9bc69ff26eeebaf3b622d62d18c757ff1f401dda.tar.gz gitea-9bc69ff26eeebaf3b622d62d18c757ff1f401dda.zip |
Support elastic search for code search (#10273)
* Support elastic search for code search
* Finished elastic search implementation and add some tests
* Enable test on drone and added docs
* Add new fields to elastic search
* Fix bug
* remove unused changes
* Use indexer alias to keep the gitea indexer version
* Improve codes
* Some code improvements
* The real indexer name changed to xxx.v1
Co-authored-by: zeripath <art27@cantab.net>
Diffstat (limited to 'modules/indexer')
-rw-r--r-- | modules/indexer/code/bleve.go | 132 | ||||
-rw-r--r-- | modules/indexer/code/bleve_test.go | 53 | ||||
-rw-r--r-- | modules/indexer/code/elastic_search.go | 385 | ||||
-rw-r--r-- | modules/indexer/code/elastic_search_test.go | 36 | ||||
-rw-r--r-- | modules/indexer/code/indexer.go | 90 | ||||
-rw-r--r-- | modules/indexer/code/indexer_test.go | 83 | ||||
-rw-r--r-- | modules/indexer/code/queue.go | 30 | ||||
-rw-r--r-- | modules/indexer/code/wrapped.go | 6 |
8 files changed, 658 insertions, 157 deletions
diff --git a/modules/indexer/code/bleve.go b/modules/indexer/code/bleve.go index 6502259ba4..81373bf3da 100644 --- a/modules/indexer/code/bleve.go +++ b/modules/indexer/code/bleve.go @@ -58,10 +58,10 @@ func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error { }) } -// openIndexer open the index at the specified path, checking for metadata +// openBleveIndexer open the index at the specified path, checking for metadata // updates and bleve version updates. If index needs to be created (or // re-created), returns (nil, nil) -func openIndexer(path string, latestVersion int) (bleve.Index, error) { +func openBleveIndexer(path string, latestVersion int) (bleve.Index, error) { _, err := os.Stat(path) if err != nil && os.IsNotExist(err) { return nil, nil @@ -104,54 +104,14 @@ func (d *RepoIndexerData) Type() string { return repoIndexerDocType } -func addUpdate(commitSha string, update fileUpdate, repo *models.Repository, batch rupture.FlushingBatch) error { - // Ignore vendored files in code search - if setting.Indexer.ExcludeVendored && enry.IsVendor(update.Filename) { - return nil - } - stdout, err := git.NewCommand("cat-file", "-s", update.BlobSha). - RunInDir(repo.RepoPath()) - if err != nil { - return err - } - if size, err := strconv.Atoi(strings.TrimSpace(stdout)); err != nil { - return fmt.Errorf("Misformatted git cat-file output: %v", err) - } else if int64(size) > setting.Indexer.MaxIndexerFileSize { - return addDelete(update.Filename, repo, batch) - } - - fileContents, err := git.NewCommand("cat-file", "blob", update.BlobSha). - RunInDirBytes(repo.RepoPath()) - if err != nil { - return err - } else if !base.IsTextFile(fileContents) { - // FIXME: UTF-16 files will probably fail here - return nil - } - - id := filenameIndexerID(repo.ID, update.Filename) - return batch.Index(id, &RepoIndexerData{ - RepoID: repo.ID, - CommitID: commitSha, - Content: string(charset.ToUTF8DropErrors(fileContents)), - Language: analyze.GetCodeLanguage(update.Filename, fileContents), - UpdatedAt: time.Now().UTC(), - }) -} - -func addDelete(filename string, repo *models.Repository, batch rupture.FlushingBatch) error { - id := filenameIndexerID(repo.ID, filename) - return batch.Delete(id) -} - const ( repoIndexerAnalyzer = "repoIndexerAnalyzer" repoIndexerDocType = "repoIndexerDocType" repoIndexerLatestVersion = 5 ) -// createRepoIndexer create a repo indexer if one does not already exist -func createRepoIndexer(path string, latestVersion int) (bleve.Index, error) { +// createBleveIndexer create a bleve repo indexer if one does not already exist +func createBleveIndexer(path string, latestVersion int) (bleve.Index, error) { docMapping := bleve.NewDocumentMapping() numericFieldMapping := bleve.NewNumericFieldMapping() numericFieldMapping.IncludeInAll = false @@ -199,18 +159,6 @@ func createRepoIndexer(path string, latestVersion int) (bleve.Index, error) { return indexer, nil } -func filenameIndexerID(repoID int64, filename string) string { - return indexerID(repoID) + "_" + filename -} - -func filenameOfIndexerID(indexerID string) string { - index := strings.IndexByte(indexerID, '_') - if index == -1 { - log.Error("Unexpected ID in repo indexer: %s", indexerID) - } - return indexerID[index+1:] -} - var ( _ Indexer = &BleveIndexer{} ) @@ -230,10 +178,51 @@ func NewBleveIndexer(indexDir string) (*BleveIndexer, bool, error) { return indexer, created, err } +func (b *BleveIndexer) addUpdate(commitSha string, update fileUpdate, repo *models.Repository, batch rupture.FlushingBatch) error { + // Ignore vendored files in code search + if setting.Indexer.ExcludeVendored && enry.IsVendor(update.Filename) { + return nil + } + + stdout, err := git.NewCommand("cat-file", "-s", update.BlobSha). + RunInDir(repo.RepoPath()) + if err != nil { + return err + } + if size, err := strconv.Atoi(strings.TrimSpace(stdout)); err != nil { + return fmt.Errorf("Misformatted git cat-file output: %v", err) + } else if int64(size) > setting.Indexer.MaxIndexerFileSize { + return b.addDelete(update.Filename, repo, batch) + } + + fileContents, err := git.NewCommand("cat-file", "blob", update.BlobSha). + RunInDirBytes(repo.RepoPath()) + if err != nil { + return err + } else if !base.IsTextFile(fileContents) { + // FIXME: UTF-16 files will probably fail here + return nil + } + + id := filenameIndexerID(repo.ID, update.Filename) + return batch.Index(id, &RepoIndexerData{ + RepoID: repo.ID, + CommitID: commitSha, + Content: string(charset.ToUTF8DropErrors(fileContents)), + Language: analyze.GetCodeLanguage(update.Filename, fileContents), + UpdatedAt: time.Now().UTC(), + }) +} + +func (b *BleveIndexer) addDelete(filename string, repo *models.Repository, batch rupture.FlushingBatch) error { + id := filenameIndexerID(repo.ID, filename) + return batch.Delete(id) +} + // init init the indexer func (b *BleveIndexer) init() (bool, error) { var err error - b.indexer, err = openIndexer(b.indexDir, repoIndexerLatestVersion) + b.indexer, err = openBleveIndexer(b.indexDir, repoIndexerLatestVersion) if err != nil { return false, err } @@ -241,7 +230,7 @@ func (b *BleveIndexer) init() (bool, error) { return false, nil } - b.indexer, err = createRepoIndexer(b.indexDir, repoIndexerLatestVersion) + b.indexer, err = createBleveIndexer(b.indexDir, repoIndexerLatestVersion) if err != nil { return false, err } @@ -262,38 +251,19 @@ func (b *BleveIndexer) Close() { } // Index indexes the data -func (b *BleveIndexer) Index(repoID int64) error { - repo, err := models.GetRepositoryByID(repoID) - if err != nil { - return err - } - - sha, err := getDefaultBranchSha(repo) - if err != nil { - return err - } - changes, err := getRepoChanges(repo, sha) - if err != nil { - return err - } else if changes == nil { - return nil - } - +func (b *BleveIndexer) Index(repo *models.Repository, sha string, changes *repoChanges) error { batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize) for _, update := range changes.Updates { - if err := addUpdate(sha, update, repo, batch); err != nil { + if err := b.addUpdate(sha, update, repo, batch); err != nil { return err } } for _, filename := range changes.RemovedFilenames { - if err := addDelete(filename, repo, batch); err != nil { + if err := b.addDelete(filename, repo, batch); err != nil { return err } } - if err = batch.Flush(); err != nil { - return err - } - return repo.UpdateIndexerStatus(models.RepoIndexerTypeCode, sha) + return batch.Flush() } // Delete deletes indexes by ids diff --git a/modules/indexer/code/bleve_test.go b/modules/indexer/code/bleve_test.go index 2b3128ac88..f79957220f 100644 --- a/modules/indexer/code/bleve_test.go +++ b/modules/indexer/code/bleve_test.go @@ -6,21 +6,15 @@ package code import ( "io/ioutil" - "path/filepath" "testing" "code.gitea.io/gitea/models" - "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/util" "github.com/stretchr/testify/assert" ) -func TestMain(m *testing.M) { - models.MainTest(m, filepath.Join("..", "..", "..")) -} - -func TestIndexAndSearch(t *testing.T) { +func TestBleveIndexAndSearch(t *testing.T) { models.PrepareTestEnv(t) dir, err := ioutil.TempDir("", "bleve.index") @@ -31,10 +25,9 @@ func TestIndexAndSearch(t *testing.T) { } defer util.RemoveAll(dir) - setting.Indexer.RepoIndexerEnabled = true idx, _, err := NewBleveIndexer(dir) if err != nil { - assert.Fail(t, "Unable to create indexer Error: %v", err) + assert.Fail(t, "Unable to create bleve indexer Error: %v", err) if idx != nil { idx.Close() } @@ -42,45 +35,5 @@ func TestIndexAndSearch(t *testing.T) { } defer idx.Close() - err = idx.Index(1) - assert.NoError(t, err) - - var ( - keywords = []struct { - Keyword string - IDs []int64 - Langs int - }{ - { - Keyword: "Description", - IDs: []int64{1}, - Langs: 1, - }, - { - Keyword: "repo1", - IDs: []int64{1}, - Langs: 1, - }, - { - Keyword: "non-exist", - IDs: []int64{}, - Langs: 0, - }, - } - ) - - for _, kw := range keywords { - total, res, langs, err := idx.Search(nil, "", kw.Keyword, 1, 10) - assert.NoError(t, err) - assert.EqualValues(t, len(kw.IDs), total) - - assert.NotNil(t, langs) - assert.Len(t, langs, kw.Langs) - - var ids = make([]int64, 0, len(res)) - for _, hit := range res { - ids = append(ids, hit.RepoID) - } - assert.EqualValues(t, kw.IDs, ids) - } + testIndexer("beleve", t, idx) } diff --git a/modules/indexer/code/elastic_search.go b/modules/indexer/code/elastic_search.go new file mode 100644 index 0000000000..4f690ed806 --- /dev/null +++ b/modules/indexer/code/elastic_search.go @@ -0,0 +1,385 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package code + +import ( + "context" + "encoding/json" + "fmt" + "strconv" + "strings" + "time" + + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/analyze" + "code.gitea.io/gitea/modules/base" + "code.gitea.io/gitea/modules/charset" + "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/timeutil" + + "github.com/go-enry/go-enry/v2" + "github.com/olivere/elastic/v7" +) + +const ( + esRepoIndexerLatestVersion = 1 +) + +var ( + _ Indexer = &ElasticSearchIndexer{} +) + +// ElasticSearchIndexer implements Indexer interface +type ElasticSearchIndexer struct { + client *elastic.Client + indexerAliasName string +} + +type elasticLogger struct { + *log.Logger +} + +func (l elasticLogger) Printf(format string, args ...interface{}) { + _ = l.Logger.Log(2, l.Logger.GetLevel(), format, args...) +} + +// NewElasticSearchIndexer creates a new elasticsearch indexer +func NewElasticSearchIndexer(url, indexerName string) (*ElasticSearchIndexer, bool, error) { + opts := []elastic.ClientOptionFunc{ + elastic.SetURL(url), + elastic.SetSniff(false), + elastic.SetHealthcheckInterval(10 * time.Second), + elastic.SetGzip(false), + } + + logger := elasticLogger{log.GetLogger(log.DEFAULT)} + + if logger.GetLevel() == log.TRACE || logger.GetLevel() == log.DEBUG { + opts = append(opts, elastic.SetTraceLog(logger)) + } else if logger.GetLevel() == log.ERROR || logger.GetLevel() == log.CRITICAL || logger.GetLevel() == log.FATAL { + opts = append(opts, elastic.SetErrorLog(logger)) + } else if logger.GetLevel() == log.INFO || logger.GetLevel() == log.WARN { + opts = append(opts, elastic.SetInfoLog(logger)) + } + + client, err := elastic.NewClient(opts...) + if err != nil { + return nil, false, err + } + + indexer := &ElasticSearchIndexer{ + client: client, + indexerAliasName: indexerName, + } + exists, err := indexer.init() + + return indexer, !exists, err +} + +const ( + defaultMapping = `{ + "mappings": { + "properties": { + "repo_id": { + "type": "long", + "index": true + }, + "content": { + "type": "text", + "index": true + }, + "commit_id": { + "type": "keyword", + "index": true + }, + "language": { + "type": "keyword", + "index": true + }, + "updated_at": { + "type": "long", + "index": true + } + } + } + }` +) + +func (b *ElasticSearchIndexer) realIndexerName() string { + return fmt.Sprintf("%s.v%d", b.indexerAliasName, esRepoIndexerLatestVersion) +} + +// Init will initialize the indexer +func (b *ElasticSearchIndexer) init() (bool, error) { + ctx := context.Background() + exists, err := b.client.IndexExists(b.realIndexerName()).Do(ctx) + if err != nil { + return false, err + } + if !exists { + var mapping = defaultMapping + + createIndex, err := b.client.CreateIndex(b.realIndexerName()).BodyString(mapping).Do(ctx) + if err != nil { + return false, err + } + if !createIndex.Acknowledged { + return false, fmt.Errorf("create index %s with %s failed", b.realIndexerName(), mapping) + } + } + + // check version + r, err := b.client.Aliases().Do(ctx) + if err != nil { + return false, err + } + + realIndexerNames := r.IndicesByAlias(b.indexerAliasName) + if len(realIndexerNames) < 1 { + res, err := b.client.Alias(). + Add(b.realIndexerName(), b.indexerAliasName). + Do(ctx) + if err != nil { + return false, err + } + if !res.Acknowledged { + return false, fmt.Errorf("") + } + } else if len(realIndexerNames) >= 1 && realIndexerNames[0] < b.realIndexerName() { + log.Warn("Found older gitea indexer named %s, but we will create a new one %s and keep the old NOT DELETED. You can delete the old version after the upgrade succeed.", + realIndexerNames[0], b.realIndexerName()) + res, err := b.client.Alias(). + Remove(realIndexerNames[0], b.indexerAliasName). + Add(b.realIndexerName(), b.indexerAliasName). + Do(ctx) + if err != nil { + return false, err + } + if !res.Acknowledged { + return false, fmt.Errorf("") + } + } + + return exists, nil +} + +func (b *ElasticSearchIndexer) addUpdate(sha string, update fileUpdate, repo *models.Repository) ([]elastic.BulkableRequest, error) { + stdout, err := git.NewCommand("cat-file", "-s", update.BlobSha). + RunInDir(repo.RepoPath()) + if err != nil { + return nil, err + } + if size, err := strconv.Atoi(strings.TrimSpace(stdout)); err != nil { + return nil, fmt.Errorf("Misformatted git cat-file output: %v", err) + } else if int64(size) > setting.Indexer.MaxIndexerFileSize { + return []elastic.BulkableRequest{b.addDelete(update.Filename, repo)}, nil + } + + fileContents, err := git.NewCommand("cat-file", "blob", update.BlobSha). + RunInDirBytes(repo.RepoPath()) + if err != nil { + return nil, err + } else if !base.IsTextFile(fileContents) { + // FIXME: UTF-16 files will probably fail here + return nil, nil + } + + id := filenameIndexerID(repo.ID, update.Filename) + + return []elastic.BulkableRequest{ + elastic.NewBulkIndexRequest(). + Index(b.indexerAliasName). + Id(id). + Doc(map[string]interface{}{ + "repo_id": repo.ID, + "content": string(charset.ToUTF8DropErrors(fileContents)), + "commit_id": sha, + "language": analyze.GetCodeLanguage(update.Filename, fileContents), + "updated_at": timeutil.TimeStampNow(), + }), + }, nil +} + +func (b *ElasticSearchIndexer) addDelete(filename string, repo *models.Repository) elastic.BulkableRequest { + id := filenameIndexerID(repo.ID, filename) + return elastic.NewBulkDeleteRequest(). + Index(b.indexerAliasName). + Id(id) +} + +// Index will save the index data +func (b *ElasticSearchIndexer) Index(repo *models.Repository, sha string, changes *repoChanges) error { + reqs := make([]elastic.BulkableRequest, 0) + for _, update := range changes.Updates { + updateReqs, err := b.addUpdate(sha, update, repo) + if err != nil { + return err + } + if len(updateReqs) > 0 { + reqs = append(reqs, updateReqs...) + } + } + + for _, filename := range changes.RemovedFilenames { + reqs = append(reqs, b.addDelete(filename, repo)) + } + + if len(reqs) > 0 { + _, err := b.client.Bulk(). + Index(b.indexerAliasName). + Add(reqs...). + Do(context.Background()) + return err + } + return nil +} + +// Delete deletes indexes by ids +func (b *ElasticSearchIndexer) Delete(repoID int64) error { + _, err := b.client.DeleteByQuery(b.indexerAliasName). + Query(elastic.NewTermsQuery("repo_id", repoID)). + Do(context.Background()) + return err +} + +func convertResult(searchResult *elastic.SearchResult, kw string, pageSize int) (int64, []*SearchResult, []*SearchResultLanguages, error) { + hits := make([]*SearchResult, 0, pageSize) + for _, hit := range searchResult.Hits.Hits { + // FIXME: There is no way to get the position the keyword on the content currently on the same request. + // So we get it from content, this may made the query slower. See + // https://discuss.elastic.co/t/fetching-position-of-keyword-in-matched-document/94291 + var startIndex, endIndex int = -1, -1 + c, ok := hit.Highlight["content"] + if ok && len(c) > 0 { + var subStr = make([]rune, 0, len(kw)) + startIndex = strings.IndexFunc(c[0], func(r rune) bool { + if len(subStr) >= len(kw) { + subStr = subStr[1:] + } + subStr = append(subStr, r) + return strings.EqualFold(kw, string(subStr)) + }) + if startIndex > -1 { + endIndex = startIndex + len(kw) + } else { + panic(fmt.Sprintf("1===%#v", hit.Highlight)) + } + } else { + panic(fmt.Sprintf("2===%#v", hit.Highlight)) + } + + repoID, fileName := parseIndexerID(hit.Id) + var res = make(map[string]interface{}) + if err := json.Unmarshal(hit.Source, &res); err != nil { + return 0, nil, nil, err + } + + language := res["language"].(string) + + hits = append(hits, &SearchResult{ + RepoID: repoID, + Filename: fileName, + CommitID: res["commit_id"].(string), + Content: res["content"].(string), + UpdatedUnix: timeutil.TimeStamp(res["updated_at"].(float64)), + Language: language, + StartIndex: startIndex, + EndIndex: endIndex, + Color: enry.GetColor(language), + }) + } + + return searchResult.TotalHits(), hits, extractAggs(searchResult), nil +} + +func extractAggs(searchResult *elastic.SearchResult) []*SearchResultLanguages { + var searchResultLanguages []*SearchResultLanguages + agg, found := searchResult.Aggregations.Terms("language") + if found { + searchResultLanguages = make([]*SearchResultLanguages, 0, 10) + + for _, bucket := range agg.Buckets { + searchResultLanguages = append(searchResultLanguages, &SearchResultLanguages{ + Language: bucket.Key.(string), + Color: enry.GetColor(bucket.Key.(string)), + Count: int(bucket.DocCount), + }) + } + } + return searchResultLanguages +} + +// Search searches for codes and language stats by given conditions. +func (b *ElasticSearchIndexer) Search(repoIDs []int64, language, keyword string, page, pageSize int) (int64, []*SearchResult, []*SearchResultLanguages, error) { + kwQuery := elastic.NewMultiMatchQuery(keyword, "content") + query := elastic.NewBoolQuery() + query = query.Must(kwQuery) + if len(repoIDs) > 0 { + var repoStrs = make([]interface{}, 0, len(repoIDs)) + for _, repoID := range repoIDs { + repoStrs = append(repoStrs, repoID) + } + repoQuery := elastic.NewTermsQuery("repo_id", repoStrs...) + query = query.Must(repoQuery) + } + + var ( + start int + kw = "<em>" + keyword + "</em>" + aggregation = elastic.NewTermsAggregation().Field("language").Size(10).OrderByCountDesc() + ) + + if page > 0 { + start = (page - 1) * pageSize + } + + if len(language) == 0 { + searchResult, err := b.client.Search(). + Index(b.indexerAliasName). + Aggregation("language", aggregation). + Query(query). + Highlight(elastic.NewHighlight().Field("content")). + Sort("repo_id", true). + From(start).Size(pageSize). + Do(context.Background()) + if err != nil { + return 0, nil, nil, err + } + + return convertResult(searchResult, kw, pageSize) + } + + langQuery := elastic.NewMatchQuery("language", language) + countResult, err := b.client.Search(). + Index(b.indexerAliasName). + Aggregation("language", aggregation). + Query(query). + Size(0). // We only needs stats information + Do(context.Background()) + if err != nil { + return 0, nil, nil, err + } + + query = query.Must(langQuery) + searchResult, err := b.client.Search(). + Index(b.indexerAliasName). + Query(query). + Highlight(elastic.NewHighlight().Field("content")). + Sort("repo_id", true). + From(start).Size(pageSize). + Do(context.Background()) + if err != nil { + return 0, nil, nil, err + } + + total, hits, _, err := convertResult(searchResult, kw, pageSize) + + return total, hits, extractAggs(countResult), err +} + +// Close implements indexer +func (b *ElasticSearchIndexer) Close() {} diff --git a/modules/indexer/code/elastic_search_test.go b/modules/indexer/code/elastic_search_test.go new file mode 100644 index 0000000000..a230939746 --- /dev/null +++ b/modules/indexer/code/elastic_search_test.go @@ -0,0 +1,36 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package code + +import ( + "os" + "testing" + + "code.gitea.io/gitea/models" + + "github.com/stretchr/testify/assert" +) + +func TestESIndexAndSearch(t *testing.T) { + models.PrepareTestEnv(t) + + u := os.Getenv("TEST_INDEXER_CODE_ES_URL") + if u == "" { + t.SkipNow() + return + } + + indexer, _, err := NewElasticSearchIndexer(u, "gitea_codes") + if err != nil { + assert.Fail(t, "Unable to create ES indexer Error: %v", err) + if indexer != nil { + indexer.Close() + } + return + } + defer indexer.Close() + + testIndexer("elastic_search", t, indexer) +} diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go index a0f91ce4b5..468955cd89 100644 --- a/modules/indexer/code/indexer.go +++ b/modules/indexer/code/indexer.go @@ -7,8 +7,11 @@ package code import ( "context" "os" + "strconv" + "strings" "time" + "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" @@ -37,12 +40,33 @@ type SearchResultLanguages struct { // Indexer defines an interface to indexer issues contents type Indexer interface { - Index(repoID int64) error + Index(repo *models.Repository, sha string, changes *repoChanges) error Delete(repoID int64) error Search(repoIDs []int64, language, keyword string, page, pageSize int) (int64, []*SearchResult, []*SearchResultLanguages, error) Close() } +func filenameIndexerID(repoID int64, filename string) string { + return indexerID(repoID) + "_" + filename +} + +func parseIndexerID(indexerID string) (int64, string) { + index := strings.IndexByte(indexerID, '_') + if index == -1 { + log.Error("Unexpected ID in repo indexer: %s", indexerID) + } + repoID, _ := strconv.ParseInt(indexerID[:index], 10, 64) + return repoID, indexerID[index+1:] +} + +func filenameOfIndexerID(indexerID string) string { + index := strings.IndexByte(indexerID, '_') + if index == -1 { + log.Error("Unexpected ID in repo indexer: %s", indexerID) + } + return indexerID[index+1:] +} + // Init initialize the repo indexer func Init() { if !setting.Indexer.RepoIndexerEnabled { @@ -63,33 +87,61 @@ func Init() { waitChannel := make(chan time.Duration) go func() { start := time.Now() - log.Info("PID: %d Initializing Repository Indexer at: %s", os.Getpid(), setting.Indexer.RepoPath) - defer func() { - if err := recover(); err != nil { - log.Error("PANIC whilst initializing repository indexer: %v\nStacktrace: %s", err, log.Stack(2)) - log.Error("The indexer files are likely corrupted and may need to be deleted") - log.Error("You can completely remove the %q directory to make Gitea recreate the indexes", setting.Indexer.RepoPath) + var ( + rIndexer Indexer + populate bool + err error + ) + switch setting.Indexer.RepoType { + case "bleve": + log.Info("PID: %d Initializing Repository Indexer at: %s", os.Getpid(), setting.Indexer.RepoPath) + defer func() { + if err := recover(); err != nil { + log.Error("PANIC whilst initializing repository indexer: %v\nStacktrace: %s", err, log.Stack(2)) + log.Error("The indexer files are likely corrupted and may need to be deleted") + log.Error("You can completely remove the \"%s\" directory to make Gitea recreate the indexes", setting.Indexer.RepoPath) + } + }() + + rIndexer, populate, err = NewBleveIndexer(setting.Indexer.RepoPath) + if err != nil { + if rIndexer != nil { + rIndexer.Close() + } cancel() indexer.Close() close(waitChannel) - log.Fatal("PID: %d Unable to initialize the Repository Indexer at path: %s Error: %v", os.Getpid(), setting.Indexer.RepoPath, err) + log.Fatal("PID: %d Unable to initialize the bleve Repository Indexer at path: %s Error: %v", os.Getpid(), setting.Indexer.RepoPath, err) } - }() - bleveIndexer, created, err := NewBleveIndexer(setting.Indexer.RepoPath) - if err != nil { - if bleveIndexer != nil { - bleveIndexer.Close() + case "elasticsearch": + log.Info("PID: %d Initializing Repository Indexer at: %s", os.Getpid(), setting.Indexer.RepoConnStr) + defer func() { + if err := recover(); err != nil { + log.Error("PANIC whilst initializing repository indexer: %v\nStacktrace: %s", err, log.Stack(2)) + log.Error("The indexer files are likely corrupted and may need to be deleted") + log.Error("You can completely remove the \"%s\" index to make Gitea recreate the indexes", setting.Indexer.RepoConnStr) + } + }() + + rIndexer, populate, err = NewElasticSearchIndexer(setting.Indexer.RepoConnStr, setting.Indexer.RepoIndexerName) + if err != nil { + if rIndexer != nil { + rIndexer.Close() + } + cancel() + indexer.Close() + close(waitChannel) + log.Fatal("PID: %d Unable to initialize the elasticsearch Repository Indexer connstr: %s Error: %v", os.Getpid(), setting.Indexer.RepoConnStr, err) } - cancel() - indexer.Close() - close(waitChannel) - log.Fatal("PID: %d Unable to initialize the Repository Indexer at path: %s Error: %v", os.Getpid(), setting.Indexer.RepoPath, err) + default: + log.Fatal("PID: %d Unknown Indexer type: %s", os.Getpid(), setting.Indexer.RepoType) } - indexer.set(bleveIndexer) + + indexer.set(rIndexer) go processRepoIndexerOperationQueue(indexer) - if created { + if populate { go populateRepoIndexer() } select { diff --git a/modules/indexer/code/indexer_test.go b/modules/indexer/code/indexer_test.go new file mode 100644 index 0000000000..0b4851a48a --- /dev/null +++ b/modules/indexer/code/indexer_test.go @@ -0,0 +1,83 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package code + +import ( + "path/filepath" + "testing" + + "code.gitea.io/gitea/models" + + "github.com/stretchr/testify/assert" +) + +func TestMain(m *testing.M) { + models.MainTest(m, filepath.Join("..", "..", "..")) +} + +func testIndexer(name string, t *testing.T, indexer Indexer) { + t.Run(name, func(t *testing.T) { + var repoID int64 = 1 + err := index(indexer, repoID) + assert.NoError(t, err) + var ( + keywords = []struct { + RepoIDs []int64 + Keyword string + IDs []int64 + Langs int + }{ + { + RepoIDs: nil, + Keyword: "Description", + IDs: []int64{repoID}, + Langs: 1, + }, + { + RepoIDs: []int64{2}, + Keyword: "Description", + IDs: []int64{}, + Langs: 0, + }, + { + RepoIDs: nil, + Keyword: "repo1", + IDs: []int64{repoID}, + Langs: 1, + }, + { + RepoIDs: []int64{2}, + Keyword: "repo1", + IDs: []int64{}, + Langs: 0, + }, + { + RepoIDs: nil, + Keyword: "non-exist", + IDs: []int64{}, + Langs: 0, + }, + } + ) + + for _, kw := range keywords { + t.Run(kw.Keyword, func(t *testing.T) { + total, res, langs, err := indexer.Search(kw.RepoIDs, "", kw.Keyword, 1, 10) + assert.NoError(t, err) + assert.EqualValues(t, len(kw.IDs), total) + assert.EqualValues(t, kw.Langs, len(langs)) + + var ids = make([]int64, 0, len(res)) + for _, hit := range res { + ids = append(ids, hit.RepoID) + assert.EqualValues(t, "# repo1\n\nDescription for repo1", hit.Content) + } + assert.EqualValues(t, kw.IDs, ids) + }) + } + + assert.NoError(t, indexer.Delete(repoID)) + }) +} diff --git a/modules/indexer/code/queue.go b/modules/indexer/code/queue.go index 94675559ea..844003e1fc 100644 --- a/modules/indexer/code/queue.go +++ b/modules/indexer/code/queue.go @@ -10,7 +10,6 @@ import ( "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" - "code.gitea.io/gitea/modules/setting" ) type repoIndexerOperation struct { @@ -25,6 +24,30 @@ func initQueue(queueLength int) { repoIndexerOperationQueue = make(chan repoIndexerOperation, queueLength) } +func index(indexer Indexer, repoID int64) error { + repo, err := models.GetRepositoryByID(repoID) + if err != nil { + return err + } + + sha, err := getDefaultBranchSha(repo) + if err != nil { + return err + } + changes, err := getRepoChanges(repo, sha) + if err != nil { + return err + } else if changes == nil { + return nil + } + + if err := indexer.Index(repo, sha, changes); err != nil { + return err + } + + return repo.UpdateIndexerStatus(models.RepoIndexerTypeCode, sha) +} + func processRepoIndexerOperationQueue(indexer Indexer) { for { select { @@ -35,7 +58,7 @@ func processRepoIndexerOperationQueue(indexer Indexer) { log.Error("indexer.Delete: %v", err) } } else { - if err = indexer.Index(op.repoID); err != nil { + if err = index(indexer, op.repoID); err != nil { log.Error("indexer.Index: %v", err) } } @@ -60,9 +83,6 @@ func UpdateRepoIndexer(repo *models.Repository, watchers ...chan<- error) { } func addOperationToQueue(op repoIndexerOperation) { - if !setting.Indexer.RepoIndexerEnabled { - return - } select { case repoIndexerOperationQueue <- op: break diff --git a/modules/indexer/code/wrapped.go b/modules/indexer/code/wrapped.go index 926597a382..d839544874 100644 --- a/modules/indexer/code/wrapped.go +++ b/modules/indexer/code/wrapped.go @@ -7,6 +7,8 @@ package code import ( "fmt" "sync" + + "code.gitea.io/gitea/models" ) var ( @@ -55,12 +57,12 @@ func (w *wrappedIndexer) get() (Indexer, error) { return w.internal, nil } -func (w *wrappedIndexer) Index(repoID int64) error { +func (w *wrappedIndexer) Index(repo *models.Repository, sha string, changes *repoChanges) error { indexer, err := w.get() if err != nil { return err } - return indexer.Index(repoID) + return indexer.Index(repo, sha, changes) } func (w *wrappedIndexer) Delete(repoID int64) error { |