aboutsummaryrefslogtreecommitdiffstats
path: root/modules
diff options
context:
space:
mode:
Diffstat (limited to 'modules')
-rw-r--r--modules/indexer/issues/indexer.go25
-rw-r--r--modules/indexer/issues/util.go46
2 files changed, 41 insertions, 30 deletions
diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go
index 020659c82b..ef06d8862a 100644
--- a/modules/indexer/issues/indexer.go
+++ b/modules/indexer/issues/indexer.go
@@ -204,12 +204,13 @@ func getIssueIndexerQueueHandler(ctx context.Context) func(items ...*IndexerMeta
func populateIssueIndexer(ctx context.Context) {
ctx, _, finished := process.GetManager().AddTypedContext(ctx, "Service: PopulateIssueIndexer", process.SystemProcessType, true)
defer finished()
- if err := PopulateIssueIndexer(ctx, true); err != nil {
+ ctx = contextWithKeepRetry(ctx) // keep retrying since it's a background task
+ if err := PopulateIssueIndexer(ctx); err != nil {
log.Error("Issue indexer population failed: %v", err)
}
}
-func PopulateIssueIndexer(ctx context.Context, keepRetrying bool) error {
+func PopulateIssueIndexer(ctx context.Context) error {
for page := 1; ; page++ {
select {
case <-ctx.Done():
@@ -232,20 +233,8 @@ func PopulateIssueIndexer(ctx context.Context, keepRetrying bool) error {
}
for _, repo := range repos {
- for {
- select {
- case <-ctx.Done():
- return fmt.Errorf("shutdown before completion: %w", ctx.Err())
- default:
- }
- if err := updateRepoIndexer(ctx, repo.ID); err != nil {
- if keepRetrying && ctx.Err() == nil {
- log.Warn("Retry to populate issue indexer for repo %d: %v", repo.ID, err)
- continue
- }
- return fmt.Errorf("populate issue indexer for repo %d: %v", repo.ID, err)
- }
- break
+ if err := updateRepoIndexer(ctx, repo.ID); err != nil {
+ return fmt.Errorf("populate issue indexer for repo %d: %v", repo.ID, err)
}
}
}
@@ -259,8 +248,8 @@ func UpdateRepoIndexer(ctx context.Context, repoID int64) {
}
// UpdateIssueIndexer add/update an issue to the issue indexer
-func UpdateIssueIndexer(issueID int64) {
- if err := updateIssueIndexer(issueID); err != nil {
+func UpdateIssueIndexer(ctx context.Context, issueID int64) {
+ if err := updateIssueIndexer(ctx, issueID); err != nil {
log.Error("Unable to push issue %d to issue indexer: %v", issueID, err)
}
}
diff --git a/modules/indexer/issues/util.go b/modules/indexer/issues/util.go
index ca4ff6d42f..510b4060b2 100644
--- a/modules/indexer/issues/util.go
+++ b/modules/indexer/issues/util.go
@@ -127,15 +127,15 @@ func updateRepoIndexer(ctx context.Context, repoID int64) error {
return fmt.Errorf("issue_model.GetIssueIDsByRepoID: %w", err)
}
for _, id := range ids {
- if err := updateIssueIndexer(id); err != nil {
+ if err := updateIssueIndexer(ctx, id); err != nil {
return err
}
}
return nil
}
-func updateIssueIndexer(issueID int64) error {
- return pushIssueIndexerQueue(&IndexerMetadata{ID: issueID})
+func updateIssueIndexer(ctx context.Context, issueID int64) error {
+ return pushIssueIndexerQueue(ctx, &IndexerMetadata{ID: issueID})
}
func deleteRepoIssueIndexer(ctx context.Context, repoID int64) error {
@@ -148,13 +148,21 @@ func deleteRepoIssueIndexer(ctx context.Context, repoID int64) error {
if len(ids) == 0 {
return nil
}
- return pushIssueIndexerQueue(&IndexerMetadata{
+ return pushIssueIndexerQueue(ctx, &IndexerMetadata{
IDs: ids,
IsDelete: true,
})
}
-func pushIssueIndexerQueue(data *IndexerMetadata) error {
+type keepRetryKey struct{}
+
+// contextWithKeepRetry returns a context with a key indicating that the indexer should keep retrying.
+// Please note that it's for background tasks only, and it should not be used for user requests, or it may cause blocking.
+func contextWithKeepRetry(ctx context.Context) context.Context {
+ return context.WithValue(ctx, keepRetryKey{}, true)
+}
+
+func pushIssueIndexerQueue(ctx context.Context, data *IndexerMetadata) error {
if issueIndexerQueue == nil {
// Some unit tests will trigger indexing, but the queue is not initialized.
// It's OK to ignore it, but log a warning message in case it's not a unit test.
@@ -162,12 +170,26 @@ func pushIssueIndexerQueue(data *IndexerMetadata) error {
return nil
}
- err := issueIndexerQueue.Push(data)
- if errors.Is(err, queue.ErrAlreadyInQueue) {
- return nil
- }
- if errors.Is(err, context.DeadlineExceeded) {
- log.Warn("It seems that issue indexer is slow and the queue is full. Please check the issue indexer or increase the queue size.")
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
+ err := issueIndexerQueue.Push(data)
+ if errors.Is(err, queue.ErrAlreadyInQueue) {
+ return nil
+ }
+ if errors.Is(err, context.DeadlineExceeded) { // the queue is full
+ log.Warn("It seems that issue indexer is slow and the queue is full. Please check the issue indexer or increase the queue size.")
+ if ctx.Value(keepRetryKey{}) == nil {
+ return err
+ }
+ // It will be better to increase the queue size instead of retrying, but users may ignore the previous warning message.
+ // However, even it retries, it may still cause index loss when there's a deadline in the context.
+ log.Debug("Retry to push %+v to issue indexer queue", data)
+ continue
+ }
+ return err
}
- return err
}