aboutsummaryrefslogtreecommitdiffstats
path: root/modules/indexer/issues
diff options
context:
space:
mode:
authorzeripath <art27@cantab.net>2020-01-07 11:23:09 +0000
committerAntoine GIRARD <sapk@users.noreply.github.com>2020-01-07 12:23:09 +0100
commit62eb1b0f2530a5ae1ce9b729378c0c8066174215 (patch)
treee567b2a9d91e69c0f2bccfeaf1a7341b4dda2706 /modules/indexer/issues
parentf71e1c8e796b099f4634bcd358e48189a97dcbad (diff)
downloadgitea-62eb1b0f2530a5ae1ce9b729378c0c8066174215.tar.gz
gitea-62eb1b0f2530a5ae1ce9b729378c0c8066174215.zip
Graceful Queues: Issue Indexing and Tasks (#9363)
* Queue: Add generic graceful queues with settings * Queue & Setting: Add worker pool implementation * Queue: Add worker settings * Queue: Make resizing worker pools * Queue: Add name variable to queues * Queue: Add monitoring * Queue: Improve logging * Issues: Gracefulise the issues indexer Remove the old now unused specific queues * Task: Move to generic queue and gracefulise * Issues: Standardise the issues indexer queue settings * Fix test * Queue: Allow Redis to connect to unix * Prevent deadlock during early shutdown of issue indexer * Add MaxWorker settings to queues * Merge branch 'master' into graceful-queues * Update modules/indexer/issues/indexer.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/indexer/issues/indexer.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/queue/queue_channel.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/queue/queue_disk.go * Update modules/queue/queue_disk_channel.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Rename queue.Description to queue.ManagedQueue as per @guillep2k * Cancel pool workers when removed * Remove dependency on queue from setting * Update modules/queue/queue_redis.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * As per @guillep2k add mutex locks on shutdown/terminate * move unlocking out of setInternal * Add warning if number of workers < 0 * Small changes as per @guillep2k * No redis host specified not found * Clean up documentation for queues * Update docs/content/doc/advanced/config-cheat-sheet.en-us.md * Update modules/indexer/issues/indexer_test.go * Ensure that persistable channel queue is added to manager * Rename QUEUE_NAME REDIS_QUEUE_NAME * Revert "Rename QUEUE_NAME REDIS_QUEUE_NAME" This reverts commit 1f83b4fc9b9dabda186257b38c265fe7012f90df. Co-authored-by: guillep2k <18600385+guillep2k@users.noreply.github.com> Co-authored-by: Lauris BH <lauris@nix.lv> Co-authored-by: techknowlogick <matti@mdranta.net> Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
Diffstat (limited to 'modules/indexer/issues')
-rw-r--r--modules/indexer/issues/db.go4
-rw-r--r--modules/indexer/issues/indexer.go199
-rw-r--r--modules/indexer/issues/indexer_test.go4
-rw-r--r--modules/indexer/issues/queue.go25
-rw-r--r--modules/indexer/issues/queue_channel.go62
-rw-r--r--modules/indexer/issues/queue_disk.go104
-rw-r--r--modules/indexer/issues/queue_redis.go146
7 files changed, 133 insertions, 411 deletions
diff --git a/modules/indexer/issues/db.go b/modules/indexer/issues/db.go
index a758cfeaee..d0cca4fd18 100644
--- a/modules/indexer/issues/db.go
+++ b/modules/indexer/issues/db.go
@@ -25,6 +25,10 @@ func (db *DBIndexer) Delete(ids ...int64) error {
return nil
}
+// Close dummy function
+func (db *DBIndexer) Close() {
+}
+
// Search dummy function
func (db *DBIndexer) Search(kw string, repoIDs []int64, limit, start int) (*SearchResult, error) {
total, ids, err := models.SearchIssueIDsByKeyword(kw, repoIDs, limit, start)
diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go
index ebcd3f68dd..894f37a963 100644
--- a/modules/indexer/issues/indexer.go
+++ b/modules/indexer/issues/indexer.go
@@ -5,12 +5,16 @@
package issues
import (
+ "context"
+ "fmt"
+ "os"
"sync"
"time"
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/util"
)
@@ -44,12 +48,14 @@ type Indexer interface {
Index(issue []*IndexerData) error
Delete(ids ...int64) error
Search(kw string, repoIDs []int64, limit, start int) (*SearchResult, error)
+ Close()
}
type indexerHolder struct {
- indexer Indexer
- mutex sync.RWMutex
- cond *sync.Cond
+ indexer Indexer
+ mutex sync.RWMutex
+ cond *sync.Cond
+ cancelled bool
}
func newIndexerHolder() *indexerHolder {
@@ -58,6 +64,13 @@ func newIndexerHolder() *indexerHolder {
return h
}
+func (h *indexerHolder) cancel() {
+ h.mutex.Lock()
+ defer h.mutex.Unlock()
+ h.cancelled = true
+ h.cond.Broadcast()
+}
+
func (h *indexerHolder) set(indexer Indexer) {
h.mutex.Lock()
defer h.mutex.Unlock()
@@ -68,16 +81,15 @@ func (h *indexerHolder) set(indexer Indexer) {
func (h *indexerHolder) get() Indexer {
h.mutex.RLock()
defer h.mutex.RUnlock()
- if h.indexer == nil {
+ if h.indexer == nil && !h.cancelled {
h.cond.Wait()
}
return h.indexer
}
var (
- issueIndexerChannel = make(chan *IndexerData, setting.Indexer.UpdateQueueLength)
// issueIndexerQueue queue of issue ids to be updated
- issueIndexerQueue Queue
+ issueIndexerQueue queue.Queue
holder = newIndexerHolder()
)
@@ -85,90 +97,99 @@ var (
// all issue index done.
func InitIssueIndexer(syncReindex bool) {
waitChannel := make(chan time.Duration)
+
+ // Create the Queue
+ switch setting.Indexer.IssueType {
+ case "bleve":
+ handler := func(data ...queue.Data) {
+ indexer := holder.get()
+ if indexer == nil {
+ log.Error("Issue indexer handler: unable to get indexer!")
+ return
+ }
+
+ iData := make([]*IndexerData, 0, setting.Indexer.IssueQueueBatchNumber)
+ for _, datum := range data {
+ indexerData, ok := datum.(*IndexerData)
+ if !ok {
+ log.Error("Unable to process provided datum: %v - not possible to cast to IndexerData", datum)
+ continue
+ }
+ log.Trace("IndexerData Process: %d %v %t", indexerData.ID, indexerData.IDs, indexerData.IsDelete)
+ if indexerData.IsDelete {
+ _ = indexer.Delete(indexerData.IDs...)
+ continue
+ }
+ iData = append(iData, indexerData)
+ }
+ if err := indexer.Index(iData); err != nil {
+ log.Error("Error whilst indexing: %v Error: %v", iData, err)
+ }
+ }
+
+ issueIndexerQueue = queue.CreateQueue("issue_indexer", handler, &IndexerData{})
+
+ if issueIndexerQueue == nil {
+ log.Fatal("Unable to create issue indexer queue")
+ }
+ default:
+ issueIndexerQueue = &queue.DummyQueue{}
+ }
+
+ // Create the Indexer
go func() {
start := time.Now()
- log.Info("Initializing Issue Indexer")
+ log.Info("PID %d: Initializing Issue Indexer: %s", os.Getpid(), setting.Indexer.IssueType)
var populate bool
- var dummyQueue bool
switch setting.Indexer.IssueType {
case "bleve":
- issueIndexer := NewBleveIndexer(setting.Indexer.IssuePath)
- exist, err := issueIndexer.Init()
- if err != nil {
- log.Fatal("Unable to initialize Bleve Issue Indexer: %v", err)
- }
- populate = !exist
- holder.set(issueIndexer)
+ graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(context.Context, func())) {
+ issueIndexer := NewBleveIndexer(setting.Indexer.IssuePath)
+ exist, err := issueIndexer.Init()
+ if err != nil {
+ holder.cancel()
+ log.Fatal("Unable to initialize Bleve Issue Indexer: %v", err)
+ }
+ populate = !exist
+ holder.set(issueIndexer)
+ atTerminate(context.Background(), func() {
+ log.Debug("Closing issue indexer")
+ issueIndexer := holder.get()
+ if issueIndexer != nil {
+ issueIndexer.Close()
+ }
+ log.Info("PID: %d Issue Indexer closed", os.Getpid())
+ })
+ log.Debug("Created Bleve Indexer")
+ })
case "db":
issueIndexer := &DBIndexer{}
holder.set(issueIndexer)
- dummyQueue = true
default:
+ holder.cancel()
log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType)
}
- if dummyQueue {
- issueIndexerQueue = &DummyQueue{}
- } else {
- var err error
- switch setting.Indexer.IssueQueueType {
- case setting.LevelQueueType:
- issueIndexerQueue, err = NewLevelQueue(
- holder.get(),
- setting.Indexer.IssueQueueDir,
- setting.Indexer.IssueQueueBatchNumber)
- if err != nil {
- log.Fatal(
- "Unable create level queue for issue queue dir: %s batch number: %d : %v",
- setting.Indexer.IssueQueueDir,
- setting.Indexer.IssueQueueBatchNumber,
- err)
- }
- case setting.ChannelQueueType:
- issueIndexerQueue = NewChannelQueue(holder.get(), setting.Indexer.IssueQueueBatchNumber)
- case setting.RedisQueueType:
- addrs, pass, idx, err := parseConnStr(setting.Indexer.IssueQueueConnStr)
- if err != nil {
- log.Fatal("Unable to parse connection string for RedisQueueType: %s : %v",
- setting.Indexer.IssueQueueConnStr,
- err)
- }
- issueIndexerQueue, err = NewRedisQueue(addrs, pass, idx, holder.get(), setting.Indexer.IssueQueueBatchNumber)
- if err != nil {
- log.Fatal("Unable to create RedisQueue: %s : %v",
- setting.Indexer.IssueQueueConnStr,
- err)
- }
- default:
- log.Fatal("Unsupported indexer queue type: %v",
- setting.Indexer.IssueQueueType)
- }
-
- go func() {
- err = issueIndexerQueue.Run()
- if err != nil {
- log.Error("issueIndexerQueue.Run: %v", err)
- }
- }()
- }
-
- go func() {
- for data := range issueIndexerChannel {
- _ = issueIndexerQueue.Push(data)
- }
- }()
+ // Start processing the queue
+ go graceful.GetManager().RunWithShutdownFns(issueIndexerQueue.Run)
+ // Populate the index
if populate {
if syncReindex {
- populateIssueIndexer()
+ graceful.GetManager().RunWithShutdownContext(populateIssueIndexer)
} else {
- go populateIssueIndexer()
+ go graceful.GetManager().RunWithShutdownContext(populateIssueIndexer)
}
}
waitChannel <- time.Since(start)
+ close(waitChannel)
}()
+
if syncReindex {
- <-waitChannel
+ select {
+ case <-waitChannel:
+ case <-graceful.GetManager().IsShutdown():
+ }
} else if setting.Indexer.StartupTimeout > 0 {
go func() {
timeout := setting.Indexer.StartupTimeout
@@ -178,7 +199,12 @@ func InitIssueIndexer(syncReindex bool) {
select {
case duration := <-waitChannel:
log.Info("Issue Indexer Initialization took %v", duration)
+ case <-graceful.GetManager().IsShutdown():
+ log.Warn("Shutdown occurred before issue index initialisation was complete")
case <-time.After(timeout):
+ if shutdownable, ok := issueIndexerQueue.(queue.Shutdownable); ok {
+ shutdownable.Terminate()
+ }
log.Fatal("Issue Indexer Initialization timed-out after: %v", timeout)
}
}()
@@ -186,8 +212,14 @@ func InitIssueIndexer(syncReindex bool) {
}
// populateIssueIndexer populate the issue indexer with issue data
-func populateIssueIndexer() {
+func populateIssueIndexer(ctx context.Context) {
for page := 1; ; page++ {
+ select {
+ case <-ctx.Done():
+ log.Warn("Issue Indexer population shutdown before completion")
+ return
+ default:
+ }
repos, _, err := models.SearchRepositoryByName(&models.SearchRepoOptions{
Page: page,
PageSize: models.RepositoryListDefaultPageSize,
@@ -200,10 +232,17 @@ func populateIssueIndexer() {
continue
}
if len(repos) == 0 {
+ log.Debug("Issue Indexer population complete")
return
}
for _, repo := range repos {
+ select {
+ case <-ctx.Done():
+ log.Info("Issue Indexer population shutdown before completion")
+ return
+ default:
+ }
UpdateRepoIndexer(repo)
}
}
@@ -237,13 +276,17 @@ func UpdateIssueIndexer(issue *models.Issue) {
comments = append(comments, comment.Content)
}
}
- issueIndexerChannel <- &IndexerData{
+ indexerData := &IndexerData{
ID: issue.ID,
RepoID: issue.RepoID,
Title: issue.Title,
Content: issue.Content,
Comments: comments,
}
+ log.Debug("Adding to channel: %v", indexerData)
+ if err := issueIndexerQueue.Push(indexerData); err != nil {
+ log.Error("Unable to push to issue indexer: %v: Error: %v", indexerData, err)
+ }
}
// DeleteRepoIssueIndexer deletes repo's all issues indexes
@@ -258,17 +301,25 @@ func DeleteRepoIssueIndexer(repo *models.Repository) {
if len(ids) == 0 {
return
}
-
- issueIndexerChannel <- &IndexerData{
+ indexerData := &IndexerData{
IDs: ids,
IsDelete: true,
}
+ if err := issueIndexerQueue.Push(indexerData); err != nil {
+ log.Error("Unable to push to issue indexer: %v: Error: %v", indexerData, err)
+ }
}
// SearchIssuesByKeyword search issue ids by keywords and repo id
func SearchIssuesByKeyword(repoIDs []int64, keyword string) ([]int64, error) {
var issueIDs []int64
- res, err := holder.get().Search(keyword, repoIDs, 1000, 0)
+ indexer := holder.get()
+
+ if indexer == nil {
+ log.Error("SearchIssuesByKeyword(): unable to get indexer!")
+ return nil, fmt.Errorf("unable to get issue indexer")
+ }
+ res, err := indexer.Search(keyword, repoIDs, 1000, 0)
if err != nil {
return nil, err
}
diff --git a/modules/indexer/issues/indexer_test.go b/modules/indexer/issues/indexer_test.go
index ca7ba29703..4028a6c8b5 100644
--- a/modules/indexer/issues/indexer_test.go
+++ b/modules/indexer/issues/indexer_test.go
@@ -15,6 +15,8 @@ import (
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/setting"
+ "gopkg.in/ini.v1"
+
"github.com/stretchr/testify/assert"
)
@@ -24,6 +26,7 @@ func TestMain(m *testing.M) {
func TestBleveSearchIssues(t *testing.T) {
assert.NoError(t, models.PrepareTestDatabase())
+ setting.Cfg = ini.Empty()
tmpIndexerDir, err := ioutil.TempDir("", "issues-indexer")
if err != nil {
@@ -41,6 +44,7 @@ func TestBleveSearchIssues(t *testing.T) {
}()
setting.Indexer.IssueType = "bleve"
+ setting.NewQueueService()
InitIssueIndexer(true)
defer func() {
indexer := holder.get()
diff --git a/modules/indexer/issues/queue.go b/modules/indexer/issues/queue.go
deleted file mode 100644
index f93e5c47a4..0000000000
--- a/modules/indexer/issues/queue.go
+++ /dev/null
@@ -1,25 +0,0 @@
-// Copyright 2018 The Gitea Authors. All rights reserved.
-// Use of this source code is governed by a MIT-style
-// license that can be found in the LICENSE file.
-
-package issues
-
-// Queue defines an interface to save an issue indexer queue
-type Queue interface {
- Run() error
- Push(*IndexerData) error
-}
-
-// DummyQueue represents an empty queue
-type DummyQueue struct {
-}
-
-// Run starts to run the queue
-func (b *DummyQueue) Run() error {
- return nil
-}
-
-// Push pushes data to indexer
-func (b *DummyQueue) Push(*IndexerData) error {
- return nil
-}
diff --git a/modules/indexer/issues/queue_channel.go b/modules/indexer/issues/queue_channel.go
deleted file mode 100644
index b6458d3eb5..0000000000
--- a/modules/indexer/issues/queue_channel.go
+++ /dev/null
@@ -1,62 +0,0 @@
-// Copyright 2018 The Gitea Authors. All rights reserved.
-// Use of this source code is governed by a MIT-style
-// license that can be found in the LICENSE file.
-
-package issues
-
-import (
- "time"
-
- "code.gitea.io/gitea/modules/setting"
-)
-
-// ChannelQueue implements
-type ChannelQueue struct {
- queue chan *IndexerData
- indexer Indexer
- batchNumber int
-}
-
-// NewChannelQueue create a memory channel queue
-func NewChannelQueue(indexer Indexer, batchNumber int) *ChannelQueue {
- return &ChannelQueue{
- queue: make(chan *IndexerData, setting.Indexer.UpdateQueueLength),
- indexer: indexer,
- batchNumber: batchNumber,
- }
-}
-
-// Run starts to run the queue
-func (c *ChannelQueue) Run() error {
- var i int
- var datas = make([]*IndexerData, 0, c.batchNumber)
- for {
- select {
- case data := <-c.queue:
- if data.IsDelete {
- _ = c.indexer.Delete(data.IDs...)
- continue
- }
-
- datas = append(datas, data)
- if len(datas) >= c.batchNumber {
- _ = c.indexer.Index(datas)
- // TODO: save the point
- datas = make([]*IndexerData, 0, c.batchNumber)
- }
- case <-time.After(time.Millisecond * 100):
- i++
- if i >= 3 && len(datas) > 0 {
- _ = c.indexer.Index(datas)
- // TODO: save the point
- datas = make([]*IndexerData, 0, c.batchNumber)
- }
- }
- }
-}
-
-// Push will push the indexer data to queue
-func (c *ChannelQueue) Push(data *IndexerData) error {
- c.queue <- data
- return nil
-}
diff --git a/modules/indexer/issues/queue_disk.go b/modules/indexer/issues/queue_disk.go
deleted file mode 100644
index d6187f2acb..0000000000
--- a/modules/indexer/issues/queue_disk.go
+++ /dev/null
@@ -1,104 +0,0 @@
-// Copyright 2019 The Gitea Authors. All rights reserved.
-// Use of this source code is governed by a MIT-style
-// license that can be found in the LICENSE file.
-
-package issues
-
-import (
- "encoding/json"
- "time"
-
- "code.gitea.io/gitea/modules/log"
-
- "gitea.com/lunny/levelqueue"
-)
-
-var (
- _ Queue = &LevelQueue{}
-)
-
-// LevelQueue implements a disk library queue
-type LevelQueue struct {
- indexer Indexer
- queue *levelqueue.Queue
- batchNumber int
-}
-
-// NewLevelQueue creates a ledis local queue
-func NewLevelQueue(indexer Indexer, dataDir string, batchNumber int) (*LevelQueue, error) {
- queue, err := levelqueue.Open(dataDir)
- if err != nil {
- return nil, err
- }
-
- return &LevelQueue{
- indexer: indexer,
- queue: queue,
- batchNumber: batchNumber,
- }, nil
-}
-
-// Run starts to run the queue
-func (l *LevelQueue) Run() error {
- var i int
- var datas = make([]*IndexerData, 0, l.batchNumber)
- for {
- i++
- if len(datas) > l.batchNumber || (len(datas) > 0 && i > 3) {
- _ = l.indexer.Index(datas)
- datas = make([]*IndexerData, 0, l.batchNumber)
- i = 0
- continue
- }
-
- bs, err := l.queue.RPop()
- if err != nil {
- if err != levelqueue.ErrNotFound {
- log.Error("RPop: %v", err)
- }
- time.Sleep(time.Millisecond * 100)
- continue
- }
-
- if len(bs) == 0 {
- time.Sleep(time.Millisecond * 100)
- continue
- }
-
- var data IndexerData
- err = json.Unmarshal(bs, &data)
- if err != nil {
- log.Error("Unmarshal: %v", err)
- time.Sleep(time.Millisecond * 100)
- continue
- }
-
- log.Trace("LevelQueue: task found: %#v", data)
-
- if data.IsDelete {
- if data.ID > 0 {
- if err = l.indexer.Delete(data.ID); err != nil {
- log.Error("indexer.Delete: %v", err)
- }
- } else if len(data.IDs) > 0 {
- if err = l.indexer.Delete(data.IDs...); err != nil {
- log.Error("indexer.Delete: %v", err)
- }
- }
- time.Sleep(time.Millisecond * 10)
- continue
- }
-
- datas = append(datas, &data)
- time.Sleep(time.Millisecond * 10)
- }
-}
-
-// Push will push the indexer data to queue
-func (l *LevelQueue) Push(data *IndexerData) error {
- bs, err := json.Marshal(data)
- if err != nil {
- return err
- }
- return l.queue.LPush(bs)
-}
diff --git a/modules/indexer/issues/queue_redis.go b/modules/indexer/issues/queue_redis.go
deleted file mode 100644
index 0344d3c87a..0000000000
--- a/modules/indexer/issues/queue_redis.go
+++ /dev/null
@@ -1,146 +0,0 @@
-// Copyright 2019 The Gitea Authors. All rights reserved.
-// Use of this source code is governed by a MIT-style
-// license that can be found in the LICENSE file.
-
-package issues
-
-import (
- "encoding/json"
- "errors"
- "strconv"
- "strings"
- "time"
-
- "code.gitea.io/gitea/modules/log"
-
- "github.com/go-redis/redis"
-)
-
-var (
- _ Queue = &RedisQueue{}
-)
-
-type redisClient interface {
- RPush(key string, args ...interface{}) *redis.IntCmd
- LPop(key string) *redis.StringCmd
- Ping() *redis.StatusCmd
-}
-
-// RedisQueue redis queue
-type RedisQueue struct {
- client redisClient
- queueName string
- indexer Indexer
- batchNumber int
-}
-
-func parseConnStr(connStr string) (addrs, password string, dbIdx int, err error) {
- fields := strings.Fields(connStr)
- for _, f := range fields {
- items := strings.SplitN(f, "=", 2)
- if len(items) < 2 {
- continue
- }
- switch strings.ToLower(items[0]) {
- case "addrs":
- addrs = items[1]
- case "password":
- password = items[1]
- case "db":
- dbIdx, err = strconv.Atoi(items[1])
- if err != nil {
- return
- }
- }
- }
- return
-}
-
-// NewRedisQueue creates single redis or cluster redis queue
-func NewRedisQueue(addrs string, password string, dbIdx int, indexer Indexer, batchNumber int) (*RedisQueue, error) {
- dbs := strings.Split(addrs, ",")
- var queue = RedisQueue{
- queueName: "issue_indexer_queue",
- indexer: indexer,
- batchNumber: batchNumber,
- }
- if len(dbs) == 0 {
- return nil, errors.New("no redis host found")
- } else if len(dbs) == 1 {
- queue.client = redis.NewClient(&redis.Options{
- Addr: strings.TrimSpace(dbs[0]), // use default Addr
- Password: password, // no password set
- DB: dbIdx, // use default DB
- })
- } else {
- queue.client = redis.NewClusterClient(&redis.ClusterOptions{
- Addrs: dbs,
- })
- }
- if err := queue.client.Ping().Err(); err != nil {
- return nil, err
- }
- return &queue, nil
-}
-
-// Run runs the redis queue
-func (r *RedisQueue) Run() error {
- var i int
- var datas = make([]*IndexerData, 0, r.batchNumber)
- for {
- bs, err := r.client.LPop(r.queueName).Bytes()
- if err != nil && err != redis.Nil {
- log.Error("LPop faile: %v", err)
- time.Sleep(time.Millisecond * 100)
- continue
- }
-
- i++
- if len(datas) > r.batchNumber || (len(datas) > 0 && i > 3) {
- _ = r.indexer.Index(datas)
- datas = make([]*IndexerData, 0, r.batchNumber)
- i = 0
- }
-
- if len(bs) == 0 {
- time.Sleep(time.Millisecond * 100)
- continue
- }
-
- var data IndexerData
- err = json.Unmarshal(bs, &data)
- if err != nil {
- log.Error("Unmarshal: %v", err)
- time.Sleep(time.Millisecond * 100)
- continue
- }
-
- log.Trace("RedisQueue: task found: %#v", data)
-
- if data.IsDelete {
- if data.ID > 0 {
- if err = r.indexer.Delete(data.ID); err != nil {
- log.Error("indexer.Delete: %v", err)
- }
- } else if len(data.IDs) > 0 {
- if err = r.indexer.Delete(data.IDs...); err != nil {
- log.Error("indexer.Delete: %v", err)
- }
- }
- time.Sleep(time.Millisecond * 100)
- continue
- }
-
- datas = append(datas, &data)
- time.Sleep(time.Millisecond * 100)
- }
-}
-
-// Push implements Queue
-func (r *RedisQueue) Push(data *IndexerData) error {
- bs, err := json.Marshal(data)
- if err != nil {
- return err
- }
- return r.client.RPush(r.queueName, bs).Err()
-}