From ba526ceffe33a54b6015cdfbdc9bba920484dc23 Mon Sep 17 00:00:00 2001 From: zeripath Date: Sat, 15 May 2021 15:22:26 +0100 Subject: Multiple Queue improvements: LevelDB Wait on empty, shutdown empty shadow level queue, reduce goroutines etc (#15693) * move shutdownfns, terminatefns and hammerfns out of separate goroutines Coalesce the shutdownfns etc into a list of functions that get run at shutdown rather then have them run at goroutines blocked on selects. This may help reduce the background select/poll load in certain configurations. * The LevelDB queues can actually wait on empty instead of polling Slight refactor to cause leveldb queues to wait on empty instead of polling. * Shutdown the shadow level queue once it is empty * Remove bytefifo additional goroutine for readToChan as it can just be run in run * Remove additional removeWorkers goroutine for workers * Simplify the AtShutdown and AtTerminate functions and add Channel Flusher * Add shutdown flusher to CUQ * move persistable channel shutdown stuff to Shutdown Fn * Ensure that UPCQ has the correct config * handle shutdown during the flushing * reduce risk of race between zeroBoost and addWorkers * prevent double shutdown Signed-off-by: Andrew Thornton --- modules/indexer/code/indexer.go | 8 +++++++- modules/indexer/issues/indexer.go | 4 ++-- 2 files changed, 9 insertions(+), 3 deletions(-) (limited to 'modules/indexer') diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go index a7d78e9fdc..67fa43eda8 100644 --- a/modules/indexer/code/indexer.go +++ b/modules/indexer/code/indexer.go @@ -115,7 +115,13 @@ func Init() { ctx, cancel := context.WithCancel(context.Background()) - graceful.GetManager().RunAtTerminate(ctx, func() { + graceful.GetManager().RunAtTerminate(func() { + select { + case <-ctx.Done(): + return + default: + } + cancel() log.Debug("Closing repository indexer") indexer.Close() log.Info("PID: %d Repository Indexer closed", os.Getpid()) diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index 9edaef6bdd..676b6686ea 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -160,7 +160,7 @@ func InitIssueIndexer(syncReindex bool) { } populate = !exist holder.set(issueIndexer) - graceful.GetManager().RunAtTerminate(context.Background(), func() { + graceful.GetManager().RunAtTerminate(func() { log.Debug("Closing issue indexer") issueIndexer := holder.get() if issueIndexer != nil { @@ -170,7 +170,7 @@ func InitIssueIndexer(syncReindex bool) { }) log.Debug("Created Bleve Indexer") case "elasticsearch": - graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(context.Context, func())) { + graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(func())) { issueIndexer, err := NewElasticSearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueIndexerName) if err != nil { log.Fatal("Unable to initialize Elastic Search Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err) -- cgit v1.2.3