diff options
Diffstat (limited to 'modules/indexer')
-rw-r--r-- | modules/indexer/issues/indexer.go | 35 | ||||
-rw-r--r-- | modules/indexer/issues/indexer_test.go | 2 | ||||
-rw-r--r-- | modules/indexer/issues/queue_redis.go | 145 |
3 files changed, 168 insertions, 14 deletions
diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index 3d066ca3fa..75e6893b87 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -46,9 +46,9 @@ type Indexer interface { } var ( - // issueIndexerUpdateQueue queue of issue ids to be updated - issueIndexerUpdateQueue Queue - issueIndexer Indexer + // issueIndexerQueue queue of issue ids to be updated + issueIndexerQueue Queue + issueIndexer Indexer ) // InitIssueIndexer initialize issue indexer, syncReindex is true then reindex until @@ -72,27 +72,36 @@ func InitIssueIndexer(syncReindex bool) error { } if dummyQueue { - issueIndexerUpdateQueue = &DummyQueue{} + issueIndexerQueue = &DummyQueue{} return nil } var err error - switch setting.Indexer.IssueIndexerQueueType { + switch setting.Indexer.IssueQueueType { case setting.LevelQueueType: - issueIndexerUpdateQueue, err = NewLevelQueue( + issueIndexerQueue, err = NewLevelQueue( issueIndexer, - setting.Indexer.IssueIndexerQueueDir, - setting.Indexer.IssueIndexerQueueBatchNumber) + setting.Indexer.IssueQueueDir, + setting.Indexer.IssueQueueBatchNumber) if err != nil { return err } case setting.ChannelQueueType: - issueIndexerUpdateQueue = NewChannelQueue(issueIndexer, setting.Indexer.IssueIndexerQueueBatchNumber) + issueIndexerQueue = NewChannelQueue(issueIndexer, setting.Indexer.IssueQueueBatchNumber) + case setting.RedisQueueType: + addrs, pass, idx, err := parseConnStr(setting.Indexer.IssueQueueConnStr) + if err != nil { + return err + } + issueIndexerQueue, err = NewRedisQueue(addrs, pass, idx, issueIndexer, setting.Indexer.IssueQueueBatchNumber) + if err != nil { + return err + } default: - return fmt.Errorf("Unsupported indexer queue type: %v", setting.Indexer.IssueIndexerQueueType) + return fmt.Errorf("Unsupported indexer queue type: %v", setting.Indexer.IssueQueueType) } - go issueIndexerUpdateQueue.Run() + go issueIndexerQueue.Run() if populate { if syncReindex { @@ -152,7 +161,7 @@ func UpdateIssueIndexer(issue *models.Issue) { comments = append(comments, comment.Content) } } - issueIndexerUpdateQueue.Push(&IndexerData{ + issueIndexerQueue.Push(&IndexerData{ ID: issue.ID, RepoID: issue.RepoID, Title: issue.Title, @@ -174,7 +183,7 @@ func DeleteRepoIssueIndexer(repo *models.Repository) { return } - issueIndexerUpdateQueue.Push(&IndexerData{ + issueIndexerQueue.Push(&IndexerData{ IDs: ids, IsDelete: true, }) diff --git a/modules/indexer/issues/indexer_test.go b/modules/indexer/issues/indexer_test.go index 6ecd9fe779..59a7beed47 100644 --- a/modules/indexer/issues/indexer_test.go +++ b/modules/indexer/issues/indexer_test.go @@ -29,7 +29,7 @@ func TestMain(m *testing.M) { func TestBleveSearchIssues(t *testing.T) { assert.NoError(t, models.PrepareTestDatabase()) - os.RemoveAll(setting.Indexer.IssueIndexerQueueDir) + os.RemoveAll(setting.Indexer.IssueQueueDir) os.RemoveAll(setting.Indexer.IssuePath) setting.Indexer.IssueType = "bleve" if err := InitIssueIndexer(true); err != nil { diff --git a/modules/indexer/issues/queue_redis.go b/modules/indexer/issues/queue_redis.go new file mode 100644 index 0000000000..a9434c4f92 --- /dev/null +++ b/modules/indexer/issues/queue_redis.go @@ -0,0 +1,145 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package issues + +import ( + "encoding/json" + "errors" + "strconv" + "strings" + "time" + + "code.gitea.io/gitea/modules/log" + "github.com/go-redis/redis" +) + +var ( + _ Queue = &RedisQueue{} +) + +type redisClient interface { + RPush(key string, args ...interface{}) *redis.IntCmd + LPop(key string) *redis.StringCmd + Ping() *redis.StatusCmd +} + +// RedisQueue redis queue +type RedisQueue struct { + client redisClient + queueName string + indexer Indexer + batchNumber int +} + +func parseConnStr(connStr string) (addrs, password string, dbIdx int, err error) { + fields := strings.Fields(connStr) + for _, f := range fields { + items := strings.SplitN(f, "=", 2) + if len(items) < 2 { + continue + } + switch strings.ToLower(items[0]) { + case "addrs": + addrs = items[1] + case "password": + password = items[1] + case "db": + dbIdx, err = strconv.Atoi(items[1]) + if err != nil { + return + } + } + } + return +} + +// NewRedisQueue creates single redis or cluster redis queue +func NewRedisQueue(addrs string, password string, dbIdx int, indexer Indexer, batchNumber int) (*RedisQueue, error) { + dbs := strings.Split(addrs, ",") + var queue = RedisQueue{ + queueName: "issue_indexer_queue", + indexer: indexer, + batchNumber: batchNumber, + } + if len(dbs) == 0 { + return nil, errors.New("no redis host found") + } else if len(dbs) == 1 { + queue.client = redis.NewClient(&redis.Options{ + Addr: strings.TrimSpace(dbs[0]), // use default Addr + Password: password, // no password set + DB: dbIdx, // use default DB + }) + } else { + queue.client = redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: dbs, + }) + } + if err := queue.client.Ping().Err(); err != nil { + return nil, err + } + return &queue, nil +} + +// Run runs the redis queue +func (r *RedisQueue) Run() error { + var i int + var datas = make([]*IndexerData, 0, r.batchNumber) + for { + bs, err := r.client.LPop(r.queueName).Bytes() + if err != nil && err != redis.Nil { + log.Error("LPop faile: %v", err) + time.Sleep(time.Millisecond * 100) + continue + } + + i++ + if len(datas) > r.batchNumber || (len(datas) > 0 && i > 3) { + r.indexer.Index(datas) + datas = make([]*IndexerData, 0, r.batchNumber) + i = 0 + } + + if len(bs) <= 0 { + time.Sleep(time.Millisecond * 100) + continue + } + + var data IndexerData + err = json.Unmarshal(bs, &data) + if err != nil { + log.Error("Unmarshal: %v", err) + time.Sleep(time.Millisecond * 100) + continue + } + + log.Trace("RedisQueue: task found: %#v", data) + + if data.IsDelete { + if data.ID > 0 { + if err = r.indexer.Delete(data.ID); err != nil { + log.Error("indexer.Delete: %v", err) + } + } else if len(data.IDs) > 0 { + if err = r.indexer.Delete(data.IDs...); err != nil { + log.Error("indexer.Delete: %v", err) + } + } + time.Sleep(time.Millisecond * 100) + continue + } + + datas = append(datas, &data) + time.Sleep(time.Millisecond * 100) + } +} + +// Push implements Queue +func (r *RedisQueue) Push(data *IndexerData) error { + bs, err := json.Marshal(data) + if err != nil { + return err + } + return r.client.RPush(r.queueName, bs).Err() +} |