]> source.dussan.org Git - gitea.git/commitdiff
Wrap the code indexer (#9476)
authorzeripath <art27@cantab.net>
Tue, 24 Dec 2019 07:26:34 +0000 (07:26 +0000)
committerLunny Xiao <xiaolunwen@gmail.com>
Tue, 24 Dec 2019 07:26:34 +0000 (15:26 +0800)
* Wrap the code indexer

In order to prevent a data race in the code indexer it must be wrapped
with a holder otherwise it is possible to Search/Index on an
incompletely initialised indexer, and search will fail with a nil
pointer until the repository indexer is initialised.

Further a completely initialised repository indexer should not be closed
until Termination otherwise actions in Hammer/Shutdown phases could
block or be lost.

Finally, there is a complex dance of shutdown etiquette should the index
initialisation fail. This PR restores that.

* Always return err if closed whilst waiting

Co-authored-by: techknowlogick <matti@mdranta.net>
modules/indexer/code/indexer.go
modules/indexer/code/queue.go
modules/indexer/code/wrapped.go [new file with mode: 0644]

index c68c7c2d71c3f5a8df8474202ebd328143bc864a..04d556f541f9de80053c6c724ebb38a211f0602a 100644 (file)
@@ -5,6 +5,8 @@
 package code
 
 import (
+       "context"
+       "os"
        "time"
 
        "code.gitea.io/gitea/modules/graceful"
@@ -12,10 +14,6 @@ import (
        "code.gitea.io/gitea/modules/setting"
 )
 
-var (
-       indexer Indexer
-)
-
 // SearchResult result of performing a search in a repo
 type SearchResult struct {
        RepoID     int64
@@ -36,28 +34,45 @@ type Indexer interface {
 // Init initialize the repo indexer
 func Init() {
        if !setting.Indexer.RepoIndexerEnabled {
+               indexer.Close()
                return
        }
 
+       ctx, cancel := context.WithCancel(context.Background())
+
+       graceful.GetManager().RunAtTerminate(ctx, func() {
+               log.Debug("Closing repository indexer")
+               indexer.Close()
+               log.Info("PID: %d Repository Indexer closed", os.Getpid())
+       })
+
        waitChannel := make(chan time.Duration)
        go func() {
                start := time.Now()
-               log.Info("Initializing Repository Indexer")
-               var created bool
-               var err error
-               indexer, created, err = NewBleveIndexer(setting.Indexer.RepoPath)
+               log.Info("PID: %d Initializing Repository Indexer at: %s", os.Getpid(), setting.Indexer.RepoPath)
+               bleveIndexer, created, err := NewBleveIndexer(setting.Indexer.RepoPath)
                if err != nil {
+                       if bleveIndexer != nil {
+                               bleveIndexer.Close()
+                       }
+                       cancel()
                        indexer.Close()
-                       log.Fatal("indexer.Init: %v", err)
+                       close(waitChannel)
+                       log.Fatal("PID: %d Unable to initialize the Repository Indexer at path: %s Error: %v", os.Getpid(), setting.Indexer.RepoPath, err)
                }
+               indexer.set(bleveIndexer)
 
                go processRepoIndexerOperationQueue(indexer)
 
                if created {
                        go populateRepoIndexer()
                }
+               select {
+               case waitChannel <- time.Since(start):
+               case <-graceful.GetManager().IsShutdown():
+               }
 
-               waitChannel <- time.Since(start)
+               close(waitChannel)
        }()
 
        if setting.Indexer.StartupTimeout > 0 {
@@ -67,9 +82,21 @@ func Init() {
                                timeout += setting.GracefulHammerTime
                        }
                        select {
-                       case duration := <-waitChannel:
+                       case <-graceful.GetManager().IsShutdown():
+                               log.Warn("Shutdown before Repository Indexer completed initialization")
+                               cancel()
+                               indexer.Close()
+                       case duration, ok := <-waitChannel:
+                               if !ok {
+                                       log.Warn("Repository Indexer Initialization failed")
+                                       cancel()
+                                       indexer.Close()
+                                       return
+                               }
                                log.Info("Repository Indexer Initialization took %v", duration)
                        case <-time.After(timeout):
+                               cancel()
+                               indexer.Close()
                                log.Fatal("Repository Indexer Initialization Timed-Out after: %v", timeout)
                        }
                }()
index 90f7d1bb19110ec7158b4eb1ad77e7abc1a73790..82cd8ded53937dcef5bdbdd31fbda6a006232d15 100644 (file)
@@ -22,8 +22,6 @@ type repoIndexerOperation struct {
 var repoIndexerOperationQueue chan repoIndexerOperation
 
 func processRepoIndexerOperationQueue(indexer Indexer) {
-       defer indexer.Close()
-
        repoIndexerOperationQueue = make(chan repoIndexerOperation, setting.Indexer.UpdateQueueLength)
        for {
                select {
diff --git a/modules/indexer/code/wrapped.go b/modules/indexer/code/wrapped.go
new file mode 100644 (file)
index 0000000..6a20883
--- /dev/null
@@ -0,0 +1,94 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package code
+
+import (
+       "fmt"
+       "sync"
+)
+
+var (
+       indexer = newWrappedIndexer()
+)
+
+// ErrWrappedIndexerClosed is the error returned if the indexer was closed before it was ready
+var ErrWrappedIndexerClosed = fmt.Errorf("Indexer closed before ready")
+
+type wrappedIndexer struct {
+       internal Indexer
+       lock     sync.RWMutex
+       cond     *sync.Cond
+       closed   bool
+}
+
+func newWrappedIndexer() *wrappedIndexer {
+       w := &wrappedIndexer{}
+       w.cond = sync.NewCond(w.lock.RLocker())
+       return w
+}
+
+func (w *wrappedIndexer) set(indexer Indexer) {
+       w.lock.Lock()
+       defer w.lock.Unlock()
+       if w.closed {
+               // Too late!
+               indexer.Close()
+       }
+       w.internal = indexer
+       w.cond.Broadcast()
+}
+
+func (w *wrappedIndexer) get() (Indexer, error) {
+       w.lock.RLock()
+       defer w.lock.RUnlock()
+       if w.internal == nil {
+               if w.closed {
+                       return nil, ErrWrappedIndexerClosed
+               }
+               w.cond.Wait()
+               if w.closed {
+                       return nil, ErrWrappedIndexerClosed
+               }
+       }
+       return w.internal, nil
+}
+
+func (w *wrappedIndexer) Index(repoID int64) error {
+       indexer, err := w.get()
+       if err != nil {
+               return err
+       }
+       return indexer.Index(repoID)
+}
+
+func (w *wrappedIndexer) Delete(repoID int64) error {
+       indexer, err := w.get()
+       if err != nil {
+               return err
+       }
+       return indexer.Delete(repoID)
+}
+
+func (w *wrappedIndexer) Search(repoIDs []int64, keyword string, page, pageSize int) (int64, []*SearchResult, error) {
+       indexer, err := w.get()
+       if err != nil {
+               return 0, nil, err
+       }
+       return indexer.Search(repoIDs, keyword, page, pageSize)
+
+}
+
+func (w *wrappedIndexer) Close() {
+       w.lock.Lock()
+       defer w.lock.Unlock()
+       if w.closed {
+               return
+       }
+       w.closed = true
+       w.cond.Broadcast()
+       if w.internal != nil {
+               w.internal.Close()
+       }
+}