diff options
author | Lunny Xiao <xiaolunwen@gmail.com> | 2019-12-10 21:29:40 +0800 |
---|---|---|
committer | Antoine GIRARD <sapk@users.noreply.github.com> | 2019-12-10 14:29:40 +0100 |
commit | 50da9f7daed4fe3e8f0c76f23eeb987e97de4962 (patch) | |
tree | ab524323c8fe0973cb831f930641d241f6d3b1ff /modules/indexer/code | |
parent | 2c83dac5d47195b7589a9e879598d00c00f1b302 (diff) | |
download | gitea-50da9f7daed4fe3e8f0c76f23eeb987e97de4962.tar.gz gitea-50da9f7daed4fe3e8f0c76f23eeb987e97de4962.zip |
Move modules/indexer to modules/indexer/code (#9301)
Diffstat (limited to 'modules/indexer/code')
-rw-r--r-- | modules/indexer/code/bleve.go | 23 | ||||
-rw-r--r-- | modules/indexer/code/indexer.go | 76 | ||||
-rw-r--r-- | modules/indexer/code/repo.go | 257 |
3 files changed, 344 insertions, 12 deletions
diff --git a/modules/indexer/code/bleve.go b/modules/indexer/code/bleve.go index 4e7eaa21b7..c2d1ed902f 100644 --- a/modules/indexer/code/bleve.go +++ b/modules/indexer/code/bleve.go @@ -15,7 +15,6 @@ import ( "code.gitea.io/gitea/modules/charset" "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/graceful" - "code.gitea.io/gitea/modules/indexer" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" "github.com/ethantkoenig/rupture" @@ -39,7 +38,7 @@ func InitRepoIndexer() { go func() { start := time.Now() log.Info("Initializing Repository Indexer") - indexer.InitRepoIndexer(populateRepoIndexerAsynchronously) + initRepoIndexer(populateRepoIndexerAsynchronously) go processRepoIndexerOperationQueue() waitChannel <- time.Since(start) }() @@ -130,7 +129,7 @@ func updateRepoIndexer(repoID int64) error { return nil } - batch := indexer.RepoIndexerBatch() + batch := RepoIndexerBatch() for _, update := range changes.Updates { if err := addUpdate(update, repo, batch); err != nil { return err @@ -198,10 +197,10 @@ func addUpdate(update fileUpdate, repo *models.Repository, batch rupture.Flushin // FIXME: UTF-16 files will probably fail here return nil } - indexerUpdate := indexer.RepoIndexerUpdate{ + indexerUpdate := RepoIndexerUpdate{ Filepath: update.Filename, - Op: indexer.RepoIndexerOpUpdate, - Data: &indexer.RepoIndexerData{ + Op: RepoIndexerOpUpdate, + Data: &RepoIndexerData{ RepoID: repo.ID, Content: string(charset.ToUTF8DropErrors(fileContents)), }, @@ -210,10 +209,10 @@ func addUpdate(update fileUpdate, repo *models.Repository, batch rupture.Flushin } func addDelete(filename string, repo *models.Repository, batch rupture.FlushingBatch) error { - indexerUpdate := indexer.RepoIndexerUpdate{ + indexerUpdate := RepoIndexerUpdate{ Filepath: filename, - Op: indexer.RepoIndexerOpDelete, - Data: &indexer.RepoIndexerData{ + Op: RepoIndexerOpDelete, + Data: &RepoIndexerData{ RepoID: repo.ID, }, } @@ -279,7 +278,7 @@ func nonGenesisChanges(repo *models.Repository, revision string) (*repoChanges, // previous commit sha may have been removed by a force push, so // try rebuilding from scratch log.Warn("git diff: %v", err) - if err = indexer.DeleteRepoFromIndexer(repo.ID); err != nil { + if err = deleteRepoFromIndexer(repo.ID); err != nil { return nil, err } return genesisChanges(repo, revision) @@ -326,8 +325,8 @@ func processRepoIndexerOperationQueue() { op := <-repoIndexerOperationQueue var err error if op.deleted { - if err = indexer.DeleteRepoFromIndexer(op.repoID); err != nil { - log.Error("DeleteRepoFromIndexer: %v", err) + if err = deleteRepoFromIndexer(op.repoID); err != nil { + log.Error("deleteRepoFromIndexer: %v", err) } } else { if err = updateRepoIndexer(op.repoID); err != nil { diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go new file mode 100644 index 0000000000..3907a7b57d --- /dev/null +++ b/modules/indexer/code/indexer.go @@ -0,0 +1,76 @@ +// Copyright 2016 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package code + +import ( + "os" + "strconv" + + "code.gitea.io/gitea/modules/setting" + + "github.com/blevesearch/bleve" + "github.com/blevesearch/bleve/analysis/token/unicodenorm" + "github.com/blevesearch/bleve/index/upsidedown" + "github.com/blevesearch/bleve/mapping" + "github.com/blevesearch/bleve/search/query" + "github.com/ethantkoenig/rupture" +) + +// indexerID a bleve-compatible unique identifier for an integer id +func indexerID(id int64) string { + return strconv.FormatInt(id, 36) +} + +// numericEqualityQuery a numeric equality query for the given value and field +func numericEqualityQuery(value int64, field string) *query.NumericRangeQuery { + f := float64(value) + tru := true + q := bleve.NewNumericRangeInclusiveQuery(&f, &f, &tru, &tru) + q.SetField(field) + return q +} + +const unicodeNormalizeName = "unicodeNormalize" + +func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error { + return m.AddCustomTokenFilter(unicodeNormalizeName, map[string]interface{}{ + "type": unicodenorm.Name, + "form": unicodenorm.NFC, + }) +} + +const maxBatchSize = 16 + +// openIndexer open the index at the specified path, checking for metadata +// updates and bleve version updates. If index needs to be created (or +// re-created), returns (nil, nil) +func openIndexer(path string, latestVersion int) (bleve.Index, error) { + _, err := os.Stat(setting.Indexer.IssuePath) + if err != nil && os.IsNotExist(err) { + return nil, nil + } else if err != nil { + return nil, err + } + + metadata, err := rupture.ReadIndexMetadata(path) + if err != nil { + return nil, err + } + if metadata.Version < latestVersion { + // the indexer is using a previous version, so we should delete it and + // re-populate + return nil, os.RemoveAll(path) + } + + index, err := bleve.Open(path) + if err != nil && err == upsidedown.IncompatibleVersion { + // the indexer was built with a previous version of bleve, so we should + // delete it and re-populate + return nil, os.RemoveAll(path) + } else if err != nil { + return nil, err + } + return index, nil +} diff --git a/modules/indexer/code/repo.go b/modules/indexer/code/repo.go new file mode 100644 index 0000000000..31f0fa7f3d --- /dev/null +++ b/modules/indexer/code/repo.go @@ -0,0 +1,257 @@ +// Copyright 2017 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package code + +import ( + "strings" + "sync" + + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" + + "github.com/blevesearch/bleve" + "github.com/blevesearch/bleve/analysis/analyzer/custom" + "github.com/blevesearch/bleve/analysis/token/lowercase" + "github.com/blevesearch/bleve/analysis/tokenizer/unicode" + "github.com/blevesearch/bleve/search/query" + "github.com/ethantkoenig/rupture" +) + +const ( + repoIndexerAnalyzer = "repoIndexerAnalyzer" + repoIndexerDocType = "repoIndexerDocType" + + repoIndexerLatestVersion = 4 +) + +type bleveIndexerHolder struct { + index bleve.Index + mutex sync.RWMutex + cond *sync.Cond +} + +func newBleveIndexerHolder() *bleveIndexerHolder { + b := &bleveIndexerHolder{} + b.cond = sync.NewCond(b.mutex.RLocker()) + return b +} + +func (r *bleveIndexerHolder) set(index bleve.Index) { + r.mutex.Lock() + defer r.mutex.Unlock() + r.index = index + r.cond.Broadcast() +} + +func (r *bleveIndexerHolder) get() bleve.Index { + r.mutex.RLock() + defer r.mutex.RUnlock() + if r.index == nil { + r.cond.Wait() + } + return r.index +} + +// repoIndexer (thread-safe) index for repository contents +var indexerHolder = newBleveIndexerHolder() + +// RepoIndexerOp type of operation to perform on repo indexer +type RepoIndexerOp int + +const ( + // RepoIndexerOpUpdate add/update a file's contents + RepoIndexerOpUpdate = iota + + // RepoIndexerOpDelete delete a file + RepoIndexerOpDelete +) + +// RepoIndexerData data stored in the repo indexer +type RepoIndexerData struct { + RepoID int64 + Content string +} + +// Type returns the document type, for bleve's mapping.Classifier interface. +func (d *RepoIndexerData) Type() string { + return repoIndexerDocType +} + +// RepoIndexerUpdate an update to the repo indexer +type RepoIndexerUpdate struct { + Filepath string + Op RepoIndexerOp + Data *RepoIndexerData +} + +// AddToFlushingBatch adds the update to the given flushing batch. +func (update RepoIndexerUpdate) AddToFlushingBatch(batch rupture.FlushingBatch) error { + id := filenameIndexerID(update.Data.RepoID, update.Filepath) + switch update.Op { + case RepoIndexerOpUpdate: + return batch.Index(id, update.Data) + case RepoIndexerOpDelete: + return batch.Delete(id) + default: + log.Error("Unrecognized repo indexer op: %d", update.Op) + } + return nil +} + +// initRepoIndexer initialize repo indexer +func initRepoIndexer(populateIndexer func() error) { + indexer, err := openIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion) + if err != nil { + log.Fatal("InitRepoIndexer: %v", err) + } + if indexer != nil { + indexerHolder.set(indexer) + return + } + + if err = createRepoIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion); err != nil { + log.Fatal("CreateRepoIndexer: %v", err) + } + if err = populateIndexer(); err != nil { + log.Fatal("PopulateRepoIndex: %v", err) + } +} + +// createRepoIndexer create a repo indexer if one does not already exist +func createRepoIndexer(path string, latestVersion int) error { + docMapping := bleve.NewDocumentMapping() + numericFieldMapping := bleve.NewNumericFieldMapping() + numericFieldMapping.IncludeInAll = false + docMapping.AddFieldMappingsAt("RepoID", numericFieldMapping) + + textFieldMapping := bleve.NewTextFieldMapping() + textFieldMapping.IncludeInAll = false + docMapping.AddFieldMappingsAt("Content", textFieldMapping) + + mapping := bleve.NewIndexMapping() + if err := addUnicodeNormalizeTokenFilter(mapping); err != nil { + return err + } else if err := mapping.AddCustomAnalyzer(repoIndexerAnalyzer, map[string]interface{}{ + "type": custom.Name, + "char_filters": []string{}, + "tokenizer": unicode.Name, + "token_filters": []string{unicodeNormalizeName, lowercase.Name}, + }); err != nil { + return err + } + mapping.DefaultAnalyzer = repoIndexerAnalyzer + mapping.AddDocumentMapping(repoIndexerDocType, docMapping) + mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping()) + + indexer, err := bleve.New(path, mapping) + if err != nil { + return err + } + indexerHolder.set(indexer) + + return rupture.WriteIndexMetadata(path, &rupture.IndexMetadata{ + Version: latestVersion, + }) +} + +func filenameIndexerID(repoID int64, filename string) string { + return indexerID(repoID) + "_" + filename +} + +func filenameOfIndexerID(indexerID string) string { + index := strings.IndexByte(indexerID, '_') + if index == -1 { + log.Error("Unexpected ID in repo indexer: %s", indexerID) + } + return indexerID[index+1:] +} + +// RepoIndexerBatch batch to add updates to +func RepoIndexerBatch() rupture.FlushingBatch { + return rupture.NewFlushingBatch(indexerHolder.get(), maxBatchSize) +} + +// deleteRepoFromIndexer delete all of a repo's files from indexer +func deleteRepoFromIndexer(repoID int64) error { + query := numericEqualityQuery(repoID, "RepoID") + searchRequest := bleve.NewSearchRequestOptions(query, 2147483647, 0, false) + result, err := indexerHolder.get().Search(searchRequest) + if err != nil { + return err + } + batch := RepoIndexerBatch() + for _, hit := range result.Hits { + if err = batch.Delete(hit.ID); err != nil { + return err + } + } + return batch.Flush() +} + +// RepoSearchResult result of performing a search in a repo +type RepoSearchResult struct { + RepoID int64 + StartIndex int + EndIndex int + Filename string + Content string +} + +// SearchRepoByKeyword searches for files in the specified repo. +// Returns the matching file-paths +func SearchRepoByKeyword(repoIDs []int64, keyword string, page, pageSize int) (int64, []*RepoSearchResult, error) { + phraseQuery := bleve.NewMatchPhraseQuery(keyword) + phraseQuery.FieldVal = "Content" + phraseQuery.Analyzer = repoIndexerAnalyzer + + var indexerQuery query.Query + if len(repoIDs) > 0 { + var repoQueries = make([]query.Query, 0, len(repoIDs)) + for _, repoID := range repoIDs { + repoQueries = append(repoQueries, numericEqualityQuery(repoID, "RepoID")) + } + + indexerQuery = bleve.NewConjunctionQuery( + bleve.NewDisjunctionQuery(repoQueries...), + phraseQuery, + ) + } else { + indexerQuery = phraseQuery + } + + from := (page - 1) * pageSize + searchRequest := bleve.NewSearchRequestOptions(indexerQuery, pageSize, from, false) + searchRequest.Fields = []string{"Content", "RepoID"} + searchRequest.IncludeLocations = true + + result, err := indexerHolder.get().Search(searchRequest) + if err != nil { + return 0, nil, err + } + + searchResults := make([]*RepoSearchResult, len(result.Hits)) + for i, hit := range result.Hits { + var startIndex, endIndex int = -1, -1 + for _, locations := range hit.Locations["Content"] { + location := locations[0] + locationStart := int(location.Start) + locationEnd := int(location.End) + if startIndex < 0 || locationStart < startIndex { + startIndex = locationStart + } + if endIndex < 0 || locationEnd > endIndex { + endIndex = locationEnd + } + } + searchResults[i] = &RepoSearchResult{ + RepoID: int64(hit.Fields["RepoID"].(float64)), + StartIndex: startIndex, + EndIndex: endIndex, + Filename: filenameOfIndexerID(hit.ID), + Content: hit.Fields["Content"].(string), + } + } + return int64(result.Total), searchResults, nil +} |