aboutsummaryrefslogtreecommitdiffstats
path: root/modules/indexer/code/indexer.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/indexer/code/indexer.go')
-rw-r--r--modules/indexer/code/indexer.go158
1 files changed, 153 insertions, 5 deletions
diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go
index 468955cd89..5456373398 100644
--- a/modules/indexer/code/indexer.go
+++ b/modules/indexer/code/indexer.go
@@ -14,6 +14,7 @@ import (
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/timeutil"
)
@@ -38,7 +39,7 @@ type SearchResultLanguages struct {
Count int
}
-// Indexer defines an interface to indexer issues contents
+// Indexer defines an interface to index and search code contents
type Indexer interface {
Index(repo *models.Repository, sha string, changes *repoChanges) error
Delete(repoID int64) error
@@ -67,6 +68,40 @@ func filenameOfIndexerID(indexerID string) string {
return indexerID[index+1:]
}
+// IndexerData represents data stored in the code indexer
+type IndexerData struct {
+ RepoID int64
+ IsDelete bool
+}
+
+var (
+ indexerQueue queue.Queue
+)
+
+func index(indexer Indexer, repoID int64) error {
+ repo, err := models.GetRepositoryByID(repoID)
+ if err != nil {
+ return err
+ }
+
+ sha, err := getDefaultBranchSha(repo)
+ if err != nil {
+ return err
+ }
+ changes, err := getRepoChanges(repo, sha)
+ if err != nil {
+ return err
+ } else if changes == nil {
+ return nil
+ }
+
+ if err := indexer.Index(repo, sha, changes); err != nil {
+ return err
+ }
+
+ return repo.UpdateIndexerStatus(models.RepoIndexerTypeCode, sha)
+}
+
// Init initialize the repo indexer
func Init() {
if !setting.Indexer.RepoIndexerEnabled {
@@ -74,8 +109,6 @@ func Init() {
return
}
- initQueue(setting.Indexer.UpdateQueueLength)
-
ctx, cancel := context.WithCancel(context.Background())
graceful.GetManager().RunAtTerminate(ctx, func() {
@@ -85,6 +118,46 @@ func Init() {
})
waitChannel := make(chan time.Duration)
+
+ // Create the Queue
+ switch setting.Indexer.RepoType {
+ case "bleve", "elasticsearch":
+ handler := func(data ...queue.Data) {
+ idx, err := indexer.get()
+ if idx == nil || err != nil {
+ log.Error("Codes indexer handler: unable to get indexer!")
+ return
+ }
+
+ for _, datum := range data {
+ indexerData, ok := datum.(*IndexerData)
+ if !ok {
+ log.Error("Unable to process provided datum: %v - not possible to cast to IndexerData", datum)
+ continue
+ }
+ log.Trace("IndexerData Process: %v %t", indexerData.RepoID, indexerData.IsDelete)
+
+ if indexerData.IsDelete {
+ if err := indexer.Delete(indexerData.RepoID); err != nil {
+ log.Error("indexer.Delete: %v", err)
+ }
+ } else {
+ if err := index(indexer, indexerData.RepoID); err != nil {
+ log.Error("index: %v", err)
+ continue
+ }
+ }
+ }
+ }
+
+ indexerQueue = queue.CreateQueue("code_indexer", handler, &IndexerData{})
+ if indexerQueue == nil {
+ log.Fatal("Unable to create codes indexer queue")
+ }
+ default:
+ log.Fatal("Unknown codes indexer type; %s", setting.Indexer.RepoType)
+ }
+
go func() {
start := time.Now()
var (
@@ -139,10 +212,11 @@ func Init() {
indexer.set(rIndexer)
- go processRepoIndexerOperationQueue(indexer)
+ // Start processing the queue
+ go graceful.GetManager().RunWithShutdownFns(indexerQueue.Run)
if populate {
- go populateRepoIndexer()
+ go graceful.GetManager().RunWithShutdownContext(populateRepoIndexer)
}
select {
case waitChannel <- time.Since(start):
@@ -179,3 +253,77 @@ func Init() {
}()
}
}
+
+// DeleteRepoFromIndexer remove all of a repository's entries from the indexer
+func DeleteRepoFromIndexer(repo *models.Repository) {
+ indexData := &IndexerData{RepoID: repo.ID, IsDelete: true}
+ if err := indexerQueue.Push(indexData); err != nil {
+ log.Error("Delete repo index data %v failed: %v", indexData, err)
+ }
+}
+
+// UpdateRepoIndexer update a repository's entries in the indexer
+func UpdateRepoIndexer(repo *models.Repository) {
+ indexData := &IndexerData{RepoID: repo.ID}
+ if err := indexerQueue.Push(indexData); err != nil {
+ log.Error("Update repo index data %v failed: %v", indexData, err)
+ }
+}
+
+// populateRepoIndexer populate the repo indexer with pre-existing data. This
+// should only be run when the indexer is created for the first time.
+func populateRepoIndexer(ctx context.Context) {
+ log.Info("Populating the repo indexer with existing repositories")
+
+ exist, err := models.IsTableNotEmpty("repository")
+ if err != nil {
+ log.Fatal("System error: %v", err)
+ } else if !exist {
+ return
+ }
+
+ // if there is any existing repo indexer metadata in the DB, delete it
+ // since we are starting afresh. Also, xorm requires deletes to have a
+ // condition, and we want to delete everything, thus 1=1.
+ if err := models.DeleteAllRecords("repo_indexer_status"); err != nil {
+ log.Fatal("System error: %v", err)
+ }
+
+ var maxRepoID int64
+ if maxRepoID, err = models.GetMaxID("repository"); err != nil {
+ log.Fatal("System error: %v", err)
+ }
+
+ // start with the maximum existing repo ID and work backwards, so that we
+ // don't include repos that are created after gitea starts; such repos will
+ // already be added to the indexer, and we don't need to add them again.
+ for maxRepoID > 0 {
+ select {
+ case <-ctx.Done():
+ log.Info("Repository Indexer population shutdown before completion")
+ return
+ default:
+ }
+ ids, err := models.GetUnindexedRepos(models.RepoIndexerTypeCode, maxRepoID, 0, 50)
+ if err != nil {
+ log.Error("populateRepoIndexer: %v", err)
+ return
+ } else if len(ids) == 0 {
+ break
+ }
+ for _, id := range ids {
+ select {
+ case <-ctx.Done():
+ log.Info("Repository Indexer population shutdown before completion")
+ return
+ default:
+ }
+ if err := indexerQueue.Push(&IndexerData{RepoID: id}); err != nil {
+ log.Error("indexerQueue.Push: %v", err)
+ return
+ }
+ maxRepoID = id - 1
+ }
+ }
+ log.Info("Done (re)populating the repo indexer with existing repositories")
+}