diff options
Diffstat (limited to 'modules/indexer/issues/indexer.go')
-rw-r--r-- | modules/indexer/issues/indexer.go | 71 |
1 files changed, 32 insertions, 39 deletions
diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index 76ff80ffca..f36ea10935 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -102,7 +102,7 @@ var ( func InitIssueIndexer(syncReindex bool) { ctx, _, finished := process.GetManager().AddTypedContext(context.Background(), "Service: IssueIndexer", process.SystemProcessType, false) - waitChannel := make(chan time.Duration, 1) + indexerInitWaitChannel := make(chan time.Duration, 1) // Create the Queue switch setting.Indexer.IssueType { @@ -110,7 +110,7 @@ func InitIssueIndexer(syncReindex bool) { handler := func(items ...*IndexerData) (unhandled []*IndexerData) { indexer := holder.get() if indexer == nil { - log.Error("Issue indexer handler: unable to get indexer.") + log.Warn("Issue indexer handler: indexer is not ready, retry later.") return items } toIndex := make([]*IndexerData, 0, len(items)) @@ -138,15 +138,17 @@ func InitIssueIndexer(syncReindex bool) { return unhandled } - issueIndexerQueue = queue.CreateSimpleQueue("issue_indexer", handler) + issueIndexerQueue = queue.CreateSimpleQueue(ctx, "issue_indexer", handler) if issueIndexerQueue == nil { log.Fatal("Unable to create issue indexer queue") } default: - issueIndexerQueue = queue.CreateSimpleQueue[*IndexerData]("issue_indexer", nil) + issueIndexerQueue = queue.CreateSimpleQueue[*IndexerData](ctx, "issue_indexer", nil) } + graceful.GetManager().RunAtTerminate(finished) + // Create the Indexer go func() { pprof.SetGoroutineLabels(ctx) @@ -178,51 +180,41 @@ func InitIssueIndexer(syncReindex bool) { if issueIndexer != nil { issueIndexer.Close() } - finished() log.Info("PID: %d Issue Indexer closed", os.Getpid()) }) log.Debug("Created Bleve Indexer") case "elasticsearch": - graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(func())) { - pprof.SetGoroutineLabels(ctx) - issueIndexer, err := NewElasticSearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueIndexerName) - if err != nil { - log.Fatal("Unable to initialize Elastic Search Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err) - } - exist, err := issueIndexer.Init() - if err != nil { - log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err) - } - populate = !exist - holder.set(issueIndexer) - atTerminate(finished) - }) + issueIndexer, err := NewElasticSearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueIndexerName) + if err != nil { + log.Fatal("Unable to initialize Elastic Search Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err) + } + exist, err := issueIndexer.Init() + if err != nil { + log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err) + } + populate = !exist + holder.set(issueIndexer) case "db": issueIndexer := &DBIndexer{} holder.set(issueIndexer) - graceful.GetManager().RunAtTerminate(finished) case "meilisearch": - graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(func())) { - pprof.SetGoroutineLabels(ctx) - issueIndexer, err := NewMeilisearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueConnAuth, setting.Indexer.IssueIndexerName) - if err != nil { - log.Fatal("Unable to initialize Meilisearch Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err) - } - exist, err := issueIndexer.Init() - if err != nil { - log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err) - } - populate = !exist - holder.set(issueIndexer) - atTerminate(finished) - }) + issueIndexer, err := NewMeilisearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueConnAuth, setting.Indexer.IssueIndexerName) + if err != nil { + log.Fatal("Unable to initialize Meilisearch Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err) + } + exist, err := issueIndexer.Init() + if err != nil { + log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err) + } + populate = !exist + holder.set(issueIndexer) default: holder.cancel() log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType) } // Start processing the queue - go graceful.GetManager().RunWithShutdownFns(issueIndexerQueue.Run) + go graceful.GetManager().RunWithCancel(issueIndexerQueue) // Populate the index if populate { @@ -232,13 +224,14 @@ func InitIssueIndexer(syncReindex bool) { go graceful.GetManager().RunWithShutdownContext(populateIssueIndexer) } } - waitChannel <- time.Since(start) - close(waitChannel) + + indexerInitWaitChannel <- time.Since(start) + close(indexerInitWaitChannel) }() if syncReindex { select { - case <-waitChannel: + case <-indexerInitWaitChannel: case <-graceful.GetManager().IsShutdown(): } } else if setting.Indexer.StartupTimeout > 0 { @@ -249,7 +242,7 @@ func InitIssueIndexer(syncReindex bool) { timeout += setting.GracefulHammerTime } select { - case duration := <-waitChannel: + case duration := <-indexerInitWaitChannel: log.Info("Issue Indexer Initialization took %v", duration) case <-graceful.GetManager().IsShutdown(): log.Warn("Shutdown occurred before issue index initialisation was complete") |