summaryrefslogtreecommitdiffstats
path: root/models/issue_indexer.go
blob: 48c0b9f2466c81cd732673a7d2fb5f09bb1a6326 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Copyright 2017 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

package models

import (
	"fmt"

	"code.gitea.io/gitea/modules/indexer"
	"code.gitea.io/gitea/modules/log"
	"code.gitea.io/gitea/modules/setting"
	"code.gitea.io/gitea/modules/util"
)

// issueIndexerUpdateQueue queue of issue ids to be updated
var issueIndexerUpdateQueue chan int64

// InitIssueIndexer initialize issue indexer
func InitIssueIndexer() {
	indexer.InitIssueIndexer(populateIssueIndexer)
	issueIndexerUpdateQueue = make(chan int64, setting.Indexer.UpdateQueueLength)
	go processIssueIndexerUpdateQueue()
}

// populateIssueIndexer populate the issue indexer with issue data
func populateIssueIndexer() error {
	batch := indexer.IssueIndexerBatch()
	for page := 1; ; page++ {
		repos, _, err := SearchRepositoryByName(&SearchRepoOptions{
			Page:        page,
			PageSize:    RepositoryListDefaultPageSize,
			OrderBy:     SearchOrderByID,
			Private:     true,
			Collaborate: util.OptionalBoolFalse,
		})
		if err != nil {
			return fmt.Errorf("Repositories: %v", err)
		}
		if len(repos) == 0 {
			return batch.Flush()
		}
		for _, repo := range repos {
			issues, err := Issues(&IssuesOptions{
				RepoIDs:  []int64{repo.ID},
				IsClosed: util.OptionalBoolNone,
				IsPull:   util.OptionalBoolNone,
			})
			if err != nil {
				return err
			}
			if err = IssueList(issues).LoadComments(); err != nil {
				return err
			}
			for _, issue := range issues {
				if err := issue.update().AddToFlushingBatch(batch); err != nil {
					return err
				}
			}
		}
	}
}

func processIssueIndexerUpdateQueue() {
	batch := indexer.IssueIndexerBatch()
	for {
		var issueID int64
		select {
		case issueID = <-issueIndexerUpdateQueue:
		default:
			// flush whatever updates we currently have, since we
			// might have to wait a while
			if err := batch.Flush(); err != nil {
				log.Error(4, "IssueIndexer: %v", err)
			}
			issueID = <-issueIndexerUpdateQueue
		}
		issue, err := GetIssueByID(issueID)
		if err != nil {
			log.Error(4, "GetIssueByID: %v", err)
		} else if err = issue.update().AddToFlushingBatch(batch); err != nil {
			log.Error(4, "IssueIndexer: %v", err)
		}
	}
}

func (issue *Issue) update() indexer.IssueIndexerUpdate {
	comments := make([]string, 0, 5)
	for _, comment := range issue.Comments {
		if comment.Type == CommentTypeComment {
			comments = append(comments, comment.Content)
		}
	}
	return indexer.IssueIndexerUpdate{
		IssueID: issue.ID,
		Data: &indexer.IssueIndexerData{
			RepoID:   issue.RepoID,
			Title:    issue.Title,
			Content:  issue.Content,
			Comments: comments,
		},
	}
}

// updateNeededCols whether a change to the specified columns requires updating
// the issue indexer
func updateNeededCols(cols []string) bool {
	for _, col := range cols {
		switch col {
		case "name", "content":
			return true
		}
	}
	return false
}

// UpdateIssueIndexerCols update an issue in the issue indexer, given changes
// to the specified columns
func UpdateIssueIndexerCols(issueID int64, cols ...string) {
	updateNeededCols(cols)
}

// UpdateIssueIndexer add/update an issue to the issue indexer
func UpdateIssueIndexer(issueID int64) {
	select {
	case issueIndexerUpdateQueue <- issueID:
	default:
		go func() {
			issueIndexerUpdateQueue <- issueID
		}()
	}
}