diff options
author | zeripath <art27@cantab.net> | 2019-10-15 14:39:51 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-10-15 14:39:51 +0100 |
commit | 167e8f18da3aadcdcdd7bb8c488c39d73ac65803 (patch) | |
tree | c2ad32fc8ced5657f62034551e72134a0a238fcb /modules/indexer/repo.go | |
parent | 4a290bd64cd4c4ba77b9f3c4908a76cc521f9621 (diff) | |
download | gitea-167e8f18da3aadcdcdd7bb8c488c39d73ac65803.tar.gz gitea-167e8f18da3aadcdcdd7bb8c488c39d73ac65803.zip |
Restore Graceful Restarting & Socket Activation (#7274)
* Prevent deadlock in indexer initialisation during graceful restart
* Move from gracehttp to our own service to add graceful ssh
* Add timeout for start of indexers and make hammer time configurable
* Fix issue with re-initialization in indexer during tests
* move the code to detect use of closed to graceful
* Handle logs gracefully - add a pid suffix just before restart
* Move to using a cond and a holder for indexers
* use time.Since
* Add some comments and attribution
* update modules.txt
* Use zero to disable timeout
* Move RestartProcess to its own file
* Add cleanup routine
Diffstat (limited to 'modules/indexer/repo.go')
-rw-r--r-- | modules/indexer/repo.go | 52 |
1 files changed, 41 insertions, 11 deletions
diff --git a/modules/indexer/repo.go b/modules/indexer/repo.go index 91ed173aa7..841f29acd7 100644 --- a/modules/indexer/repo.go +++ b/modules/indexer/repo.go @@ -6,6 +6,7 @@ package indexer import ( "strings" + "sync" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" @@ -25,8 +26,36 @@ const ( repoIndexerLatestVersion = 4 ) +type bleveIndexerHolder struct { + index bleve.Index + mutex sync.RWMutex + cond *sync.Cond +} + +func newBleveIndexerHolder() *bleveIndexerHolder { + b := &bleveIndexerHolder{} + b.cond = sync.NewCond(b.mutex.RLocker()) + return b +} + +func (r *bleveIndexerHolder) set(index bleve.Index) { + r.mutex.Lock() + defer r.mutex.Unlock() + r.index = index + r.cond.Broadcast() +} + +func (r *bleveIndexerHolder) get() bleve.Index { + r.mutex.RLock() + defer r.mutex.RUnlock() + if r.index == nil { + r.cond.Wait() + } + return r.index +} + // repoIndexer (thread-safe) index for repository contents -var repoIndexer bleve.Index +var indexerHolder = newBleveIndexerHolder() // RepoIndexerOp type of operation to perform on repo indexer type RepoIndexerOp int @@ -73,12 +102,12 @@ func (update RepoIndexerUpdate) AddToFlushingBatch(batch rupture.FlushingBatch) // InitRepoIndexer initialize repo indexer func InitRepoIndexer(populateIndexer func() error) { - var err error - repoIndexer, err = openIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion) + indexer, err := openIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion) if err != nil { log.Fatal("InitRepoIndexer: %v", err) } - if repoIndexer != nil { + if indexer != nil { + indexerHolder.set(indexer) return } @@ -92,7 +121,6 @@ func InitRepoIndexer(populateIndexer func() error) { // createRepoIndexer create a repo indexer if one does not already exist func createRepoIndexer(path string, latestVersion int) error { - var err error docMapping := bleve.NewDocumentMapping() numericFieldMapping := bleve.NewNumericFieldMapping() numericFieldMapping.IncludeInAll = false @@ -103,9 +131,9 @@ func createRepoIndexer(path string, latestVersion int) error { docMapping.AddFieldMappingsAt("Content", textFieldMapping) mapping := bleve.NewIndexMapping() - if err = addUnicodeNormalizeTokenFilter(mapping); err != nil { + if err := addUnicodeNormalizeTokenFilter(mapping); err != nil { return err - } else if err = mapping.AddCustomAnalyzer(repoIndexerAnalyzer, map[string]interface{}{ + } else if err := mapping.AddCustomAnalyzer(repoIndexerAnalyzer, map[string]interface{}{ "type": custom.Name, "char_filters": []string{}, "tokenizer": unicode.Name, @@ -117,10 +145,12 @@ func createRepoIndexer(path string, latestVersion int) error { mapping.AddDocumentMapping(repoIndexerDocType, docMapping) mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping()) - repoIndexer, err = bleve.New(path, mapping) + indexer, err := bleve.New(path, mapping) if err != nil { return err } + indexerHolder.set(indexer) + return rupture.WriteIndexMetadata(path, &rupture.IndexMetadata{ Version: latestVersion, }) @@ -140,14 +170,14 @@ func filenameOfIndexerID(indexerID string) string { // RepoIndexerBatch batch to add updates to func RepoIndexerBatch() rupture.FlushingBatch { - return rupture.NewFlushingBatch(repoIndexer, maxBatchSize) + return rupture.NewFlushingBatch(indexerHolder.get(), maxBatchSize) } // DeleteRepoFromIndexer delete all of a repo's files from indexer func DeleteRepoFromIndexer(repoID int64) error { query := numericEqualityQuery(repoID, "RepoID") searchRequest := bleve.NewSearchRequestOptions(query, 2147483647, 0, false) - result, err := repoIndexer.Search(searchRequest) + result, err := indexerHolder.get().Search(searchRequest) if err != nil { return err } @@ -196,7 +226,7 @@ func SearchRepoByKeyword(repoIDs []int64, keyword string, page, pageSize int) (i searchRequest.Fields = []string{"Content", "RepoID"} searchRequest.IncludeLocations = true - result, err := repoIndexer.Search(searchRequest) + result, err := indexerHolder.get().Search(searchRequest) if err != nil { return 0, nil, err } |