diff options
Diffstat (limited to 'modules/indexer/issues/indexer.go')
-rw-r--r-- | modules/indexer/issues/indexer.go | 193 |
1 files changed, 130 insertions, 63 deletions
diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index df8bfd6305..4f410daf4c 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -5,9 +5,11 @@ package issues import ( - "fmt" + "sync" + "time" "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/util" @@ -45,78 +47,143 @@ type Indexer interface { Search(kw string, repoID int64, limit, start int) (*SearchResult, error) } +type indexerHolder struct { + indexer Indexer + mutex sync.RWMutex + cond *sync.Cond +} + +func newIndexerHolder() *indexerHolder { + h := &indexerHolder{} + h.cond = sync.NewCond(h.mutex.RLocker()) + return h +} + +func (h *indexerHolder) set(indexer Indexer) { + h.mutex.Lock() + defer h.mutex.Unlock() + h.indexer = indexer + h.cond.Broadcast() +} + +func (h *indexerHolder) get() Indexer { + h.mutex.RLock() + defer h.mutex.RUnlock() + if h.indexer == nil { + h.cond.Wait() + } + return h.indexer +} + var ( + issueIndexerChannel = make(chan *IndexerData, setting.Indexer.UpdateQueueLength) // issueIndexerQueue queue of issue ids to be updated issueIndexerQueue Queue - issueIndexer Indexer + holder = newIndexerHolder() ) // InitIssueIndexer initialize issue indexer, syncReindex is true then reindex until // all issue index done. -func InitIssueIndexer(syncReindex bool) error { - var populate bool - var dummyQueue bool - switch setting.Indexer.IssueType { - case "bleve": - issueIndexer = NewBleveIndexer(setting.Indexer.IssuePath) - exist, err := issueIndexer.Init() - if err != nil { - return err +func InitIssueIndexer(syncReindex bool) { + waitChannel := make(chan time.Duration) + go func() { + start := time.Now() + log.Info("Initializing Issue Indexer") + var populate bool + var dummyQueue bool + switch setting.Indexer.IssueType { + case "bleve": + issueIndexer := NewBleveIndexer(setting.Indexer.IssuePath) + exist, err := issueIndexer.Init() + if err != nil { + log.Fatal("Unable to initialize Bleve Issue Indexer: %v", err) + } + populate = !exist + holder.set(issueIndexer) + case "db": + issueIndexer := &DBIndexer{} + holder.set(issueIndexer) + dummyQueue = true + default: + log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType) } - populate = !exist - case "db": - issueIndexer = &DBIndexer{} - dummyQueue = true - default: - return fmt.Errorf("unknow issue indexer type: %s", setting.Indexer.IssueType) - } - if dummyQueue { - issueIndexerQueue = &DummyQueue{} - return nil - } + if dummyQueue { + issueIndexerQueue = &DummyQueue{} + } else { + var err error + switch setting.Indexer.IssueQueueType { + case setting.LevelQueueType: + issueIndexerQueue, err = NewLevelQueue( + holder.get(), + setting.Indexer.IssueQueueDir, + setting.Indexer.IssueQueueBatchNumber) + if err != nil { + log.Fatal( + "Unable create level queue for issue queue dir: %s batch number: %d : %v", + setting.Indexer.IssueQueueDir, + setting.Indexer.IssueQueueBatchNumber, + err) + } + case setting.ChannelQueueType: + issueIndexerQueue = NewChannelQueue(holder.get(), setting.Indexer.IssueQueueBatchNumber) + case setting.RedisQueueType: + addrs, pass, idx, err := parseConnStr(setting.Indexer.IssueQueueConnStr) + if err != nil { + log.Fatal("Unable to parse connection string for RedisQueueType: %s : %v", + setting.Indexer.IssueQueueConnStr, + err) + } + issueIndexerQueue, err = NewRedisQueue(addrs, pass, idx, holder.get(), setting.Indexer.IssueQueueBatchNumber) + if err != nil { + log.Fatal("Unable to create RedisQueue: %s : %v", + setting.Indexer.IssueQueueConnStr, + err) + } + default: + log.Fatal("Unsupported indexer queue type: %v", + setting.Indexer.IssueQueueType) + } - var err error - switch setting.Indexer.IssueQueueType { - case setting.LevelQueueType: - issueIndexerQueue, err = NewLevelQueue( - issueIndexer, - setting.Indexer.IssueQueueDir, - setting.Indexer.IssueQueueBatchNumber) - if err != nil { - return err + go func() { + err = issueIndexerQueue.Run() + if err != nil { + log.Error("issueIndexerQueue.Run: %v", err) + } + }() } - case setting.ChannelQueueType: - issueIndexerQueue = NewChannelQueue(issueIndexer, setting.Indexer.IssueQueueBatchNumber) - case setting.RedisQueueType: - addrs, pass, idx, err := parseConnStr(setting.Indexer.IssueQueueConnStr) - if err != nil { - return err - } - issueIndexerQueue, err = NewRedisQueue(addrs, pass, idx, issueIndexer, setting.Indexer.IssueQueueBatchNumber) - if err != nil { - return err - } - default: - return fmt.Errorf("Unsupported indexer queue type: %v", setting.Indexer.IssueQueueType) - } - go func() { - err = issueIndexerQueue.Run() - if err != nil { - log.Error("issueIndexerQueue.Run: %v", err) - } - }() + go func() { + for data := range issueIndexerChannel { + _ = issueIndexerQueue.Push(data) + } + }() - if populate { - if syncReindex { - populateIssueIndexer() - } else { - go populateIssueIndexer() + if populate { + if syncReindex { + populateIssueIndexer() + } else { + go populateIssueIndexer() + } } + waitChannel <- time.Since(start) + }() + if syncReindex { + <-waitChannel + } else if setting.Indexer.StartupTimeout > 0 { + go func() { + timeout := setting.Indexer.StartupTimeout + if graceful.IsChild && setting.GracefulHammerTime > 0 { + timeout += setting.GracefulHammerTime + } + select { + case duration := <-waitChannel: + log.Info("Issue Indexer Initialization took %v", duration) + case <-time.After(timeout): + log.Fatal("Issue Indexer Initialization timed-out after: %v", timeout) + } + }() } - - return nil } // populateIssueIndexer populate the issue indexer with issue data @@ -166,13 +233,13 @@ func UpdateIssueIndexer(issue *models.Issue) { comments = append(comments, comment.Content) } } - _ = issueIndexerQueue.Push(&IndexerData{ + issueIndexerChannel <- &IndexerData{ ID: issue.ID, RepoID: issue.RepoID, Title: issue.Title, Content: issue.Content, Comments: comments, - }) + } } // DeleteRepoIssueIndexer deletes repo's all issues indexes @@ -188,16 +255,16 @@ func DeleteRepoIssueIndexer(repo *models.Repository) { return } - _ = issueIndexerQueue.Push(&IndexerData{ + issueIndexerChannel <- &IndexerData{ IDs: ids, IsDelete: true, - }) + } } // SearchIssuesByKeyword search issue ids by keywords and repo id func SearchIssuesByKeyword(repoID int64, keyword string) ([]int64, error) { var issueIDs []int64 - res, err := issueIndexer.Search(keyword, repoID, 1000, 0) + res, err := holder.get().Search(keyword, repoID, 1000, 0) if err != nil { return nil, err } |