diff options
author | Lunny Xiao <xiaolunwen@gmail.com> | 2019-12-23 20:31:16 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-12-23 20:31:16 +0800 |
commit | 89b4e0477b4e1b9e1cccd87b68fde4ea8a578e9c (patch) | |
tree | 46141746472fc33dfa8c262fa96176895bc9fba8 /modules/indexer/code/bleve.go | |
parent | 2f9564f993ba02ba503d7088eb8cc70536b7a6df (diff) | |
download | gitea-89b4e0477b4e1b9e1cccd87b68fde4ea8a578e9c.tar.gz gitea-89b4e0477b4e1b9e1cccd87b68fde4ea8a578e9c.zip |
Refactor code indexer (#9313)
* Refactor code indexer
* fix test
* fix test
* refactor code indexer
* fix import
* improve code
* fix typo
* fix test and make code clean
* fix lint
Diffstat (limited to 'modules/indexer/code/bleve.go')
-rw-r--r-- | modules/indexer/code/bleve.go | 529 |
1 files changed, 247 insertions, 282 deletions
diff --git a/modules/indexer/code/bleve.go b/modules/indexer/code/bleve.go index bb2fc5bc74..339dca74a1 100644 --- a/modules/indexer/code/bleve.go +++ b/modules/indexer/code/bleve.go @@ -9,182 +9,90 @@ import ( "os" "strconv" "strings" - "time" "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/base" "code.gitea.io/gitea/modules/charset" "code.gitea.io/gitea/modules/git" - "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" + + "github.com/blevesearch/bleve" + "github.com/blevesearch/bleve/analysis/analyzer/custom" + "github.com/blevesearch/bleve/analysis/token/lowercase" + "github.com/blevesearch/bleve/analysis/token/unicodenorm" + "github.com/blevesearch/bleve/analysis/tokenizer/unicode" + "github.com/blevesearch/bleve/index/upsidedown" + "github.com/blevesearch/bleve/mapping" + "github.com/blevesearch/bleve/search/query" "github.com/ethantkoenig/rupture" ) -type repoIndexerOperation struct { - repoID int64 - deleted bool - watchers []chan<- error -} - -var repoIndexerOperationQueue chan repoIndexerOperation +const unicodeNormalizeName = "unicodeNormalize" +const maxBatchSize = 16 -// InitRepoIndexer initialize the repo indexer -func InitRepoIndexer() { - if !setting.Indexer.RepoIndexerEnabled { - return - } - waitChannel := make(chan time.Duration) - // FIXME: graceful: This should use a persistable queue - repoIndexerOperationQueue = make(chan repoIndexerOperation, setting.Indexer.UpdateQueueLength) - go func() { - start := time.Now() - log.Info("PID: %d: Initializing Repository Indexer", os.Getpid()) - initRepoIndexer(populateRepoIndexerAsynchronously) - go processRepoIndexerOperationQueue() - waitChannel <- time.Since(start) - }() - if setting.Indexer.StartupTimeout > 0 { - go func() { - timeout := setting.Indexer.StartupTimeout - if graceful.GetManager().IsChild() && setting.GracefulHammerTime > 0 { - timeout += setting.GracefulHammerTime - } - select { - case duration := <-waitChannel: - log.Info("Repository Indexer Initialization took %v", duration) - case <-time.After(timeout): - log.Fatal("Repository Indexer Initialization Timed-Out after: %v", timeout) - } - }() - - } +// indexerID a bleve-compatible unique identifier for an integer id +func indexerID(id int64) string { + return strconv.FormatInt(id, 36) } -// populateRepoIndexerAsynchronously asynchronously populates the repo indexer -// with pre-existing data. This should only be run when the indexer is created -// for the first time. -func populateRepoIndexerAsynchronously() error { - exist, err := models.IsTableNotEmpty("repository") - if err != nil { - return err - } else if !exist { - return nil - } - - var maxRepoID int64 - if maxRepoID, err = models.GetMaxID("repository"); err != nil { - return err - } - go populateRepoIndexer(maxRepoID) - return nil +// numericEqualityQuery a numeric equality query for the given value and field +func numericEqualityQuery(value int64, field string) *query.NumericRangeQuery { + f := float64(value) + tru := true + q := bleve.NewNumericRangeInclusiveQuery(&f, &f, &tru, &tru) + q.SetField(field) + return q } -// populateRepoIndexer populate the repo indexer with pre-existing data. This -// should only be run when the indexer is created for the first time. -// FIXME: graceful: This should use a persistable queue -func populateRepoIndexer(maxRepoID int64) { - log.Info("Populating the repo indexer with existing repositories") - - isShutdown := graceful.GetManager().IsShutdown() - - // start with the maximum existing repo ID and work backwards, so that we - // don't include repos that are created after gitea starts; such repos will - // already be added to the indexer, and we don't need to add them again. - for maxRepoID > 0 { - select { - case <-isShutdown: - log.Info("Repository Indexer population shutdown before completion") - return - default: - } - ids, err := models.GetUnindexedRepos(maxRepoID, 0, 50) - if err != nil { - log.Error("populateRepoIndexer: %v", err) - return - } else if len(ids) == 0 { - break - } - for _, id := range ids { - select { - case <-isShutdown: - log.Info("Repository Indexer population shutdown before completion") - return - default: - } - repoIndexerOperationQueue <- repoIndexerOperation{ - repoID: id, - deleted: false, - } - maxRepoID = id - 1 - } - } - log.Info("Done (re)populating the repo indexer with existing repositories") +func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error { + return m.AddCustomTokenFilter(unicodeNormalizeName, map[string]interface{}{ + "type": unicodenorm.Name, + "form": unicodenorm.NFC, + }) } -func updateRepoIndexer(repoID int64) error { - repo, err := models.GetRepositoryByID(repoID) - if err != nil { - return fmt.Errorf("UpdateRepoIndexer: Unable to GetRepositoryByID: %d, Error: %v", repoID, err) +// openIndexer open the index at the specified path, checking for metadata +// updates and bleve version updates. If index needs to be created (or +// re-created), returns (nil, nil) +func openIndexer(path string, latestVersion int) (bleve.Index, error) { + _, err := os.Stat(path) + if err != nil && os.IsNotExist(err) { + return nil, nil + } else if err != nil { + return nil, err } - sha, err := getDefaultBranchSha(repo) + metadata, err := rupture.ReadIndexMetadata(path) if err != nil { - return fmt.Errorf("UpdateRepoIndexer: Unable to GetDefaultBranchSha for: %s/%s, Error: %v", repo.MustOwnerName(), repo.Name, err) + return nil, err } - changes, err := getRepoChanges(repo, sha) - if err != nil { - return fmt.Errorf("UpdateRepoIndexer: Unable to GetRepoChanges for: %s/%s Sha: %s Error: %v", repo.MustOwnerName(), repo.Name, sha, err) - } else if changes == nil { - return nil + if metadata.Version < latestVersion { + // the indexer is using a previous version, so we should delete it and + // re-populate + return nil, os.RemoveAll(path) } - batch := RepoIndexerBatch() - for _, update := range changes.Updates { - if err := addUpdate(update, repo, batch); err != nil { - return fmt.Errorf("UpdateRepoIndexer: Unable to addUpdate to: %s/%s Sha: %s, update: %s(%s) Error: %v", repo.MustOwnerName(), repo.Name, sha, update.Filename, update.BlobSha, err) - } - } - for _, filename := range changes.RemovedFilenames { - if err := addDelete(filename, repo, batch); err != nil { - return fmt.Errorf("UpdateRepoIndexer: Unable to addDelete to: %s/%s Sha: %s, filename: %s Error: %v", repo.MustOwnerName(), repo.Name, sha, filename, err) - } - } - if err = batch.Flush(); err != nil { - return fmt.Errorf("UpdateRepoIndexer: Unable to flush batch to indexer for repo: %s/%s Error: %v", repo.MustOwnerName(), repo.Name, err) + index, err := bleve.Open(path) + if err != nil && err == upsidedown.IncompatibleVersion { + // the indexer was built with a previous version of bleve, so we should + // delete it and re-populate + return nil, os.RemoveAll(path) + } else if err != nil { + return nil, err } - return repo.UpdateIndexerStatus(sha) + return index, nil } -// repoChanges changes (file additions/updates/removals) to a repo -type repoChanges struct { - Updates []fileUpdate - RemovedFilenames []string +// RepoIndexerData data stored in the repo indexer +type RepoIndexerData struct { + RepoID int64 + Content string } -type fileUpdate struct { - Filename string - BlobSha string -} - -func getDefaultBranchSha(repo *models.Repository) (string, error) { - stdout, err := git.NewCommand("show-ref", "-s", git.BranchPrefix+repo.DefaultBranch).RunInDir(repo.RepoPath()) - if err != nil { - return "", err - } - return strings.TrimSpace(stdout), nil -} - -// getRepoChanges returns changes to repo since last indexer update -func getRepoChanges(repo *models.Repository, revision string) (*repoChanges, error) { - if err := repo.GetIndexerStatus(); err != nil { - return nil, err - } - - if len(repo.IndexerStatus.CommitSha) == 0 { - return genesisChanges(repo, revision) - } - return nonGenesisChanges(repo, revision) +// Type returns the document type, for bleve's mapping.Classifier interface. +func (d *RepoIndexerData) Type() string { + return repoIndexerDocType } func addUpdate(update fileUpdate, repo *models.Repository, batch rupture.FlushingBatch) error { @@ -207,174 +115,231 @@ func addUpdate(update fileUpdate, repo *models.Repository, batch rupture.Flushin // FIXME: UTF-16 files will probably fail here return nil } - indexerUpdate := RepoIndexerUpdate{ - Filepath: update.Filename, - Op: RepoIndexerOpUpdate, - Data: &RepoIndexerData{ - RepoID: repo.ID, - Content: string(charset.ToUTF8DropErrors(fileContents)), - }, - } - return indexerUpdate.AddToFlushingBatch(batch) + + id := filenameIndexerID(repo.ID, update.Filename) + return batch.Index(id, &RepoIndexerData{ + RepoID: repo.ID, + Content: string(charset.ToUTF8DropErrors(fileContents)), + }) } func addDelete(filename string, repo *models.Repository, batch rupture.FlushingBatch) error { - indexerUpdate := RepoIndexerUpdate{ - Filepath: filename, - Op: RepoIndexerOpDelete, - Data: &RepoIndexerData{ - RepoID: repo.ID, - }, - } - return indexerUpdate.AddToFlushingBatch(batch) + id := filenameIndexerID(repo.ID, filename) + return batch.Delete(id) } -func isIndexable(entry *git.TreeEntry) bool { - if !entry.IsRegular() && !entry.IsExecutable() { - return false +const ( + repoIndexerAnalyzer = "repoIndexerAnalyzer" + repoIndexerDocType = "repoIndexerDocType" + repoIndexerLatestVersion = 4 +) + +// createRepoIndexer create a repo indexer if one does not already exist +func createRepoIndexer(path string, latestVersion int) (bleve.Index, error) { + docMapping := bleve.NewDocumentMapping() + numericFieldMapping := bleve.NewNumericFieldMapping() + numericFieldMapping.IncludeInAll = false + docMapping.AddFieldMappingsAt("RepoID", numericFieldMapping) + + textFieldMapping := bleve.NewTextFieldMapping() + textFieldMapping.IncludeInAll = false + docMapping.AddFieldMappingsAt("Content", textFieldMapping) + + mapping := bleve.NewIndexMapping() + if err := addUnicodeNormalizeTokenFilter(mapping); err != nil { + return nil, err + } else if err := mapping.AddCustomAnalyzer(repoIndexerAnalyzer, map[string]interface{}{ + "type": custom.Name, + "char_filters": []string{}, + "tokenizer": unicode.Name, + "token_filters": []string{unicodeNormalizeName, lowercase.Name}, + }); err != nil { + return nil, err } - name := strings.ToLower(entry.Name()) - for _, g := range setting.Indexer.ExcludePatterns { - if g.Match(name) { - return false - } + mapping.DefaultAnalyzer = repoIndexerAnalyzer + mapping.AddDocumentMapping(repoIndexerDocType, docMapping) + mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping()) + + indexer, err := bleve.New(path, mapping) + if err != nil { + return nil, err } - for _, g := range setting.Indexer.IncludePatterns { - if g.Match(name) { - return true - } + + if err = rupture.WriteIndexMetadata(path, &rupture.IndexMetadata{ + Version: latestVersion, + }); err != nil { + return nil, err } - return len(setting.Indexer.IncludePatterns) == 0 + return indexer, nil +} + +func filenameIndexerID(repoID int64, filename string) string { + return indexerID(repoID) + "_" + filename } -// parseGitLsTreeOutput parses the output of a `git ls-tree -r --full-name` command -func parseGitLsTreeOutput(stdout []byte) ([]fileUpdate, error) { - entries, err := git.ParseTreeEntries(stdout) +func filenameOfIndexerID(indexerID string) string { + index := strings.IndexByte(indexerID, '_') + if index == -1 { + log.Error("Unexpected ID in repo indexer: %s", indexerID) + } + return indexerID[index+1:] +} + +var ( + _ Indexer = &BleveIndexer{} +) + +// BleveIndexer represents a bleve indexer implementation +type BleveIndexer struct { + indexDir string + indexer bleve.Index +} + +// NewBleveIndexer creates a new bleve local indexer +func NewBleveIndexer(indexDir string) (*BleveIndexer, bool, error) { + indexer := &BleveIndexer{ + indexDir: indexDir, + } + created, err := indexer.init() + return indexer, created, err +} + +// init init the indexer +func (b *BleveIndexer) init() (bool, error) { + var err error + b.indexer, err = openIndexer(b.indexDir, repoIndexerLatestVersion) if err != nil { - return nil, err + return false, err } - var idxCount = 0 - updates := make([]fileUpdate, len(entries)) - for _, entry := range entries { - if isIndexable(entry) { - updates[idxCount] = fileUpdate{ - Filename: entry.Name(), - BlobSha: entry.ID.String(), - } - idxCount++ + if b.indexer != nil { + return false, nil + } + + b.indexer, err = createRepoIndexer(b.indexDir, repoIndexerLatestVersion) + if err != nil { + return false, err + } + + return true, nil +} + +// Close close the indexer +func (b *BleveIndexer) Close() { + log.Debug("Closing repo indexer") + if b.indexer != nil { + err := b.indexer.Close() + if err != nil { + log.Error("Error whilst closing the repository indexer: %v", err) } } - return updates[:idxCount], nil + log.Info("PID: %d Repository Indexer closed", os.Getpid()) } -// genesisChanges get changes to add repo to the indexer for the first time -func genesisChanges(repo *models.Repository, revision string) (*repoChanges, error) { - var changes repoChanges - stdout, err := git.NewCommand("ls-tree", "--full-tree", "-r", revision). - RunInDirBytes(repo.RepoPath()) +// Index indexes the data +func (b *BleveIndexer) Index(repoID int64) error { + repo, err := models.GetRepositoryByID(repoID) if err != nil { - return nil, err + return err } - changes.Updates, err = parseGitLsTreeOutput(stdout) - return &changes, err -} -// nonGenesisChanges get changes since the previous indexer update -func nonGenesisChanges(repo *models.Repository, revision string) (*repoChanges, error) { - diffCmd := git.NewCommand("diff", "--name-status", - repo.IndexerStatus.CommitSha, revision) - stdout, err := diffCmd.RunInDir(repo.RepoPath()) + sha, err := getDefaultBranchSha(repo) if err != nil { - // previous commit sha may have been removed by a force push, so - // try rebuilding from scratch - log.Warn("git diff: %v", err) - if err = deleteRepoFromIndexer(repo.ID); err != nil { - return nil, err - } - return genesisChanges(repo, revision) + return err + } + changes, err := getRepoChanges(repo, sha) + if err != nil { + return err + } else if changes == nil { + return nil } - var changes repoChanges - updatedFilenames := make([]string, 0, 10) - for _, line := range strings.Split(stdout, "\n") { - line = strings.TrimSpace(line) - if len(line) == 0 { - continue - } - filename := strings.TrimSpace(line[1:]) - if len(filename) == 0 { - continue - } else if filename[0] == '"' { - filename, err = strconv.Unquote(filename) - if err != nil { - return nil, err - } - } - switch status := line[0]; status { - case 'M', 'A': - updatedFilenames = append(updatedFilenames, filename) - case 'D': - changes.RemovedFilenames = append(changes.RemovedFilenames, filename) - default: - log.Warn("Unrecognized status: %c (line=%s)", status, line) + batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize) + for _, update := range changes.Updates { + if err := addUpdate(update, repo, batch); err != nil { + return err } } + for _, filename := range changes.RemovedFilenames { + if err := addDelete(filename, repo, batch); err != nil { + return err + } + } + if err = batch.Flush(); err != nil { + return err + } + return repo.UpdateIndexerStatus(sha) +} - cmd := git.NewCommand("ls-tree", "--full-tree", revision, "--") - cmd.AddArguments(updatedFilenames...) - lsTreeStdout, err := cmd.RunInDirBytes(repo.RepoPath()) +// Delete deletes indexes by ids +func (b *BleveIndexer) Delete(repoID int64) error { + query := numericEqualityQuery(repoID, "RepoID") + searchRequest := bleve.NewSearchRequestOptions(query, 2147483647, 0, false) + result, err := b.indexer.Search(searchRequest) if err != nil { - return nil, err + return err + } + batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize) + for _, hit := range result.Hits { + if err = batch.Delete(hit.ID); err != nil { + return err + } } - changes.Updates, err = parseGitLsTreeOutput(lsTreeStdout) - return &changes, err + return batch.Flush() } -func processRepoIndexerOperationQueue() { - for { - select { - case op := <-repoIndexerOperationQueue: - var err error - if op.deleted { - if err = deleteRepoFromIndexer(op.repoID); err != nil { - log.Error("DeleteRepoFromIndexer: %v", err) - } - } else { - if err = updateRepoIndexer(op.repoID); err != nil { - log.Error("updateRepoIndexer: %v", err) - } - } - for _, watcher := range op.watchers { - watcher <- err - } - case <-graceful.GetManager().IsShutdown(): - log.Info("PID: %d Repository indexer queue processing stopped", os.Getpid()) - return +// Search searches for files in the specified repo. +// Returns the matching file-paths +func (b *BleveIndexer) Search(repoIDs []int64, keyword string, page, pageSize int) (int64, []*SearchResult, error) { + phraseQuery := bleve.NewMatchPhraseQuery(keyword) + phraseQuery.FieldVal = "Content" + phraseQuery.Analyzer = repoIndexerAnalyzer + + var indexerQuery query.Query + if len(repoIDs) > 0 { + var repoQueries = make([]query.Query, 0, len(repoIDs)) + for _, repoID := range repoIDs { + repoQueries = append(repoQueries, numericEqualityQuery(repoID, "RepoID")) } + indexerQuery = bleve.NewConjunctionQuery( + bleve.NewDisjunctionQuery(repoQueries...), + phraseQuery, + ) + } else { + indexerQuery = phraseQuery } -} - -// DeleteRepoFromIndexer remove all of a repository's entries from the indexer -func DeleteRepoFromIndexer(repo *models.Repository, watchers ...chan<- error) { - addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: true, watchers: watchers}) -} -// UpdateRepoIndexer update a repository's entries in the indexer -func UpdateRepoIndexer(repo *models.Repository, watchers ...chan<- error) { - addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: false, watchers: watchers}) -} + from := (page - 1) * pageSize + searchRequest := bleve.NewSearchRequestOptions(indexerQuery, pageSize, from, false) + searchRequest.Fields = []string{"Content", "RepoID"} + searchRequest.IncludeLocations = true -func addOperationToQueue(op repoIndexerOperation) { - if !setting.Indexer.RepoIndexerEnabled { - return + result, err := b.indexer.Search(searchRequest) + if err != nil { + return 0, nil, err } - select { - case repoIndexerOperationQueue <- op: - break - default: - go func() { - repoIndexerOperationQueue <- op - }() + + searchResults := make([]*SearchResult, len(result.Hits)) + for i, hit := range result.Hits { + var startIndex, endIndex int = -1, -1 + for _, locations := range hit.Locations["Content"] { + location := locations[0] + locationStart := int(location.Start) + locationEnd := int(location.End) + if startIndex < 0 || locationStart < startIndex { + startIndex = locationStart + } + if endIndex < 0 || locationEnd > endIndex { + endIndex = locationEnd + } + } + searchResults[i] = &SearchResult{ + RepoID: int64(hit.Fields["RepoID"].(float64)), + StartIndex: startIndex, + EndIndex: endIndex, + Filename: filenameOfIndexerID(hit.ID), + Content: hit.Fields["Content"].(string), + } } + return int64(result.Total), searchResults, nil } |