summaryrefslogtreecommitdiffstats
path: root/modules/indexer/issues/indexer.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/indexer/issues/indexer.go')
-rw-r--r--modules/indexer/issues/indexer.go199
1 files changed, 125 insertions, 74 deletions
diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go
index ebcd3f68dd..894f37a963 100644
--- a/modules/indexer/issues/indexer.go
+++ b/modules/indexer/issues/indexer.go
@@ -5,12 +5,16 @@
package issues
import (
+ "context"
+ "fmt"
+ "os"
"sync"
"time"
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/util"
)
@@ -44,12 +48,14 @@ type Indexer interface {
Index(issue []*IndexerData) error
Delete(ids ...int64) error
Search(kw string, repoIDs []int64, limit, start int) (*SearchResult, error)
+ Close()
}
type indexerHolder struct {
- indexer Indexer
- mutex sync.RWMutex
- cond *sync.Cond
+ indexer Indexer
+ mutex sync.RWMutex
+ cond *sync.Cond
+ cancelled bool
}
func newIndexerHolder() *indexerHolder {
@@ -58,6 +64,13 @@ func newIndexerHolder() *indexerHolder {
return h
}
+func (h *indexerHolder) cancel() {
+ h.mutex.Lock()
+ defer h.mutex.Unlock()
+ h.cancelled = true
+ h.cond.Broadcast()
+}
+
func (h *indexerHolder) set(indexer Indexer) {
h.mutex.Lock()
defer h.mutex.Unlock()
@@ -68,16 +81,15 @@ func (h *indexerHolder) set(indexer Indexer) {
func (h *indexerHolder) get() Indexer {
h.mutex.RLock()
defer h.mutex.RUnlock()
- if h.indexer == nil {
+ if h.indexer == nil && !h.cancelled {
h.cond.Wait()
}
return h.indexer
}
var (
- issueIndexerChannel = make(chan *IndexerData, setting.Indexer.UpdateQueueLength)
// issueIndexerQueue queue of issue ids to be updated
- issueIndexerQueue Queue
+ issueIndexerQueue queue.Queue
holder = newIndexerHolder()
)
@@ -85,90 +97,99 @@ var (
// all issue index done.
func InitIssueIndexer(syncReindex bool) {
waitChannel := make(chan time.Duration)
+
+ // Create the Queue
+ switch setting.Indexer.IssueType {
+ case "bleve":
+ handler := func(data ...queue.Data) {
+ indexer := holder.get()
+ if indexer == nil {
+ log.Error("Issue indexer handler: unable to get indexer!")
+ return
+ }
+
+ iData := make([]*IndexerData, 0, setting.Indexer.IssueQueueBatchNumber)
+ for _, datum := range data {
+ indexerData, ok := datum.(*IndexerData)
+ if !ok {
+ log.Error("Unable to process provided datum: %v - not possible to cast to IndexerData", datum)
+ continue
+ }
+ log.Trace("IndexerData Process: %d %v %t", indexerData.ID, indexerData.IDs, indexerData.IsDelete)
+ if indexerData.IsDelete {
+ _ = indexer.Delete(indexerData.IDs...)
+ continue
+ }
+ iData = append(iData, indexerData)
+ }
+ if err := indexer.Index(iData); err != nil {
+ log.Error("Error whilst indexing: %v Error: %v", iData, err)
+ }
+ }
+
+ issueIndexerQueue = queue.CreateQueue("issue_indexer", handler, &IndexerData{})
+
+ if issueIndexerQueue == nil {
+ log.Fatal("Unable to create issue indexer queue")
+ }
+ default:
+ issueIndexerQueue = &queue.DummyQueue{}
+ }
+
+ // Create the Indexer
go func() {
start := time.Now()
- log.Info("Initializing Issue Indexer")
+ log.Info("PID %d: Initializing Issue Indexer: %s", os.Getpid(), setting.Indexer.IssueType)
var populate bool
- var dummyQueue bool
switch setting.Indexer.IssueType {
case "bleve":
- issueIndexer := NewBleveIndexer(setting.Indexer.IssuePath)
- exist, err := issueIndexer.Init()
- if err != nil {
- log.Fatal("Unable to initialize Bleve Issue Indexer: %v", err)
- }
- populate = !exist
- holder.set(issueIndexer)
+ graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(context.Context, func())) {
+ issueIndexer := NewBleveIndexer(setting.Indexer.IssuePath)
+ exist, err := issueIndexer.Init()
+ if err != nil {
+ holder.cancel()
+ log.Fatal("Unable to initialize Bleve Issue Indexer: %v", err)
+ }
+ populate = !exist
+ holder.set(issueIndexer)
+ atTerminate(context.Background(), func() {
+ log.Debug("Closing issue indexer")
+ issueIndexer := holder.get()
+ if issueIndexer != nil {
+ issueIndexer.Close()
+ }
+ log.Info("PID: %d Issue Indexer closed", os.Getpid())
+ })
+ log.Debug("Created Bleve Indexer")
+ })
case "db":
issueIndexer := &DBIndexer{}
holder.set(issueIndexer)
- dummyQueue = true
default:
+ holder.cancel()
log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType)
}
- if dummyQueue {
- issueIndexerQueue = &DummyQueue{}
- } else {
- var err error
- switch setting.Indexer.IssueQueueType {
- case setting.LevelQueueType:
- issueIndexerQueue, err = NewLevelQueue(
- holder.get(),
- setting.Indexer.IssueQueueDir,
- setting.Indexer.IssueQueueBatchNumber)
- if err != nil {
- log.Fatal(
- "Unable create level queue for issue queue dir: %s batch number: %d : %v",
- setting.Indexer.IssueQueueDir,
- setting.Indexer.IssueQueueBatchNumber,
- err)
- }
- case setting.ChannelQueueType:
- issueIndexerQueue = NewChannelQueue(holder.get(), setting.Indexer.IssueQueueBatchNumber)
- case setting.RedisQueueType:
- addrs, pass, idx, err := parseConnStr(setting.Indexer.IssueQueueConnStr)
- if err != nil {
- log.Fatal("Unable to parse connection string for RedisQueueType: %s : %v",
- setting.Indexer.IssueQueueConnStr,
- err)
- }
- issueIndexerQueue, err = NewRedisQueue(addrs, pass, idx, holder.get(), setting.Indexer.IssueQueueBatchNumber)
- if err != nil {
- log.Fatal("Unable to create RedisQueue: %s : %v",
- setting.Indexer.IssueQueueConnStr,
- err)
- }
- default:
- log.Fatal("Unsupported indexer queue type: %v",
- setting.Indexer.IssueQueueType)
- }
-
- go func() {
- err = issueIndexerQueue.Run()
- if err != nil {
- log.Error("issueIndexerQueue.Run: %v", err)
- }
- }()
- }
-
- go func() {
- for data := range issueIndexerChannel {
- _ = issueIndexerQueue.Push(data)
- }
- }()
+ // Start processing the queue
+ go graceful.GetManager().RunWithShutdownFns(issueIndexerQueue.Run)
+ // Populate the index
if populate {
if syncReindex {
- populateIssueIndexer()
+ graceful.GetManager().RunWithShutdownContext(populateIssueIndexer)
} else {
- go populateIssueIndexer()
+ go graceful.GetManager().RunWithShutdownContext(populateIssueIndexer)
}
}
waitChannel <- time.Since(start)
+ close(waitChannel)
}()
+
if syncReindex {
- <-waitChannel
+ select {
+ case <-waitChannel:
+ case <-graceful.GetManager().IsShutdown():
+ }
} else if setting.Indexer.StartupTimeout > 0 {
go func() {
timeout := setting.Indexer.StartupTimeout
@@ -178,7 +199,12 @@ func InitIssueIndexer(syncReindex bool) {
select {
case duration := <-waitChannel:
log.Info("Issue Indexer Initialization took %v", duration)
+ case <-graceful.GetManager().IsShutdown():
+ log.Warn("Shutdown occurred before issue index initialisation was complete")
case <-time.After(timeout):
+ if shutdownable, ok := issueIndexerQueue.(queue.Shutdownable); ok {
+ shutdownable.Terminate()
+ }
log.Fatal("Issue Indexer Initialization timed-out after: %v", timeout)
}
}()
@@ -186,8 +212,14 @@ func InitIssueIndexer(syncReindex bool) {
}
// populateIssueIndexer populate the issue indexer with issue data
-func populateIssueIndexer() {
+func populateIssueIndexer(ctx context.Context) {
for page := 1; ; page++ {
+ select {
+ case <-ctx.Done():
+ log.Warn("Issue Indexer population shutdown before completion")
+ return
+ default:
+ }
repos, _, err := models.SearchRepositoryByName(&models.SearchRepoOptions{
Page: page,
PageSize: models.RepositoryListDefaultPageSize,
@@ -200,10 +232,17 @@ func populateIssueIndexer() {
continue
}
if len(repos) == 0 {
+ log.Debug("Issue Indexer population complete")
return
}
for _, repo := range repos {
+ select {
+ case <-ctx.Done():
+ log.Info("Issue Indexer population shutdown before completion")
+ return
+ default:
+ }
UpdateRepoIndexer(repo)
}
}
@@ -237,13 +276,17 @@ func UpdateIssueIndexer(issue *models.Issue) {
comments = append(comments, comment.Content)
}
}
- issueIndexerChannel <- &IndexerData{
+ indexerData := &IndexerData{
ID: issue.ID,
RepoID: issue.RepoID,
Title: issue.Title,
Content: issue.Content,
Comments: comments,
}
+ log.Debug("Adding to channel: %v", indexerData)
+ if err := issueIndexerQueue.Push(indexerData); err != nil {
+ log.Error("Unable to push to issue indexer: %v: Error: %v", indexerData, err)
+ }
}
// DeleteRepoIssueIndexer deletes repo's all issues indexes
@@ -258,17 +301,25 @@ func DeleteRepoIssueIndexer(repo *models.Repository) {
if len(ids) == 0 {
return
}
-
- issueIndexerChannel <- &IndexerData{
+ indexerData := &IndexerData{
IDs: ids,
IsDelete: true,
}
+ if err := issueIndexerQueue.Push(indexerData); err != nil {
+ log.Error("Unable to push to issue indexer: %v: Error: %v", indexerData, err)
+ }
}
// SearchIssuesByKeyword search issue ids by keywords and repo id
func SearchIssuesByKeyword(repoIDs []int64, keyword string) ([]int64, error) {
var issueIDs []int64
- res, err := holder.get().Search(keyword, repoIDs, 1000, 0)
+ indexer := holder.get()
+
+ if indexer == nil {
+ log.Error("SearchIssuesByKeyword(): unable to get indexer!")
+ return nil, fmt.Errorf("unable to get issue indexer")
+ }
+ res, err := indexer.Search(keyword, repoIDs, 1000, 0)
if err != nil {
return nil, err
}