diff options
author | zeripath <art27@cantab.net> | 2019-10-15 14:39:51 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-10-15 14:39:51 +0100 |
commit | 167e8f18da3aadcdcdd7bb8c488c39d73ac65803 (patch) | |
tree | c2ad32fc8ced5657f62034551e72134a0a238fcb /modules/indexer | |
parent | 4a290bd64cd4c4ba77b9f3c4908a76cc521f9621 (diff) | |
download | gitea-167e8f18da3aadcdcdd7bb8c488c39d73ac65803.tar.gz gitea-167e8f18da3aadcdcdd7bb8c488c39d73ac65803.zip |
Restore Graceful Restarting & Socket Activation (#7274)
* Prevent deadlock in indexer initialisation during graceful restart
* Move from gracehttp to our own service to add graceful ssh
* Add timeout for start of indexers and make hammer time configurable
* Fix issue with re-initialization in indexer during tests
* move the code to detect use of closed to graceful
* Handle logs gracefully - add a pid suffix just before restart
* Move to using a cond and a holder for indexers
* use time.Since
* Add some comments and attribution
* update modules.txt
* Use zero to disable timeout
* Move RestartProcess to its own file
* Add cleanup routine
Diffstat (limited to 'modules/indexer')
-rw-r--r-- | modules/indexer/issues/indexer.go | 193 | ||||
-rw-r--r-- | modules/indexer/issues/indexer_test.go | 14 | ||||
-rw-r--r-- | modules/indexer/repo.go | 52 |
3 files changed, 173 insertions, 86 deletions
diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index df8bfd6305..4f410daf4c 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -5,9 +5,11 @@ package issues import ( - "fmt" + "sync" + "time" "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/util" @@ -45,78 +47,143 @@ type Indexer interface { Search(kw string, repoID int64, limit, start int) (*SearchResult, error) } +type indexerHolder struct { + indexer Indexer + mutex sync.RWMutex + cond *sync.Cond +} + +func newIndexerHolder() *indexerHolder { + h := &indexerHolder{} + h.cond = sync.NewCond(h.mutex.RLocker()) + return h +} + +func (h *indexerHolder) set(indexer Indexer) { + h.mutex.Lock() + defer h.mutex.Unlock() + h.indexer = indexer + h.cond.Broadcast() +} + +func (h *indexerHolder) get() Indexer { + h.mutex.RLock() + defer h.mutex.RUnlock() + if h.indexer == nil { + h.cond.Wait() + } + return h.indexer +} + var ( + issueIndexerChannel = make(chan *IndexerData, setting.Indexer.UpdateQueueLength) // issueIndexerQueue queue of issue ids to be updated issueIndexerQueue Queue - issueIndexer Indexer + holder = newIndexerHolder() ) // InitIssueIndexer initialize issue indexer, syncReindex is true then reindex until // all issue index done. -func InitIssueIndexer(syncReindex bool) error { - var populate bool - var dummyQueue bool - switch setting.Indexer.IssueType { - case "bleve": - issueIndexer = NewBleveIndexer(setting.Indexer.IssuePath) - exist, err := issueIndexer.Init() - if err != nil { - return err +func InitIssueIndexer(syncReindex bool) { + waitChannel := make(chan time.Duration) + go func() { + start := time.Now() + log.Info("Initializing Issue Indexer") + var populate bool + var dummyQueue bool + switch setting.Indexer.IssueType { + case "bleve": + issueIndexer := NewBleveIndexer(setting.Indexer.IssuePath) + exist, err := issueIndexer.Init() + if err != nil { + log.Fatal("Unable to initialize Bleve Issue Indexer: %v", err) + } + populate = !exist + holder.set(issueIndexer) + case "db": + issueIndexer := &DBIndexer{} + holder.set(issueIndexer) + dummyQueue = true + default: + log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType) } - populate = !exist - case "db": - issueIndexer = &DBIndexer{} - dummyQueue = true - default: - return fmt.Errorf("unknow issue indexer type: %s", setting.Indexer.IssueType) - } - if dummyQueue { - issueIndexerQueue = &DummyQueue{} - return nil - } + if dummyQueue { + issueIndexerQueue = &DummyQueue{} + } else { + var err error + switch setting.Indexer.IssueQueueType { + case setting.LevelQueueType: + issueIndexerQueue, err = NewLevelQueue( + holder.get(), + setting.Indexer.IssueQueueDir, + setting.Indexer.IssueQueueBatchNumber) + if err != nil { + log.Fatal( + "Unable create level queue for issue queue dir: %s batch number: %d : %v", + setting.Indexer.IssueQueueDir, + setting.Indexer.IssueQueueBatchNumber, + err) + } + case setting.ChannelQueueType: + issueIndexerQueue = NewChannelQueue(holder.get(), setting.Indexer.IssueQueueBatchNumber) + case setting.RedisQueueType: + addrs, pass, idx, err := parseConnStr(setting.Indexer.IssueQueueConnStr) + if err != nil { + log.Fatal("Unable to parse connection string for RedisQueueType: %s : %v", + setting.Indexer.IssueQueueConnStr, + err) + } + issueIndexerQueue, err = NewRedisQueue(addrs, pass, idx, holder.get(), setting.Indexer.IssueQueueBatchNumber) + if err != nil { + log.Fatal("Unable to create RedisQueue: %s : %v", + setting.Indexer.IssueQueueConnStr, + err) + } + default: + log.Fatal("Unsupported indexer queue type: %v", + setting.Indexer.IssueQueueType) + } - var err error - switch setting.Indexer.IssueQueueType { - case setting.LevelQueueType: - issueIndexerQueue, err = NewLevelQueue( - issueIndexer, - setting.Indexer.IssueQueueDir, - setting.Indexer.IssueQueueBatchNumber) - if err != nil { - return err + go func() { + err = issueIndexerQueue.Run() + if err != nil { + log.Error("issueIndexerQueue.Run: %v", err) + } + }() } - case setting.ChannelQueueType: - issueIndexerQueue = NewChannelQueue(issueIndexer, setting.Indexer.IssueQueueBatchNumber) - case setting.RedisQueueType: - addrs, pass, idx, err := parseConnStr(setting.Indexer.IssueQueueConnStr) - if err != nil { - return err - } - issueIndexerQueue, err = NewRedisQueue(addrs, pass, idx, issueIndexer, setting.Indexer.IssueQueueBatchNumber) - if err != nil { - return err - } - default: - return fmt.Errorf("Unsupported indexer queue type: %v", setting.Indexer.IssueQueueType) - } - go func() { - err = issueIndexerQueue.Run() - if err != nil { - log.Error("issueIndexerQueue.Run: %v", err) - } - }() + go func() { + for data := range issueIndexerChannel { + _ = issueIndexerQueue.Push(data) + } + }() - if populate { - if syncReindex { - populateIssueIndexer() - } else { - go populateIssueIndexer() + if populate { + if syncReindex { + populateIssueIndexer() + } else { + go populateIssueIndexer() + } } + waitChannel <- time.Since(start) + }() + if syncReindex { + <-waitChannel + } else if setting.Indexer.StartupTimeout > 0 { + go func() { + timeout := setting.Indexer.StartupTimeout + if graceful.IsChild && setting.GracefulHammerTime > 0 { + timeout += setting.GracefulHammerTime + } + select { + case duration := <-waitChannel: + log.Info("Issue Indexer Initialization took %v", duration) + case <-time.After(timeout): + log.Fatal("Issue Indexer Initialization timed-out after: %v", timeout) + } + }() } - - return nil } // populateIssueIndexer populate the issue indexer with issue data @@ -166,13 +233,13 @@ func UpdateIssueIndexer(issue *models.Issue) { comments = append(comments, comment.Content) } } - _ = issueIndexerQueue.Push(&IndexerData{ + issueIndexerChannel <- &IndexerData{ ID: issue.ID, RepoID: issue.RepoID, Title: issue.Title, Content: issue.Content, Comments: comments, - }) + } } // DeleteRepoIssueIndexer deletes repo's all issues indexes @@ -188,16 +255,16 @@ func DeleteRepoIssueIndexer(repo *models.Repository) { return } - _ = issueIndexerQueue.Push(&IndexerData{ + issueIndexerChannel <- &IndexerData{ IDs: ids, IsDelete: true, - }) + } } // SearchIssuesByKeyword search issue ids by keywords and repo id func SearchIssuesByKeyword(repoID int64, keyword string) ([]int64, error) { var issueIDs []int64 - res, err := issueIndexer.Search(keyword, repoID, 1000, 0) + res, err := holder.get().Search(keyword, repoID, 1000, 0) if err != nil { return nil, err } diff --git a/modules/indexer/issues/indexer_test.go b/modules/indexer/issues/indexer_test.go index 59a7beed47..212c2edfbe 100644 --- a/modules/indexer/issues/indexer_test.go +++ b/modules/indexer/issues/indexer_test.go @@ -5,7 +5,6 @@ package issues import ( - "fmt" "os" "path/filepath" "testing" @@ -17,11 +16,6 @@ import ( "github.com/stretchr/testify/assert" ) -func fatalTestError(fmtStr string, args ...interface{}) { - fmt.Fprintf(os.Stderr, fmtStr, args...) - os.Exit(1) -} - func TestMain(m *testing.M) { models.MainTest(m, filepath.Join("..", "..", "..")) } @@ -32,9 +26,7 @@ func TestBleveSearchIssues(t *testing.T) { os.RemoveAll(setting.Indexer.IssueQueueDir) os.RemoveAll(setting.Indexer.IssuePath) setting.Indexer.IssueType = "bleve" - if err := InitIssueIndexer(true); err != nil { - fatalTestError("Error InitIssueIndexer: %v\n", err) - } + InitIssueIndexer(true) time.Sleep(5 * time.Second) @@ -59,9 +51,7 @@ func TestDBSearchIssues(t *testing.T) { assert.NoError(t, models.PrepareTestDatabase()) setting.Indexer.IssueType = "db" - if err := InitIssueIndexer(true); err != nil { - fatalTestError("Error InitIssueIndexer: %v\n", err) - } + InitIssueIndexer(true) ids, err := SearchIssuesByKeyword(1, "issue2") assert.NoError(t, err) diff --git a/modules/indexer/repo.go b/modules/indexer/repo.go index 91ed173aa7..841f29acd7 100644 --- a/modules/indexer/repo.go +++ b/modules/indexer/repo.go @@ -6,6 +6,7 @@ package indexer import ( "strings" + "sync" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" @@ -25,8 +26,36 @@ const ( repoIndexerLatestVersion = 4 ) +type bleveIndexerHolder struct { + index bleve.Index + mutex sync.RWMutex + cond *sync.Cond +} + +func newBleveIndexerHolder() *bleveIndexerHolder { + b := &bleveIndexerHolder{} + b.cond = sync.NewCond(b.mutex.RLocker()) + return b +} + +func (r *bleveIndexerHolder) set(index bleve.Index) { + r.mutex.Lock() + defer r.mutex.Unlock() + r.index = index + r.cond.Broadcast() +} + +func (r *bleveIndexerHolder) get() bleve.Index { + r.mutex.RLock() + defer r.mutex.RUnlock() + if r.index == nil { + r.cond.Wait() + } + return r.index +} + // repoIndexer (thread-safe) index for repository contents -var repoIndexer bleve.Index +var indexerHolder = newBleveIndexerHolder() // RepoIndexerOp type of operation to perform on repo indexer type RepoIndexerOp int @@ -73,12 +102,12 @@ func (update RepoIndexerUpdate) AddToFlushingBatch(batch rupture.FlushingBatch) // InitRepoIndexer initialize repo indexer func InitRepoIndexer(populateIndexer func() error) { - var err error - repoIndexer, err = openIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion) + indexer, err := openIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion) if err != nil { log.Fatal("InitRepoIndexer: %v", err) } - if repoIndexer != nil { + if indexer != nil { + indexerHolder.set(indexer) return } @@ -92,7 +121,6 @@ func InitRepoIndexer(populateIndexer func() error) { // createRepoIndexer create a repo indexer if one does not already exist func createRepoIndexer(path string, latestVersion int) error { - var err error docMapping := bleve.NewDocumentMapping() numericFieldMapping := bleve.NewNumericFieldMapping() numericFieldMapping.IncludeInAll = false @@ -103,9 +131,9 @@ func createRepoIndexer(path string, latestVersion int) error { docMapping.AddFieldMappingsAt("Content", textFieldMapping) mapping := bleve.NewIndexMapping() - if err = addUnicodeNormalizeTokenFilter(mapping); err != nil { + if err := addUnicodeNormalizeTokenFilter(mapping); err != nil { return err - } else if err = mapping.AddCustomAnalyzer(repoIndexerAnalyzer, map[string]interface{}{ + } else if err := mapping.AddCustomAnalyzer(repoIndexerAnalyzer, map[string]interface{}{ "type": custom.Name, "char_filters": []string{}, "tokenizer": unicode.Name, @@ -117,10 +145,12 @@ func createRepoIndexer(path string, latestVersion int) error { mapping.AddDocumentMapping(repoIndexerDocType, docMapping) mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping()) - repoIndexer, err = bleve.New(path, mapping) + indexer, err := bleve.New(path, mapping) if err != nil { return err } + indexerHolder.set(indexer) + return rupture.WriteIndexMetadata(path, &rupture.IndexMetadata{ Version: latestVersion, }) @@ -140,14 +170,14 @@ func filenameOfIndexerID(indexerID string) string { // RepoIndexerBatch batch to add updates to func RepoIndexerBatch() rupture.FlushingBatch { - return rupture.NewFlushingBatch(repoIndexer, maxBatchSize) + return rupture.NewFlushingBatch(indexerHolder.get(), maxBatchSize) } // DeleteRepoFromIndexer delete all of a repo's files from indexer func DeleteRepoFromIndexer(repoID int64) error { query := numericEqualityQuery(repoID, "RepoID") searchRequest := bleve.NewSearchRequestOptions(query, 2147483647, 0, false) - result, err := repoIndexer.Search(searchRequest) + result, err := indexerHolder.get().Search(searchRequest) if err != nil { return err } @@ -196,7 +226,7 @@ func SearchRepoByKeyword(repoIDs []int64, keyword string, page, pageSize int) (i searchRequest.Fields = []string{"Content", "RepoID"} searchRequest.IncludeLocations = true - result, err := repoIndexer.Search(searchRequest) + result, err := indexerHolder.get().Search(searchRequest) if err != nil { return 0, nil, err } |