aboutsummaryrefslogtreecommitdiffstats
path: root/modules/indexer/code/indexer.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/indexer/code/indexer.go')
-rw-r--r--modules/indexer/code/indexer.go145
1 files changed, 47 insertions, 98 deletions
diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go
index f38fd6000c..13d06874c9 100644
--- a/modules/indexer/code/indexer.go
+++ b/modules/indexer/code/indexer.go
@@ -7,86 +7,41 @@ import (
"context"
"os"
"runtime/pprof"
- "strconv"
- "strings"
+ "sync/atomic"
"time"
"code.gitea.io/gitea/models/db"
repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/modules/graceful"
+ "code.gitea.io/gitea/modules/indexer/code/bleve"
+ "code.gitea.io/gitea/modules/indexer/code/elasticsearch"
+ "code.gitea.io/gitea/modules/indexer/code/internal"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting"
- "code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/modules/util"
)
-// SearchResult result of performing a search in a repo
-type SearchResult struct {
- RepoID int64
- StartIndex int
- EndIndex int
- Filename string
- Content string
- CommitID string
- UpdatedUnix timeutil.TimeStamp
- Language string
- Color string
-}
-
-// SearchResultLanguages result of top languages count in search results
-type SearchResultLanguages struct {
- Language string
- Color string
- Count int
-}
-
-// Indexer defines an interface to index and search code contents
-type Indexer interface {
- Ping() bool
- Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *repoChanges) error
- Delete(repoID int64) error
- Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error)
- Close()
-}
-
-func filenameIndexerID(repoID int64, filename string) string {
- return indexerID(repoID) + "_" + filename
-}
-
-func indexerID(id int64) string {
- return strconv.FormatInt(id, 36)
-}
-
-func parseIndexerID(indexerID string) (int64, string) {
- index := strings.IndexByte(indexerID, '_')
- if index == -1 {
- log.Error("Unexpected ID in repo indexer: %s", indexerID)
- }
- repoID, _ := strconv.ParseInt(indexerID[:index], 36, 64)
- return repoID, indexerID[index+1:]
-}
-
-func filenameOfIndexerID(indexerID string) string {
- index := strings.IndexByte(indexerID, '_')
- if index == -1 {
- log.Error("Unexpected ID in repo indexer: %s", indexerID)
- }
- return indexerID[index+1:]
-}
+var (
+ indexerQueue *queue.WorkerPoolQueue[*internal.IndexerData]
+ // globalIndexer is the global indexer, it cannot be nil.
+ // When the real indexer is not ready, it will be a dummy indexer which will return error to explain it's not ready.
+ // So it's always safe use it as *globalIndexer.Load() and call its methods.
+ globalIndexer atomic.Pointer[internal.Indexer]
+ dummyIndexer *internal.Indexer
+)
-// IndexerData represents data stored in the code indexer
-type IndexerData struct {
- RepoID int64
+func init() {
+ i := internal.NewDummyIndexer()
+ dummyIndexer = &i
+ globalIndexer.Store(dummyIndexer)
}
-var indexerQueue *queue.WorkerPoolQueue[*IndexerData]
-
-func index(ctx context.Context, indexer Indexer, repoID int64) error {
+func index(ctx context.Context, indexer internal.Indexer, repoID int64) error {
repo, err := repo_model.GetRepositoryByID(ctx, repoID)
if repo_model.IsErrRepoNotExist(err) {
- return indexer.Delete(repoID)
+ return indexer.Delete(ctx, repoID)
}
if err != nil {
return err
@@ -139,7 +94,7 @@ func index(ctx context.Context, indexer Indexer, repoID int64) error {
// Init initialize the repo indexer
func Init() {
if !setting.Indexer.RepoIndexerEnabled {
- indexer.Close()
+ (*globalIndexer.Load()).Close()
return
}
@@ -153,7 +108,7 @@ func Init() {
}
cancel()
log.Debug("Closing repository indexer")
- indexer.Close()
+ (*globalIndexer.Load()).Close()
log.Info("PID: %d Repository Indexer closed", os.Getpid())
finished()
})
@@ -163,13 +118,8 @@ func Init() {
// Create the Queue
switch setting.Indexer.RepoType {
case "bleve", "elasticsearch":
- handler := func(items ...*IndexerData) (unhandled []*IndexerData) {
- idx, err := indexer.get()
- if idx == nil || err != nil {
- log.Warn("Codes indexer handler: indexer is not ready, retry later.")
- return items
- }
-
+ handler := func(items ...*internal.IndexerData) (unhandled []*internal.IndexerData) {
+ indexer := *globalIndexer.Load()
for _, indexerData := range items {
log.Trace("IndexerData Process Repo: %d", indexerData.RepoID)
@@ -188,11 +138,7 @@ func Init() {
code.gitea.io/gitea/modules/indexer/code.index(indexer.go:105)
*/
if err := index(ctx, indexer, indexerData.RepoID); err != nil {
- if !idx.Ping() {
- log.Error("Code indexer handler: indexer is unavailable.")
- unhandled = append(unhandled, indexerData)
- continue
- }
+ unhandled = append(unhandled, indexerData)
if !setting.IsInTesting {
log.Error("Codes indexer handler: index error for repo %v: %v", indexerData.RepoID, err)
}
@@ -213,8 +159,8 @@ func Init() {
pprof.SetGoroutineLabels(ctx)
start := time.Now()
var (
- rIndexer Indexer
- populate bool
+ rIndexer internal.Indexer
+ existed bool
err error
)
switch setting.Indexer.RepoType {
@@ -228,10 +174,11 @@ func Init() {
}
}()
- rIndexer, populate, err = NewBleveIndexer(setting.Indexer.RepoPath)
+ rIndexer = bleve.NewIndexer(setting.Indexer.RepoPath)
+ existed, err = rIndexer.Init(ctx)
if err != nil {
cancel()
- indexer.Close()
+ (*globalIndexer.Load()).Close()
close(waitChannel)
log.Fatal("PID: %d Unable to initialize the bleve Repository Indexer at path: %s Error: %v", os.Getpid(), setting.Indexer.RepoPath, err)
}
@@ -245,23 +192,31 @@ func Init() {
}
}()
- rIndexer, populate, err = NewElasticSearchIndexer(setting.Indexer.RepoConnStr, setting.Indexer.RepoIndexerName)
+ rIndexer = elasticsearch.NewIndexer(setting.Indexer.RepoConnStr, setting.Indexer.RepoIndexerName)
+ if err != nil {
+ cancel()
+ (*globalIndexer.Load()).Close()
+ close(waitChannel)
+ log.Fatal("PID: %d Unable to create the elasticsearch Repository Indexer connstr: %s Error: %v", os.Getpid(), setting.Indexer.RepoConnStr, err)
+ }
+ existed, err = rIndexer.Init(ctx)
if err != nil {
cancel()
- indexer.Close()
+ (*globalIndexer.Load()).Close()
close(waitChannel)
log.Fatal("PID: %d Unable to initialize the elasticsearch Repository Indexer connstr: %s Error: %v", os.Getpid(), setting.Indexer.RepoConnStr, err)
}
+
default:
log.Fatal("PID: %d Unknown Indexer type: %s", os.Getpid(), setting.Indexer.RepoType)
}
- indexer.set(rIndexer)
+ globalIndexer.Store(&rIndexer)
// Start processing the queue
go graceful.GetManager().RunWithCancel(indexerQueue)
- if populate {
+ if !existed { // populate the index because it's created for the first time
go graceful.GetManager().RunWithShutdownContext(populateRepoIndexer)
}
select {
@@ -283,18 +238,18 @@ func Init() {
case <-graceful.GetManager().IsShutdown():
log.Warn("Shutdown before Repository Indexer completed initialization")
cancel()
- indexer.Close()
+ (*globalIndexer.Load()).Close()
case duration, ok := <-waitChannel:
if !ok {
log.Warn("Repository Indexer Initialization failed")
cancel()
- indexer.Close()
+ (*globalIndexer.Load()).Close()
return
}
log.Info("Repository Indexer Initialization took %v", duration)
case <-time.After(timeout):
cancel()
- indexer.Close()
+ (*globalIndexer.Load()).Close()
log.Fatal("Repository Indexer Initialization Timed-Out after: %v", timeout)
}
}()
@@ -303,21 +258,15 @@ func Init() {
// UpdateRepoIndexer update a repository's entries in the indexer
func UpdateRepoIndexer(repo *repo_model.Repository) {
- indexData := &IndexerData{RepoID: repo.ID}
+ indexData := &internal.IndexerData{RepoID: repo.ID}
if err := indexerQueue.Push(indexData); err != nil {
log.Error("Update repo index data %v failed: %v", indexData, err)
}
}
// IsAvailable checks if issue indexer is available
-func IsAvailable() bool {
- idx, err := indexer.get()
- if err != nil {
- log.Error("IsAvailable(): unable to get indexer: %v", err)
- return false
- }
-
- return idx.Ping()
+func IsAvailable(ctx context.Context) bool {
+ return (*globalIndexer.Load()).Ping(ctx) == nil
}
// populateRepoIndexer populate the repo indexer with pre-existing data. This
@@ -368,7 +317,7 @@ func populateRepoIndexer(ctx context.Context) {
return
default:
}
- if err := indexerQueue.Push(&IndexerData{RepoID: id}); err != nil {
+ if err := indexerQueue.Push(&internal.IndexerData{RepoID: id}); err != nil {
log.Error("indexerQueue.Push: %v", err)
return
}