aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLunny Xiao <xiaolunwen@gmail.com>2020-09-07 23:05:08 +0800
committerGitHub <noreply@github.com>2020-09-07 23:05:08 +0800
commit91e7ad569ac9590b521e5c4fdfb2162f528db49f (patch)
tree675316438ca6d716b87849dcf23f5ecd772176d5
parenta722dd72db7fb6efd9ec0bf730d6b2364ef6337c (diff)
downloadgitea-91e7ad569ac9590b521e5c4fdfb2162f528db49f.tar.gz
gitea-91e7ad569ac9590b521e5c4fdfb2162f528db49f.zip
Add queue for code indexer (#10332)
* Add queue for code indexer * Fix lint * Fix test * Fix lint * Fix bug * Fix bug * Fix lint * Add noqueue * Fix tests * Rename noqueue to immediate
-rw-r--r--integrations/mssql.ini.tmpl3
-rw-r--r--integrations/mysql.ini.tmpl3
-rw-r--r--integrations/mysql8.ini.tmpl3
-rw-r--r--integrations/pgsql.ini.tmpl3
-rw-r--r--integrations/repo_search_test.go13
-rw-r--r--integrations/sqlite.ini.tmpl3
-rw-r--r--modules/indexer/code/elastic_search.go5
-rw-r--r--modules/indexer/code/indexer.go158
-rw-r--r--modules/indexer/code/queue.go154
-rw-r--r--modules/queue/queue.go59
10 files changed, 233 insertions, 171 deletions
diff --git a/integrations/mssql.ini.tmpl b/integrations/mssql.ini.tmpl
index a8fbbe7fe5..cfb3594126 100644
--- a/integrations/mssql.ini.tmpl
+++ b/integrations/mssql.ini.tmpl
@@ -14,6 +14,9 @@ ISSUE_INDEXER_PATH = integrations/indexers-mssql/issues.bleve
REPO_INDEXER_ENABLED = true
REPO_INDEXER_PATH = integrations/indexers-mssql/repos.bleve
+[queue.code_indexer]
+TYPE = immediate
+
[repository]
ROOT = {{REPO_TEST_DIR}}integrations/gitea-integration-mssql/gitea-repositories
diff --git a/integrations/mysql.ini.tmpl b/integrations/mysql.ini.tmpl
index 5691311660..8e3d2b3f14 100644
--- a/integrations/mysql.ini.tmpl
+++ b/integrations/mysql.ini.tmpl
@@ -16,6 +16,9 @@ ISSUE_INDEXER_PATH = integrations/indexers-mysql/issues.bleve
REPO_INDEXER_ENABLED = true
REPO_INDEXER_PATH = integrations/indexers-mysql/repos.bleve
+[queue.code_indexer]
+TYPE = immediate
+
[repository]
ROOT = {{REPO_TEST_DIR}}integrations/gitea-integration-mysql/gitea-repositories
diff --git a/integrations/mysql8.ini.tmpl b/integrations/mysql8.ini.tmpl
index a135ecb981..ca77babf4b 100644
--- a/integrations/mysql8.ini.tmpl
+++ b/integrations/mysql8.ini.tmpl
@@ -14,6 +14,9 @@ ISSUE_INDEXER_PATH = integrations/indexers-mysql8/issues.bleve
REPO_INDEXER_ENABLED = true
REPO_INDEXER_PATH = integrations/indexers-mysql8/repos.bleve
+[queue.code_indexer]
+TYPE = immediate
+
[repository]
ROOT = {{REPO_TEST_DIR}}integrations/gitea-integration-mysql8/gitea-repositories
diff --git a/integrations/pgsql.ini.tmpl b/integrations/pgsql.ini.tmpl
index 4cac2585fb..802296cf63 100644
--- a/integrations/pgsql.ini.tmpl
+++ b/integrations/pgsql.ini.tmpl
@@ -15,6 +15,9 @@ ISSUE_INDEXER_PATH = integrations/indexers-pgsql/issues.bleve
REPO_INDEXER_ENABLED = true
REPO_INDEXER_PATH = integrations/indexers-pgsql/repos.bleve
+[queue.code_indexer]
+TYPE = immediate
+
[repository]
ROOT = {{REPO_TEST_DIR}}integrations/gitea-integration-pgsql/gitea-repositories
diff --git a/integrations/repo_search_test.go b/integrations/repo_search_test.go
index 701013735c..6f2ee37460 100644
--- a/integrations/repo_search_test.go
+++ b/integrations/repo_search_test.go
@@ -7,7 +7,6 @@ package integrations
import (
"net/http"
"testing"
- "time"
"code.gitea.io/gitea/models"
code_indexer "code.gitea.io/gitea/modules/indexer/code"
@@ -62,14 +61,6 @@ func testSearch(t *testing.T, url string, expected []string) {
assert.EqualValues(t, expected, filenames)
}
-func executeIndexer(t *testing.T, repo *models.Repository, op func(*models.Repository, ...chan<- error)) {
- waiter := make(chan error, 1)
- op(repo, waiter)
-
- select {
- case err := <-waiter:
- assert.NoError(t, err)
- case <-time.After(1 * time.Minute):
- assert.Fail(t, "Repository indexer took too long")
- }
+func executeIndexer(t *testing.T, repo *models.Repository, op func(*models.Repository)) {
+ op(repo)
}
diff --git a/integrations/sqlite.ini.tmpl b/integrations/sqlite.ini.tmpl
index e899328c81..5d54c5f9fa 100644
--- a/integrations/sqlite.ini.tmpl
+++ b/integrations/sqlite.ini.tmpl
@@ -10,6 +10,9 @@ ISSUE_INDEXER_PATH = integrations/indexers-sqlite/issues.bleve
REPO_INDEXER_ENABLED = true
REPO_INDEXER_PATH = integrations/indexers-sqlite/repos.bleve
+[queue.code_indexer]
+TYPE = immediate
+
[repository]
ROOT = {{REPO_TEST_DIR}}integrations/gitea-integration-sqlite/gitea-repositories
diff --git a/modules/indexer/code/elastic_search.go b/modules/indexer/code/elastic_search.go
index 4f690ed806..db36c5e0c4 100644
--- a/modules/indexer/code/elastic_search.go
+++ b/modules/indexer/code/elastic_search.go
@@ -168,6 +168,11 @@ func (b *ElasticSearchIndexer) init() (bool, error) {
}
func (b *ElasticSearchIndexer) addUpdate(sha string, update fileUpdate, repo *models.Repository) ([]elastic.BulkableRequest, error) {
+ // Ignore vendored files in code search
+ if setting.Indexer.ExcludeVendored && enry.IsVendor(update.Filename) {
+ return nil, nil
+ }
+
stdout, err := git.NewCommand("cat-file", "-s", update.BlobSha).
RunInDir(repo.RepoPath())
if err != nil {
diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go
index 468955cd89..5456373398 100644
--- a/modules/indexer/code/indexer.go
+++ b/modules/indexer/code/indexer.go
@@ -14,6 +14,7 @@ import (
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/timeutil"
)
@@ -38,7 +39,7 @@ type SearchResultLanguages struct {
Count int
}
-// Indexer defines an interface to indexer issues contents
+// Indexer defines an interface to index and search code contents
type Indexer interface {
Index(repo *models.Repository, sha string, changes *repoChanges) error
Delete(repoID int64) error
@@ -67,6 +68,40 @@ func filenameOfIndexerID(indexerID string) string {
return indexerID[index+1:]
}
+// IndexerData represents data stored in the code indexer
+type IndexerData struct {
+ RepoID int64
+ IsDelete bool
+}
+
+var (
+ indexerQueue queue.Queue
+)
+
+func index(indexer Indexer, repoID int64) error {
+ repo, err := models.GetRepositoryByID(repoID)
+ if err != nil {
+ return err
+ }
+
+ sha, err := getDefaultBranchSha(repo)
+ if err != nil {
+ return err
+ }
+ changes, err := getRepoChanges(repo, sha)
+ if err != nil {
+ return err
+ } else if changes == nil {
+ return nil
+ }
+
+ if err := indexer.Index(repo, sha, changes); err != nil {
+ return err
+ }
+
+ return repo.UpdateIndexerStatus(models.RepoIndexerTypeCode, sha)
+}
+
// Init initialize the repo indexer
func Init() {
if !setting.Indexer.RepoIndexerEnabled {
@@ -74,8 +109,6 @@ func Init() {
return
}
- initQueue(setting.Indexer.UpdateQueueLength)
-
ctx, cancel := context.WithCancel(context.Background())
graceful.GetManager().RunAtTerminate(ctx, func() {
@@ -85,6 +118,46 @@ func Init() {
})
waitChannel := make(chan time.Duration)
+
+ // Create the Queue
+ switch setting.Indexer.RepoType {
+ case "bleve", "elasticsearch":
+ handler := func(data ...queue.Data) {
+ idx, err := indexer.get()
+ if idx == nil || err != nil {
+ log.Error("Codes indexer handler: unable to get indexer!")
+ return
+ }
+
+ for _, datum := range data {
+ indexerData, ok := datum.(*IndexerData)
+ if !ok {
+ log.Error("Unable to process provided datum: %v - not possible to cast to IndexerData", datum)
+ continue
+ }
+ log.Trace("IndexerData Process: %v %t", indexerData.RepoID, indexerData.IsDelete)
+
+ if indexerData.IsDelete {
+ if err := indexer.Delete(indexerData.RepoID); err != nil {
+ log.Error("indexer.Delete: %v", err)
+ }
+ } else {
+ if err := index(indexer, indexerData.RepoID); err != nil {
+ log.Error("index: %v", err)
+ continue
+ }
+ }
+ }
+ }
+
+ indexerQueue = queue.CreateQueue("code_indexer", handler, &IndexerData{})
+ if indexerQueue == nil {
+ log.Fatal("Unable to create codes indexer queue")
+ }
+ default:
+ log.Fatal("Unknown codes indexer type; %s", setting.Indexer.RepoType)
+ }
+
go func() {
start := time.Now()
var (
@@ -139,10 +212,11 @@ func Init() {
indexer.set(rIndexer)
- go processRepoIndexerOperationQueue(indexer)
+ // Start processing the queue
+ go graceful.GetManager().RunWithShutdownFns(indexerQueue.Run)
if populate {
- go populateRepoIndexer()
+ go graceful.GetManager().RunWithShutdownContext(populateRepoIndexer)
}
select {
case waitChannel <- time.Since(start):
@@ -179,3 +253,77 @@ func Init() {
}()
}
}
+
+// DeleteRepoFromIndexer remove all of a repository's entries from the indexer
+func DeleteRepoFromIndexer(repo *models.Repository) {
+ indexData := &IndexerData{RepoID: repo.ID, IsDelete: true}
+ if err := indexerQueue.Push(indexData); err != nil {
+ log.Error("Delete repo index data %v failed: %v", indexData, err)
+ }
+}
+
+// UpdateRepoIndexer update a repository's entries in the indexer
+func UpdateRepoIndexer(repo *models.Repository) {
+ indexData := &IndexerData{RepoID: repo.ID}
+ if err := indexerQueue.Push(indexData); err != nil {
+ log.Error("Update repo index data %v failed: %v", indexData, err)
+ }
+}
+
+// populateRepoIndexer populate the repo indexer with pre-existing data. This
+// should only be run when the indexer is created for the first time.
+func populateRepoIndexer(ctx context.Context) {
+ log.Info("Populating the repo indexer with existing repositories")
+
+ exist, err := models.IsTableNotEmpty("repository")
+ if err != nil {
+ log.Fatal("System error: %v", err)
+ } else if !exist {
+ return
+ }
+
+ // if there is any existing repo indexer metadata in the DB, delete it
+ // since we are starting afresh. Also, xorm requires deletes to have a
+ // condition, and we want to delete everything, thus 1=1.
+ if err := models.DeleteAllRecords("repo_indexer_status"); err != nil {
+ log.Fatal("System error: %v", err)
+ }
+
+ var maxRepoID int64
+ if maxRepoID, err = models.GetMaxID("repository"); err != nil {
+ log.Fatal("System error: %v", err)
+ }
+
+ // start with the maximum existing repo ID and work backwards, so that we
+ // don't include repos that are created after gitea starts; such repos will
+ // already be added to the indexer, and we don't need to add them again.
+ for maxRepoID > 0 {
+ select {
+ case <-ctx.Done():
+ log.Info("Repository Indexer population shutdown before completion")
+ return
+ default:
+ }
+ ids, err := models.GetUnindexedRepos(models.RepoIndexerTypeCode, maxRepoID, 0, 50)
+ if err != nil {
+ log.Error("populateRepoIndexer: %v", err)
+ return
+ } else if len(ids) == 0 {
+ break
+ }
+ for _, id := range ids {
+ select {
+ case <-ctx.Done():
+ log.Info("Repository Indexer population shutdown before completion")
+ return
+ default:
+ }
+ if err := indexerQueue.Push(&IndexerData{RepoID: id}); err != nil {
+ log.Error("indexerQueue.Push: %v", err)
+ return
+ }
+ maxRepoID = id - 1
+ }
+ }
+ log.Info("Done (re)populating the repo indexer with existing repositories")
+}
diff --git a/modules/indexer/code/queue.go b/modules/indexer/code/queue.go
deleted file mode 100644
index 844003e1fc..0000000000
--- a/modules/indexer/code/queue.go
+++ /dev/null
@@ -1,154 +0,0 @@
-// Copyright 2019 The Gitea Authors. All rights reserved.
-// Use of this source code is governed by a MIT-style
-// license that can be found in the LICENSE file.
-
-package code
-
-import (
- "os"
-
- "code.gitea.io/gitea/models"
- "code.gitea.io/gitea/modules/graceful"
- "code.gitea.io/gitea/modules/log"
-)
-
-type repoIndexerOperation struct {
- repoID int64
- deleted bool
- watchers []chan<- error
-}
-
-var repoIndexerOperationQueue chan repoIndexerOperation
-
-func initQueue(queueLength int) {
- repoIndexerOperationQueue = make(chan repoIndexerOperation, queueLength)
-}
-
-func index(indexer Indexer, repoID int64) error {
- repo, err := models.GetRepositoryByID(repoID)
- if err != nil {
- return err
- }
-
- sha, err := getDefaultBranchSha(repo)
- if err != nil {
- return err
- }
- changes, err := getRepoChanges(repo, sha)
- if err != nil {
- return err
- } else if changes == nil {
- return nil
- }
-
- if err := indexer.Index(repo, sha, changes); err != nil {
- return err
- }
-
- return repo.UpdateIndexerStatus(models.RepoIndexerTypeCode, sha)
-}
-
-func processRepoIndexerOperationQueue(indexer Indexer) {
- for {
- select {
- case op := <-repoIndexerOperationQueue:
- var err error
- if op.deleted {
- if err = indexer.Delete(op.repoID); err != nil {
- log.Error("indexer.Delete: %v", err)
- }
- } else {
- if err = index(indexer, op.repoID); err != nil {
- log.Error("indexer.Index: %v", err)
- }
- }
- for _, watcher := range op.watchers {
- watcher <- err
- }
- case <-graceful.GetManager().IsShutdown():
- log.Info("PID: %d Repository indexer queue processing stopped", os.Getpid())
- return
- }
- }
-}
-
-// DeleteRepoFromIndexer remove all of a repository's entries from the indexer
-func DeleteRepoFromIndexer(repo *models.Repository, watchers ...chan<- error) {
- addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: true, watchers: watchers})
-}
-
-// UpdateRepoIndexer update a repository's entries in the indexer
-func UpdateRepoIndexer(repo *models.Repository, watchers ...chan<- error) {
- addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: false, watchers: watchers})
-}
-
-func addOperationToQueue(op repoIndexerOperation) {
- select {
- case repoIndexerOperationQueue <- op:
- break
- default:
- go func() {
- repoIndexerOperationQueue <- op
- }()
- }
-}
-
-// populateRepoIndexer populate the repo indexer with pre-existing data. This
-// should only be run when the indexer is created for the first time.
-func populateRepoIndexer() {
- log.Info("Populating the repo indexer with existing repositories")
-
- isShutdown := graceful.GetManager().IsShutdown()
-
- exist, err := models.IsTableNotEmpty("repository")
- if err != nil {
- log.Fatal("System error: %v", err)
- } else if !exist {
- return
- }
-
- // if there is any existing repo indexer metadata in the DB, delete it
- // since we are starting afresh. Also, xorm requires deletes to have a
- // condition, and we want to delete everything, thus 1=1.
- if err := models.DeleteAllRecords("repo_indexer_status"); err != nil {
- log.Fatal("System error: %v", err)
- }
-
- var maxRepoID int64
- if maxRepoID, err = models.GetMaxID("repository"); err != nil {
- log.Fatal("System error: %v", err)
- }
-
- // start with the maximum existing repo ID and work backwards, so that we
- // don't include repos that are created after gitea starts; such repos will
- // already be added to the indexer, and we don't need to add them again.
- for maxRepoID > 0 {
- select {
- case <-isShutdown:
- log.Info("Repository Indexer population shutdown before completion")
- return
- default:
- }
- ids, err := models.GetUnindexedRepos(models.RepoIndexerTypeCode, maxRepoID, 0, 50)
- if err != nil {
- log.Error("populateRepoIndexer: %v", err)
- return
- } else if len(ids) == 0 {
- break
- }
- for _, id := range ids {
- select {
- case <-isShutdown:
- log.Info("Repository Indexer population shutdown before completion")
- return
- default:
- }
- repoIndexerOperationQueue <- repoIndexerOperation{
- repoID: id,
- deleted: false,
- }
- maxRepoID = id - 1
- }
- }
- log.Info("Done (re)populating the repo indexer with existing repositories")
-}
diff --git a/modules/queue/queue.go b/modules/queue/queue.go
index e3c63310be..d08cba35a1 100644
--- a/modules/queue/queue.go
+++ b/modules/queue/queue.go
@@ -106,7 +106,64 @@ func (*DummyQueue) IsEmpty() bool {
return true
}
-var queuesMap = map[Type]NewQueueFunc{DummyQueueType: NewDummyQueue}
+// ImmediateType is the type to execute the function when push
+const ImmediateType Type = "immediate"
+
+// NewImmediate creates a new false queue to execute the function when push
+func NewImmediate(handler HandlerFunc, opts, exemplar interface{}) (Queue, error) {
+ return &Immediate{
+ handler: handler,
+ }, nil
+}
+
+// Immediate represents an direct execution queue
+type Immediate struct {
+ handler HandlerFunc
+}
+
+// Run does nothing
+func (*Immediate) Run(_, _ func(context.Context, func())) {}
+
+// Push fakes a push of data to the queue
+func (q *Immediate) Push(data Data) error {
+ return q.PushFunc(data, nil)
+}
+
+// PushFunc fakes a push of data to the queue with a function. The function is never run.
+func (q *Immediate) PushFunc(data Data, f func() error) error {
+ if f != nil {
+ if err := f(); err != nil {
+ return err
+ }
+ }
+ q.handler(data)
+ return nil
+}
+
+// Has always returns false as this queue never does anything
+func (*Immediate) Has(Data) (bool, error) {
+ return false, nil
+}
+
+// Flush always returns nil
+func (*Immediate) Flush(time.Duration) error {
+ return nil
+}
+
+// FlushWithContext always returns nil
+func (*Immediate) FlushWithContext(context.Context) error {
+ return nil
+}
+
+// IsEmpty asserts that the queue is empty
+func (*Immediate) IsEmpty() bool {
+ return true
+}
+
+var queuesMap = map[Type]NewQueueFunc{
+ DummyQueueType: NewDummyQueue,
+ ImmediateType: NewImmediate,
+}
// RegisteredTypes provides the list of requested types of queues
func RegisteredTypes() []Type {