summaryrefslogtreecommitdiffstats
path: root/modules
diff options
context:
space:
mode:
authorzeripath <art27@cantab.net>2020-01-07 11:23:09 +0000
committerAntoine GIRARD <sapk@users.noreply.github.com>2020-01-07 12:23:09 +0100
commit62eb1b0f2530a5ae1ce9b729378c0c8066174215 (patch)
treee567b2a9d91e69c0f2bccfeaf1a7341b4dda2706 /modules
parentf71e1c8e796b099f4634bcd358e48189a97dcbad (diff)
downloadgitea-62eb1b0f2530a5ae1ce9b729378c0c8066174215.tar.gz
gitea-62eb1b0f2530a5ae1ce9b729378c0c8066174215.zip
Graceful Queues: Issue Indexing and Tasks (#9363)
* Queue: Add generic graceful queues with settings * Queue & Setting: Add worker pool implementation * Queue: Add worker settings * Queue: Make resizing worker pools * Queue: Add name variable to queues * Queue: Add monitoring * Queue: Improve logging * Issues: Gracefulise the issues indexer Remove the old now unused specific queues * Task: Move to generic queue and gracefulise * Issues: Standardise the issues indexer queue settings * Fix test * Queue: Allow Redis to connect to unix * Prevent deadlock during early shutdown of issue indexer * Add MaxWorker settings to queues * Merge branch 'master' into graceful-queues * Update modules/indexer/issues/indexer.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/indexer/issues/indexer.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/queue/queue_channel.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/queue/queue_disk.go * Update modules/queue/queue_disk_channel.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Rename queue.Description to queue.ManagedQueue as per @guillep2k * Cancel pool workers when removed * Remove dependency on queue from setting * Update modules/queue/queue_redis.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * As per @guillep2k add mutex locks on shutdown/terminate * move unlocking out of setInternal * Add warning if number of workers < 0 * Small changes as per @guillep2k * No redis host specified not found * Clean up documentation for queues * Update docs/content/doc/advanced/config-cheat-sheet.en-us.md * Update modules/indexer/issues/indexer_test.go * Ensure that persistable channel queue is added to manager * Rename QUEUE_NAME REDIS_QUEUE_NAME * Revert "Rename QUEUE_NAME REDIS_QUEUE_NAME" This reverts commit 1f83b4fc9b9dabda186257b38c265fe7012f90df. Co-authored-by: guillep2k <18600385+guillep2k@users.noreply.github.com> Co-authored-by: Lauris BH <lauris@nix.lv> Co-authored-by: techknowlogick <matti@mdranta.net> Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
Diffstat (limited to 'modules')
-rw-r--r--modules/indexer/issues/db.go4
-rw-r--r--modules/indexer/issues/indexer.go199
-rw-r--r--modules/indexer/issues/indexer_test.go4
-rw-r--r--modules/indexer/issues/queue.go25
-rw-r--r--modules/indexer/issues/queue_channel.go62
-rw-r--r--modules/indexer/issues/queue_disk.go104
-rw-r--r--modules/indexer/issues/queue_redis.go146
-rw-r--r--modules/queue/manager.go270
-rw-r--r--modules/queue/queue.go133
-rw-r--r--modules/queue/queue_channel.go106
-rw-r--r--modules/queue/queue_channel_test.go91
-rw-r--r--modules/queue/queue_disk.go213
-rw-r--r--modules/queue/queue_disk_channel.go193
-rw-r--r--modules/queue/queue_disk_channel_test.go117
-rw-r--r--modules/queue/queue_disk_test.go126
-rw-r--r--modules/queue/queue_redis.go234
-rw-r--r--modules/queue/queue_test.go43
-rw-r--r--modules/queue/queue_wrapped.go206
-rw-r--r--modules/queue/setting.go75
-rw-r--r--modules/queue/workerpool.go325
-rw-r--r--modules/setting/queue.go159
-rw-r--r--modules/setting/setting.go1
-rw-r--r--modules/setting/task.go27
-rw-r--r--modules/task/queue.go14
-rw-r--r--modules/task/queue_channel.go48
-rw-r--r--modules/task/queue_redis.go130
-rw-r--r--modules/task/task.go41
27 files changed, 2453 insertions, 643 deletions
diff --git a/modules/indexer/issues/db.go b/modules/indexer/issues/db.go
index a758cfeaee..d0cca4fd18 100644
--- a/modules/indexer/issues/db.go
+++ b/modules/indexer/issues/db.go
@@ -25,6 +25,10 @@ func (db *DBIndexer) Delete(ids ...int64) error {
return nil
}
+// Close dummy function
+func (db *DBIndexer) Close() {
+}
+
// Search dummy function
func (db *DBIndexer) Search(kw string, repoIDs []int64, limit, start int) (*SearchResult, error) {
total, ids, err := models.SearchIssueIDsByKeyword(kw, repoIDs, limit, start)
diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go
index ebcd3f68dd..894f37a963 100644
--- a/modules/indexer/issues/indexer.go
+++ b/modules/indexer/issues/indexer.go
@@ -5,12 +5,16 @@
package issues
import (
+ "context"
+ "fmt"
+ "os"
"sync"
"time"
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/util"
)
@@ -44,12 +48,14 @@ type Indexer interface {
Index(issue []*IndexerData) error
Delete(ids ...int64) error
Search(kw string, repoIDs []int64, limit, start int) (*SearchResult, error)
+ Close()
}
type indexerHolder struct {
- indexer Indexer
- mutex sync.RWMutex
- cond *sync.Cond
+ indexer Indexer
+ mutex sync.RWMutex
+ cond *sync.Cond
+ cancelled bool
}
func newIndexerHolder() *indexerHolder {
@@ -58,6 +64,13 @@ func newIndexerHolder() *indexerHolder {
return h
}
+func (h *indexerHolder) cancel() {
+ h.mutex.Lock()
+ defer h.mutex.Unlock()
+ h.cancelled = true
+ h.cond.Broadcast()
+}
+
func (h *indexerHolder) set(indexer Indexer) {
h.mutex.Lock()
defer h.mutex.Unlock()
@@ -68,16 +81,15 @@ func (h *indexerHolder) set(indexer Indexer) {
func (h *indexerHolder) get() Indexer {
h.mutex.RLock()
defer h.mutex.RUnlock()
- if h.indexer == nil {
+ if h.indexer == nil && !h.cancelled {
h.cond.Wait()
}
return h.indexer
}
var (
- issueIndexerChannel = make(chan *IndexerData, setting.Indexer.UpdateQueueLength)
// issueIndexerQueue queue of issue ids to be updated
- issueIndexerQueue Queue
+ issueIndexerQueue queue.Queue
holder = newIndexerHolder()
)
@@ -85,90 +97,99 @@ var (
// all issue index done.
func InitIssueIndexer(syncReindex bool) {
waitChannel := make(chan time.Duration)
+
+ // Create the Queue
+ switch setting.Indexer.IssueType {
+ case "bleve":
+ handler := func(data ...queue.Data) {
+ indexer := holder.get()
+ if indexer == nil {
+ log.Error("Issue indexer handler: unable to get indexer!")
+ return
+ }
+
+ iData := make([]*IndexerData, 0, setting.Indexer.IssueQueueBatchNumber)
+ for _, datum := range data {
+ indexerData, ok := datum.(*IndexerData)
+ if !ok {
+ log.Error("Unable to process provided datum: %v - not possible to cast to IndexerData", datum)
+ continue
+ }
+ log.Trace("IndexerData Process: %d %v %t", indexerData.ID, indexerData.IDs, indexerData.IsDelete)
+ if indexerData.IsDelete {
+ _ = indexer.Delete(indexerData.IDs...)
+ continue
+ }
+ iData = append(iData, indexerData)
+ }
+ if err := indexer.Index(iData); err != nil {
+ log.Error("Error whilst indexing: %v Error: %v", iData, err)
+ }
+ }
+
+ issueIndexerQueue = queue.CreateQueue("issue_indexer", handler, &IndexerData{})
+
+ if issueIndexerQueue == nil {
+ log.Fatal("Unable to create issue indexer queue")
+ }
+ default:
+ issueIndexerQueue = &queue.DummyQueue{}
+ }
+
+ // Create the Indexer
go func() {
start := time.Now()
- log.Info("Initializing Issue Indexer")
+ log.Info("PID %d: Initializing Issue Indexer: %s", os.Getpid(), setting.Indexer.IssueType)
var populate bool
- var dummyQueue bool
switch setting.Indexer.IssueType {
case "bleve":
- issueIndexer := NewBleveIndexer(setting.Indexer.IssuePath)
- exist, err := issueIndexer.Init()
- if err != nil {
- log.Fatal("Unable to initialize Bleve Issue Indexer: %v", err)
- }
- populate = !exist
- holder.set(issueIndexer)
+ graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(context.Context, func())) {
+ issueIndexer := NewBleveIndexer(setting.Indexer.IssuePath)
+ exist, err := issueIndexer.Init()
+ if err != nil {
+ holder.cancel()
+ log.Fatal("Unable to initialize Bleve Issue Indexer: %v", err)
+ }
+ populate = !exist
+ holder.set(issueIndexer)
+ atTerminate(context.Background(), func() {
+ log.Debug("Closing issue indexer")
+ issueIndexer := holder.get()
+ if issueIndexer != nil {
+ issueIndexer.Close()
+ }
+ log.Info("PID: %d Issue Indexer closed", os.Getpid())
+ })
+ log.Debug("Created Bleve Indexer")
+ })
case "db":
issueIndexer := &DBIndexer{}
holder.set(issueIndexer)
- dummyQueue = true
default:
+ holder.cancel()
log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType)
}
- if dummyQueue {
- issueIndexerQueue = &DummyQueue{}
- } else {
- var err error
- switch setting.Indexer.IssueQueueType {
- case setting.LevelQueueType:
- issueIndexerQueue, err = NewLevelQueue(
- holder.get(),
- setting.Indexer.IssueQueueDir,
- setting.Indexer.IssueQueueBatchNumber)
- if err != nil {
- log.Fatal(
- "Unable create level queue for issue queue dir: %s batch number: %d : %v",
- setting.Indexer.IssueQueueDir,
- setting.Indexer.IssueQueueBatchNumber,
- err)
- }
- case setting.ChannelQueueType:
- issueIndexerQueue = NewChannelQueue(holder.get(), setting.Indexer.IssueQueueBatchNumber)
- case setting.RedisQueueType:
- addrs, pass, idx, err := parseConnStr(setting.Indexer.IssueQueueConnStr)
- if err != nil {
- log.Fatal("Unable to parse connection string for RedisQueueType: %s : %v",
- setting.Indexer.IssueQueueConnStr,
- err)
- }
- issueIndexerQueue, err = NewRedisQueue(addrs, pass, idx, holder.get(), setting.Indexer.IssueQueueBatchNumber)
- if err != nil {
- log.Fatal("Unable to create RedisQueue: %s : %v",
- setting.Indexer.IssueQueueConnStr,
- err)
- }
- default:
- log.Fatal("Unsupported indexer queue type: %v",
- setting.Indexer.IssueQueueType)
- }
-
- go func() {
- err = issueIndexerQueue.Run()
- if err != nil {
- log.Error("issueIndexerQueue.Run: %v", err)
- }
- }()
- }
-
- go func() {
- for data := range issueIndexerChannel {
- _ = issueIndexerQueue.Push(data)
- }
- }()
+ // Start processing the queue
+ go graceful.GetManager().RunWithShutdownFns(issueIndexerQueue.Run)
+ // Populate the index
if populate {
if syncReindex {
- populateIssueIndexer()
+ graceful.GetManager().RunWithShutdownContext(populateIssueIndexer)
} else {
- go populateIssueIndexer()
+ go graceful.GetManager().RunWithShutdownContext(populateIssueIndexer)
}
}
waitChannel <- time.Since(start)
+ close(waitChannel)
}()
+
if syncReindex {
- <-waitChannel
+ select {
+ case <-waitChannel:
+ case <-graceful.GetManager().IsShutdown():
+ }
} else if setting.Indexer.StartupTimeout > 0 {
go func() {
timeout := setting.Indexer.StartupTimeout
@@ -178,7 +199,12 @@ func InitIssueIndexer(syncReindex bool) {
select {
case duration := <-waitChannel:
log.Info("Issue Indexer Initialization took %v", duration)
+ case <-graceful.GetManager().IsShutdown():
+ log.Warn("Shutdown occurred before issue index initialisation was complete")
case <-time.After(timeout):
+ if shutdownable, ok := issueIndexerQueue.(queue.Shutdownable); ok {
+ shutdownable.Terminate()
+ }
log.Fatal("Issue Indexer Initialization timed-out after: %v", timeout)
}
}()
@@ -186,8 +212,14 @@ func InitIssueIndexer(syncReindex bool) {
}
// populateIssueIndexer populate the issue indexer with issue data
-func populateIssueIndexer() {
+func populateIssueIndexer(ctx context.Context) {
for page := 1; ; page++ {
+ select {
+ case <-ctx.Done():
+ log.Warn("Issue Indexer population shutdown before completion")
+ return
+ default:
+ }
repos, _, err := models.SearchRepositoryByName(&models.SearchRepoOptions{
Page: page,
PageSize: models.RepositoryListDefaultPageSize,
@@ -200,10 +232,17 @@ func populateIssueIndexer() {
continue
}
if len(repos) == 0 {
+ log.Debug("Issue Indexer population complete")
return
}
for _, repo := range repos {
+ select {
+ case <-ctx.Done():
+ log.Info("Issue Indexer population shutdown before completion")
+ return
+ default:
+ }
UpdateRepoIndexer(repo)
}
}
@@ -237,13 +276,17 @@ func UpdateIssueIndexer(issue *models.Issue) {
comments = append(comments, comment.Content)
}
}
- issueIndexerChannel <- &IndexerData{
+ indexerData := &IndexerData{
ID: issue.ID,
RepoID: issue.RepoID,
Title: issue.Title,
Content: issue.Content,
Comments: comments,
}
+ log.Debug("Adding to channel: %v", indexerData)
+ if err := issueIndexerQueue.Push(indexerData); err != nil {
+ log.Error("Unable to push to issue indexer: %v: Error: %v", indexerData, err)
+ }
}
// DeleteRepoIssueIndexer deletes repo's all issues indexes
@@ -258,17 +301,25 @@ func DeleteRepoIssueIndexer(repo *models.Repository) {
if len(ids) == 0 {
return
}
-
- issueIndexerChannel <- &IndexerData{
+ indexerData := &IndexerData{
IDs: ids,
IsDelete: true,
}
+ if err := issueIndexerQueue.Push(indexerData); err != nil {
+ log.Error("Unable to push to issue indexer: %v: Error: %v", indexerData, err)
+ }
}
// SearchIssuesByKeyword search issue ids by keywords and repo id
func SearchIssuesByKeyword(repoIDs []int64, keyword string) ([]int64, error) {
var issueIDs []int64
- res, err := holder.get().Search(keyword, repoIDs, 1000, 0)
+ indexer := holder.get()
+
+ if indexer == nil {
+ log.Error("SearchIssuesByKeyword(): unable to get indexer!")
+ return nil, fmt.Errorf("unable to get issue indexer")
+ }
+ res, err := indexer.Search(keyword, repoIDs, 1000, 0)
if err != nil {
return nil, err
}
diff --git a/modules/indexer/issues/indexer_test.go b/modules/indexer/issues/indexer_test.go
index ca7ba29703..4028a6c8b5 100644
--- a/modules/indexer/issues/indexer_test.go
+++ b/modules/indexer/issues/indexer_test.go
@@ -15,6 +15,8 @@ import (
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/setting"
+ "gopkg.in/ini.v1"
+
"github.com/stretchr/testify/assert"
)
@@ -24,6 +26,7 @@ func TestMain(m *testing.M) {
func TestBleveSearchIssues(t *testing.T) {
assert.NoError(t, models.PrepareTestDatabase())
+ setting.Cfg = ini.Empty()
tmpIndexerDir, err := ioutil.TempDir("", "issues-indexer")
if err != nil {
@@ -41,6 +44,7 @@ func TestBleveSearchIssues(t *testing.T) {
}()
setting.Indexer.IssueType = "bleve"
+ setting.NewQueueService()
InitIssueIndexer(true)
defer func() {
indexer := holder.get()
diff --git a/modules/indexer/issues/queue.go b/modules/indexer/issues/queue.go
deleted file mode 100644
index f93e5c47a4..0000000000
--- a/modules/indexer/issues/queue.go
+++ /dev/null
@@ -1,25 +0,0 @@
-// Copyright 2018 The Gitea Authors. All rights reserved.
-// Use of this source code is governed by a MIT-style
-// license that can be found in the LICENSE file.
-
-package issues
-
-// Queue defines an interface to save an issue indexer queue
-type Queue interface {
- Run() error
- Push(*IndexerData) error
-}
-
-// DummyQueue represents an empty queue
-type DummyQueue struct {
-}
-
-// Run starts to run the queue
-func (b *DummyQueue) Run() error {
- return nil
-}
-
-// Push pushes data to indexer
-func (b *DummyQueue) Push(*IndexerData) error {
- return nil
-}
diff --git a/modules/indexer/issues/queue_channel.go b/modules/indexer/issues/queue_channel.go
deleted file mode 100644
index b6458d3eb5..0000000000
--- a/modules/indexer/issues/queue_channel.go
+++ /dev/null
@@ -1,62 +0,0 @@
-// Copyright 2018 The Gitea Authors. All rights reserved.
-// Use of this source code is governed by a MIT-style
-// license that can be found in the LICENSE file.
-
-package issues
-
-import (
- "time"
-
- "code.gitea.io/gitea/modules/setting"
-)
-
-// ChannelQueue implements
-type ChannelQueue struct {
- queue chan *IndexerData
- indexer Indexer
- batchNumber int
-}
-
-// NewChannelQueue create a memory channel queue
-func NewChannelQueue(indexer Indexer, batchNumber int) *ChannelQueue {
- return &ChannelQueue{
- queue: make(chan *IndexerData, setting.Indexer.UpdateQueueLength),
- indexer: indexer,
- batchNumber: batchNumber,
- }
-}
-
-// Run starts to run the queue
-func (c *ChannelQueue) Run() error {
- var i int
- var datas = make([]*IndexerData, 0, c.batchNumber)
- for {
- select {
- case data := <-c.queue:
- if data.IsDelete {
- _ = c.indexer.Delete(data.IDs...)
- continue
- }
-
- datas = append(datas, data)
- if len(datas) >= c.batchNumber {
- _ = c.indexer.Index(datas)
- // TODO: save the point
- datas = make([]*IndexerData, 0, c.batchNumber)
- }
- case <-time.After(time.Millisecond * 100):
- i++
- if i >= 3 && len(datas) > 0 {
- _ = c.indexer.Index(datas)
- // TODO: save the point
- datas = make([]*IndexerData, 0, c.batchNumber)
- }
- }
- }
-}
-
-// Push will push the indexer data to queue
-func (c *ChannelQueue) Push(data *IndexerData) error {
- c.queue <- data
- return nil
-}
diff --git a/modules/indexer/issues/queue_disk.go b/modules/indexer/issues/queue_disk.go
deleted file mode 100644
index d6187f2acb..0000000000
--- a/modules/indexer/issues/queue_disk.go
+++ /dev/null
@@ -1,104 +0,0 @@
-// Copyright 2019 The Gitea Authors. All rights reserved.
-// Use of this source code is governed by a MIT-style
-// license that can be found in the LICENSE file.
-
-package issues
-
-import (
- "encoding/json"
- "time"
-
- "code.gitea.io/gitea/modules/log"
-
- "gitea.com/lunny/levelqueue"
-)
-
-var (
- _ Queue = &LevelQueue{}
-)
-
-// LevelQueue implements a disk library queue
-type LevelQueue struct {
- indexer Indexer
- queue *levelqueue.Queue
- batchNumber int
-}
-
-// NewLevelQueue creates a ledis local queue
-func NewLevelQueue(indexer Indexer, dataDir string, batchNumber int) (*LevelQueue, error) {
- queue, err := levelqueue.Open(dataDir)
- if err != nil {
- return nil, err
- }
-
- return &LevelQueue{
- indexer: indexer,
- queue: queue,
- batchNumber: batchNumber,
- }, nil
-}
-
-// Run starts to run the queue
-func (l *LevelQueue) Run() error {
- var i int
- var datas = make([]*IndexerData, 0, l.batchNumber)
- for {
- i++
- if len(datas) > l.batchNumber || (len(datas) > 0 && i > 3) {
- _ = l.indexer.Index(datas)
- datas = make([]*IndexerData, 0, l.batchNumber)
- i = 0
- continue
- }
-
- bs, err := l.queue.RPop()
- if err != nil {
- if err != levelqueue.ErrNotFound {
- log.Error("RPop: %v", err)
- }
- time.Sleep(time.Millisecond * 100)
- continue
- }
-
- if len(bs) == 0 {
- time.Sleep(time.Millisecond * 100)
- continue
- }
-
- var data IndexerData
- err = json.Unmarshal(bs, &data)
- if err != nil {
- log.Error("Unmarshal: %v", err)
- time.Sleep(time.Millisecond * 100)
- continue
- }
-
- log.Trace("LevelQueue: task found: %#v", data)
-
- if data.IsDelete {
- if data.ID > 0 {
- if err = l.indexer.Delete(data.ID); err != nil {
- log.Error("indexer.Delete: %v", err)
- }
- } else if len(data.IDs) > 0 {
- if err = l.indexer.Delete(data.IDs...); err != nil {
- log.Error("indexer.Delete: %v", err)
- }
- }
- time.Sleep(time.Millisecond * 10)
- continue
- }
-
- datas = append(datas, &data)
- time.Sleep(time.Millisecond * 10)
- }
-}
-
-// Push will push the indexer data to queue
-func (l *LevelQueue) Push(data *IndexerData) error {
- bs, err := json.Marshal(data)
- if err != nil {
- return err
- }
- return l.queue.LPush(bs)
-}
diff --git a/modules/indexer/issues/queue_redis.go b/modules/indexer/issues/queue_redis.go
deleted file mode 100644
index 0344d3c87a..0000000000
--- a/modules/indexer/issues/queue_redis.go
+++ /dev/null
@@ -1,146 +0,0 @@
-// Copyright 2019 The Gitea Authors. All rights reserved.
-// Use of this source code is governed by a MIT-style
-// license that can be found in the LICENSE file.
-
-package issues
-
-import (
- "encoding/json"
- "errors"
- "strconv"
- "strings"
- "time"
-
- "code.gitea.io/gitea/modules/log"
-
- "github.com/go-redis/redis"
-)
-
-var (
- _ Queue = &RedisQueue{}
-)
-
-type redisClient interface {
- RPush(key string, args ...interface{}) *redis.IntCmd
- LPop(key string) *redis.StringCmd
- Ping() *redis.StatusCmd
-}
-
-// RedisQueue redis queue
-type RedisQueue struct {
- client redisClient
- queueName string
- indexer Indexer
- batchNumber int
-}
-
-func parseConnStr(connStr string) (addrs, password string, dbIdx int, err error) {
- fields := strings.Fields(connStr)
- for _, f := range fields {
- items := strings.SplitN(f, "=", 2)
- if len(items) < 2 {
- continue
- }
- switch strings.ToLower(items[0]) {
- case "addrs":
- addrs = items[1]
- case "password":
- password = items[1]
- case "db":
- dbIdx, err = strconv.Atoi(items[1])
- if err != nil {
- return
- }
- }
- }
- return
-}
-
-// NewRedisQueue creates single redis or cluster redis queue
-func NewRedisQueue(addrs string, password string, dbIdx int, indexer Indexer, batchNumber int) (*RedisQueue, error) {
- dbs := strings.Split(addrs, ",")
- var queue = RedisQueue{
- queueName: "issue_indexer_queue",
- indexer: indexer,
- batchNumber: batchNumber,
- }
- if len(dbs) == 0 {
- return nil, errors.New("no redis host found")
- } else if len(dbs) == 1 {
- queue.client = redis.NewClient(&redis.Options{
- Addr: strings.TrimSpace(dbs[0]), // use default Addr
- Password: password, // no password set
- DB: dbIdx, // use default DB
- })
- } else {
- queue.client = redis.NewClusterClient(&redis.ClusterOptions{
- Addrs: dbs,
- })
- }
- if err := queue.client.Ping().Err(); err != nil {
- return nil, err
- }
- return &queue, nil
-}
-
-// Run runs the redis queue
-func (r *RedisQueue) Run() error {
- var i int
- var datas = make([]*IndexerData, 0, r.batchNumber)
- for {
- bs, err := r.client.LPop(r.queueName).Bytes()
- if err != nil && err != redis.Nil {
- log.Error("LPop faile: %v", err)
- time.Sleep(time.Millisecond * 100)
- continue
- }
-
- i++
- if len(datas) > r.batchNumber || (len(datas) > 0 && i > 3) {
- _ = r.indexer.Index(datas)
- datas = make([]*IndexerData, 0, r.batchNumber)
- i = 0
- }
-
- if len(bs) == 0 {
- time.Sleep(time.Millisecond * 100)
- continue
- }
-
- var data IndexerData
- err = json.Unmarshal(bs, &data)
- if err != nil {
- log.Error("Unmarshal: %v", err)
- time.Sleep(time.Millisecond * 100)
- continue
- }
-
- log.Trace("RedisQueue: task found: %#v", data)
-
- if data.IsDelete {
- if data.ID > 0 {
- if err = r.indexer.Delete(data.ID); err != nil {
- log.Error("indexer.Delete: %v", err)
- }
- } else if len(data.IDs) > 0 {
- if err = r.indexer.Delete(data.IDs...); err != nil {
- log.Error("indexer.Delete: %v", err)
- }
- }
- time.Sleep(time.Millisecond * 100)
- continue
- }
-
- datas = append(datas, &data)
- time.Sleep(time.Millisecond * 100)
- }
-}
-
-// Push implements Queue
-func (r *RedisQueue) Push(data *IndexerData) error {
- bs, err := json.Marshal(data)
- if err != nil {
- return err
- }
- return r.client.RPush(r.queueName, bs).Err()
-}
diff --git a/modules/queue/manager.go b/modules/queue/manager.go
new file mode 100644
index 0000000000..88b2644848
--- /dev/null
+++ b/modules/queue/manager.go
@@ -0,0 +1,270 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "reflect"
+ "sort"
+ "sync"
+ "time"
+
+ "code.gitea.io/gitea/modules/log"
+)
+
+var manager *Manager
+
+// Manager is a queue manager
+type Manager struct {
+ mutex sync.Mutex
+
+ counter int64
+ Queues map[int64]*ManagedQueue
+}
+
+// ManagedQueue represents a working queue inheriting from Gitea.
+type ManagedQueue struct {
+ mutex sync.Mutex
+ QID int64
+ Queue Queue
+ Type Type
+ Name string
+ Configuration interface{}
+ ExemplarType string
+ Pool ManagedPool
+ counter int64
+ PoolWorkers map[int64]*PoolWorkers
+}
+
+// ManagedPool is a simple interface to get certain details from a worker pool
+type ManagedPool interface {
+ AddWorkers(number int, timeout time.Duration) context.CancelFunc
+ NumberOfWorkers() int
+ MaxNumberOfWorkers() int
+ SetMaxNumberOfWorkers(int)
+ BoostTimeout() time.Duration
+ BlockTimeout() time.Duration
+ BoostWorkers() int
+ SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
+}
+
+// ManagedQueueList implements the sort.Interface
+type ManagedQueueList []*ManagedQueue
+
+// PoolWorkers represents a working queue inheriting from Gitea.
+type PoolWorkers struct {
+ PID int64
+ Workers int
+ Start time.Time
+ Timeout time.Time
+ HasTimeout bool
+ Cancel context.CancelFunc
+}
+
+// PoolWorkersList implements the sort.Interface
+type PoolWorkersList []*PoolWorkers
+
+func init() {
+ _ = GetManager()
+}
+
+// GetManager returns a Manager and initializes one as singleton if there's none yet
+func GetManager() *Manager {
+ if manager == nil {
+ manager = &Manager{
+ Queues: make(map[int64]*ManagedQueue),
+ }
+ }
+ return manager
+}
+
+// Add adds a queue to this manager
+func (m *Manager) Add(queue Queue,
+ t Type,
+ configuration,
+ exemplar interface{},
+ pool ManagedPool) int64 {
+
+ cfg, _ := json.Marshal(configuration)
+ mq := &ManagedQueue{
+ Queue: queue,
+ Type: t,
+ Configuration: string(cfg),
+ ExemplarType: reflect.TypeOf(exemplar).String(),
+ PoolWorkers: make(map[int64]*PoolWorkers),
+ Pool: pool,
+ }
+ m.mutex.Lock()
+ m.counter++
+ mq.QID = m.counter
+ mq.Name = fmt.Sprintf("queue-%d", mq.QID)
+ if named, ok := queue.(Named); ok {
+ mq.Name = named.Name()
+ }
+ m.Queues[mq.QID] = mq
+ m.mutex.Unlock()
+ log.Trace("Queue Manager registered: %s (QID: %d)", mq.Name, mq.QID)
+ return mq.QID
+}
+
+// Remove a queue from the Manager
+func (m *Manager) Remove(qid int64) {
+ m.mutex.Lock()
+ delete(m.Queues, qid)
+ m.mutex.Unlock()
+ log.Trace("Queue Manager removed: QID: %d", qid)
+
+}
+
+// GetManagedQueue by qid
+func (m *Manager) GetManagedQueue(qid int64) *ManagedQueue {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+ return m.Queues[qid]
+}
+
+// ManagedQueues returns the managed queues
+func (m *Manager) ManagedQueues() []*ManagedQueue {
+ m.mutex.Lock()
+ mqs := make([]*ManagedQueue, 0, len(m.Queues))
+ for _, mq := range m.Queues {
+ mqs = append(mqs, mq)
+ }
+ m.mutex.Unlock()
+ sort.Sort(ManagedQueueList(mqs))
+ return mqs
+}
+
+// Workers returns the poolworkers
+func (q *ManagedQueue) Workers() []*PoolWorkers {
+ q.mutex.Lock()
+ workers := make([]*PoolWorkers, 0, len(q.PoolWorkers))
+ for _, worker := range q.PoolWorkers {
+ workers = append(workers, worker)
+ }
+ q.mutex.Unlock()
+ sort.Sort(PoolWorkersList(workers))
+ return workers
+}
+
+// RegisterWorkers registers workers to this queue
+func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, cancel context.CancelFunc) int64 {
+ q.mutex.Lock()
+ defer q.mutex.Unlock()
+ q.counter++
+ q.PoolWorkers[q.counter] = &PoolWorkers{
+ PID: q.counter,
+ Workers: number,
+ Start: start,
+ Timeout: timeout,
+ HasTimeout: hasTimeout,
+ Cancel: cancel,
+ }
+ return q.counter
+}
+
+// CancelWorkers cancels pooled workers with pid
+func (q *ManagedQueue) CancelWorkers(pid int64) {
+ q.mutex.Lock()
+ pw, ok := q.PoolWorkers[pid]
+ q.mutex.Unlock()
+ if !ok {
+ return
+ }
+ pw.Cancel()
+}
+
+// RemoveWorkers deletes pooled workers with pid
+func (q *ManagedQueue) RemoveWorkers(pid int64) {
+ q.mutex.Lock()
+ pw, ok := q.PoolWorkers[pid]
+ delete(q.PoolWorkers, pid)
+ q.mutex.Unlock()
+ if ok && pw.Cancel != nil {
+ pw.Cancel()
+ }
+}
+
+// AddWorkers adds workers to the queue if it has registered an add worker function
+func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.CancelFunc {
+ if q.Pool != nil {
+ // the cancel will be added to the pool workers description above
+ return q.Pool.AddWorkers(number, timeout)
+ }
+ return nil
+}
+
+// NumberOfWorkers returns the number of workers in the queue
+func (q *ManagedQueue) NumberOfWorkers() int {
+ if q.Pool != nil {
+ return q.Pool.NumberOfWorkers()
+ }
+ return -1
+}
+
+// MaxNumberOfWorkers returns the maximum number of workers for the pool
+func (q *ManagedQueue) MaxNumberOfWorkers() int {
+ if q.Pool != nil {
+ return q.Pool.MaxNumberOfWorkers()
+ }
+ return 0
+}
+
+// BoostWorkers returns the number of workers for a boost
+func (q *ManagedQueue) BoostWorkers() int {
+ if q.Pool != nil {
+ return q.Pool.BoostWorkers()
+ }
+ return -1
+}
+
+// BoostTimeout returns the timeout of the next boost
+func (q *ManagedQueue) BoostTimeout() time.Duration {
+ if q.Pool != nil {
+ return q.Pool.BoostTimeout()
+ }
+ return 0
+}
+
+// BlockTimeout returns the timeout til the next boost
+func (q *ManagedQueue) BlockTimeout() time.Duration {
+ if q.Pool != nil {
+ return q.Pool.BlockTimeout()
+ }
+ return 0
+}
+
+// SetSettings sets the setable boost values
+func (q *ManagedQueue) SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) {
+ if q.Pool != nil {
+ q.Pool.SetSettings(maxNumberOfWorkers, boostWorkers, timeout)
+ }
+}
+
+func (l ManagedQueueList) Len() int {
+ return len(l)
+}
+
+func (l ManagedQueueList) Less(i, j int) bool {
+ return l[i].Name < l[j].Name
+}
+
+func (l ManagedQueueList) Swap(i, j int) {
+ l[i], l[j] = l[j], l[i]
+}
+
+func (l PoolWorkersList) Len() int {
+ return len(l)
+}
+
+func (l PoolWorkersList) Less(i, j int) bool {
+ return l[i].Start.Before(l[j].Start)
+}
+
+func (l PoolWorkersList) Swap(i, j int) {
+ l[i], l[j] = l[j], l[i]
+}
diff --git a/modules/queue/queue.go b/modules/queue/queue.go
new file mode 100644
index 0000000000..d458a7d506
--- /dev/null
+++ b/modules/queue/queue.go
@@ -0,0 +1,133 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "reflect"
+)
+
+// ErrInvalidConfiguration is called when there is invalid configuration for a queue
+type ErrInvalidConfiguration struct {
+ cfg interface{}
+ err error
+}
+
+func (err ErrInvalidConfiguration) Error() string {
+ if err.err != nil {
+ return fmt.Sprintf("Invalid Configuration Argument: %v: Error: %v", err.cfg, err.err)
+ }
+ return fmt.Sprintf("Invalid Configuration Argument: %v", err.cfg)
+}
+
+// IsErrInvalidConfiguration checks if an error is an ErrInvalidConfiguration
+func IsErrInvalidConfiguration(err error) bool {
+ _, ok := err.(ErrInvalidConfiguration)
+ return ok
+}
+
+// Type is a type of Queue
+type Type string
+
+// Data defines an type of queuable data
+type Data interface{}
+
+// HandlerFunc is a function that takes a variable amount of data and processes it
+type HandlerFunc func(...Data)
+
+// NewQueueFunc is a function that creates a queue
+type NewQueueFunc func(handler HandlerFunc, config interface{}, exemplar interface{}) (Queue, error)
+
+// Shutdownable represents a queue that can be shutdown
+type Shutdownable interface {
+ Shutdown()
+ Terminate()
+}
+
+// Named represents a queue with a name
+type Named interface {
+ Name() string
+}
+
+// Queue defines an interface to save an issue indexer queue
+type Queue interface {
+ Run(atShutdown, atTerminate func(context.Context, func()))
+ Push(Data) error
+}
+
+// DummyQueueType is the type for the dummy queue
+const DummyQueueType Type = "dummy"
+
+// NewDummyQueue creates a new DummyQueue
+func NewDummyQueue(handler HandlerFunc, opts, exemplar interface{}) (Queue, error) {
+ return &DummyQueue{}, nil
+}
+
+// DummyQueue represents an empty queue
+type DummyQueue struct {
+}
+
+// Run starts to run the queue
+func (b *DummyQueue) Run(_, _ func(context.Context, func())) {}
+
+// Push pushes data to the queue
+func (b *DummyQueue) Push(Data) error {
+ return nil
+}
+
+func toConfig(exemplar, cfg interface{}) (interface{}, error) {
+ if reflect.TypeOf(cfg).AssignableTo(reflect.TypeOf(exemplar)) {
+ return cfg, nil
+ }
+
+ configBytes, ok := cfg.([]byte)
+ if !ok {
+ configStr, ok := cfg.(string)
+ if !ok {
+ return nil, ErrInvalidConfiguration{cfg: cfg}
+ }
+ configBytes = []byte(configStr)
+ }
+ newVal := reflect.New(reflect.TypeOf(exemplar))
+ if err := json.Unmarshal(configBytes, newVal.Interface()); err != nil {
+ return nil, ErrInvalidConfiguration{cfg: cfg, err: err}
+ }
+ return newVal.Elem().Interface(), nil
+}
+
+var queuesMap = map[Type]NewQueueFunc{DummyQueueType: NewDummyQueue}
+
+// RegisteredTypes provides the list of requested types of queues
+func RegisteredTypes() []Type {
+ types := make([]Type, len(queuesMap))
+ i := 0
+ for key := range queuesMap {
+ types[i] = key
+ i++
+ }
+ return types
+}
+
+// RegisteredTypesAsString provides the list of requested types of queues
+func RegisteredTypesAsString() []string {
+ types := make([]string, len(queuesMap))
+ i := 0
+ for key := range queuesMap {
+ types[i] = string(key)
+ i++
+ }
+ return types
+}
+
+// NewQueue takes a queue Type and HandlerFunc some options and possibly an exemplar and returns a Queue or an error
+func NewQueue(queueType Type, handlerFunc HandlerFunc, opts, exemplar interface{}) (Queue, error) {
+ newFn, ok := queuesMap[queueType]
+ if !ok {
+ return nil, fmt.Errorf("Unsupported queue type: %v", queueType)
+ }
+ return newFn(handlerFunc, opts, exemplar)
+}
diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go
new file mode 100644
index 0000000000..c8f8a53804
--- /dev/null
+++ b/modules/queue/queue_channel.go
@@ -0,0 +1,106 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+ "context"
+ "fmt"
+ "reflect"
+ "time"
+
+ "code.gitea.io/gitea/modules/log"
+)
+
+// ChannelQueueType is the type for channel queue
+const ChannelQueueType Type = "channel"
+
+// ChannelQueueConfiguration is the configuration for a ChannelQueue
+type ChannelQueueConfiguration struct {
+ QueueLength int
+ BatchLength int
+ Workers int
+ MaxWorkers int
+ BlockTimeout time.Duration
+ BoostTimeout time.Duration
+ BoostWorkers int
+ Name string
+}
+
+// ChannelQueue implements
+type ChannelQueue struct {
+ pool *WorkerPool
+ exemplar interface{}
+ workers int
+ name string
+}
+
+// NewChannelQueue create a memory channel queue
+func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
+ configInterface, err := toConfig(ChannelQueueConfiguration{}, cfg)
+ if err != nil {
+ return nil, err
+ }
+ config := configInterface.(ChannelQueueConfiguration)
+ if config.BatchLength == 0 {
+ config.BatchLength = 1
+ }
+ dataChan := make(chan Data, config.QueueLength)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ queue := &ChannelQueue{
+ pool: &WorkerPool{
+ baseCtx: ctx,
+ cancel: cancel,
+ batchLength: config.BatchLength,
+ handle: handle,
+ dataChan: dataChan,
+ blockTimeout: config.BlockTimeout,
+ boostTimeout: config.BoostTimeout,
+ boostWorkers: config.BoostWorkers,
+ maxNumberOfWorkers: config.MaxWorkers,
+ },
+ exemplar: exemplar,
+ workers: config.Workers,
+ name: config.Name,
+ }
+ queue.pool.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar, queue.pool)
+ return queue, nil
+}
+
+// Run starts to run the queue
+func (c *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
+ atShutdown(context.Background(), func() {
+ log.Warn("ChannelQueue: %s is not shutdownable!", c.name)
+ })
+ atTerminate(context.Background(), func() {
+ log.Warn("ChannelQueue: %s is not terminatable!", c.name)
+ })
+ go func() {
+ _ = c.pool.AddWorkers(c.workers, 0)
+ }()
+}
+
+// Push will push data into the queue
+func (c *ChannelQueue) Push(data Data) error {
+ if c.exemplar != nil {
+ // Assert data is of same type as r.exemplar
+ t := reflect.TypeOf(data)
+ exemplarType := reflect.TypeOf(c.exemplar)
+ if !t.AssignableTo(exemplarType) || data == nil {
+ return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, c.exemplar, c.name)
+ }
+ }
+ c.pool.Push(data)
+ return nil
+}
+
+// Name returns the name of this queue
+func (c *ChannelQueue) Name() string {
+ return c.name
+}
+
+func init() {
+ queuesMap[ChannelQueueType] = NewChannelQueue
+}
diff --git a/modules/queue/queue_channel_test.go b/modules/queue/queue_channel_test.go
new file mode 100644
index 0000000000..fafc1e3303
--- /dev/null
+++ b/modules/queue/queue_channel_test.go
@@ -0,0 +1,91 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestChannelQueue(t *testing.T) {
+ handleChan := make(chan *testData)
+ handle := func(data ...Data) {
+ for _, datum := range data {
+ testDatum := datum.(*testData)
+ handleChan <- testDatum
+ }
+ }
+
+ nilFn := func(_ context.Context, _ func()) {}
+
+ queue, err := NewChannelQueue(handle,
+ ChannelQueueConfiguration{
+ QueueLength: 20,
+ Workers: 1,
+ MaxWorkers: 10,
+ BlockTimeout: 1 * time.Second,
+ BoostTimeout: 5 * time.Minute,
+ BoostWorkers: 5,
+ }, &testData{})
+ assert.NoError(t, err)
+
+ go queue.Run(nilFn, nilFn)
+
+ test1 := testData{"A", 1}
+ go queue.Push(&test1)
+ result1 := <-handleChan
+ assert.Equal(t, test1.TestString, result1.TestString)
+ assert.Equal(t, test1.TestInt, result1.TestInt)
+
+ err = queue.Push(test1)
+ assert.Error(t, err)
+}
+
+func TestChannelQueue_Batch(t *testing.T) {
+ handleChan := make(chan *testData)
+ handle := func(data ...Data) {
+ assert.True(t, len(data) == 2)
+ for _, datum := range data {
+ testDatum := datum.(*testData)
+ handleChan <- testDatum
+ }
+ }
+
+ nilFn := func(_ context.Context, _ func()) {}
+
+ queue, err := NewChannelQueue(handle,
+ ChannelQueueConfiguration{
+ QueueLength: 20,
+ BatchLength: 2,
+ Workers: 1,
+ MaxWorkers: 10,
+ BlockTimeout: 1 * time.Second,
+ BoostTimeout: 5 * time.Minute,
+ BoostWorkers: 5,
+ }, &testData{})
+ assert.NoError(t, err)
+
+ go queue.Run(nilFn, nilFn)
+
+ test1 := testData{"A", 1}
+ test2 := testData{"B", 2}
+
+ queue.Push(&test1)
+ go queue.Push(&test2)
+
+ result1 := <-handleChan
+ assert.Equal(t, test1.TestString, result1.TestString)
+ assert.Equal(t, test1.TestInt, result1.TestInt)
+
+ result2 := <-handleChan
+ assert.Equal(t, test2.TestString, result2.TestString)
+ assert.Equal(t, test2.TestInt, result2.TestInt)
+
+ err = queue.Push(test1)
+ assert.Error(t, err)
+}
diff --git a/modules/queue/queue_disk.go b/modules/queue/queue_disk.go
new file mode 100644
index 0000000000..98e7b24e42
--- /dev/null
+++ b/modules/queue/queue_disk.go
@@ -0,0 +1,213 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "reflect"
+ "sync"
+ "time"
+
+ "code.gitea.io/gitea/modules/log"
+
+ "gitea.com/lunny/levelqueue"
+)
+
+// LevelQueueType is the type for level queue
+const LevelQueueType Type = "level"
+
+// LevelQueueConfiguration is the configuration for a LevelQueue
+type LevelQueueConfiguration struct {
+ DataDir string
+ QueueLength int
+ BatchLength int
+ Workers int
+ MaxWorkers int
+ BlockTimeout time.Duration
+ BoostTimeout time.Duration
+ BoostWorkers int
+ Name string
+}
+
+// LevelQueue implements a disk library queue
+type LevelQueue struct {
+ pool *WorkerPool
+ queue *levelqueue.Queue
+ closed chan struct{}
+ terminated chan struct{}
+ lock sync.Mutex
+ exemplar interface{}
+ workers int
+ name string
+}
+
+// NewLevelQueue creates a ledis local queue
+func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
+ configInterface, err := toConfig(LevelQueueConfiguration{}, cfg)
+ if err != nil {
+ return nil, err
+ }
+ config := configInterface.(LevelQueueConfiguration)
+
+ internal, err := levelqueue.Open(config.DataDir)
+ if err != nil {
+ return nil, err
+ }
+
+ dataChan := make(chan Data, config.QueueLength)
+ ctx, cancel := context.WithCancel(context.Background())
+
+ queue := &LevelQueue{
+ pool: &WorkerPool{
+ baseCtx: ctx,
+ cancel: cancel,
+ batchLength: config.BatchLength,
+ handle: handle,
+ dataChan: dataChan,
+ blockTimeout: config.BlockTimeout,
+ boostTimeout: config.BoostTimeout,
+ boostWorkers: config.BoostWorkers,
+ maxNumberOfWorkers: config.MaxWorkers,
+ },
+ queue: internal,
+ exemplar: exemplar,
+ closed: make(chan struct{}),
+ terminated: make(chan struct{}),
+ workers: config.Workers,
+ name: config.Name,
+ }
+ queue.pool.qid = GetManager().Add(queue, LevelQueueType, config, exemplar, queue.pool)
+ return queue, nil
+}
+
+// Run starts to run the queue
+func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
+ atShutdown(context.Background(), l.Shutdown)
+ atTerminate(context.Background(), l.Terminate)
+
+ go func() {
+ _ = l.pool.AddWorkers(l.workers, 0)
+ }()
+
+ go l.readToChan()
+
+ log.Trace("LevelQueue: %s Waiting til closed", l.name)
+ <-l.closed
+
+ log.Trace("LevelQueue: %s Waiting til done", l.name)
+ l.pool.Wait()
+
+ log.Trace("LevelQueue: %s Waiting til cleaned", l.name)
+ ctx, cancel := context.WithCancel(context.Background())
+ atTerminate(ctx, cancel)
+ l.pool.CleanUp(ctx)
+ cancel()
+ log.Trace("LevelQueue: %s Cleaned", l.name)
+
+}
+
+func (l *LevelQueue) readToChan() {
+ for {
+ select {
+ case <-l.closed:
+ // tell the pool to shutdown.
+ l.pool.cancel()
+ return
+ default:
+ bs, err := l.queue.RPop()
+ if err != nil {
+ if err != levelqueue.ErrNotFound {
+ log.Error("LevelQueue: %s Error on RPop: %v", l.name, err)
+ }
+ time.Sleep(time.Millisecond * 100)
+ continue
+ }
+
+ if len(bs) == 0 {
+ time.Sleep(time.Millisecond * 100)
+ continue
+ }
+
+ var data Data
+ if l.exemplar != nil {
+ t := reflect.TypeOf(l.exemplar)
+ n := reflect.New(t)
+ ne := n.Elem()
+ err = json.Unmarshal(bs, ne.Addr().Interface())
+ data = ne.Interface().(Data)
+ } else {
+ err = json.Unmarshal(bs, &data)
+ }
+ if err != nil {
+ log.Error("LevelQueue: %s Failed to unmarshal with error: %v", l.name, err)
+ time.Sleep(time.Millisecond * 100)
+ continue
+ }
+
+ log.Trace("LevelQueue %s: Task found: %#v", l.name, data)
+ l.pool.Push(data)
+
+ }
+ }
+}
+
+// Push will push the indexer data to queue
+func (l *LevelQueue) Push(data Data) error {
+ if l.exemplar != nil {
+ // Assert data is of same type as r.exemplar
+ value := reflect.ValueOf(data)
+ t := value.Type()
+ exemplarType := reflect.ValueOf(l.exemplar).Type()
+ if !t.AssignableTo(exemplarType) || data == nil {
+ return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, l.exemplar, l.name)
+ }
+ }
+ bs, err := json.Marshal(data)
+ if err != nil {
+ return err
+ }
+ return l.queue.LPush(bs)
+}
+
+// Shutdown this queue and stop processing
+func (l *LevelQueue) Shutdown() {
+ l.lock.Lock()
+ defer l.lock.Unlock()
+ log.Trace("LevelQueue: %s Shutdown", l.name)
+ select {
+ case <-l.closed:
+ default:
+ close(l.closed)
+ }
+}
+
+// Terminate this queue and close the queue
+func (l *LevelQueue) Terminate() {
+ log.Trace("LevelQueue: %s Terminating", l.name)
+ l.Shutdown()
+ l.lock.Lock()
+ select {
+ case <-l.terminated:
+ l.lock.Unlock()
+ default:
+ close(l.terminated)
+ l.lock.Unlock()
+ if err := l.queue.Close(); err != nil && err.Error() != "leveldb: closed" {
+ log.Error("Error whilst closing internal queue in %s: %v", l.name, err)
+ }
+
+ }
+}
+
+// Name returns the name of this queue
+func (l *LevelQueue) Name() string {
+ return l.name
+}
+
+func init() {
+ queuesMap[LevelQueueType] = NewLevelQueue
+}
diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go
new file mode 100644
index 0000000000..895c8ce918
--- /dev/null
+++ b/modules/queue/queue_disk_channel.go
@@ -0,0 +1,193 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+ "context"
+ "time"
+
+ "code.gitea.io/gitea/modules/log"
+)
+
+// PersistableChannelQueueType is the type for persistable queue
+const PersistableChannelQueueType Type = "persistable-channel"
+
+// PersistableChannelQueueConfiguration is the configuration for a PersistableChannelQueue
+type PersistableChannelQueueConfiguration struct {
+ Name string
+ DataDir string
+ BatchLength int
+ QueueLength int
+ Timeout time.Duration
+ MaxAttempts int
+ Workers int
+ MaxWorkers int
+ BlockTimeout time.Duration
+ BoostTimeout time.Duration
+ BoostWorkers int
+}
+
+// PersistableChannelQueue wraps a channel queue and level queue together
+type PersistableChannelQueue struct {
+ *ChannelQueue
+ delayedStarter
+ closed chan struct{}
+}
+
+// NewPersistableChannelQueue creates a wrapped batched channel queue with persistable level queue backend when shutting down
+// This differs from a wrapped queue in that the persistent queue is only used to persist at shutdown/terminate
+func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
+ configInterface, err := toConfig(PersistableChannelQueueConfiguration{}, cfg)
+ if err != nil {
+ return nil, err
+ }
+ config := configInterface.(PersistableChannelQueueConfiguration)
+
+ channelQueue, err := NewChannelQueue(handle, ChannelQueueConfiguration{
+ QueueLength: config.QueueLength,
+ BatchLength: config.BatchLength,
+ Workers: config.Workers,
+ MaxWorkers: config.MaxWorkers,
+ BlockTimeout: config.BlockTimeout,
+ BoostTimeout: config.BoostTimeout,
+ BoostWorkers: config.BoostWorkers,
+ Name: config.Name + "-channel",
+ }, exemplar)
+ if err != nil {
+ return nil, err
+ }
+
+ // the level backend only needs temporary workers to catch up with the previously dropped work
+ levelCfg := LevelQueueConfiguration{
+ DataDir: config.DataDir,
+ QueueLength: config.QueueLength,
+ BatchLength: config.BatchLength,
+ Workers: 1,
+ MaxWorkers: 6,
+ BlockTimeout: 1 * time.Second,
+ BoostTimeout: 5 * time.Minute,
+ BoostWorkers: 5,
+ Name: config.Name + "-level",
+ }
+
+ levelQueue, err := NewLevelQueue(handle, levelCfg, exemplar)
+ if err == nil {
+ queue := &PersistableChannelQueue{
+ ChannelQueue: channelQueue.(*ChannelQueue),
+ delayedStarter: delayedStarter{
+ internal: levelQueue.(*LevelQueue),
+ name: config.Name,
+ },
+ closed: make(chan struct{}),
+ }
+ _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar, nil)
+ return queue, nil
+ }
+ if IsErrInvalidConfiguration(err) {
+ // Retrying ain't gonna make this any better...
+ return nil, ErrInvalidConfiguration{cfg: cfg}
+ }
+
+ queue := &PersistableChannelQueue{
+ ChannelQueue: channelQueue.(*ChannelQueue),
+ delayedStarter: delayedStarter{
+ cfg: levelCfg,
+ underlying: LevelQueueType,
+ timeout: config.Timeout,
+ maxAttempts: config.MaxAttempts,
+ name: config.Name,
+ },
+ closed: make(chan struct{}),
+ }
+ _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar, nil)
+ return queue, nil
+}
+
+// Name returns the name of this queue
+func (p *PersistableChannelQueue) Name() string {
+ return p.delayedStarter.name
+}
+
+// Push will push the indexer data to queue
+func (p *PersistableChannelQueue) Push(data Data) error {
+ select {
+ case <-p.closed:
+ return p.internal.Push(data)
+ default:
+ return p.ChannelQueue.Push(data)
+ }
+}
+
+// Run starts to run the queue
+func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
+ p.lock.Lock()
+ if p.internal == nil {
+ err := p.setInternal(atShutdown, p.ChannelQueue.pool.handle, p.exemplar)
+ p.lock.Unlock()
+ if err != nil {
+ log.Fatal("Unable to create internal queue for %s Error: %v", p.Name(), err)
+ return
+ }
+ } else {
+ p.lock.Unlock()
+ }
+ atShutdown(context.Background(), p.Shutdown)
+ atTerminate(context.Background(), p.Terminate)
+
+ // Just run the level queue - we shut it down later
+ go p.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {})
+
+ go func() {
+ _ = p.ChannelQueue.pool.AddWorkers(p.workers, 0)
+ }()
+
+ log.Trace("PersistableChannelQueue: %s Waiting til closed", p.delayedStarter.name)
+ <-p.closed
+ log.Trace("PersistableChannelQueue: %s Cancelling pools", p.delayedStarter.name)
+ p.ChannelQueue.pool.cancel()
+ p.internal.(*LevelQueue).pool.cancel()
+ log.Trace("PersistableChannelQueue: %s Waiting til done", p.delayedStarter.name)
+ p.ChannelQueue.pool.Wait()
+ p.internal.(*LevelQueue).pool.Wait()
+ // Redirect all remaining data in the chan to the internal channel
+ go func() {
+ log.Trace("PersistableChannelQueue: %s Redirecting remaining data", p.delayedStarter.name)
+ for data := range p.ChannelQueue.pool.dataChan {
+ _ = p.internal.Push(data)
+ }
+ log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", p.delayedStarter.name)
+ }()
+ log.Trace("PersistableChannelQueue: %s Done main loop", p.delayedStarter.name)
+}
+
+// Shutdown processing this queue
+func (p *PersistableChannelQueue) Shutdown() {
+ log.Trace("PersistableChannelQueue: %s Shutdown", p.delayedStarter.name)
+ select {
+ case <-p.closed:
+ default:
+ p.lock.Lock()
+ defer p.lock.Unlock()
+ if p.internal != nil {
+ p.internal.(*LevelQueue).Shutdown()
+ }
+ close(p.closed)
+ }
+}
+
+// Terminate this queue and close the queue
+func (p *PersistableChannelQueue) Terminate() {
+ log.Trace("PersistableChannelQueue: %s Terminating", p.delayedStarter.name)
+ p.Shutdown()
+ p.lock.Lock()
+ defer p.lock.Unlock()
+ if p.internal != nil {
+ p.internal.(*LevelQueue).Terminate()
+ }
+}
+
+func init() {
+ queuesMap[PersistableChannelQueueType] = NewPersistableChannelQueue
+}
diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go
new file mode 100644
index 0000000000..4ef68961c6
--- /dev/null
+++ b/modules/queue/queue_disk_channel_test.go
@@ -0,0 +1,117 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+ "context"
+ "io/ioutil"
+ "os"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestPersistableChannelQueue(t *testing.T) {
+ handleChan := make(chan *testData)
+ handle := func(data ...Data) {
+ assert.True(t, len(data) == 2)
+ for _, datum := range data {
+ testDatum := datum.(*testData)
+ handleChan <- testDatum
+ }
+ }
+
+ queueShutdown := []func(){}
+ queueTerminate := []func(){}
+
+ tmpDir, err := ioutil.TempDir("", "persistable-channel-queue-test-data")
+ assert.NoError(t, err)
+ defer os.RemoveAll(tmpDir)
+
+ queue, err := NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{
+ DataDir: tmpDir,
+ BatchLength: 2,
+ QueueLength: 20,
+ Workers: 1,
+ MaxWorkers: 10,
+ }, &testData{})
+ assert.NoError(t, err)
+
+ go queue.Run(func(_ context.Context, shutdown func()) {
+ queueShutdown = append(queueShutdown, shutdown)
+ }, func(_ context.Context, terminate func()) {
+ queueTerminate = append(queueTerminate, terminate)
+ })
+
+ test1 := testData{"A", 1}
+ test2 := testData{"B", 2}
+
+ err = queue.Push(&test1)
+ assert.NoError(t, err)
+ go func() {
+ err = queue.Push(&test2)
+ assert.NoError(t, err)
+ }()
+
+ result1 := <-handleChan
+ assert.Equal(t, test1.TestString, result1.TestString)
+ assert.Equal(t, test1.TestInt, result1.TestInt)
+
+ result2 := <-handleChan
+ assert.Equal(t, test2.TestString, result2.TestString)
+ assert.Equal(t, test2.TestInt, result2.TestInt)
+
+ err = queue.Push(test1)
+ assert.Error(t, err)
+
+ for _, callback := range queueShutdown {
+ callback()
+ }
+ time.Sleep(200 * time.Millisecond)
+ err = queue.Push(&test1)
+ assert.NoError(t, err)
+ err = queue.Push(&test2)
+ assert.NoError(t, err)
+ select {
+ case <-handleChan:
+ assert.Fail(t, "Handler processing should have stopped")
+ default:
+ }
+ for _, callback := range queueTerminate {
+ callback()
+ }
+
+ // Reopen queue
+ queue, err = NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{
+ DataDir: tmpDir,
+ BatchLength: 2,
+ QueueLength: 20,
+ Workers: 1,
+ MaxWorkers: 10,
+ }, &testData{})
+ assert.NoError(t, err)
+
+ go queue.Run(func(_ context.Context, shutdown func()) {
+ queueShutdown = append(queueShutdown, shutdown)
+ }, func(_ context.Context, terminate func()) {
+ queueTerminate = append(queueTerminate, terminate)
+ })
+
+ result3 := <-handleChan
+ assert.Equal(t, test1.TestString, result3.TestString)
+ assert.Equal(t, test1.TestInt, result3.TestInt)
+
+ result4 := <-handleChan
+ assert.Equal(t, test2.TestString, result4.TestString)
+ assert.Equal(t, test2.TestInt, result4.TestInt)
+ for _, callback := range queueShutdown {
+ callback()
+ }
+ for _, callback := range queueTerminate {
+ callback()
+ }
+
+}
diff --git a/modules/queue/queue_disk_test.go b/modules/queue/queue_disk_test.go
new file mode 100644
index 0000000000..c5959d606f
--- /dev/null
+++ b/modules/queue/queue_disk_test.go
@@ -0,0 +1,126 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+ "context"
+ "io/ioutil"
+ "os"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestLevelQueue(t *testing.T) {
+ handleChan := make(chan *testData)
+ handle := func(data ...Data) {
+ assert.True(t, len(data) == 2)
+ for _, datum := range data {
+ testDatum := datum.(*testData)
+ handleChan <- testDatum
+ }
+ }
+
+ queueShutdown := []func(){}
+ queueTerminate := []func(){}
+
+ tmpDir, err := ioutil.TempDir("", "level-queue-test-data")
+ assert.NoError(t, err)
+ defer os.RemoveAll(tmpDir)
+
+ queue, err := NewLevelQueue(handle, LevelQueueConfiguration{
+ DataDir: tmpDir,
+ BatchLength: 2,
+ Workers: 1,
+ MaxWorkers: 10,
+ QueueLength: 20,
+ BlockTimeout: 1 * time.Second,
+ BoostTimeout: 5 * time.Minute,
+ BoostWorkers: 5,
+ }, &testData{})
+ assert.NoError(t, err)
+
+ go queue.Run(func(_ context.Context, shutdown func()) {
+ queueShutdown = append(queueShutdown, shutdown)
+ }, func(_ context.Context, terminate func()) {
+ queueTerminate = append(queueTerminate, terminate)
+ })
+
+ test1 := testData{"A", 1}
+ test2 := testData{"B", 2}
+
+ err = queue.Push(&test1)
+ assert.NoError(t, err)
+ go func() {
+ err = queue.Push(&test2)
+ assert.NoError(t, err)
+ }()
+
+ result1 := <-handleChan
+ assert.Equal(t, test1.TestString, result1.TestString)
+ assert.Equal(t, test1.TestInt, result1.TestInt)
+
+ result2 := <-handleChan
+ assert.Equal(t, test2.TestString, result2.TestString)
+ assert.Equal(t, test2.TestInt, result2.TestInt)
+
+ err = queue.Push(test1)
+ assert.Error(t, err)
+
+ for _, callback := range queueShutdown {
+ callback()
+ }
+ time.Sleep(200 * time.Millisecond)
+ err = queue.Push(&test1)
+ assert.NoError(t, err)
+ err = queue.Push(&test2)
+ assert.NoError(t, err)
+ select {
+ case <-handleChan:
+ assert.Fail(t, "Handler processing should have stopped")
+ default:
+ }
+ for _, callback := range queueTerminate {
+ callback()
+ }
+
+ // Reopen queue
+ queue, err = NewWrappedQueue(handle,
+ WrappedQueueConfiguration{
+ Underlying: LevelQueueType,
+ Config: LevelQueueConfiguration{
+ DataDir: tmpDir,
+ BatchLength: 2,
+ Workers: 1,
+ MaxWorkers: 10,
+ QueueLength: 20,
+ BlockTimeout: 1 * time.Second,
+ BoostTimeout: 5 * time.Minute,
+ BoostWorkers: 5,
+ },
+ }, &testData{})
+ assert.NoError(t, err)
+
+ go queue.Run(func(_ context.Context, shutdown func()) {
+ queueShutdown = append(queueShutdown, shutdown)
+ }, func(_ context.Context, terminate func()) {
+ queueTerminate = append(queueTerminate, terminate)
+ })
+
+ result3 := <-handleChan
+ assert.Equal(t, test1.TestString, result3.TestString)
+ assert.Equal(t, test1.TestInt, result3.TestInt)
+
+ result4 := <-handleChan
+ assert.Equal(t, test2.TestString, result4.TestString)
+ assert.Equal(t, test2.TestInt, result4.TestInt)
+ for _, callback := range queueShutdown {
+ callback()
+ }
+ for _, callback := range queueTerminate {
+ callback()
+ }
+}
diff --git a/modules/queue/queue_redis.go b/modules/queue/queue_redis.go
new file mode 100644
index 0000000000..14e68937a5
--- /dev/null
+++ b/modules/queue/queue_redis.go
@@ -0,0 +1,234 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "reflect"
+ "strings"
+ "sync"
+ "time"
+
+ "code.gitea.io/gitea/modules/log"
+
+ "github.com/go-redis/redis"
+)
+
+// RedisQueueType is the type for redis queue
+const RedisQueueType Type = "redis"
+
+type redisClient interface {
+ RPush(key string, args ...interface{}) *redis.IntCmd
+ LPop(key string) *redis.StringCmd
+ Ping() *redis.StatusCmd
+ Close() error
+}
+
+// RedisQueue redis queue
+type RedisQueue struct {
+ pool *WorkerPool
+ client redisClient
+ queueName string
+ closed chan struct{}
+ terminated chan struct{}
+ exemplar interface{}
+ workers int
+ name string
+ lock sync.Mutex
+}
+
+// RedisQueueConfiguration is the configuration for the redis queue
+type RedisQueueConfiguration struct {
+ Network string
+ Addresses string
+ Password string
+ DBIndex int
+ BatchLength int
+ QueueLength int
+ QueueName string
+ Workers int
+ MaxWorkers int
+ BlockTimeout time.Duration
+ BoostTimeout time.Duration
+ BoostWorkers int
+ Name string
+}
+
+// NewRedisQueue creates single redis or cluster redis queue
+func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
+ configInterface, err := toConfig(RedisQueueConfiguration{}, cfg)
+ if err != nil {
+ return nil, err
+ }
+ config := configInterface.(RedisQueueConfiguration)
+
+ dbs := strings.Split(config.Addresses, ",")
+
+ dataChan := make(chan Data, config.QueueLength)
+ ctx, cancel := context.WithCancel(context.Background())
+
+ var queue = &RedisQueue{
+ pool: &WorkerPool{
+ baseCtx: ctx,
+ cancel: cancel,
+ batchLength: config.BatchLength,
+ handle: handle,
+ dataChan: dataChan,
+ blockTimeout: config.BlockTimeout,
+ boostTimeout: config.BoostTimeout,
+ boostWorkers: config.BoostWorkers,
+ maxNumberOfWorkers: config.MaxWorkers,
+ },
+ queueName: config.QueueName,
+ exemplar: exemplar,
+ closed: make(chan struct{}),
+ workers: config.Workers,
+ name: config.Name,
+ }
+ if len(dbs) == 0 {
+ return nil, errors.New("no redis host specified")
+ } else if len(dbs) == 1 {
+ queue.client = redis.NewClient(&redis.Options{
+ Network: config.Network,
+ Addr: strings.TrimSpace(dbs[0]), // use default Addr
+ Password: config.Password, // no password set
+ DB: config.DBIndex, // use default DB
+ })
+ } else {
+ queue.client = redis.NewClusterClient(&redis.ClusterOptions{
+ Addrs: dbs,
+ })
+ }
+ if err := queue.client.Ping().Err(); err != nil {
+ return nil, err
+ }
+ queue.pool.qid = GetManager().Add(queue, RedisQueueType, config, exemplar, queue.pool)
+
+ return queue, nil
+}
+
+// Run runs the redis queue
+func (r *RedisQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
+ atShutdown(context.Background(), r.Shutdown)
+ atTerminate(context.Background(), r.Terminate)
+
+ go func() {
+ _ = r.pool.AddWorkers(r.workers, 0)
+ }()
+
+ go r.readToChan()
+
+ log.Trace("RedisQueue: %s Waiting til closed", r.name)
+ <-r.closed
+ log.Trace("RedisQueue: %s Waiting til done", r.name)
+ r.pool.Wait()
+
+ log.Trace("RedisQueue: %s Waiting til cleaned", r.name)
+ ctx, cancel := context.WithCancel(context.Background())
+ atTerminate(ctx, cancel)
+ r.pool.CleanUp(ctx)
+ cancel()
+}
+
+func (r *RedisQueue) readToChan() {
+ for {
+ select {
+ case <-r.closed:
+ // tell the pool to shutdown
+ r.pool.cancel()
+ return
+ default:
+ bs, err := r.client.LPop(r.queueName).Bytes()
+ if err != nil && err != redis.Nil {
+ log.Error("RedisQueue: %s Error on LPop: %v", r.name, err)
+ time.Sleep(time.Millisecond * 100)
+ continue
+ }
+
+ if len(bs) == 0 {
+ time.Sleep(time.Millisecond * 100)
+ continue
+ }
+
+ var data Data
+ if r.exemplar != nil {
+ t := reflect.TypeOf(r.exemplar)
+ n := reflect.New(t)
+ ne := n.Elem()
+ err = json.Unmarshal(bs, ne.Addr().Interface())
+ data = ne.Interface().(Data)
+ } else {
+ err = json.Unmarshal(bs, &data)
+ }
+ if err != nil {
+ log.Error("RedisQueue: %s Error on Unmarshal: %v", r.name, err)
+ time.Sleep(time.Millisecond * 100)
+ continue
+ }
+
+ log.Trace("RedisQueue: %s Task found: %#v", r.name, data)
+ r.pool.Push(data)
+ }
+ }
+}
+
+// Push implements Queue
+func (r *RedisQueue) Push(data Data) error {
+ if r.exemplar != nil {
+ // Assert data is of same type as r.exemplar
+ value := reflect.ValueOf(data)
+ t := value.Type()
+ exemplarType := reflect.ValueOf(r.exemplar).Type()
+ if !t.AssignableTo(exemplarType) || data == nil {
+ return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, r.exemplar, r.name)
+ }
+ }
+ bs, err := json.Marshal(data)
+ if err != nil {
+ return err
+ }
+ return r.client.RPush(r.queueName, bs).Err()
+}
+
+// Shutdown processing from this queue
+func (r *RedisQueue) Shutdown() {
+ log.Trace("Shutdown: %s", r.name)
+ r.lock.Lock()
+ select {
+ case <-r.closed:
+ default:
+ close(r.closed)
+ }
+ r.lock.Unlock()
+}
+
+// Terminate this queue and close the queue
+func (r *RedisQueue) Terminate() {
+ log.Trace("Terminating: %s", r.name)
+ r.Shutdown()
+ r.lock.Lock()
+ select {
+ case <-r.terminated:
+ r.lock.Unlock()
+ default:
+ close(r.terminated)
+ r.lock.Unlock()
+ if err := r.client.Close(); err != nil {
+ log.Error("Error whilst closing internal redis client in %s: %v", r.name, err)
+ }
+ }
+}
+
+// Name returns the name of this queue
+func (r *RedisQueue) Name() string {
+ return r.name
+}
+
+func init() {
+ queuesMap[RedisQueueType] = NewRedisQueue
+}
diff --git a/modules/queue/queue_test.go b/modules/queue/queue_test.go
new file mode 100644
index 0000000000..3608f68d3d
--- /dev/null
+++ b/modules/queue/queue_test.go
@@ -0,0 +1,43 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+ "encoding/json"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+type testData struct {
+ TestString string
+ TestInt int
+}
+
+func TestToConfig(t *testing.T) {
+ cfg := testData{
+ TestString: "Config",
+ TestInt: 10,
+ }
+ exemplar := testData{}
+
+ cfg2I, err := toConfig(exemplar, cfg)
+ assert.NoError(t, err)
+ cfg2, ok := (cfg2I).(testData)
+ assert.True(t, ok)
+ assert.NotEqual(t, cfg2, exemplar)
+ assert.Equal(t, &cfg, &cfg2)
+
+ cfgString, err := json.Marshal(cfg)
+ assert.NoError(t, err)
+
+ cfg3I, err := toConfig(exemplar, cfgString)
+ assert.NoError(t, err)
+ cfg3, ok := (cfg3I).(testData)
+ assert.True(t, ok)
+ assert.Equal(t, cfg.TestString, cfg3.TestString)
+ assert.Equal(t, cfg.TestInt, cfg3.TestInt)
+ assert.NotEqual(t, cfg3, exemplar)
+}
diff --git a/modules/queue/queue_wrapped.go b/modules/queue/queue_wrapped.go
new file mode 100644
index 0000000000..d0b93b54d0
--- /dev/null
+++ b/modules/queue/queue_wrapped.go
@@ -0,0 +1,206 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+ "context"
+ "fmt"
+ "reflect"
+ "sync"
+ "time"
+
+ "code.gitea.io/gitea/modules/log"
+)
+
+// WrappedQueueType is the type for a wrapped delayed starting queue
+const WrappedQueueType Type = "wrapped"
+
+// WrappedQueueConfiguration is the configuration for a WrappedQueue
+type WrappedQueueConfiguration struct {
+ Underlying Type
+ Timeout time.Duration
+ MaxAttempts int
+ Config interface{}
+ QueueLength int
+ Name string
+}
+
+type delayedStarter struct {
+ lock sync.Mutex
+ internal Queue
+ underlying Type
+ cfg interface{}
+ timeout time.Duration
+ maxAttempts int
+ name string
+}
+
+// setInternal must be called with the lock locked.
+func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), handle HandlerFunc, exemplar interface{}) error {
+ var ctx context.Context
+ var cancel context.CancelFunc
+ if q.timeout > 0 {
+ ctx, cancel = context.WithTimeout(context.Background(), q.timeout)
+ } else {
+ ctx, cancel = context.WithCancel(context.Background())
+ }
+
+ defer cancel()
+ // Ensure we also stop at shutdown
+ atShutdown(ctx, func() {
+ cancel()
+ })
+
+ i := 1
+ for q.internal == nil {
+ select {
+ case <-ctx.Done():
+ return fmt.Errorf("Timedout creating queue %v with cfg %v in %s", q.underlying, q.cfg, q.name)
+ default:
+ queue, err := NewQueue(q.underlying, handle, q.cfg, exemplar)
+ if err == nil {
+ q.internal = queue
+ q.lock.Unlock()
+ break
+ }
+ if err.Error() != "resource temporarily unavailable" {
+ log.Warn("[Attempt: %d] Failed to create queue: %v for %s cfg: %v error: %v", i, q.underlying, q.name, q.cfg, err)
+ }
+ i++
+ if q.maxAttempts > 0 && i > q.maxAttempts {
+ return fmt.Errorf("Unable to create queue %v for %s with cfg %v by max attempts: error: %v", q.underlying, q.name, q.cfg, err)
+ }
+ sleepTime := 100 * time.Millisecond
+ if q.timeout > 0 && q.maxAttempts > 0 {
+ sleepTime = (q.timeout - 200*time.Millisecond) / time.Duration(q.maxAttempts)
+ }
+ t := time.NewTimer(sleepTime)
+ select {
+ case <-ctx.Done():
+ t.Stop()
+ case <-t.C:
+ }
+ }
+ }
+ return nil
+}
+
+// WrappedQueue wraps a delayed starting queue
+type WrappedQueue struct {
+ delayedStarter
+ handle HandlerFunc
+ exemplar interface{}
+ channel chan Data
+}
+
+// NewWrappedQueue will attempt to create a queue of the provided type,
+// but if there is a problem creating this queue it will instead create
+// a WrappedQueue with delayed startup of the queue instead and a
+// channel which will be redirected to the queue
+func NewWrappedQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
+ configInterface, err := toConfig(WrappedQueueConfiguration{}, cfg)
+ if err != nil {
+ return nil, err
+ }
+ config := configInterface.(WrappedQueueConfiguration)
+
+ queue, err := NewQueue(config.Underlying, handle, config.Config, exemplar)
+ if err == nil {
+ // Just return the queue there is no need to wrap
+ return queue, nil
+ }
+ if IsErrInvalidConfiguration(err) {
+ // Retrying ain't gonna make this any better...
+ return nil, ErrInvalidConfiguration{cfg: cfg}
+ }
+
+ queue = &WrappedQueue{
+ handle: handle,
+ channel: make(chan Data, config.QueueLength),
+ exemplar: exemplar,
+ delayedStarter: delayedStarter{
+ cfg: config.Config,
+ underlying: config.Underlying,
+ timeout: config.Timeout,
+ maxAttempts: config.MaxAttempts,
+ name: config.Name,
+ },
+ }
+ _ = GetManager().Add(queue, WrappedQueueType, config, exemplar, nil)
+ return queue, nil
+}
+
+// Name returns the name of the queue
+func (q *WrappedQueue) Name() string {
+ return q.name + "-wrapper"
+}
+
+// Push will push the data to the internal channel checking it against the exemplar
+func (q *WrappedQueue) Push(data Data) error {
+ if q.exemplar != nil {
+ // Assert data is of same type as r.exemplar
+ value := reflect.ValueOf(data)
+ t := value.Type()
+ exemplarType := reflect.ValueOf(q.exemplar).Type()
+ if !t.AssignableTo(exemplarType) || data == nil {
+ return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
+ }
+ }
+ q.channel <- data
+ return nil
+}
+
+// Run starts to run the queue and attempts to create the internal queue
+func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
+ q.lock.Lock()
+ if q.internal == nil {
+ err := q.setInternal(atShutdown, q.handle, q.exemplar)
+ q.lock.Unlock()
+ if err != nil {
+ log.Fatal("Unable to set the internal queue for %s Error: %v", q.Name(), err)
+ return
+ }
+ go func() {
+ for data := range q.channel {
+ _ = q.internal.Push(data)
+ }
+ }()
+ } else {
+ q.lock.Unlock()
+ }
+
+ q.internal.Run(atShutdown, atTerminate)
+ log.Trace("WrappedQueue: %s Done", q.name)
+}
+
+// Shutdown this queue and stop processing
+func (q *WrappedQueue) Shutdown() {
+ log.Trace("WrappedQueue: %s Shutdown", q.name)
+ q.lock.Lock()
+ defer q.lock.Unlock()
+ if q.internal == nil {
+ return
+ }
+ if shutdownable, ok := q.internal.(Shutdownable); ok {
+ shutdownable.Shutdown()
+ }
+}
+
+// Terminate this queue and close the queue
+func (q *WrappedQueue) Terminate() {
+ log.Trace("WrappedQueue: %s Terminating", q.name)
+ q.lock.Lock()
+ defer q.lock.Unlock()
+ if q.internal == nil {
+ return
+ }
+ if shutdownable, ok := q.internal.(Shutdownable); ok {
+ shutdownable.Terminate()
+ }
+}
+
+func init() {
+ queuesMap[WrappedQueueType] = NewWrappedQueue
+}
diff --git a/modules/queue/setting.go b/modules/queue/setting.go
new file mode 100644
index 0000000000..d5a6b41882
--- /dev/null
+++ b/modules/queue/setting.go
@@ -0,0 +1,75 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+ "encoding/json"
+ "fmt"
+
+ "code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/setting"
+)
+
+func validType(t string) (Type, error) {
+ if len(t) == 0 {
+ return PersistableChannelQueueType, nil
+ }
+ for _, typ := range RegisteredTypes() {
+ if t == string(typ) {
+ return typ, nil
+ }
+ }
+ return PersistableChannelQueueType, fmt.Errorf("Unknown queue type: %s defaulting to %s", t, string(PersistableChannelQueueType))
+}
+
+// CreateQueue for name with provided handler and exemplar
+func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue {
+ q := setting.GetQueueSettings(name)
+ opts := make(map[string]interface{})
+ opts["Name"] = name
+ opts["QueueLength"] = q.Length
+ opts["BatchLength"] = q.BatchLength
+ opts["DataDir"] = q.DataDir
+ opts["Addresses"] = q.Addresses
+ opts["Network"] = q.Network
+ opts["Password"] = q.Password
+ opts["DBIndex"] = q.DBIndex
+ opts["QueueName"] = q.QueueName
+ opts["Workers"] = q.Workers
+ opts["MaxWorkers"] = q.MaxWorkers
+ opts["BlockTimeout"] = q.BlockTimeout
+ opts["BoostTimeout"] = q.BoostTimeout
+ opts["BoostWorkers"] = q.BoostWorkers
+
+ typ, err := validType(q.Type)
+ if err != nil {
+ log.Error("Invalid type %s provided for queue named %s defaulting to %s", q.Type, name, string(typ))
+ }
+
+ cfg, err := json.Marshal(opts)
+ if err != nil {
+ log.Error("Unable to marshall generic options: %v Error: %v", opts, err)
+ log.Error("Unable to create queue for %s", name, err)
+ return nil
+ }
+
+ returnable, err := NewQueue(typ, handle, cfg, exemplar)
+ if q.WrapIfNecessary && err != nil {
+ log.Warn("Unable to create queue for %s: %v", name, err)
+ log.Warn("Attempting to create wrapped queue")
+ returnable, err = NewQueue(WrappedQueueType, handle, WrappedQueueConfiguration{
+ Underlying: Type(q.Type),
+ Timeout: q.Timeout,
+ MaxAttempts: q.MaxAttempts,
+ Config: cfg,
+ QueueLength: q.Length,
+ }, exemplar)
+ }
+ if err != nil {
+ log.Error("Unable to create queue for %s: %v", name, err)
+ return nil
+ }
+ return returnable
+}
diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go
new file mode 100644
index 0000000000..25fc7dd644
--- /dev/null
+++ b/modules/queue/workerpool.go
@@ -0,0 +1,325 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "code.gitea.io/gitea/modules/log"
+)
+
+// WorkerPool takes
+type WorkerPool struct {
+ lock sync.Mutex
+ baseCtx context.Context
+ cancel context.CancelFunc
+ cond *sync.Cond
+ qid int64
+ maxNumberOfWorkers int
+ numberOfWorkers int
+ batchLength int
+ handle HandlerFunc
+ dataChan chan Data
+ blockTimeout time.Duration
+ boostTimeout time.Duration
+ boostWorkers int
+}
+
+// Push pushes the data to the internal channel
+func (p *WorkerPool) Push(data Data) {
+ p.lock.Lock()
+ if p.blockTimeout > 0 && p.boostTimeout > 0 && (p.numberOfWorkers <= p.maxNumberOfWorkers || p.maxNumberOfWorkers < 0) {
+ p.lock.Unlock()
+ p.pushBoost(data)
+ } else {
+ p.lock.Unlock()
+ p.dataChan <- data
+ }
+}
+
+func (p *WorkerPool) pushBoost(data Data) {
+ select {
+ case p.dataChan <- data:
+ default:
+ p.lock.Lock()
+ if p.blockTimeout <= 0 {
+ p.lock.Unlock()
+ p.dataChan <- data
+ return
+ }
+ ourTimeout := p.blockTimeout
+ timer := time.NewTimer(p.blockTimeout)
+ p.lock.Unlock()
+ select {
+ case p.dataChan <- data:
+ if timer.Stop() {
+ select {
+ case <-timer.C:
+ default:
+ }
+ }
+ case <-timer.C:
+ p.lock.Lock()
+ if p.blockTimeout > ourTimeout || (p.numberOfWorkers > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0) {
+ p.lock.Unlock()
+ p.dataChan <- data
+ return
+ }
+ p.blockTimeout *= 2
+ ctx, cancel := context.WithCancel(p.baseCtx)
+ mq := GetManager().GetManagedQueue(p.qid)
+ boost := p.boostWorkers
+ if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 {
+ boost = p.maxNumberOfWorkers - p.numberOfWorkers
+ }
+ if mq != nil {
+ log.Warn("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, mq.Name, ourTimeout, boost, p.boostTimeout, p.blockTimeout)
+
+ start := time.Now()
+ pid := mq.RegisterWorkers(boost, start, false, start, cancel)
+ go func() {
+ <-ctx.Done()
+ mq.RemoveWorkers(pid)
+ cancel()
+ }()
+ } else {
+ log.Warn("WorkerPool: %d Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, ourTimeout, p.boostWorkers, p.boostTimeout, p.blockTimeout)
+ }
+ go func() {
+ <-time.After(p.boostTimeout)
+ cancel()
+ p.lock.Lock()
+ p.blockTimeout /= 2
+ p.lock.Unlock()
+ }()
+ p.addWorkers(ctx, boost)
+ p.lock.Unlock()
+ p.dataChan <- data
+ }
+ }
+}
+
+// NumberOfWorkers returns the number of current workers in the pool
+func (p *WorkerPool) NumberOfWorkers() int {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+ return p.numberOfWorkers
+}
+
+// MaxNumberOfWorkers returns the maximum number of workers automatically added to the pool
+func (p *WorkerPool) MaxNumberOfWorkers() int {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+ return p.maxNumberOfWorkers
+}
+
+// BoostWorkers returns the number of workers for a boost
+func (p *WorkerPool) BoostWorkers() int {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+ return p.boostWorkers
+}
+
+// BoostTimeout returns the timeout of the next boost
+func (p *WorkerPool) BoostTimeout() time.Duration {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+ return p.boostTimeout
+}
+
+// BlockTimeout returns the timeout til the next boost
+func (p *WorkerPool) BlockTimeout() time.Duration {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+ return p.blockTimeout
+}
+
+// SetSettings sets the setable boost values
+func (p *WorkerPool) SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+ p.maxNumberOfWorkers = maxNumberOfWorkers
+ p.boostWorkers = boostWorkers
+ p.boostTimeout = timeout
+}
+
+// SetMaxNumberOfWorkers sets the maximum number of workers automatically added to the pool
+// Changing this number will not change the number of current workers but will change the limit
+// for future additions
+func (p *WorkerPool) SetMaxNumberOfWorkers(newMax int) {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+ p.maxNumberOfWorkers = newMax
+}
+
+// AddWorkers adds workers to the pool - this allows the number of workers to go above the limit
+func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc {
+ var ctx context.Context
+ var cancel context.CancelFunc
+ start := time.Now()
+ end := start
+ hasTimeout := false
+ if timeout > 0 {
+ ctx, cancel = context.WithTimeout(p.baseCtx, timeout)
+ end = start.Add(timeout)
+ hasTimeout = true
+ } else {
+ ctx, cancel = context.WithCancel(p.baseCtx)
+ }
+
+ mq := GetManager().GetManagedQueue(p.qid)
+ if mq != nil {
+ pid := mq.RegisterWorkers(number, start, hasTimeout, end, cancel)
+ go func() {
+ <-ctx.Done()
+ mq.RemoveWorkers(pid)
+ cancel()
+ }()
+ log.Trace("WorkerPool: %d (for %s) adding %d workers with group id: %d", p.qid, mq.Name, number, pid)
+ } else {
+ log.Trace("WorkerPool: %d adding %d workers (no group id)", p.qid, number)
+
+ }
+ p.addWorkers(ctx, number)
+ return cancel
+}
+
+// addWorkers adds workers to the pool
+func (p *WorkerPool) addWorkers(ctx context.Context, number int) {
+ for i := 0; i < number; i++ {
+ p.lock.Lock()
+ if p.cond == nil {
+ p.cond = sync.NewCond(&p.lock)
+ }
+ p.numberOfWorkers++
+ p.lock.Unlock()
+ go func() {
+ p.doWork(ctx)
+
+ p.lock.Lock()
+ p.numberOfWorkers--
+ if p.numberOfWorkers == 0 {
+ p.cond.Broadcast()
+ } else if p.numberOfWorkers < 0 {
+ // numberOfWorkers can't go negative but...
+ log.Warn("Number of Workers < 0 for QID %d - this shouldn't happen", p.qid)
+ p.numberOfWorkers = 0
+ p.cond.Broadcast()
+ }
+ p.lock.Unlock()
+ }()
+ }
+}
+
+// Wait for WorkerPool to finish
+func (p *WorkerPool) Wait() {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+ if p.cond == nil {
+ p.cond = sync.NewCond(&p.lock)
+ }
+ if p.numberOfWorkers <= 0 {
+ return
+ }
+ p.cond.Wait()
+}
+
+// CleanUp will drain the remaining contents of the channel
+// This should be called after AddWorkers context is closed
+func (p *WorkerPool) CleanUp(ctx context.Context) {
+ log.Trace("WorkerPool: %d CleanUp", p.qid)
+ close(p.dataChan)
+ for data := range p.dataChan {
+ p.handle(data)
+ select {
+ case <-ctx.Done():
+ log.Warn("WorkerPool: %d Cleanup context closed before finishing clean-up", p.qid)
+ return
+ default:
+ }
+ }
+ log.Trace("WorkerPool: %d CleanUp Done", p.qid)
+}
+
+func (p *WorkerPool) doWork(ctx context.Context) {
+ delay := time.Millisecond * 300
+ var data = make([]Data, 0, p.batchLength)
+ for {
+ select {
+ case <-ctx.Done():
+ if len(data) > 0 {
+ log.Trace("Handling: %d data, %v", len(data), data)
+ p.handle(data...)
+ }
+ log.Trace("Worker shutting down")
+ return
+ case datum, ok := <-p.dataChan:
+ if !ok {
+ // the dataChan has been closed - we should finish up:
+ if len(data) > 0 {
+ log.Trace("Handling: %d data, %v", len(data), data)
+ p.handle(data...)
+ }
+ log.Trace("Worker shutting down")
+ return
+ }
+ data = append(data, datum)
+ if len(data) >= p.batchLength {
+ log.Trace("Handling: %d data, %v", len(data), data)
+ p.handle(data...)
+ data = make([]Data, 0, p.batchLength)
+ }
+ default:
+ timer := time.NewTimer(delay)
+ select {
+ case <-ctx.Done():
+ if timer.Stop() {
+ select {
+ case <-timer.C:
+ default:
+ }
+ }
+ if len(data) > 0 {
+ log.Trace("Handling: %d data, %v", len(data), data)
+ p.handle(data...)
+ }
+ log.Trace("Worker shutting down")
+ return
+ case datum, ok := <-p.dataChan:
+ if timer.Stop() {
+ select {
+ case <-timer.C:
+ default:
+ }
+ }
+ if !ok {
+ // the dataChan has been closed - we should finish up:
+ if len(data) > 0 {
+ log.Trace("Handling: %d data, %v", len(data), data)
+ p.handle(data...)
+ }
+ log.Trace("Worker shutting down")
+ return
+ }
+ data = append(data, datum)
+ if len(data) >= p.batchLength {
+ log.Trace("Handling: %d data, %v", len(data), data)
+ p.handle(data...)
+ data = make([]Data, 0, p.batchLength)
+ }
+ case <-timer.C:
+ delay = time.Millisecond * 100
+ if len(data) > 0 {
+ log.Trace("Handling: %d data, %v", len(data), data)
+ p.handle(data...)
+ data = make([]Data, 0, p.batchLength)
+ }
+
+ }
+ }
+ }
+}
diff --git a/modules/setting/queue.go b/modules/setting/queue.go
new file mode 100644
index 0000000000..546802715f
--- /dev/null
+++ b/modules/setting/queue.go
@@ -0,0 +1,159 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package setting
+
+import (
+ "fmt"
+ "path"
+ "strconv"
+ "strings"
+ "time"
+
+ "code.gitea.io/gitea/modules/log"
+)
+
+// QueueSettings represent the settings for a queue from the ini
+type QueueSettings struct {
+ DataDir string
+ Length int
+ BatchLength int
+ ConnectionString string
+ Type string
+ Network string
+ Addresses string
+ Password string
+ QueueName string
+ DBIndex int
+ WrapIfNecessary bool
+ MaxAttempts int
+ Timeout time.Duration
+ Workers int
+ MaxWorkers int
+ BlockTimeout time.Duration
+ BoostTimeout time.Duration
+ BoostWorkers int
+}
+
+// Queue settings
+var Queue = QueueSettings{}
+
+// GetQueueSettings returns the queue settings for the appropriately named queue
+func GetQueueSettings(name string) QueueSettings {
+ q := QueueSettings{}
+ sec := Cfg.Section("queue." + name)
+ // DataDir is not directly inheritable
+ q.DataDir = path.Join(Queue.DataDir, name)
+ // QueueName is not directly inheritable either
+ q.QueueName = name + Queue.QueueName
+ for _, key := range sec.Keys() {
+ switch key.Name() {
+ case "DATADIR":
+ q.DataDir = key.MustString(q.DataDir)
+ case "QUEUE_NAME":
+ q.QueueName = key.MustString(q.QueueName)
+ }
+ }
+ if !path.IsAbs(q.DataDir) {
+ q.DataDir = path.Join(AppDataPath, q.DataDir)
+ }
+ sec.Key("DATADIR").SetValue(q.DataDir)
+ // The rest are...
+ q.Length = sec.Key("LENGTH").MustInt(Queue.Length)
+ q.BatchLength = sec.Key("BATCH_LENGTH").MustInt(Queue.BatchLength)
+ q.ConnectionString = sec.Key("CONN_STR").MustString(Queue.ConnectionString)
+ q.Type = sec.Key("TYPE").MustString(Queue.Type)
+ q.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(Queue.WrapIfNecessary)
+ q.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(Queue.MaxAttempts)
+ q.Timeout = sec.Key("TIMEOUT").MustDuration(Queue.Timeout)
+ q.Workers = sec.Key("WORKERS").MustInt(Queue.Workers)
+ q.MaxWorkers = sec.Key("MAX_WORKERS").MustInt(Queue.MaxWorkers)
+ q.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(Queue.BlockTimeout)
+ q.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(Queue.BoostTimeout)
+ q.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(Queue.BoostWorkers)
+
+ q.Network, q.Addresses, q.Password, q.DBIndex, _ = ParseQueueConnStr(q.ConnectionString)
+ return q
+}
+
+// NewQueueService sets up the default settings for Queues
+// This is exported for tests to be able to use the queue
+func NewQueueService() {
+ sec := Cfg.Section("queue")
+ Queue.DataDir = sec.Key("DATADIR").MustString("queues/")
+ if !path.IsAbs(Queue.DataDir) {
+ Queue.DataDir = path.Join(AppDataPath, Queue.DataDir)
+ }
+ Queue.Length = sec.Key("LENGTH").MustInt(20)
+ Queue.BatchLength = sec.Key("BATCH_LENGTH").MustInt(20)
+ Queue.ConnectionString = sec.Key("CONN_STR").MustString(path.Join(AppDataPath, ""))
+ Queue.Type = sec.Key("TYPE").MustString("")
+ Queue.Network, Queue.Addresses, Queue.Password, Queue.DBIndex, _ = ParseQueueConnStr(Queue.ConnectionString)
+ Queue.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(true)
+ Queue.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(10)
+ Queue.Timeout = sec.Key("TIMEOUT").MustDuration(GracefulHammerTime + 30*time.Second)
+ Queue.Workers = sec.Key("WORKERS").MustInt(1)
+ Queue.MaxWorkers = sec.Key("MAX_WORKERS").MustInt(10)
+ Queue.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(1 * time.Second)
+ Queue.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(5 * time.Minute)
+ Queue.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(5)
+ Queue.QueueName = sec.Key("QUEUE_NAME").MustString("_queue")
+
+ // Now handle the old issue_indexer configuration
+ section := Cfg.Section("queue.issue_indexer")
+ issueIndexerSectionMap := map[string]string{}
+ for _, key := range section.Keys() {
+ issueIndexerSectionMap[key.Name()] = key.Value()
+ }
+ if _, ok := issueIndexerSectionMap["TYPE"]; !ok {
+ switch Indexer.IssueQueueType {
+ case LevelQueueType:
+ section.Key("TYPE").SetValue("level")
+ case ChannelQueueType:
+ section.Key("TYPE").SetValue("persistable-channel")
+ case RedisQueueType:
+ section.Key("TYPE").SetValue("redis")
+ default:
+ log.Fatal("Unsupported indexer queue type: %v",
+ Indexer.IssueQueueType)
+ }
+ }
+ if _, ok := issueIndexerSectionMap["LENGTH"]; !ok {
+ section.Key("LENGTH").SetValue(fmt.Sprintf("%d", Indexer.UpdateQueueLength))
+ }
+ if _, ok := issueIndexerSectionMap["BATCH_LENGTH"]; !ok {
+ section.Key("BATCH_LENGTH").SetValue(fmt.Sprintf("%d", Indexer.IssueQueueBatchNumber))
+ }
+ if _, ok := issueIndexerSectionMap["DATADIR"]; !ok {
+ section.Key("DATADIR").SetValue(Indexer.IssueQueueDir)
+ }
+ if _, ok := issueIndexerSectionMap["CONN_STR"]; !ok {
+ section.Key("CONN_STR").SetValue(Indexer.IssueQueueConnStr)
+ }
+}
+
+// ParseQueueConnStr parses a queue connection string
+func ParseQueueConnStr(connStr string) (network, addrs, password string, dbIdx int, err error) {
+ fields := strings.Fields(connStr)
+ for _, f := range fields {
+ items := strings.SplitN(f, "=", 2)
+ if len(items) < 2 {
+ continue
+ }
+ switch strings.ToLower(items[0]) {
+ case "network":
+ network = items[1]
+ case "addrs":
+ addrs = items[1]
+ case "password":
+ password = items[1]
+ case "db":
+ dbIdx, err = strconv.Atoi(items[1])
+ if err != nil {
+ return
+ }
+ }
+ }
+ return
+}
diff --git a/modules/setting/setting.go b/modules/setting/setting.go
index 2a5e37b41b..17c84d3d31 100644
--- a/modules/setting/setting.go
+++ b/modules/setting/setting.go
@@ -1093,4 +1093,5 @@ func NewServices() {
newMigrationsService()
newIndexerService()
newTaskService()
+ NewQueueService()
}
diff --git a/modules/setting/task.go b/modules/setting/task.go
index 97704d4a4d..81ed39a9fb 100644
--- a/modules/setting/task.go
+++ b/modules/setting/task.go
@@ -4,22 +4,15 @@
package setting
-var (
- // Task settings
- Task = struct {
- QueueType string
- QueueLength int
- QueueConnStr string
- }{
- QueueType: ChannelQueueType,
- QueueLength: 1000,
- QueueConnStr: "addrs=127.0.0.1:6379 db=0",
- }
-)
-
func newTaskService() {
- sec := Cfg.Section("task")
- Task.QueueType = sec.Key("QUEUE_TYPE").MustString(ChannelQueueType)
- Task.QueueLength = sec.Key("QUEUE_LENGTH").MustInt(1000)
- Task.QueueConnStr = sec.Key("QUEUE_CONN_STR").MustString("addrs=127.0.0.1:6379 db=0")
+ taskSec := Cfg.Section("task")
+ queueTaskSec := Cfg.Section("queue.task")
+ switch taskSec.Key("QUEUE_TYPE").MustString(ChannelQueueType) {
+ case ChannelQueueType:
+ queueTaskSec.Key("TYPE").MustString("persistable-channel")
+ case RedisQueueType:
+ queueTaskSec.Key("TYPE").MustString("redis")
+ }
+ queueTaskSec.Key("LENGTH").MustInt(taskSec.Key("QUEUE_LENGTH").MustInt(1000))
+ queueTaskSec.Key("CONN_STR").MustString(taskSec.Key("QUEUE_CONN_STR").MustString("addrs=127.0.0.1:6379 db=0"))
}
diff --git a/modules/task/queue.go b/modules/task/queue.go
deleted file mode 100644
index ddee0b3d46..0000000000
--- a/modules/task/queue.go
+++ /dev/null
@@ -1,14 +0,0 @@
-// Copyright 2019 Gitea. All rights reserved.
-// Use of this source code is governed by a MIT-style
-// license that can be found in the LICENSE file.
-
-package task
-
-import "code.gitea.io/gitea/models"
-
-// Queue defines an interface to run task queue
-type Queue interface {
- Run() error
- Push(*models.Task) error
- Stop()
-}
diff --git a/modules/task/queue_channel.go b/modules/task/queue_channel.go
deleted file mode 100644
index da541f4755..0000000000
--- a/modules/task/queue_channel.go
+++ /dev/null
@@ -1,48 +0,0 @@
-// Copyright 2019 The Gitea Authors. All rights reserved.
-// Use of this source code is governed by a MIT-style
-// license that can be found in the LICENSE file.
-
-package task
-
-import (
- "code.gitea.io/gitea/models"
- "code.gitea.io/gitea/modules/log"
-)
-
-var (
- _ Queue = &ChannelQueue{}
-)
-
-// ChannelQueue implements
-type ChannelQueue struct {
- queue chan *models.Task
-}
-
-// NewChannelQueue create a memory channel queue
-func NewChannelQueue(queueLen int) *ChannelQueue {
- return &ChannelQueue{
- queue: make(chan *models.Task, queueLen),
- }
-}
-
-// Run starts to run the queue
-func (c *ChannelQueue) Run() error {
- for task := range c.queue {
- err := Run(task)
- if err != nil {
- log.Error("Run task failed: %s", err.Error())
- }
- }
- return nil
-}
-
-// Push will push the task ID to queue
-func (c *ChannelQueue) Push(task *models.Task) error {
- c.queue <- task
- return nil
-}
-
-// Stop stop the queue
-func (c *ChannelQueue) Stop() {
- close(c.queue)
-}
diff --git a/modules/task/queue_redis.go b/modules/task/queue_redis.go
deleted file mode 100644
index 127de0cdbf..0000000000
--- a/modules/task/queue_redis.go
+++ /dev/null
@@ -1,130 +0,0 @@
-// Copyright 2019 The Gitea Authors. All rights reserved.
-// Use of this source code is governed by a MIT-style
-// license that can be found in the LICENSE file.
-
-package task
-
-import (
- "encoding/json"
- "errors"
- "strconv"
- "strings"
- "time"
-
- "code.gitea.io/gitea/models"
- "code.gitea.io/gitea/modules/log"
-
- "github.com/go-redis/redis"
-)
-
-var (
- _ Queue = &RedisQueue{}
-)
-
-type redisClient interface {
- RPush(key string, args ...interface{}) *redis.IntCmd
- LPop(key string) *redis.StringCmd
- Ping() *redis.StatusCmd
-}
-
-// RedisQueue redis queue
-type RedisQueue struct {
- client redisClient
- queueName string
- closeChan chan bool
-}
-
-func parseConnStr(connStr string) (addrs, password string, dbIdx int, err error) {
- fields := strings.Fields(connStr)
- for _, f := range fields {
- items := strings.SplitN(f, "=", 2)
- if len(items) < 2 {
- continue
- }
- switch strings.ToLower(items[0]) {
- case "addrs":
- addrs = items[1]
- case "password":
- password = items[1]
- case "db":
- dbIdx, err = strconv.Atoi(items[1])
- if err != nil {
- return
- }
- }
- }
- return
-}
-
-// NewRedisQueue creates single redis or cluster redis queue
-func NewRedisQueue(addrs string, password string, dbIdx int) (*RedisQueue, error) {
- dbs := strings.Split(addrs, ",")
- var queue = RedisQueue{
- queueName: "task_queue",
- closeChan: make(chan bool),
- }
- if len(dbs) == 0 {
- return nil, errors.New("no redis host found")
- } else if len(dbs) == 1 {
- queue.client = redis.NewClient(&redis.Options{
- Addr: strings.TrimSpace(dbs[0]), // use default Addr
- Password: password, // no password set
- DB: dbIdx, // use default DB
- })
- } else {
- // cluster will ignore db
- queue.client = redis.NewClusterClient(&redis.ClusterOptions{
- Addrs: dbs,
- Password: password,
- })
- }
- if err := queue.client.Ping().Err(); err != nil {
- return nil, err
- }
- return &queue, nil
-}
-
-// Run starts to run the queue
-func (r *RedisQueue) Run() error {
- for {
- select {
- case <-r.closeChan:
- return nil
- case <-time.After(time.Millisecond * 100):
- }
-
- bs, err := r.client.LPop(r.queueName).Bytes()
- if err != nil {
- if err != redis.Nil {
- log.Error("LPop failed: %v", err)
- }
- time.Sleep(time.Millisecond * 100)
- continue
- }
-
- var task models.Task
- err = json.Unmarshal(bs, &task)
- if err != nil {
- log.Error("Unmarshal task failed: %s", err.Error())
- } else {
- err = Run(&task)
- if err != nil {
- log.Error("Run task failed: %s", err.Error())
- }
- }
- }
-}
-
-// Push implements Queue
-func (r *RedisQueue) Push(task *models.Task) error {
- bs, err := json.Marshal(task)
- if err != nil {
- return err
- }
- return r.client.RPush(r.queueName, bs).Err()
-}
-
-// Stop stop the queue
-func (r *RedisQueue) Stop() {
- r.closeChan <- true
-}
diff --git a/modules/task/task.go b/modules/task/task.go
index 64744afe7a..416f0c696a 100644
--- a/modules/task/task.go
+++ b/modules/task/task.go
@@ -8,14 +8,15 @@ import (
"fmt"
"code.gitea.io/gitea/models"
+ "code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/migrations/base"
- "code.gitea.io/gitea/modules/setting"
+ "code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/structs"
)
// taskQueue is a global queue of tasks
-var taskQueue Queue
+var taskQueue queue.Queue
// Run a task
func Run(t *models.Task) error {
@@ -23,38 +24,32 @@ func Run(t *models.Task) error {
case structs.TaskTypeMigrateRepo:
return runMigrateTask(t)
default:
- return fmt.Errorf("Unknow task type: %d", t.Type)
+ return fmt.Errorf("Unknown task type: %d", t.Type)
}
}
// Init will start the service to get all unfinished tasks and run them
func Init() error {
- switch setting.Task.QueueType {
- case setting.ChannelQueueType:
- taskQueue = NewChannelQueue(setting.Task.QueueLength)
- case setting.RedisQueueType:
- var err error
- addrs, pass, idx, err := parseConnStr(setting.Task.QueueConnStr)
- if err != nil {
- return err
- }
- taskQueue, err = NewRedisQueue(addrs, pass, idx)
- if err != nil {
- return err
- }
- default:
- return fmt.Errorf("Unsupported task queue type: %v", setting.Task.QueueType)
+ taskQueue = queue.CreateQueue("task", handle, &models.Task{})
+
+ if taskQueue == nil {
+ return fmt.Errorf("Unable to create Task Queue")
}
- go func() {
- if err := taskQueue.Run(); err != nil {
- log.Error("taskQueue.Run end failed: %v", err)
- }
- }()
+ go graceful.GetManager().RunWithShutdownFns(taskQueue.Run)
return nil
}
+func handle(data ...queue.Data) {
+ for _, datum := range data {
+ task := datum.(*models.Task)
+ if err := Run(task); err != nil {
+ log.Error("Run task failed: %v", err)
+ }
+ }
+}
+
// MigrateRepository add migration repository to task
func MigrateRepository(doer, u *models.User, opts base.MigrateOptions) error {
task, err := models.CreateMigrateTask(doer, u, opts)