aboutsummaryrefslogtreecommitdiffstats
path: root/modules/indexer/issues/indexer.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/indexer/issues/indexer.go')
-rw-r--r--modules/indexer/issues/indexer.go71
1 files changed, 32 insertions, 39 deletions
diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go
index 76ff80ffca..f36ea10935 100644
--- a/modules/indexer/issues/indexer.go
+++ b/modules/indexer/issues/indexer.go
@@ -102,7 +102,7 @@ var (
func InitIssueIndexer(syncReindex bool) {
ctx, _, finished := process.GetManager().AddTypedContext(context.Background(), "Service: IssueIndexer", process.SystemProcessType, false)
- waitChannel := make(chan time.Duration, 1)
+ indexerInitWaitChannel := make(chan time.Duration, 1)
// Create the Queue
switch setting.Indexer.IssueType {
@@ -110,7 +110,7 @@ func InitIssueIndexer(syncReindex bool) {
handler := func(items ...*IndexerData) (unhandled []*IndexerData) {
indexer := holder.get()
if indexer == nil {
- log.Error("Issue indexer handler: unable to get indexer.")
+ log.Warn("Issue indexer handler: indexer is not ready, retry later.")
return items
}
toIndex := make([]*IndexerData, 0, len(items))
@@ -138,15 +138,17 @@ func InitIssueIndexer(syncReindex bool) {
return unhandled
}
- issueIndexerQueue = queue.CreateSimpleQueue("issue_indexer", handler)
+ issueIndexerQueue = queue.CreateSimpleQueue(ctx, "issue_indexer", handler)
if issueIndexerQueue == nil {
log.Fatal("Unable to create issue indexer queue")
}
default:
- issueIndexerQueue = queue.CreateSimpleQueue[*IndexerData]("issue_indexer", nil)
+ issueIndexerQueue = queue.CreateSimpleQueue[*IndexerData](ctx, "issue_indexer", nil)
}
+ graceful.GetManager().RunAtTerminate(finished)
+
// Create the Indexer
go func() {
pprof.SetGoroutineLabels(ctx)
@@ -178,51 +180,41 @@ func InitIssueIndexer(syncReindex bool) {
if issueIndexer != nil {
issueIndexer.Close()
}
- finished()
log.Info("PID: %d Issue Indexer closed", os.Getpid())
})
log.Debug("Created Bleve Indexer")
case "elasticsearch":
- graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(func())) {
- pprof.SetGoroutineLabels(ctx)
- issueIndexer, err := NewElasticSearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueIndexerName)
- if err != nil {
- log.Fatal("Unable to initialize Elastic Search Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err)
- }
- exist, err := issueIndexer.Init()
- if err != nil {
- log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err)
- }
- populate = !exist
- holder.set(issueIndexer)
- atTerminate(finished)
- })
+ issueIndexer, err := NewElasticSearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueIndexerName)
+ if err != nil {
+ log.Fatal("Unable to initialize Elastic Search Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err)
+ }
+ exist, err := issueIndexer.Init()
+ if err != nil {
+ log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err)
+ }
+ populate = !exist
+ holder.set(issueIndexer)
case "db":
issueIndexer := &DBIndexer{}
holder.set(issueIndexer)
- graceful.GetManager().RunAtTerminate(finished)
case "meilisearch":
- graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(func())) {
- pprof.SetGoroutineLabels(ctx)
- issueIndexer, err := NewMeilisearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueConnAuth, setting.Indexer.IssueIndexerName)
- if err != nil {
- log.Fatal("Unable to initialize Meilisearch Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err)
- }
- exist, err := issueIndexer.Init()
- if err != nil {
- log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err)
- }
- populate = !exist
- holder.set(issueIndexer)
- atTerminate(finished)
- })
+ issueIndexer, err := NewMeilisearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueConnAuth, setting.Indexer.IssueIndexerName)
+ if err != nil {
+ log.Fatal("Unable to initialize Meilisearch Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err)
+ }
+ exist, err := issueIndexer.Init()
+ if err != nil {
+ log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err)
+ }
+ populate = !exist
+ holder.set(issueIndexer)
default:
holder.cancel()
log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType)
}
// Start processing the queue
- go graceful.GetManager().RunWithShutdownFns(issueIndexerQueue.Run)
+ go graceful.GetManager().RunWithCancel(issueIndexerQueue)
// Populate the index
if populate {
@@ -232,13 +224,14 @@ func InitIssueIndexer(syncReindex bool) {
go graceful.GetManager().RunWithShutdownContext(populateIssueIndexer)
}
}
- waitChannel <- time.Since(start)
- close(waitChannel)
+
+ indexerInitWaitChannel <- time.Since(start)
+ close(indexerInitWaitChannel)
}()
if syncReindex {
select {
- case <-waitChannel:
+ case <-indexerInitWaitChannel:
case <-graceful.GetManager().IsShutdown():
}
} else if setting.Indexer.StartupTimeout > 0 {
@@ -249,7 +242,7 @@ func InitIssueIndexer(syncReindex bool) {
timeout += setting.GracefulHammerTime
}
select {
- case duration := <-waitChannel:
+ case duration := <-indexerInitWaitChannel:
log.Info("Issue Indexer Initialization took %v", duration)
case <-graceful.GetManager().IsShutdown():
log.Warn("Shutdown occurred before issue index initialisation was complete")