summaryrefslogtreecommitdiffstats
path: root/modules/indexer/code
diff options
context:
space:
mode:
authorLunny Xiao <xiaolunwen@gmail.com>2020-09-07 23:05:08 +0800
committerGitHub <noreply@github.com>2020-09-07 23:05:08 +0800
commit91e7ad569ac9590b521e5c4fdfb2162f528db49f (patch)
tree675316438ca6d716b87849dcf23f5ecd772176d5 /modules/indexer/code
parenta722dd72db7fb6efd9ec0bf730d6b2364ef6337c (diff)
downloadgitea-91e7ad569ac9590b521e5c4fdfb2162f528db49f.tar.gz
gitea-91e7ad569ac9590b521e5c4fdfb2162f528db49f.zip
Add queue for code indexer (#10332)
* Add queue for code indexer * Fix lint * Fix test * Fix lint * Fix bug * Fix bug * Fix lint * Add noqueue * Fix tests * Rename noqueue to immediate
Diffstat (limited to 'modules/indexer/code')
-rw-r--r--modules/indexer/code/elastic_search.go5
-rw-r--r--modules/indexer/code/indexer.go158
-rw-r--r--modules/indexer/code/queue.go154
3 files changed, 158 insertions, 159 deletions
diff --git a/modules/indexer/code/elastic_search.go b/modules/indexer/code/elastic_search.go
index 4f690ed806..db36c5e0c4 100644
--- a/modules/indexer/code/elastic_search.go
+++ b/modules/indexer/code/elastic_search.go
@@ -168,6 +168,11 @@ func (b *ElasticSearchIndexer) init() (bool, error) {
}
func (b *ElasticSearchIndexer) addUpdate(sha string, update fileUpdate, repo *models.Repository) ([]elastic.BulkableRequest, error) {
+ // Ignore vendored files in code search
+ if setting.Indexer.ExcludeVendored && enry.IsVendor(update.Filename) {
+ return nil, nil
+ }
+
stdout, err := git.NewCommand("cat-file", "-s", update.BlobSha).
RunInDir(repo.RepoPath())
if err != nil {
diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go
index 468955cd89..5456373398 100644
--- a/modules/indexer/code/indexer.go
+++ b/modules/indexer/code/indexer.go
@@ -14,6 +14,7 @@ import (
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/timeutil"
)
@@ -38,7 +39,7 @@ type SearchResultLanguages struct {
Count int
}
-// Indexer defines an interface to indexer issues contents
+// Indexer defines an interface to index and search code contents
type Indexer interface {
Index(repo *models.Repository, sha string, changes *repoChanges) error
Delete(repoID int64) error
@@ -67,6 +68,40 @@ func filenameOfIndexerID(indexerID string) string {
return indexerID[index+1:]
}
+// IndexerData represents data stored in the code indexer
+type IndexerData struct {
+ RepoID int64
+ IsDelete bool
+}
+
+var (
+ indexerQueue queue.Queue
+)
+
+func index(indexer Indexer, repoID int64) error {
+ repo, err := models.GetRepositoryByID(repoID)
+ if err != nil {
+ return err
+ }
+
+ sha, err := getDefaultBranchSha(repo)
+ if err != nil {
+ return err
+ }
+ changes, err := getRepoChanges(repo, sha)
+ if err != nil {
+ return err
+ } else if changes == nil {
+ return nil
+ }
+
+ if err := indexer.Index(repo, sha, changes); err != nil {
+ return err
+ }
+
+ return repo.UpdateIndexerStatus(models.RepoIndexerTypeCode, sha)
+}
+
// Init initialize the repo indexer
func Init() {
if !setting.Indexer.RepoIndexerEnabled {
@@ -74,8 +109,6 @@ func Init() {
return
}
- initQueue(setting.Indexer.UpdateQueueLength)
-
ctx, cancel := context.WithCancel(context.Background())
graceful.GetManager().RunAtTerminate(ctx, func() {
@@ -85,6 +118,46 @@ func Init() {
})
waitChannel := make(chan time.Duration)
+
+ // Create the Queue
+ switch setting.Indexer.RepoType {
+ case "bleve", "elasticsearch":
+ handler := func(data ...queue.Data) {
+ idx, err := indexer.get()
+ if idx == nil || err != nil {
+ log.Error("Codes indexer handler: unable to get indexer!")
+ return
+ }
+
+ for _, datum := range data {
+ indexerData, ok := datum.(*IndexerData)
+ if !ok {
+ log.Error("Unable to process provided datum: %v - not possible to cast to IndexerData", datum)
+ continue
+ }
+ log.Trace("IndexerData Process: %v %t", indexerData.RepoID, indexerData.IsDelete)
+
+ if indexerData.IsDelete {
+ if err := indexer.Delete(indexerData.RepoID); err != nil {
+ log.Error("indexer.Delete: %v", err)
+ }
+ } else {
+ if err := index(indexer, indexerData.RepoID); err != nil {
+ log.Error("index: %v", err)
+ continue
+ }
+ }
+ }
+ }
+
+ indexerQueue = queue.CreateQueue("code_indexer", handler, &IndexerData{})
+ if indexerQueue == nil {
+ log.Fatal("Unable to create codes indexer queue")
+ }
+ default:
+ log.Fatal("Unknown codes indexer type; %s", setting.Indexer.RepoType)
+ }
+
go func() {
start := time.Now()
var (
@@ -139,10 +212,11 @@ func Init() {
indexer.set(rIndexer)
- go processRepoIndexerOperationQueue(indexer)
+ // Start processing the queue
+ go graceful.GetManager().RunWithShutdownFns(indexerQueue.Run)
if populate {
- go populateRepoIndexer()
+ go graceful.GetManager().RunWithShutdownContext(populateRepoIndexer)
}
select {
case waitChannel <- time.Since(start):
@@ -179,3 +253,77 @@ func Init() {
}()
}
}
+
+// DeleteRepoFromIndexer remove all of a repository's entries from the indexer
+func DeleteRepoFromIndexer(repo *models.Repository) {
+ indexData := &IndexerData{RepoID: repo.ID, IsDelete: true}
+ if err := indexerQueue.Push(indexData); err != nil {
+ log.Error("Delete repo index data %v failed: %v", indexData, err)
+ }
+}
+
+// UpdateRepoIndexer update a repository's entries in the indexer
+func UpdateRepoIndexer(repo *models.Repository) {
+ indexData := &IndexerData{RepoID: repo.ID}
+ if err := indexerQueue.Push(indexData); err != nil {
+ log.Error("Update repo index data %v failed: %v", indexData, err)
+ }
+}
+
+// populateRepoIndexer populate the repo indexer with pre-existing data. This
+// should only be run when the indexer is created for the first time.
+func populateRepoIndexer(ctx context.Context) {
+ log.Info("Populating the repo indexer with existing repositories")
+
+ exist, err := models.IsTableNotEmpty("repository")
+ if err != nil {
+ log.Fatal("System error: %v", err)
+ } else if !exist {
+ return
+ }
+
+ // if there is any existing repo indexer metadata in the DB, delete it
+ // since we are starting afresh. Also, xorm requires deletes to have a
+ // condition, and we want to delete everything, thus 1=1.
+ if err := models.DeleteAllRecords("repo_indexer_status"); err != nil {
+ log.Fatal("System error: %v", err)
+ }
+
+ var maxRepoID int64
+ if maxRepoID, err = models.GetMaxID("repository"); err != nil {
+ log.Fatal("System error: %v", err)
+ }
+
+ // start with the maximum existing repo ID and work backwards, so that we
+ // don't include repos that are created after gitea starts; such repos will
+ // already be added to the indexer, and we don't need to add them again.
+ for maxRepoID > 0 {
+ select {
+ case <-ctx.Done():
+ log.Info("Repository Indexer population shutdown before completion")
+ return
+ default:
+ }
+ ids, err := models.GetUnindexedRepos(models.RepoIndexerTypeCode, maxRepoID, 0, 50)
+ if err != nil {
+ log.Error("populateRepoIndexer: %v", err)
+ return
+ } else if len(ids) == 0 {
+ break
+ }
+ for _, id := range ids {
+ select {
+ case <-ctx.Done():
+ log.Info("Repository Indexer population shutdown before completion")
+ return
+ default:
+ }
+ if err := indexerQueue.Push(&IndexerData{RepoID: id}); err != nil {
+ log.Error("indexerQueue.Push: %v", err)
+ return
+ }
+ maxRepoID = id - 1
+ }
+ }
+ log.Info("Done (re)populating the repo indexer with existing repositories")
+}
diff --git a/modules/indexer/code/queue.go b/modules/indexer/code/queue.go
deleted file mode 100644
index 844003e1fc..0000000000
--- a/modules/indexer/code/queue.go
+++ /dev/null
@@ -1,154 +0,0 @@
-// Copyright 2019 The Gitea Authors. All rights reserved.
-// Use of this source code is governed by a MIT-style
-// license that can be found in the LICENSE file.
-
-package code
-
-import (
- "os"
-
- "code.gitea.io/gitea/models"
- "code.gitea.io/gitea/modules/graceful"
- "code.gitea.io/gitea/modules/log"
-)
-
-type repoIndexerOperation struct {
- repoID int64
- deleted bool
- watchers []chan<- error
-}
-
-var repoIndexerOperationQueue chan repoIndexerOperation
-
-func initQueue(queueLength int) {
- repoIndexerOperationQueue = make(chan repoIndexerOperation, queueLength)
-}
-
-func index(indexer Indexer, repoID int64) error {
- repo, err := models.GetRepositoryByID(repoID)
- if err != nil {
- return err
- }
-
- sha, err := getDefaultBranchSha(repo)
- if err != nil {
- return err
- }
- changes, err := getRepoChanges(repo, sha)
- if err != nil {
- return err
- } else if changes == nil {
- return nil
- }
-
- if err := indexer.Index(repo, sha, changes); err != nil {
- return err
- }
-
- return repo.UpdateIndexerStatus(models.RepoIndexerTypeCode, sha)
-}
-
-func processRepoIndexerOperationQueue(indexer Indexer) {
- for {
- select {
- case op := <-repoIndexerOperationQueue:
- var err error
- if op.deleted {
- if err = indexer.Delete(op.repoID); err != nil {
- log.Error("indexer.Delete: %v", err)
- }
- } else {
- if err = index(indexer, op.repoID); err != nil {
- log.Error("indexer.Index: %v", err)
- }
- }
- for _, watcher := range op.watchers {
- watcher <- err
- }
- case <-graceful.GetManager().IsShutdown():
- log.Info("PID: %d Repository indexer queue processing stopped", os.Getpid())
- return
- }
- }
-}
-
-// DeleteRepoFromIndexer remove all of a repository's entries from the indexer
-func DeleteRepoFromIndexer(repo *models.Repository, watchers ...chan<- error) {
- addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: true, watchers: watchers})
-}
-
-// UpdateRepoIndexer update a repository's entries in the indexer
-func UpdateRepoIndexer(repo *models.Repository, watchers ...chan<- error) {
- addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: false, watchers: watchers})
-}
-
-func addOperationToQueue(op repoIndexerOperation) {
- select {
- case repoIndexerOperationQueue <- op:
- break
- default:
- go func() {
- repoIndexerOperationQueue <- op
- }()
- }
-}
-
-// populateRepoIndexer populate the repo indexer with pre-existing data. This
-// should only be run when the indexer is created for the first time.
-func populateRepoIndexer() {
- log.Info("Populating the repo indexer with existing repositories")
-
- isShutdown := graceful.GetManager().IsShutdown()
-
- exist, err := models.IsTableNotEmpty("repository")
- if err != nil {
- log.Fatal("System error: %v", err)
- } else if !exist {
- return
- }
-
- // if there is any existing repo indexer metadata in the DB, delete it
- // since we are starting afresh. Also, xorm requires deletes to have a
- // condition, and we want to delete everything, thus 1=1.
- if err := models.DeleteAllRecords("repo_indexer_status"); err != nil {
- log.Fatal("System error: %v", err)
- }
-
- var maxRepoID int64
- if maxRepoID, err = models.GetMaxID("repository"); err != nil {
- log.Fatal("System error: %v", err)
- }
-
- // start with the maximum existing repo ID and work backwards, so that we
- // don't include repos that are created after gitea starts; such repos will
- // already be added to the indexer, and we don't need to add them again.
- for maxRepoID > 0 {
- select {
- case <-isShutdown:
- log.Info("Repository Indexer population shutdown before completion")
- return
- default:
- }
- ids, err := models.GetUnindexedRepos(models.RepoIndexerTypeCode, maxRepoID, 0, 50)
- if err != nil {
- log.Error("populateRepoIndexer: %v", err)
- return
- } else if len(ids) == 0 {
- break
- }
- for _, id := range ids {
- select {
- case <-isShutdown:
- log.Info("Repository Indexer population shutdown before completion")
- return
- default:
- }
- repoIndexerOperationQueue <- repoIndexerOperation{
- repoID: id,
- deleted: false,
- }
- maxRepoID = id - 1
- }
- }
- log.Info("Done (re)populating the repo indexer with existing repositories")
-}