diff options
author | Lunny Xiao <xiaolunwen@gmail.com> | 2019-12-23 20:31:16 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-12-23 20:31:16 +0800 |
commit | 89b4e0477b4e1b9e1cccd87b68fde4ea8a578e9c (patch) | |
tree | 46141746472fc33dfa8c262fa96176895bc9fba8 /modules | |
parent | 2f9564f993ba02ba503d7088eb8cc70536b7a6df (diff) | |
download | gitea-89b4e0477b4e1b9e1cccd87b68fde4ea8a578e9c.tar.gz gitea-89b4e0477b4e1b9e1cccd87b68fde4ea8a578e9c.zip |
Refactor code indexer (#9313)
* Refactor code indexer
* fix test
* fix test
* refactor code indexer
* fix import
* improve code
* fix typo
* fix test and make code clean
* fix lint
Diffstat (limited to 'modules')
-rw-r--r-- | modules/indexer/code/bleve.go | 529 | ||||
-rw-r--r-- | modules/indexer/code/bleve_test.go | 54 | ||||
-rw-r--r-- | modules/indexer/code/git.go | 147 | ||||
-rw-r--r-- | modules/indexer/code/indexer.go | 107 | ||||
-rw-r--r-- | modules/indexer/code/queue.go | 133 | ||||
-rw-r--r-- | modules/indexer/code/repo.go | 290 | ||||
-rw-r--r-- | modules/indexer/code/search.go (renamed from modules/search/search.go) | 7 | ||||
-rw-r--r-- | modules/indexer/issues/db.go | 2 | ||||
-rw-r--r-- | modules/indexer/issues/indexer.go | 2 | ||||
-rw-r--r-- | modules/setting/indexer.go | 2 |
10 files changed, 642 insertions, 631 deletions
diff --git a/modules/indexer/code/bleve.go b/modules/indexer/code/bleve.go index bb2fc5bc74..339dca74a1 100644 --- a/modules/indexer/code/bleve.go +++ b/modules/indexer/code/bleve.go @@ -9,182 +9,90 @@ import ( "os" "strconv" "strings" - "time" "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/base" "code.gitea.io/gitea/modules/charset" "code.gitea.io/gitea/modules/git" - "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" + + "github.com/blevesearch/bleve" + "github.com/blevesearch/bleve/analysis/analyzer/custom" + "github.com/blevesearch/bleve/analysis/token/lowercase" + "github.com/blevesearch/bleve/analysis/token/unicodenorm" + "github.com/blevesearch/bleve/analysis/tokenizer/unicode" + "github.com/blevesearch/bleve/index/upsidedown" + "github.com/blevesearch/bleve/mapping" + "github.com/blevesearch/bleve/search/query" "github.com/ethantkoenig/rupture" ) -type repoIndexerOperation struct { - repoID int64 - deleted bool - watchers []chan<- error -} - -var repoIndexerOperationQueue chan repoIndexerOperation +const unicodeNormalizeName = "unicodeNormalize" +const maxBatchSize = 16 -// InitRepoIndexer initialize the repo indexer -func InitRepoIndexer() { - if !setting.Indexer.RepoIndexerEnabled { - return - } - waitChannel := make(chan time.Duration) - // FIXME: graceful: This should use a persistable queue - repoIndexerOperationQueue = make(chan repoIndexerOperation, setting.Indexer.UpdateQueueLength) - go func() { - start := time.Now() - log.Info("PID: %d: Initializing Repository Indexer", os.Getpid()) - initRepoIndexer(populateRepoIndexerAsynchronously) - go processRepoIndexerOperationQueue() - waitChannel <- time.Since(start) - }() - if setting.Indexer.StartupTimeout > 0 { - go func() { - timeout := setting.Indexer.StartupTimeout - if graceful.GetManager().IsChild() && setting.GracefulHammerTime > 0 { - timeout += setting.GracefulHammerTime - } - select { - case duration := <-waitChannel: - log.Info("Repository Indexer Initialization took %v", duration) - case <-time.After(timeout): - log.Fatal("Repository Indexer Initialization Timed-Out after: %v", timeout) - } - }() - - } +// indexerID a bleve-compatible unique identifier for an integer id +func indexerID(id int64) string { + return strconv.FormatInt(id, 36) } -// populateRepoIndexerAsynchronously asynchronously populates the repo indexer -// with pre-existing data. This should only be run when the indexer is created -// for the first time. -func populateRepoIndexerAsynchronously() error { - exist, err := models.IsTableNotEmpty("repository") - if err != nil { - return err - } else if !exist { - return nil - } - - var maxRepoID int64 - if maxRepoID, err = models.GetMaxID("repository"); err != nil { - return err - } - go populateRepoIndexer(maxRepoID) - return nil +// numericEqualityQuery a numeric equality query for the given value and field +func numericEqualityQuery(value int64, field string) *query.NumericRangeQuery { + f := float64(value) + tru := true + q := bleve.NewNumericRangeInclusiveQuery(&f, &f, &tru, &tru) + q.SetField(field) + return q } -// populateRepoIndexer populate the repo indexer with pre-existing data. This -// should only be run when the indexer is created for the first time. -// FIXME: graceful: This should use a persistable queue -func populateRepoIndexer(maxRepoID int64) { - log.Info("Populating the repo indexer with existing repositories") - - isShutdown := graceful.GetManager().IsShutdown() - - // start with the maximum existing repo ID and work backwards, so that we - // don't include repos that are created after gitea starts; such repos will - // already be added to the indexer, and we don't need to add them again. - for maxRepoID > 0 { - select { - case <-isShutdown: - log.Info("Repository Indexer population shutdown before completion") - return - default: - } - ids, err := models.GetUnindexedRepos(maxRepoID, 0, 50) - if err != nil { - log.Error("populateRepoIndexer: %v", err) - return - } else if len(ids) == 0 { - break - } - for _, id := range ids { - select { - case <-isShutdown: - log.Info("Repository Indexer population shutdown before completion") - return - default: - } - repoIndexerOperationQueue <- repoIndexerOperation{ - repoID: id, - deleted: false, - } - maxRepoID = id - 1 - } - } - log.Info("Done (re)populating the repo indexer with existing repositories") +func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error { + return m.AddCustomTokenFilter(unicodeNormalizeName, map[string]interface{}{ + "type": unicodenorm.Name, + "form": unicodenorm.NFC, + }) } -func updateRepoIndexer(repoID int64) error { - repo, err := models.GetRepositoryByID(repoID) - if err != nil { - return fmt.Errorf("UpdateRepoIndexer: Unable to GetRepositoryByID: %d, Error: %v", repoID, err) +// openIndexer open the index at the specified path, checking for metadata +// updates and bleve version updates. If index needs to be created (or +// re-created), returns (nil, nil) +func openIndexer(path string, latestVersion int) (bleve.Index, error) { + _, err := os.Stat(path) + if err != nil && os.IsNotExist(err) { + return nil, nil + } else if err != nil { + return nil, err } - sha, err := getDefaultBranchSha(repo) + metadata, err := rupture.ReadIndexMetadata(path) if err != nil { - return fmt.Errorf("UpdateRepoIndexer: Unable to GetDefaultBranchSha for: %s/%s, Error: %v", repo.MustOwnerName(), repo.Name, err) + return nil, err } - changes, err := getRepoChanges(repo, sha) - if err != nil { - return fmt.Errorf("UpdateRepoIndexer: Unable to GetRepoChanges for: %s/%s Sha: %s Error: %v", repo.MustOwnerName(), repo.Name, sha, err) - } else if changes == nil { - return nil + if metadata.Version < latestVersion { + // the indexer is using a previous version, so we should delete it and + // re-populate + return nil, os.RemoveAll(path) } - batch := RepoIndexerBatch() - for _, update := range changes.Updates { - if err := addUpdate(update, repo, batch); err != nil { - return fmt.Errorf("UpdateRepoIndexer: Unable to addUpdate to: %s/%s Sha: %s, update: %s(%s) Error: %v", repo.MustOwnerName(), repo.Name, sha, update.Filename, update.BlobSha, err) - } - } - for _, filename := range changes.RemovedFilenames { - if err := addDelete(filename, repo, batch); err != nil { - return fmt.Errorf("UpdateRepoIndexer: Unable to addDelete to: %s/%s Sha: %s, filename: %s Error: %v", repo.MustOwnerName(), repo.Name, sha, filename, err) - } - } - if err = batch.Flush(); err != nil { - return fmt.Errorf("UpdateRepoIndexer: Unable to flush batch to indexer for repo: %s/%s Error: %v", repo.MustOwnerName(), repo.Name, err) + index, err := bleve.Open(path) + if err != nil && err == upsidedown.IncompatibleVersion { + // the indexer was built with a previous version of bleve, so we should + // delete it and re-populate + return nil, os.RemoveAll(path) + } else if err != nil { + return nil, err } - return repo.UpdateIndexerStatus(sha) + return index, nil } -// repoChanges changes (file additions/updates/removals) to a repo -type repoChanges struct { - Updates []fileUpdate - RemovedFilenames []string +// RepoIndexerData data stored in the repo indexer +type RepoIndexerData struct { + RepoID int64 + Content string } -type fileUpdate struct { - Filename string - BlobSha string -} - -func getDefaultBranchSha(repo *models.Repository) (string, error) { - stdout, err := git.NewCommand("show-ref", "-s", git.BranchPrefix+repo.DefaultBranch).RunInDir(repo.RepoPath()) - if err != nil { - return "", err - } - return strings.TrimSpace(stdout), nil -} - -// getRepoChanges returns changes to repo since last indexer update -func getRepoChanges(repo *models.Repository, revision string) (*repoChanges, error) { - if err := repo.GetIndexerStatus(); err != nil { - return nil, err - } - - if len(repo.IndexerStatus.CommitSha) == 0 { - return genesisChanges(repo, revision) - } - return nonGenesisChanges(repo, revision) +// Type returns the document type, for bleve's mapping.Classifier interface. +func (d *RepoIndexerData) Type() string { + return repoIndexerDocType } func addUpdate(update fileUpdate, repo *models.Repository, batch rupture.FlushingBatch) error { @@ -207,174 +115,231 @@ func addUpdate(update fileUpdate, repo *models.Repository, batch rupture.Flushin // FIXME: UTF-16 files will probably fail here return nil } - indexerUpdate := RepoIndexerUpdate{ - Filepath: update.Filename, - Op: RepoIndexerOpUpdate, - Data: &RepoIndexerData{ - RepoID: repo.ID, - Content: string(charset.ToUTF8DropErrors(fileContents)), - }, - } - return indexerUpdate.AddToFlushingBatch(batch) + + id := filenameIndexerID(repo.ID, update.Filename) + return batch.Index(id, &RepoIndexerData{ + RepoID: repo.ID, + Content: string(charset.ToUTF8DropErrors(fileContents)), + }) } func addDelete(filename string, repo *models.Repository, batch rupture.FlushingBatch) error { - indexerUpdate := RepoIndexerUpdate{ - Filepath: filename, - Op: RepoIndexerOpDelete, - Data: &RepoIndexerData{ - RepoID: repo.ID, - }, - } - return indexerUpdate.AddToFlushingBatch(batch) + id := filenameIndexerID(repo.ID, filename) + return batch.Delete(id) } -func isIndexable(entry *git.TreeEntry) bool { - if !entry.IsRegular() && !entry.IsExecutable() { - return false +const ( + repoIndexerAnalyzer = "repoIndexerAnalyzer" + repoIndexerDocType = "repoIndexerDocType" + repoIndexerLatestVersion = 4 +) + +// createRepoIndexer create a repo indexer if one does not already exist +func createRepoIndexer(path string, latestVersion int) (bleve.Index, error) { + docMapping := bleve.NewDocumentMapping() + numericFieldMapping := bleve.NewNumericFieldMapping() + numericFieldMapping.IncludeInAll = false + docMapping.AddFieldMappingsAt("RepoID", numericFieldMapping) + + textFieldMapping := bleve.NewTextFieldMapping() + textFieldMapping.IncludeInAll = false + docMapping.AddFieldMappingsAt("Content", textFieldMapping) + + mapping := bleve.NewIndexMapping() + if err := addUnicodeNormalizeTokenFilter(mapping); err != nil { + return nil, err + } else if err := mapping.AddCustomAnalyzer(repoIndexerAnalyzer, map[string]interface{}{ + "type": custom.Name, + "char_filters": []string{}, + "tokenizer": unicode.Name, + "token_filters": []string{unicodeNormalizeName, lowercase.Name}, + }); err != nil { + return nil, err } - name := strings.ToLower(entry.Name()) - for _, g := range setting.Indexer.ExcludePatterns { - if g.Match(name) { - return false - } + mapping.DefaultAnalyzer = repoIndexerAnalyzer + mapping.AddDocumentMapping(repoIndexerDocType, docMapping) + mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping()) + + indexer, err := bleve.New(path, mapping) + if err != nil { + return nil, err } - for _, g := range setting.Indexer.IncludePatterns { - if g.Match(name) { - return true - } + + if err = rupture.WriteIndexMetadata(path, &rupture.IndexMetadata{ + Version: latestVersion, + }); err != nil { + return nil, err } - return len(setting.Indexer.IncludePatterns) == 0 + return indexer, nil +} + +func filenameIndexerID(repoID int64, filename string) string { + return indexerID(repoID) + "_" + filename } -// parseGitLsTreeOutput parses the output of a `git ls-tree -r --full-name` command -func parseGitLsTreeOutput(stdout []byte) ([]fileUpdate, error) { - entries, err := git.ParseTreeEntries(stdout) +func filenameOfIndexerID(indexerID string) string { + index := strings.IndexByte(indexerID, '_') + if index == -1 { + log.Error("Unexpected ID in repo indexer: %s", indexerID) + } + return indexerID[index+1:] +} + +var ( + _ Indexer = &BleveIndexer{} +) + +// BleveIndexer represents a bleve indexer implementation +type BleveIndexer struct { + indexDir string + indexer bleve.Index +} + +// NewBleveIndexer creates a new bleve local indexer +func NewBleveIndexer(indexDir string) (*BleveIndexer, bool, error) { + indexer := &BleveIndexer{ + indexDir: indexDir, + } + created, err := indexer.init() + return indexer, created, err +} + +// init init the indexer +func (b *BleveIndexer) init() (bool, error) { + var err error + b.indexer, err = openIndexer(b.indexDir, repoIndexerLatestVersion) if err != nil { - return nil, err + return false, err } - var idxCount = 0 - updates := make([]fileUpdate, len(entries)) - for _, entry := range entries { - if isIndexable(entry) { - updates[idxCount] = fileUpdate{ - Filename: entry.Name(), - BlobSha: entry.ID.String(), - } - idxCount++ + if b.indexer != nil { + return false, nil + } + + b.indexer, err = createRepoIndexer(b.indexDir, repoIndexerLatestVersion) + if err != nil { + return false, err + } + + return true, nil +} + +// Close close the indexer +func (b *BleveIndexer) Close() { + log.Debug("Closing repo indexer") + if b.indexer != nil { + err := b.indexer.Close() + if err != nil { + log.Error("Error whilst closing the repository indexer: %v", err) } } - return updates[:idxCount], nil + log.Info("PID: %d Repository Indexer closed", os.Getpid()) } -// genesisChanges get changes to add repo to the indexer for the first time -func genesisChanges(repo *models.Repository, revision string) (*repoChanges, error) { - var changes repoChanges - stdout, err := git.NewCommand("ls-tree", "--full-tree", "-r", revision). - RunInDirBytes(repo.RepoPath()) +// Index indexes the data +func (b *BleveIndexer) Index(repoID int64) error { + repo, err := models.GetRepositoryByID(repoID) if err != nil { - return nil, err + return err } - changes.Updates, err = parseGitLsTreeOutput(stdout) - return &changes, err -} -// nonGenesisChanges get changes since the previous indexer update -func nonGenesisChanges(repo *models.Repository, revision string) (*repoChanges, error) { - diffCmd := git.NewCommand("diff", "--name-status", - repo.IndexerStatus.CommitSha, revision) - stdout, err := diffCmd.RunInDir(repo.RepoPath()) + sha, err := getDefaultBranchSha(repo) if err != nil { - // previous commit sha may have been removed by a force push, so - // try rebuilding from scratch - log.Warn("git diff: %v", err) - if err = deleteRepoFromIndexer(repo.ID); err != nil { - return nil, err - } - return genesisChanges(repo, revision) + return err + } + changes, err := getRepoChanges(repo, sha) + if err != nil { + return err + } else if changes == nil { + return nil } - var changes repoChanges - updatedFilenames := make([]string, 0, 10) - for _, line := range strings.Split(stdout, "\n") { - line = strings.TrimSpace(line) - if len(line) == 0 { - continue - } - filename := strings.TrimSpace(line[1:]) - if len(filename) == 0 { - continue - } else if filename[0] == '"' { - filename, err = strconv.Unquote(filename) - if err != nil { - return nil, err - } - } - switch status := line[0]; status { - case 'M', 'A': - updatedFilenames = append(updatedFilenames, filename) - case 'D': - changes.RemovedFilenames = append(changes.RemovedFilenames, filename) - default: - log.Warn("Unrecognized status: %c (line=%s)", status, line) + batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize) + for _, update := range changes.Updates { + if err := addUpdate(update, repo, batch); err != nil { + return err } } + for _, filename := range changes.RemovedFilenames { + if err := addDelete(filename, repo, batch); err != nil { + return err + } + } + if err = batch.Flush(); err != nil { + return err + } + return repo.UpdateIndexerStatus(sha) +} - cmd := git.NewCommand("ls-tree", "--full-tree", revision, "--") - cmd.AddArguments(updatedFilenames...) - lsTreeStdout, err := cmd.RunInDirBytes(repo.RepoPath()) +// Delete deletes indexes by ids +func (b *BleveIndexer) Delete(repoID int64) error { + query := numericEqualityQuery(repoID, "RepoID") + searchRequest := bleve.NewSearchRequestOptions(query, 2147483647, 0, false) + result, err := b.indexer.Search(searchRequest) if err != nil { - return nil, err + return err + } + batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize) + for _, hit := range result.Hits { + if err = batch.Delete(hit.ID); err != nil { + return err + } } - changes.Updates, err = parseGitLsTreeOutput(lsTreeStdout) - return &changes, err + return batch.Flush() } -func processRepoIndexerOperationQueue() { - for { - select { - case op := <-repoIndexerOperationQueue: - var err error - if op.deleted { - if err = deleteRepoFromIndexer(op.repoID); err != nil { - log.Error("DeleteRepoFromIndexer: %v", err) - } - } else { - if err = updateRepoIndexer(op.repoID); err != nil { - log.Error("updateRepoIndexer: %v", err) - } - } - for _, watcher := range op.watchers { - watcher <- err - } - case <-graceful.GetManager().IsShutdown(): - log.Info("PID: %d Repository indexer queue processing stopped", os.Getpid()) - return +// Search searches for files in the specified repo. +// Returns the matching file-paths +func (b *BleveIndexer) Search(repoIDs []int64, keyword string, page, pageSize int) (int64, []*SearchResult, error) { + phraseQuery := bleve.NewMatchPhraseQuery(keyword) + phraseQuery.FieldVal = "Content" + phraseQuery.Analyzer = repoIndexerAnalyzer + + var indexerQuery query.Query + if len(repoIDs) > 0 { + var repoQueries = make([]query.Query, 0, len(repoIDs)) + for _, repoID := range repoIDs { + repoQueries = append(repoQueries, numericEqualityQuery(repoID, "RepoID")) } + indexerQuery = bleve.NewConjunctionQuery( + bleve.NewDisjunctionQuery(repoQueries...), + phraseQuery, + ) + } else { + indexerQuery = phraseQuery } -} - -// DeleteRepoFromIndexer remove all of a repository's entries from the indexer -func DeleteRepoFromIndexer(repo *models.Repository, watchers ...chan<- error) { - addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: true, watchers: watchers}) -} -// UpdateRepoIndexer update a repository's entries in the indexer -func UpdateRepoIndexer(repo *models.Repository, watchers ...chan<- error) { - addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: false, watchers: watchers}) -} + from := (page - 1) * pageSize + searchRequest := bleve.NewSearchRequestOptions(indexerQuery, pageSize, from, false) + searchRequest.Fields = []string{"Content", "RepoID"} + searchRequest.IncludeLocations = true -func addOperationToQueue(op repoIndexerOperation) { - if !setting.Indexer.RepoIndexerEnabled { - return + result, err := b.indexer.Search(searchRequest) + if err != nil { + return 0, nil, err } - select { - case repoIndexerOperationQueue <- op: - break - default: - go func() { - repoIndexerOperationQueue <- op - }() + + searchResults := make([]*SearchResult, len(result.Hits)) + for i, hit := range result.Hits { + var startIndex, endIndex int = -1, -1 + for _, locations := range hit.Locations["Content"] { + location := locations[0] + locationStart := int(location.Start) + locationEnd := int(location.End) + if startIndex < 0 || locationStart < startIndex { + startIndex = locationStart + } + if endIndex < 0 || locationEnd > endIndex { + endIndex = locationEnd + } + } + searchResults[i] = &SearchResult{ + RepoID: int64(hit.Fields["RepoID"].(float64)), + StartIndex: startIndex, + EndIndex: endIndex, + Filename: filenameOfIndexerID(hit.ID), + Content: hit.Fields["Content"].(string), + } } + return int64(result.Total), searchResults, nil } diff --git a/modules/indexer/code/bleve_test.go b/modules/indexer/code/bleve_test.go index 2eafeef3c5..ac2b411998 100644 --- a/modules/indexer/code/bleve_test.go +++ b/modules/indexer/code/bleve_test.go @@ -5,12 +5,66 @@ package code import ( + "os" "path/filepath" "testing" "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" + + "github.com/stretchr/testify/assert" ) func TestMain(m *testing.M) { models.MainTest(m, filepath.Join("..", "..", "..")) } + +func TestIndexAndSearch(t *testing.T) { + models.PrepareTestEnv(t) + + dir := "./bleve.index" + os.RemoveAll(dir) + + setting.Indexer.RepoIndexerEnabled = true + idx, _, err := NewBleveIndexer(dir) + if err != nil { + idx.Close() + log.Fatal("indexer.Init: %v", err) + } + + err = idx.Index(1) + assert.NoError(t, err) + + var ( + keywords = []struct { + Keyword string + IDs []int64 + }{ + { + Keyword: "Description", + IDs: []int64{1}, + }, + { + Keyword: "repo1", + IDs: []int64{1}, + }, + { + Keyword: "non-exist", + IDs: []int64{}, + }, + } + ) + + for _, kw := range keywords { + total, res, err := idx.Search(nil, kw.Keyword, 1, 10) + assert.NoError(t, err) + assert.EqualValues(t, len(kw.IDs), total) + + var ids = make([]int64, 0, len(res)) + for _, hit := range res { + ids = append(ids, hit.RepoID) + } + assert.EqualValues(t, kw.IDs, ids) + } +} diff --git a/modules/indexer/code/git.go b/modules/indexer/code/git.go new file mode 100644 index 0000000000..bfa7d20438 --- /dev/null +++ b/modules/indexer/code/git.go @@ -0,0 +1,147 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package code + +import ( + "strconv" + "strings" + + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" +) + +type fileUpdate struct { + Filename string + BlobSha string +} + +// repoChanges changes (file additions/updates/removals) to a repo +type repoChanges struct { + Updates []fileUpdate + RemovedFilenames []string +} + +func getDefaultBranchSha(repo *models.Repository) (string, error) { + stdout, err := git.NewCommand("show-ref", "-s", git.BranchPrefix+repo.DefaultBranch).RunInDir(repo.RepoPath()) + if err != nil { + return "", err + } + return strings.TrimSpace(stdout), nil +} + +// getRepoChanges returns changes to repo since last indexer update +func getRepoChanges(repo *models.Repository, revision string) (*repoChanges, error) { + if err := repo.GetIndexerStatus(); err != nil { + return nil, err + } + + if len(repo.IndexerStatus.CommitSha) == 0 { + return genesisChanges(repo, revision) + } + return nonGenesisChanges(repo, revision) +} + +func isIndexable(entry *git.TreeEntry) bool { + if !entry.IsRegular() && !entry.IsExecutable() { + return false + } + name := strings.ToLower(entry.Name()) + for _, g := range setting.Indexer.ExcludePatterns { + if g.Match(name) { + return false + } + } + for _, g := range setting.Indexer.IncludePatterns { + if g.Match(name) { + return true + } + } + return len(setting.Indexer.IncludePatterns) == 0 +} + +// parseGitLsTreeOutput parses the output of a `git ls-tree -r --full-name` command +func parseGitLsTreeOutput(stdout []byte) ([]fileUpdate, error) { + entries, err := git.ParseTreeEntries(stdout) + if err != nil { + return nil, err + } + var idxCount = 0 + updates := make([]fileUpdate, len(entries)) + for _, entry := range entries { + if isIndexable(entry) { + updates[idxCount] = fileUpdate{ + Filename: entry.Name(), + BlobSha: entry.ID.String(), + } + idxCount++ + } + } + return updates[:idxCount], nil +} + +// genesisChanges get changes to add repo to the indexer for the first time +func genesisChanges(repo *models.Repository, revision string) (*repoChanges, error) { + var changes repoChanges + stdout, err := git.NewCommand("ls-tree", "--full-tree", "-r", revision). + RunInDirBytes(repo.RepoPath()) + if err != nil { + return nil, err + } + changes.Updates, err = parseGitLsTreeOutput(stdout) + return &changes, err +} + +// nonGenesisChanges get changes since the previous indexer update +func nonGenesisChanges(repo *models.Repository, revision string) (*repoChanges, error) { + diffCmd := git.NewCommand("diff", "--name-status", + repo.IndexerStatus.CommitSha, revision) + stdout, err := diffCmd.RunInDir(repo.RepoPath()) + if err != nil { + // previous commit sha may have been removed by a force push, so + // try rebuilding from scratch + log.Warn("git diff: %v", err) + if err = indexer.Delete(repo.ID); err != nil { + return nil, err + } + return genesisChanges(repo, revision) + } + var changes repoChanges + updatedFilenames := make([]string, 0, 10) + for _, line := range strings.Split(stdout, "\n") { + line = strings.TrimSpace(line) + if len(line) == 0 { + continue + } + filename := strings.TrimSpace(line[1:]) + if len(filename) == 0 { + continue + } else if filename[0] == '"' { + filename, err = strconv.Unquote(filename) + if err != nil { + return nil, err + } + } + + switch status := line[0]; status { + case 'M', 'A': + updatedFilenames = append(updatedFilenames, filename) + case 'D': + changes.RemovedFilenames = append(changes.RemovedFilenames, filename) + default: + log.Warn("Unrecognized status: %c (line=%s)", status, line) + } + } + + cmd := git.NewCommand("ls-tree", "--full-tree", revision, "--") + cmd.AddArguments(updatedFilenames...) + lsTreeStdout, err := cmd.RunInDirBytes(repo.RepoPath()) + if err != nil { + return nil, err + } + changes.Updates, err = parseGitLsTreeOutput(lsTreeStdout) + return &changes, err +} diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go index 3907a7b57d..c68c7c2d71 100644 --- a/modules/indexer/code/indexer.go +++ b/modules/indexer/code/indexer.go @@ -5,72 +5,73 @@ package code import ( - "os" - "strconv" + "time" + "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" +) - "github.com/blevesearch/bleve" - "github.com/blevesearch/bleve/analysis/token/unicodenorm" - "github.com/blevesearch/bleve/index/upsidedown" - "github.com/blevesearch/bleve/mapping" - "github.com/blevesearch/bleve/search/query" - "github.com/ethantkoenig/rupture" +var ( + indexer Indexer ) -// indexerID a bleve-compatible unique identifier for an integer id -func indexerID(id int64) string { - return strconv.FormatInt(id, 36) +// SearchResult result of performing a search in a repo +type SearchResult struct { + RepoID int64 + StartIndex int + EndIndex int + Filename string + Content string } -// numericEqualityQuery a numeric equality query for the given value and field -func numericEqualityQuery(value int64, field string) *query.NumericRangeQuery { - f := float64(value) - tru := true - q := bleve.NewNumericRangeInclusiveQuery(&f, &f, &tru, &tru) - q.SetField(field) - return q +// Indexer defines an interface to indexer issues contents +type Indexer interface { + Index(repoID int64) error + Delete(repoID int64) error + Search(repoIDs []int64, keyword string, page, pageSize int) (int64, []*SearchResult, error) + Close() } -const unicodeNormalizeName = "unicodeNormalize" +// Init initialize the repo indexer +func Init() { + if !setting.Indexer.RepoIndexerEnabled { + return + } -func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error { - return m.AddCustomTokenFilter(unicodeNormalizeName, map[string]interface{}{ - "type": unicodenorm.Name, - "form": unicodenorm.NFC, - }) -} + waitChannel := make(chan time.Duration) + go func() { + start := time.Now() + log.Info("Initializing Repository Indexer") + var created bool + var err error + indexer, created, err = NewBleveIndexer(setting.Indexer.RepoPath) + if err != nil { + indexer.Close() + log.Fatal("indexer.Init: %v", err) + } -const maxBatchSize = 16 + go processRepoIndexerOperationQueue(indexer) -// openIndexer open the index at the specified path, checking for metadata -// updates and bleve version updates. If index needs to be created (or -// re-created), returns (nil, nil) -func openIndexer(path string, latestVersion int) (bleve.Index, error) { - _, err := os.Stat(setting.Indexer.IssuePath) - if err != nil && os.IsNotExist(err) { - return nil, nil - } else if err != nil { - return nil, err - } + if created { + go populateRepoIndexer() + } - metadata, err := rupture.ReadIndexMetadata(path) - if err != nil { - return nil, err - } - if metadata.Version < latestVersion { - // the indexer is using a previous version, so we should delete it and - // re-populate - return nil, os.RemoveAll(path) - } + waitChannel <- time.Since(start) + }() - index, err := bleve.Open(path) - if err != nil && err == upsidedown.IncompatibleVersion { - // the indexer was built with a previous version of bleve, so we should - // delete it and re-populate - return nil, os.RemoveAll(path) - } else if err != nil { - return nil, err + if setting.Indexer.StartupTimeout > 0 { + go func() { + timeout := setting.Indexer.StartupTimeout + if graceful.GetManager().IsChild() && setting.GracefulHammerTime > 0 { + timeout += setting.GracefulHammerTime + } + select { + case duration := <-waitChannel: + log.Info("Repository Indexer Initialization took %v", duration) + case <-time.After(timeout): + log.Fatal("Repository Indexer Initialization Timed-Out after: %v", timeout) + } + }() } - return index, nil } diff --git a/modules/indexer/code/queue.go b/modules/indexer/code/queue.go new file mode 100644 index 0000000000..90f7d1bb19 --- /dev/null +++ b/modules/indexer/code/queue.go @@ -0,0 +1,133 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package code + +import ( + "os" + + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" +) + +type repoIndexerOperation struct { + repoID int64 + deleted bool + watchers []chan<- error +} + +var repoIndexerOperationQueue chan repoIndexerOperation + +func processRepoIndexerOperationQueue(indexer Indexer) { + defer indexer.Close() + + repoIndexerOperationQueue = make(chan repoIndexerOperation, setting.Indexer.UpdateQueueLength) + for { + select { + case op := <-repoIndexerOperationQueue: + var err error + if op.deleted { + if err = indexer.Delete(op.repoID); err != nil { + log.Error("indexer.Delete: %v", err) + } + } else { + if err = indexer.Index(op.repoID); err != nil { + log.Error("indexer.Index: %v", err) + } + } + for _, watcher := range op.watchers { + watcher <- err + } + case <-graceful.GetManager().IsShutdown(): + log.Info("PID: %d Repository indexer queue processing stopped", os.Getpid()) + return + } + } +} + +// DeleteRepoFromIndexer remove all of a repository's entries from the indexer +func DeleteRepoFromIndexer(repo *models.Repository, watchers ...chan<- error) { + addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: true, watchers: watchers}) +} + +// UpdateRepoIndexer update a repository's entries in the indexer +func UpdateRepoIndexer(repo *models.Repository, watchers ...chan<- error) { + addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: false, watchers: watchers}) +} + +func addOperationToQueue(op repoIndexerOperation) { + if !setting.Indexer.RepoIndexerEnabled { + return + } + select { + case repoIndexerOperationQueue <- op: + break + default: + go func() { + repoIndexerOperationQueue <- op + }() + } +} + +// populateRepoIndexer populate the repo indexer with pre-existing data. This +// should only be run when the indexer is created for the first time. +func populateRepoIndexer() { + log.Info("Populating the repo indexer with existing repositories") + + isShutdown := graceful.GetManager().IsShutdown() + + exist, err := models.IsTableNotEmpty("repository") + if err != nil { + log.Fatal("System error: %v", err) + } else if !exist { + return + } + + // if there is any existing repo indexer metadata in the DB, delete it + // since we are starting afresh. Also, xorm requires deletes to have a + // condition, and we want to delete everything, thus 1=1. + if err := models.DeleteAllRecords("repo_indexer_status"); err != nil { + log.Fatal("System error: %v", err) + } + + var maxRepoID int64 + if maxRepoID, err = models.GetMaxID("repository"); err != nil { + log.Fatal("System error: %v", err) + } + + // start with the maximum existing repo ID and work backwards, so that we + // don't include repos that are created after gitea starts; such repos will + // already be added to the indexer, and we don't need to add them again. + for maxRepoID > 0 { + select { + case <-isShutdown: + log.Info("Repository Indexer population shutdown before completion") + return + default: + } + ids, err := models.GetUnindexedRepos(maxRepoID, 0, 50) + if err != nil { + log.Error("populateRepoIndexer: %v", err) + return + } else if len(ids) == 0 { + break + } + for _, id := range ids { + select { + case <-isShutdown: + log.Info("Repository Indexer population shutdown before completion") + return + default: + } + repoIndexerOperationQueue <- repoIndexerOperation{ + repoID: id, + deleted: false, + } + maxRepoID = id - 1 + } + } + log.Info("Done (re)populating the repo indexer with existing repositories") +} diff --git a/modules/indexer/code/repo.go b/modules/indexer/code/repo.go deleted file mode 100644 index bc5f317b7d..0000000000 --- a/modules/indexer/code/repo.go +++ /dev/null @@ -1,290 +0,0 @@ -// Copyright 2017 The Gitea Authors. All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package code - -import ( - "context" - "os" - "strings" - "sync" - - "code.gitea.io/gitea/models" - "code.gitea.io/gitea/modules/graceful" - "code.gitea.io/gitea/modules/log" - "code.gitea.io/gitea/modules/setting" - - "github.com/blevesearch/bleve" - "github.com/blevesearch/bleve/analysis/analyzer/custom" - "github.com/blevesearch/bleve/analysis/token/lowercase" - "github.com/blevesearch/bleve/analysis/tokenizer/unicode" - "github.com/blevesearch/bleve/search/query" - "github.com/ethantkoenig/rupture" -) - -const ( - repoIndexerAnalyzer = "repoIndexerAnalyzer" - repoIndexerDocType = "repoIndexerDocType" - - repoIndexerLatestVersion = 4 -) - -type bleveIndexerHolder struct { - index bleve.Index - mutex sync.RWMutex - cond *sync.Cond -} - -func newBleveIndexerHolder() *bleveIndexerHolder { - b := &bleveIndexerHolder{} - b.cond = sync.NewCond(b.mutex.RLocker()) - return b -} - -func (r *bleveIndexerHolder) set(index bleve.Index) { - r.mutex.Lock() - defer r.mutex.Unlock() - r.index = index - r.cond.Broadcast() -} - -func (r *bleveIndexerHolder) get() bleve.Index { - r.mutex.RLock() - defer r.mutex.RUnlock() - if r.index == nil { - r.cond.Wait() - } - return r.index -} - -// repoIndexer (thread-safe) index for repository contents -var indexerHolder = newBleveIndexerHolder() - -// RepoIndexerOp type of operation to perform on repo indexer -type RepoIndexerOp int - -const ( - // RepoIndexerOpUpdate add/update a file's contents - RepoIndexerOpUpdate = iota - - // RepoIndexerOpDelete delete a file - RepoIndexerOpDelete -) - -// RepoIndexerData data stored in the repo indexer -type RepoIndexerData struct { - RepoID int64 - Content string -} - -// Type returns the document type, for bleve's mapping.Classifier interface. -func (d *RepoIndexerData) Type() string { - return repoIndexerDocType -} - -// RepoIndexerUpdate an update to the repo indexer -type RepoIndexerUpdate struct { - Filepath string - Op RepoIndexerOp - Data *RepoIndexerData -} - -// AddToFlushingBatch adds the update to the given flushing batch. -func (update RepoIndexerUpdate) AddToFlushingBatch(batch rupture.FlushingBatch) error { - id := filenameIndexerID(update.Data.RepoID, update.Filepath) - switch update.Op { - case RepoIndexerOpUpdate: - return batch.Index(id, update.Data) - case RepoIndexerOpDelete: - return batch.Delete(id) - default: - log.Error("Unrecognized repo indexer op: %d", update.Op) - } - return nil -} - -// initRepoIndexer initialize repo indexer -func initRepoIndexer(populateIndexer func() error) { - indexer, err := openIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion) - if err != nil { - log.Fatal("InitRepoIndexer %s: %v", setting.Indexer.RepoPath, err) - } - if indexer != nil { - indexerHolder.set(indexer) - closeAtTerminate() - - // Continue population from where left off - if err = populateIndexer(); err != nil { - log.Fatal("PopulateRepoIndex: %v", err) - } - return - } - - if err = createRepoIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion); err != nil { - log.Fatal("CreateRepoIndexer: %v", err) - } - closeAtTerminate() - - // if there is any existing repo indexer metadata in the DB, delete it - // since we are starting afresh. Also, xorm requires deletes to have a - // condition, and we want to delete everything, thus 1=1. - if err := models.DeleteAllRecords("repo_indexer_status"); err != nil { - log.Fatal("DeleteAllRepoIndexerStatus: %v", err) - } - - if err = populateIndexer(); err != nil { - log.Fatal("PopulateRepoIndex: %v", err) - } -} - -func closeAtTerminate() { - graceful.GetManager().RunAtTerminate(context.Background(), func() { - log.Debug("Closing repo indexer") - indexer := indexerHolder.get() - if indexer != nil { - err := indexer.Close() - if err != nil { - log.Error("Error whilst closing the repository indexer: %v", err) - } - } - log.Info("PID: %d Repository Indexer closed", os.Getpid()) - }) -} - -// createRepoIndexer create a repo indexer if one does not already exist -func createRepoIndexer(path string, latestVersion int) error { - docMapping := bleve.NewDocumentMapping() - numericFieldMapping := bleve.NewNumericFieldMapping() - numericFieldMapping.IncludeInAll = false - docMapping.AddFieldMappingsAt("RepoID", numericFieldMapping) - - textFieldMapping := bleve.NewTextFieldMapping() - textFieldMapping.IncludeInAll = false - docMapping.AddFieldMappingsAt("Content", textFieldMapping) - - mapping := bleve.NewIndexMapping() - if err := addUnicodeNormalizeTokenFilter(mapping); err != nil { - return err - } else if err := mapping.AddCustomAnalyzer(repoIndexerAnalyzer, map[string]interface{}{ - "type": custom.Name, - "char_filters": []string{}, - "tokenizer": unicode.Name, - "token_filters": []string{unicodeNormalizeName, lowercase.Name}, - }); err != nil { - return err - } - mapping.DefaultAnalyzer = repoIndexerAnalyzer - mapping.AddDocumentMapping(repoIndexerDocType, docMapping) - mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping()) - - indexer, err := bleve.New(path, mapping) - if err != nil { - return err - } - indexerHolder.set(indexer) - - return rupture.WriteIndexMetadata(path, &rupture.IndexMetadata{ - Version: latestVersion, - }) -} - -func filenameIndexerID(repoID int64, filename string) string { - return indexerID(repoID) + "_" + filename -} - -func filenameOfIndexerID(indexerID string) string { - index := strings.IndexByte(indexerID, '_') - if index == -1 { - log.Error("Unexpected ID in repo indexer: %s", indexerID) - } - return indexerID[index+1:] -} - -// RepoIndexerBatch batch to add updates to -func RepoIndexerBatch() rupture.FlushingBatch { - return rupture.NewFlushingBatch(indexerHolder.get(), maxBatchSize) -} - -// deleteRepoFromIndexer delete all of a repo's files from indexer -func deleteRepoFromIndexer(repoID int64) error { - query := numericEqualityQuery(repoID, "RepoID") - searchRequest := bleve.NewSearchRequestOptions(query, 2147483647, 0, false) - result, err := indexerHolder.get().Search(searchRequest) - if err != nil { - return err - } - batch := RepoIndexerBatch() - for _, hit := range result.Hits { - if err = batch.Delete(hit.ID); err != nil { - return err - } - } - return batch.Flush() -} - -// RepoSearchResult result of performing a search in a repo -type RepoSearchResult struct { - RepoID int64 - StartIndex int - EndIndex int - Filename string - Content string -} - -// SearchRepoByKeyword searches for files in the specified repo. -// Returns the matching file-paths -func SearchRepoByKeyword(repoIDs []int64, keyword string, page, pageSize int) (int64, []*RepoSearchResult, error) { - phraseQuery := bleve.NewMatchPhraseQuery(keyword) - phraseQuery.FieldVal = "Content" - phraseQuery.Analyzer = repoIndexerAnalyzer - - var indexerQuery query.Query - if len(repoIDs) > 0 { - var repoQueries = make([]query.Query, 0, len(repoIDs)) - for _, repoID := range repoIDs { - repoQueries = append(repoQueries, numericEqualityQuery(repoID, "RepoID")) - } - - indexerQuery = bleve.NewConjunctionQuery( - bleve.NewDisjunctionQuery(repoQueries...), - phraseQuery, - ) - } else { - indexerQuery = phraseQuery - } - - from := (page - 1) * pageSize - searchRequest := bleve.NewSearchRequestOptions(indexerQuery, pageSize, from, false) - searchRequest.Fields = []string{"Content", "RepoID"} - searchRequest.IncludeLocations = true - - result, err := indexerHolder.get().Search(searchRequest) - if err != nil { - return 0, nil, err - } - - searchResults := make([]*RepoSearchResult, len(result.Hits)) - for i, hit := range result.Hits { - var startIndex, endIndex int = -1, -1 - for _, locations := range hit.Locations["Content"] { - location := locations[0] - locationStart := int(location.Start) - locationEnd := int(location.End) - if startIndex < 0 || locationStart < startIndex { - startIndex = locationStart - } - if endIndex < 0 || locationEnd > endIndex { - endIndex = locationEnd - } - } - searchResults[i] = &RepoSearchResult{ - RepoID: int64(hit.Fields["RepoID"].(float64)), - StartIndex: startIndex, - EndIndex: endIndex, - Filename: filenameOfIndexerID(hit.ID), - Content: hit.Fields["Content"].(string), - } - } - return int64(result.Total), searchResults, nil -} diff --git a/modules/search/search.go b/modules/indexer/code/search.go index 531d95b187..18f193a532 100644 --- a/modules/search/search.go +++ b/modules/indexer/code/search.go @@ -2,7 +2,7 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -package search +package code import ( "bytes" @@ -11,7 +11,6 @@ import ( "strings" "code.gitea.io/gitea/modules/highlight" - code_indexer "code.gitea.io/gitea/modules/indexer/code" "code.gitea.io/gitea/modules/util" ) @@ -60,7 +59,7 @@ func writeStrings(buf *bytes.Buffer, strs ...string) error { return nil } -func searchResult(result *code_indexer.RepoSearchResult, startIndex, endIndex int) (*Result, error) { +func searchResult(result *SearchResult, startIndex, endIndex int) (*Result, error) { startLineNum := 1 + strings.Count(result.Content[:startIndex], "\n") var formattedLinesBuffer bytes.Buffer @@ -113,7 +112,7 @@ func PerformSearch(repoIDs []int64, keyword string, page, pageSize int) (int, [] return 0, nil, nil } - total, results, err := code_indexer.SearchRepoByKeyword(repoIDs, keyword, page, pageSize) + total, results, err := indexer.Search(repoIDs, keyword, page, pageSize) if err != nil { return 0, nil, err } diff --git a/modules/indexer/issues/db.go b/modules/indexer/issues/db.go index 7d4e389471..a758cfeaee 100644 --- a/modules/indexer/issues/db.go +++ b/modules/indexer/issues/db.go @@ -6,7 +6,7 @@ package issues import "code.gitea.io/gitea/models" -// DBIndexer implements Indexer inteface to use database's like search +// DBIndexer implements Indexer interface to use database's like search type DBIndexer struct { } diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index 50b8d6d224..ebcd3f68dd 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -38,7 +38,7 @@ type SearchResult struct { Hits []Match } -// Indexer defines an inteface to indexer issues contents +// Indexer defines an interface to indexer issues contents type Indexer interface { Init() (bool, error) Index(issue []*IndexerData) error diff --git a/modules/setting/indexer.go b/modules/setting/indexer.go index fbaef3fcf2..589e7bf864 100644 --- a/modules/setting/indexer.go +++ b/modules/setting/indexer.go @@ -45,6 +45,8 @@ var ( IssueQueueDir: "indexers/issues.queue", IssueQueueConnStr: "", IssueQueueBatchNumber: 20, + + MaxIndexerFileSize: 1024 * 1024, } ) |