]> source.dussan.org Git - gitea.git/commitdiff
Graceful Queues: Issue Indexing and Tasks (#9363)
authorzeripath <art27@cantab.net>
Tue, 7 Jan 2020 11:23:09 +0000 (11:23 +0000)
committerAntoine GIRARD <sapk@users.noreply.github.com>
Tue, 7 Jan 2020 11:23:09 +0000 (12:23 +0100)
* Queue: Add generic graceful queues with settings

* Queue & Setting: Add worker pool implementation

* Queue: Add worker settings

* Queue: Make resizing worker pools

* Queue: Add name variable to queues

* Queue: Add monitoring

* Queue: Improve logging

* Issues: Gracefulise the issues indexer

Remove the old now unused specific queues

* Task: Move to generic queue and gracefulise

* Issues: Standardise the issues indexer queue settings

* Fix test

* Queue: Allow Redis to connect to unix

* Prevent deadlock during early shutdown of issue indexer

* Add MaxWorker settings to queues

* Merge branch 'master' into graceful-queues

* Update modules/indexer/issues/indexer.go

Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com>
* Update modules/indexer/issues/indexer.go

Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com>
* Update modules/queue/queue_channel.go

Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com>
* Update modules/queue/queue_disk.go

* Update modules/queue/queue_disk_channel.go

Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com>
* Rename queue.Description to queue.ManagedQueue as per @guillep2k

* Cancel pool workers when removed

* Remove dependency on queue from setting

* Update modules/queue/queue_redis.go

Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com>
* As per @guillep2k add mutex locks on shutdown/terminate

* move unlocking out of setInternal

* Add warning if number of workers < 0

* Small changes as per @guillep2k

* No redis host specified not found

* Clean up documentation for queues

* Update docs/content/doc/advanced/config-cheat-sheet.en-us.md

* Update modules/indexer/issues/indexer_test.go

* Ensure that persistable channel queue is added to manager

* Rename QUEUE_NAME REDIS_QUEUE_NAME

* Revert "Rename QUEUE_NAME REDIS_QUEUE_NAME"

This reverts commit 1f83b4fc9b9dabda186257b38c265fe7012f90df.

Co-authored-by: guillep2k <18600385+guillep2k@users.noreply.github.com>
Co-authored-by: Lauris BH <lauris@nix.lv>
Co-authored-by: techknowlogick <matti@mdranta.net>
Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
35 files changed:
custom/conf/app.ini.sample
docs/content/doc/advanced/config-cheat-sheet.en-us.md
integrations/issue_test.go
modules/indexer/issues/db.go
modules/indexer/issues/indexer.go
modules/indexer/issues/indexer_test.go
modules/indexer/issues/queue.go [deleted file]
modules/indexer/issues/queue_channel.go [deleted file]
modules/indexer/issues/queue_disk.go [deleted file]
modules/indexer/issues/queue_redis.go [deleted file]
modules/queue/manager.go [new file with mode: 0644]
modules/queue/queue.go [new file with mode: 0644]
modules/queue/queue_channel.go [new file with mode: 0644]
modules/queue/queue_channel_test.go [new file with mode: 0644]
modules/queue/queue_disk.go [new file with mode: 0644]
modules/queue/queue_disk_channel.go [new file with mode: 0644]
modules/queue/queue_disk_channel_test.go [new file with mode: 0644]
modules/queue/queue_disk_test.go [new file with mode: 0644]
modules/queue/queue_redis.go [new file with mode: 0644]
modules/queue/queue_test.go [new file with mode: 0644]
modules/queue/queue_wrapped.go [new file with mode: 0644]
modules/queue/setting.go [new file with mode: 0644]
modules/queue/workerpool.go [new file with mode: 0644]
modules/setting/queue.go [new file with mode: 0644]
modules/setting/setting.go
modules/setting/task.go
modules/task/queue.go [deleted file]
modules/task/queue_channel.go [deleted file]
modules/task/queue_redis.go [deleted file]
modules/task/task.go
options/locale/locale_en-US.ini
routers/admin/admin.go
routers/routes/routes.go
templates/admin/monitor.tmpl
templates/admin/queue.tmpl [new file with mode: 0644]

index 8b72ead3d60fef2faf35c88310838995f8389b34..29e147add8a6fab05f8eb41d60c3de9f89cfc914 100644 (file)
@@ -382,6 +382,39 @@ REPO_INDEXER_INCLUDE =
 ; A comma separated list of glob patterns to exclude from the index; ; default is empty
 REPO_INDEXER_EXCLUDE =
 
+[queue]
+; Specific queues can be individually configured with [queue.name]. [queue] provides defaults
+;
+; General queue queue type, currently support: persistable-channel, channel, level, redis, dummy
+; default to persistable-channel
+TYPE = persistable-channel
+; data-dir for storing persistable queues and level queues, individual queues will be named by their type
+DATADIR = queues/
+; Default queue length before a channel queue will block
+LENGTH = 20
+; Batch size to send for batched queues
+BATCH_LENGTH = 20
+; Connection string for redis queues this will store the redis connection string.
+CONN_STR = "addrs=127.0.0.1:6379 db=0"
+; Provide the suffix of the default redis queue name - specific queues can be overriden within in their [queue.name] sections.
+QUEUE_NAME = "_queue"
+; If the queue cannot be created at startup - level queues may need a timeout at startup - wrap the queue:
+WRAP_IF_NECESSARY = true
+; Attempt to create the wrapped queue at max
+MAX_ATTEMPTS = 10
+; Timeout queue creation
+TIMEOUT = 15m30s
+; Create a pool with this many workers
+WORKERS = 1
+; Dynamically scale the worker pool to at this many workers
+MAX_WORKERS = 10
+; Add boost workers when the queue blocks for BLOCK_TIMEOUT
+BLOCK_TIMEOUT = 1s
+; Remove the boost workers after BOOST_TIMEOUT
+BOOST_TIMEOUT = 5m
+; During a boost add BOOST_WORKERS
+BOOST_WORKERS = 5
+
 [admin]
 ; Disallow regular (non-admin) users from creating organizations.
 DISABLE_REGULAR_ORG_CREATION = false
index 691c543a6ca28a9685b5ffa5f10a24adbc02a876..dc6a1ba34697a417005f4154ea151282b6c4cef1 100644 (file)
@@ -226,6 +226,7 @@ relation to port exhaustion.
 
 - `ISSUE_INDEXER_TYPE`: **bleve**: Issue indexer type, currently support: bleve or db, if it's db, below issue indexer item will be invalid.
 - `ISSUE_INDEXER_PATH`: **indexers/issues.bleve**: Index file used for issue search.
+- The next 4 configuration values are deprecated and should be set in `queue.issue_indexer` however are kept for backwards compatibility:
 - `ISSUE_INDEXER_QUEUE_TYPE`: **levelqueue**: Issue indexer queue, currently supports:`channel`, `levelqueue`, `redis`.
 - `ISSUE_INDEXER_QUEUE_DIR`: **indexers/issues.queue**: When `ISSUE_INDEXER_QUEUE_TYPE` is `levelqueue`, this will be the queue will be saved path.
 - `ISSUE_INDEXER_QUEUE_CONN_STR`: **addrs=127.0.0.1:6379 db=0**: When `ISSUE_INDEXER_QUEUE_TYPE` is `redis`, this will store the redis connection string.
@@ -239,6 +240,24 @@ relation to port exhaustion.
 - `MAX_FILE_SIZE`: **1048576**: Maximum size in bytes of files to be indexed.
 - `STARTUP_TIMEOUT`: **30s**: If the indexer takes longer than this timeout to start - fail. (This timeout will be added to the hammer time above for child processes - as bleve will not start until the previous parent is shutdown.) Set to zero to never timeout.
 
+## Queue (`queue` and `queue.*`)
+
+- `TYPE`: **persistable-channel**: General queue type, currently support: `persistable-channel`, `channel`, `level`, `redis`, `dummy`
+- `DATADIR`: **queues/**: Base DataDir for storing persistent and level queues. `DATADIR` for inidividual queues can be set in `queue.name` sections but will default to `DATADIR/`**`name`**.
+- `LENGTH`: **20**: Maximal queue size before channel queues block
+- `BATCH_LENGTH`: **20**: Batch data before passing to the handler
+- `CONN_STR`: **addrs=127.0.0.1:6379 db=0**: Connection string for the redis queue type.
+- `QUEUE_NAME`: **_queue**: The suffix for default redis queue name. Individual queues will default to **`name`**`QUEUE_NAME` but can be overriden in the specific `queue.name` section.
+- `WRAP_IF_NECESSARY`: **true**: Will wrap queues with a timeoutable queue if the selected queue is not ready to be created - (Only relevant for the level queue.)
+- `MAX_ATTEMPTS`: **10**: Maximum number of attempts to create the wrapped queue
+- `TIMEOUT`: **GRACEFUL_HAMMER_TIME + 30s**: Timeout the creation of the wrapped queue if it takes longer than this to create.
+- Queues by default come with a dynamically scaling worker pool. The following settings configure this:
+- `WORKERS`: **1**: Number of initial workers for the queue.
+- `MAX_WORKERS`: **10**: Maximum number of worker go-routines for the queue.
+- `BLOCK_TIMEOUT`: **1s**: If the queue blocks for this time, boost the number of workers - the `BLOCK_TIMEOUT` will then be doubled before boosting again whilst the boost is ongoing.
+- `BOOST_TIMEOUT`: **5m**: Boost workers will timeout after this long.
+- `BOOST_WORKERS`: **5**: This many workers will be added to the worker pool if there is a boost.
+
 ## Admin (`admin`)
 - `DEFAULT_EMAIL_NOTIFICATIONS`: **enabled**: Default configuration for email notifications for users (user configurable). Options: enabled, onmention, disabled
 
@@ -614,6 +633,7 @@ You may redefine `ELEMENT`, `ALLOW_ATTR`, and `REGEXP` multiple times; each time
 
 ## Task (`task`)
 
+-  Task queue configuration has been moved to `queue.task` however, the below configuration values are kept for backwards compatibilityx:
 - `QUEUE_TYPE`: **channel**: Task queue type, could be `channel` or `redis`.
 - `QUEUE_LENGTH`: **1000**: Task queue length, available only when `QUEUE_TYPE` is `channel`.
 - `QUEUE_CONN_STR`: **addrs=127.0.0.1:6379 db=0**: Task queue connection string, available only when `QUEUE_TYPE` is `redis`. If there redis needs a password, use `addrs=127.0.0.1:6379 password=123 db=0`.
index fe66a005047fe7fcc3f30e6c2efd60b362dcf329..1454d758850198801f9618c51a21c3155a2d8725 100644 (file)
@@ -11,8 +11,10 @@ import (
        "strconv"
        "strings"
        "testing"
+       "time"
 
        "code.gitea.io/gitea/models"
+       "code.gitea.io/gitea/modules/indexer/issues"
        "code.gitea.io/gitea/modules/references"
        "code.gitea.io/gitea/modules/setting"
        "code.gitea.io/gitea/modules/test"
@@ -87,7 +89,12 @@ func TestViewIssuesKeyword(t *testing.T) {
        defer prepareTestEnv(t)()
 
        repo := models.AssertExistsAndLoadBean(t, &models.Repository{ID: 1}).(*models.Repository)
-
+       issue := models.AssertExistsAndLoadBean(t, &models.Issue{
+               RepoID: repo.ID,
+               Index:  1,
+       }).(*models.Issue)
+       issues.UpdateIssueIndexer(issue)
+       time.Sleep(time.Second * 1)
        const keyword = "first"
        req := NewRequestf(t, "GET", "%s/issues?q=%s", repo.RelLink(), keyword)
        resp := MakeRequest(t, req, http.StatusOK)
index a758cfeaeebdda80d79fa2d3fd2572a8d258304a..d0cca4fd18087e3645e92376ca9b7cb4a620680e 100644 (file)
@@ -25,6 +25,10 @@ func (db *DBIndexer) Delete(ids ...int64) error {
        return nil
 }
 
+// Close dummy function
+func (db *DBIndexer) Close() {
+}
+
 // Search dummy function
 func (db *DBIndexer) Search(kw string, repoIDs []int64, limit, start int) (*SearchResult, error) {
        total, ids, err := models.SearchIssueIDsByKeyword(kw, repoIDs, limit, start)
index ebcd3f68dd51c15d4db408f5c37ef213372e837e..894f37a96315f29c4a7976b7b62bf79d717a7c05 100644 (file)
@@ -5,12 +5,16 @@
 package issues
 
 import (
+       "context"
+       "fmt"
+       "os"
        "sync"
        "time"
 
        "code.gitea.io/gitea/models"
        "code.gitea.io/gitea/modules/graceful"
        "code.gitea.io/gitea/modules/log"
+       "code.gitea.io/gitea/modules/queue"
        "code.gitea.io/gitea/modules/setting"
        "code.gitea.io/gitea/modules/util"
 )
@@ -44,12 +48,14 @@ type Indexer interface {
        Index(issue []*IndexerData) error
        Delete(ids ...int64) error
        Search(kw string, repoIDs []int64, limit, start int) (*SearchResult, error)
+       Close()
 }
 
 type indexerHolder struct {
-       indexer Indexer
-       mutex   sync.RWMutex
-       cond    *sync.Cond
+       indexer   Indexer
+       mutex     sync.RWMutex
+       cond      *sync.Cond
+       cancelled bool
 }
 
 func newIndexerHolder() *indexerHolder {
@@ -58,6 +64,13 @@ func newIndexerHolder() *indexerHolder {
        return h
 }
 
+func (h *indexerHolder) cancel() {
+       h.mutex.Lock()
+       defer h.mutex.Unlock()
+       h.cancelled = true
+       h.cond.Broadcast()
+}
+
 func (h *indexerHolder) set(indexer Indexer) {
        h.mutex.Lock()
        defer h.mutex.Unlock()
@@ -68,16 +81,15 @@ func (h *indexerHolder) set(indexer Indexer) {
 func (h *indexerHolder) get() Indexer {
        h.mutex.RLock()
        defer h.mutex.RUnlock()
-       if h.indexer == nil {
+       if h.indexer == nil && !h.cancelled {
                h.cond.Wait()
        }
        return h.indexer
 }
 
 var (
-       issueIndexerChannel = make(chan *IndexerData, setting.Indexer.UpdateQueueLength)
        // issueIndexerQueue queue of issue ids to be updated
-       issueIndexerQueue Queue
+       issueIndexerQueue queue.Queue
        holder            = newIndexerHolder()
 )
 
@@ -85,90 +97,99 @@ var (
 // all issue index done.
 func InitIssueIndexer(syncReindex bool) {
        waitChannel := make(chan time.Duration)
+
+       // Create the Queue
+       switch setting.Indexer.IssueType {
+       case "bleve":
+               handler := func(data ...queue.Data) {
+                       indexer := holder.get()
+                       if indexer == nil {
+                               log.Error("Issue indexer handler: unable to get indexer!")
+                               return
+                       }
+
+                       iData := make([]*IndexerData, 0, setting.Indexer.IssueQueueBatchNumber)
+                       for _, datum := range data {
+                               indexerData, ok := datum.(*IndexerData)
+                               if !ok {
+                                       log.Error("Unable to process provided datum: %v - not possible to cast to IndexerData", datum)
+                                       continue
+                               }
+                               log.Trace("IndexerData Process: %d %v %t", indexerData.ID, indexerData.IDs, indexerData.IsDelete)
+                               if indexerData.IsDelete {
+                                       _ = indexer.Delete(indexerData.IDs...)
+                                       continue
+                               }
+                               iData = append(iData, indexerData)
+                       }
+                       if err := indexer.Index(iData); err != nil {
+                               log.Error("Error whilst indexing: %v Error: %v", iData, err)
+                       }
+               }
+
+               issueIndexerQueue = queue.CreateQueue("issue_indexer", handler, &IndexerData{})
+
+               if issueIndexerQueue == nil {
+                       log.Fatal("Unable to create issue indexer queue")
+               }
+       default:
+               issueIndexerQueue = &queue.DummyQueue{}
+       }
+
+       // Create the Indexer
        go func() {
                start := time.Now()
-               log.Info("Initializing Issue Indexer")
+               log.Info("PID %d: Initializing Issue Indexer: %s", os.Getpid(), setting.Indexer.IssueType)
                var populate bool
-               var dummyQueue bool
                switch setting.Indexer.IssueType {
                case "bleve":
-                       issueIndexer := NewBleveIndexer(setting.Indexer.IssuePath)
-                       exist, err := issueIndexer.Init()
-                       if err != nil {
-                               log.Fatal("Unable to initialize Bleve Issue Indexer: %v", err)
-                       }
-                       populate = !exist
-                       holder.set(issueIndexer)
+                       graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(context.Context, func())) {
+                               issueIndexer := NewBleveIndexer(setting.Indexer.IssuePath)
+                               exist, err := issueIndexer.Init()
+                               if err != nil {
+                                       holder.cancel()
+                                       log.Fatal("Unable to initialize Bleve Issue Indexer: %v", err)
+                               }
+                               populate = !exist
+                               holder.set(issueIndexer)
+                               atTerminate(context.Background(), func() {
+                                       log.Debug("Closing issue indexer")
+                                       issueIndexer := holder.get()
+                                       if issueIndexer != nil {
+                                               issueIndexer.Close()
+                                       }
+                                       log.Info("PID: %d Issue Indexer closed", os.Getpid())
+                               })
+                               log.Debug("Created Bleve Indexer")
+                       })
                case "db":
                        issueIndexer := &DBIndexer{}
                        holder.set(issueIndexer)
-                       dummyQueue = true
                default:
+                       holder.cancel()
                        log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType)
                }
 
-               if dummyQueue {
-                       issueIndexerQueue = &DummyQueue{}
-               } else {
-                       var err error
-                       switch setting.Indexer.IssueQueueType {
-                       case setting.LevelQueueType:
-                               issueIndexerQueue, err = NewLevelQueue(
-                                       holder.get(),
-                                       setting.Indexer.IssueQueueDir,
-                                       setting.Indexer.IssueQueueBatchNumber)
-                               if err != nil {
-                                       log.Fatal(
-                                               "Unable create level queue for issue queue dir: %s batch number: %d : %v",
-                                               setting.Indexer.IssueQueueDir,
-                                               setting.Indexer.IssueQueueBatchNumber,
-                                               err)
-                               }
-                       case setting.ChannelQueueType:
-                               issueIndexerQueue = NewChannelQueue(holder.get(), setting.Indexer.IssueQueueBatchNumber)
-                       case setting.RedisQueueType:
-                               addrs, pass, idx, err := parseConnStr(setting.Indexer.IssueQueueConnStr)
-                               if err != nil {
-                                       log.Fatal("Unable to parse connection string for RedisQueueType: %s : %v",
-                                               setting.Indexer.IssueQueueConnStr,
-                                               err)
-                               }
-                               issueIndexerQueue, err = NewRedisQueue(addrs, pass, idx, holder.get(), setting.Indexer.IssueQueueBatchNumber)
-                               if err != nil {
-                                       log.Fatal("Unable to create RedisQueue: %s : %v",
-                                               setting.Indexer.IssueQueueConnStr,
-                                               err)
-                               }
-                       default:
-                               log.Fatal("Unsupported indexer queue type: %v",
-                                       setting.Indexer.IssueQueueType)
-                       }
-
-                       go func() {
-                               err = issueIndexerQueue.Run()
-                               if err != nil {
-                                       log.Error("issueIndexerQueue.Run: %v", err)
-                               }
-                       }()
-               }
-
-               go func() {
-                       for data := range issueIndexerChannel {
-                               _ = issueIndexerQueue.Push(data)
-                       }
-               }()
+               // Start processing the queue
+               go graceful.GetManager().RunWithShutdownFns(issueIndexerQueue.Run)
 
+               // Populate the index
                if populate {
                        if syncReindex {
-                               populateIssueIndexer()
+                               graceful.GetManager().RunWithShutdownContext(populateIssueIndexer)
                        } else {
-                               go populateIssueIndexer()
+                               go graceful.GetManager().RunWithShutdownContext(populateIssueIndexer)
                        }
                }
                waitChannel <- time.Since(start)
+               close(waitChannel)
        }()
+
        if syncReindex {
-               <-waitChannel
+               select {
+               case <-waitChannel:
+               case <-graceful.GetManager().IsShutdown():
+               }
        } else if setting.Indexer.StartupTimeout > 0 {
                go func() {
                        timeout := setting.Indexer.StartupTimeout
@@ -178,7 +199,12 @@ func InitIssueIndexer(syncReindex bool) {
                        select {
                        case duration := <-waitChannel:
                                log.Info("Issue Indexer Initialization took %v", duration)
+                       case <-graceful.GetManager().IsShutdown():
+                               log.Warn("Shutdown occurred before issue index initialisation was complete")
                        case <-time.After(timeout):
+                               if shutdownable, ok := issueIndexerQueue.(queue.Shutdownable); ok {
+                                       shutdownable.Terminate()
+                               }
                                log.Fatal("Issue Indexer Initialization timed-out after: %v", timeout)
                        }
                }()
@@ -186,8 +212,14 @@ func InitIssueIndexer(syncReindex bool) {
 }
 
 // populateIssueIndexer populate the issue indexer with issue data
-func populateIssueIndexer() {
+func populateIssueIndexer(ctx context.Context) {
        for page := 1; ; page++ {
+               select {
+               case <-ctx.Done():
+                       log.Warn("Issue Indexer population shutdown before completion")
+                       return
+               default:
+               }
                repos, _, err := models.SearchRepositoryByName(&models.SearchRepoOptions{
                        Page:        page,
                        PageSize:    models.RepositoryListDefaultPageSize,
@@ -200,10 +232,17 @@ func populateIssueIndexer() {
                        continue
                }
                if len(repos) == 0 {
+                       log.Debug("Issue Indexer population complete")
                        return
                }
 
                for _, repo := range repos {
+                       select {
+                       case <-ctx.Done():
+                               log.Info("Issue Indexer population shutdown before completion")
+                               return
+                       default:
+                       }
                        UpdateRepoIndexer(repo)
                }
        }
@@ -237,13 +276,17 @@ func UpdateIssueIndexer(issue *models.Issue) {
                        comments = append(comments, comment.Content)
                }
        }
-       issueIndexerChannel <- &IndexerData{
+       indexerData := &IndexerData{
                ID:       issue.ID,
                RepoID:   issue.RepoID,
                Title:    issue.Title,
                Content:  issue.Content,
                Comments: comments,
        }
+       log.Debug("Adding to channel: %v", indexerData)
+       if err := issueIndexerQueue.Push(indexerData); err != nil {
+               log.Error("Unable to push to issue indexer: %v: Error: %v", indexerData, err)
+       }
 }
 
 // DeleteRepoIssueIndexer deletes repo's all issues indexes
@@ -258,17 +301,25 @@ func DeleteRepoIssueIndexer(repo *models.Repository) {
        if len(ids) == 0 {
                return
        }
-
-       issueIndexerChannel <- &IndexerData{
+       indexerData := &IndexerData{
                IDs:      ids,
                IsDelete: true,
        }
+       if err := issueIndexerQueue.Push(indexerData); err != nil {
+               log.Error("Unable to push to issue indexer: %v: Error: %v", indexerData, err)
+       }
 }
 
 // SearchIssuesByKeyword search issue ids by keywords and repo id
 func SearchIssuesByKeyword(repoIDs []int64, keyword string) ([]int64, error) {
        var issueIDs []int64
-       res, err := holder.get().Search(keyword, repoIDs, 1000, 0)
+       indexer := holder.get()
+
+       if indexer == nil {
+               log.Error("SearchIssuesByKeyword(): unable to get indexer!")
+               return nil, fmt.Errorf("unable to get issue indexer")
+       }
+       res, err := indexer.Search(keyword, repoIDs, 1000, 0)
        if err != nil {
                return nil, err
        }
index ca7ba29703fe87cd8aecbf0d5e7975cccafbd296..4028a6c8b518d541d2b89ff1c4c5256dc52eee94 100644 (file)
@@ -15,6 +15,8 @@ import (
        "code.gitea.io/gitea/models"
        "code.gitea.io/gitea/modules/setting"
 
+       "gopkg.in/ini.v1"
+
        "github.com/stretchr/testify/assert"
 )
 
@@ -24,6 +26,7 @@ func TestMain(m *testing.M) {
 
 func TestBleveSearchIssues(t *testing.T) {
        assert.NoError(t, models.PrepareTestDatabase())
+       setting.Cfg = ini.Empty()
 
        tmpIndexerDir, err := ioutil.TempDir("", "issues-indexer")
        if err != nil {
@@ -41,6 +44,7 @@ func TestBleveSearchIssues(t *testing.T) {
        }()
 
        setting.Indexer.IssueType = "bleve"
+       setting.NewQueueService()
        InitIssueIndexer(true)
        defer func() {
                indexer := holder.get()
diff --git a/modules/indexer/issues/queue.go b/modules/indexer/issues/queue.go
deleted file mode 100644 (file)
index f93e5c4..0000000
+++ /dev/null
@@ -1,25 +0,0 @@
-// Copyright 2018 The Gitea Authors. All rights reserved.
-// Use of this source code is governed by a MIT-style
-// license that can be found in the LICENSE file.
-
-package issues
-
-// Queue defines an interface to save an issue indexer queue
-type Queue interface {
-       Run() error
-       Push(*IndexerData) error
-}
-
-// DummyQueue represents an empty queue
-type DummyQueue struct {
-}
-
-// Run starts to run the queue
-func (b *DummyQueue) Run() error {
-       return nil
-}
-
-// Push pushes data to indexer
-func (b *DummyQueue) Push(*IndexerData) error {
-       return nil
-}
diff --git a/modules/indexer/issues/queue_channel.go b/modules/indexer/issues/queue_channel.go
deleted file mode 100644 (file)
index b6458d3..0000000
+++ /dev/null
@@ -1,62 +0,0 @@
-// Copyright 2018 The Gitea Authors. All rights reserved.
-// Use of this source code is governed by a MIT-style
-// license that can be found in the LICENSE file.
-
-package issues
-
-import (
-       "time"
-
-       "code.gitea.io/gitea/modules/setting"
-)
-
-// ChannelQueue implements
-type ChannelQueue struct {
-       queue       chan *IndexerData
-       indexer     Indexer
-       batchNumber int
-}
-
-// NewChannelQueue create a memory channel queue
-func NewChannelQueue(indexer Indexer, batchNumber int) *ChannelQueue {
-       return &ChannelQueue{
-               queue:       make(chan *IndexerData, setting.Indexer.UpdateQueueLength),
-               indexer:     indexer,
-               batchNumber: batchNumber,
-       }
-}
-
-// Run starts to run the queue
-func (c *ChannelQueue) Run() error {
-       var i int
-       var datas = make([]*IndexerData, 0, c.batchNumber)
-       for {
-               select {
-               case data := <-c.queue:
-                       if data.IsDelete {
-                               _ = c.indexer.Delete(data.IDs...)
-                               continue
-                       }
-
-                       datas = append(datas, data)
-                       if len(datas) >= c.batchNumber {
-                               _ = c.indexer.Index(datas)
-                               // TODO: save the point
-                               datas = make([]*IndexerData, 0, c.batchNumber)
-                       }
-               case <-time.After(time.Millisecond * 100):
-                       i++
-                       if i >= 3 && len(datas) > 0 {
-                               _ = c.indexer.Index(datas)
-                               // TODO: save the point
-                               datas = make([]*IndexerData, 0, c.batchNumber)
-                       }
-               }
-       }
-}
-
-// Push will push the indexer data to queue
-func (c *ChannelQueue) Push(data *IndexerData) error {
-       c.queue <- data
-       return nil
-}
diff --git a/modules/indexer/issues/queue_disk.go b/modules/indexer/issues/queue_disk.go
deleted file mode 100644 (file)
index d6187f2..0000000
+++ /dev/null
@@ -1,104 +0,0 @@
-// Copyright 2019 The Gitea Authors. All rights reserved.
-// Use of this source code is governed by a MIT-style
-// license that can be found in the LICENSE file.
-
-package issues
-
-import (
-       "encoding/json"
-       "time"
-
-       "code.gitea.io/gitea/modules/log"
-
-       "gitea.com/lunny/levelqueue"
-)
-
-var (
-       _ Queue = &LevelQueue{}
-)
-
-// LevelQueue implements a disk library queue
-type LevelQueue struct {
-       indexer     Indexer
-       queue       *levelqueue.Queue
-       batchNumber int
-}
-
-// NewLevelQueue creates a ledis local queue
-func NewLevelQueue(indexer Indexer, dataDir string, batchNumber int) (*LevelQueue, error) {
-       queue, err := levelqueue.Open(dataDir)
-       if err != nil {
-               return nil, err
-       }
-
-       return &LevelQueue{
-               indexer:     indexer,
-               queue:       queue,
-               batchNumber: batchNumber,
-       }, nil
-}
-
-// Run starts to run the queue
-func (l *LevelQueue) Run() error {
-       var i int
-       var datas = make([]*IndexerData, 0, l.batchNumber)
-       for {
-               i++
-               if len(datas) > l.batchNumber || (len(datas) > 0 && i > 3) {
-                       _ = l.indexer.Index(datas)
-                       datas = make([]*IndexerData, 0, l.batchNumber)
-                       i = 0
-                       continue
-               }
-
-               bs, err := l.queue.RPop()
-               if err != nil {
-                       if err != levelqueue.ErrNotFound {
-                               log.Error("RPop: %v", err)
-                       }
-                       time.Sleep(time.Millisecond * 100)
-                       continue
-               }
-
-               if len(bs) == 0 {
-                       time.Sleep(time.Millisecond * 100)
-                       continue
-               }
-
-               var data IndexerData
-               err = json.Unmarshal(bs, &data)
-               if err != nil {
-                       log.Error("Unmarshal: %v", err)
-                       time.Sleep(time.Millisecond * 100)
-                       continue
-               }
-
-               log.Trace("LevelQueue: task found: %#v", data)
-
-               if data.IsDelete {
-                       if data.ID > 0 {
-                               if err = l.indexer.Delete(data.ID); err != nil {
-                                       log.Error("indexer.Delete: %v", err)
-                               }
-                       } else if len(data.IDs) > 0 {
-                               if err = l.indexer.Delete(data.IDs...); err != nil {
-                                       log.Error("indexer.Delete: %v", err)
-                               }
-                       }
-                       time.Sleep(time.Millisecond * 10)
-                       continue
-               }
-
-               datas = append(datas, &data)
-               time.Sleep(time.Millisecond * 10)
-       }
-}
-
-// Push will push the indexer data to queue
-func (l *LevelQueue) Push(data *IndexerData) error {
-       bs, err := json.Marshal(data)
-       if err != nil {
-               return err
-       }
-       return l.queue.LPush(bs)
-}
diff --git a/modules/indexer/issues/queue_redis.go b/modules/indexer/issues/queue_redis.go
deleted file mode 100644 (file)
index 0344d3c..0000000
+++ /dev/null
@@ -1,146 +0,0 @@
-// Copyright 2019 The Gitea Authors. All rights reserved.
-// Use of this source code is governed by a MIT-style
-// license that can be found in the LICENSE file.
-
-package issues
-
-import (
-       "encoding/json"
-       "errors"
-       "strconv"
-       "strings"
-       "time"
-
-       "code.gitea.io/gitea/modules/log"
-
-       "github.com/go-redis/redis"
-)
-
-var (
-       _ Queue = &RedisQueue{}
-)
-
-type redisClient interface {
-       RPush(key string, args ...interface{}) *redis.IntCmd
-       LPop(key string) *redis.StringCmd
-       Ping() *redis.StatusCmd
-}
-
-// RedisQueue redis queue
-type RedisQueue struct {
-       client      redisClient
-       queueName   string
-       indexer     Indexer
-       batchNumber int
-}
-
-func parseConnStr(connStr string) (addrs, password string, dbIdx int, err error) {
-       fields := strings.Fields(connStr)
-       for _, f := range fields {
-               items := strings.SplitN(f, "=", 2)
-               if len(items) < 2 {
-                       continue
-               }
-               switch strings.ToLower(items[0]) {
-               case "addrs":
-                       addrs = items[1]
-               case "password":
-                       password = items[1]
-               case "db":
-                       dbIdx, err = strconv.Atoi(items[1])
-                       if err != nil {
-                               return
-                       }
-               }
-       }
-       return
-}
-
-// NewRedisQueue creates single redis or cluster redis queue
-func NewRedisQueue(addrs string, password string, dbIdx int, indexer Indexer, batchNumber int) (*RedisQueue, error) {
-       dbs := strings.Split(addrs, ",")
-       var queue = RedisQueue{
-               queueName:   "issue_indexer_queue",
-               indexer:     indexer,
-               batchNumber: batchNumber,
-       }
-       if len(dbs) == 0 {
-               return nil, errors.New("no redis host found")
-       } else if len(dbs) == 1 {
-               queue.client = redis.NewClient(&redis.Options{
-                       Addr:     strings.TrimSpace(dbs[0]), // use default Addr
-                       Password: password,                  // no password set
-                       DB:       dbIdx,                     // use default DB
-               })
-       } else {
-               queue.client = redis.NewClusterClient(&redis.ClusterOptions{
-                       Addrs: dbs,
-               })
-       }
-       if err := queue.client.Ping().Err(); err != nil {
-               return nil, err
-       }
-       return &queue, nil
-}
-
-// Run runs the redis queue
-func (r *RedisQueue) Run() error {
-       var i int
-       var datas = make([]*IndexerData, 0, r.batchNumber)
-       for {
-               bs, err := r.client.LPop(r.queueName).Bytes()
-               if err != nil && err != redis.Nil {
-                       log.Error("LPop faile: %v", err)
-                       time.Sleep(time.Millisecond * 100)
-                       continue
-               }
-
-               i++
-               if len(datas) > r.batchNumber || (len(datas) > 0 && i > 3) {
-                       _ = r.indexer.Index(datas)
-                       datas = make([]*IndexerData, 0, r.batchNumber)
-                       i = 0
-               }
-
-               if len(bs) == 0 {
-                       time.Sleep(time.Millisecond * 100)
-                       continue
-               }
-
-               var data IndexerData
-               err = json.Unmarshal(bs, &data)
-               if err != nil {
-                       log.Error("Unmarshal: %v", err)
-                       time.Sleep(time.Millisecond * 100)
-                       continue
-               }
-
-               log.Trace("RedisQueue: task found: %#v", data)
-
-               if data.IsDelete {
-                       if data.ID > 0 {
-                               if err = r.indexer.Delete(data.ID); err != nil {
-                                       log.Error("indexer.Delete: %v", err)
-                               }
-                       } else if len(data.IDs) > 0 {
-                               if err = r.indexer.Delete(data.IDs...); err != nil {
-                                       log.Error("indexer.Delete: %v", err)
-                               }
-                       }
-                       time.Sleep(time.Millisecond * 100)
-                       continue
-               }
-
-               datas = append(datas, &data)
-               time.Sleep(time.Millisecond * 100)
-       }
-}
-
-// Push implements Queue
-func (r *RedisQueue) Push(data *IndexerData) error {
-       bs, err := json.Marshal(data)
-       if err != nil {
-               return err
-       }
-       return r.client.RPush(r.queueName, bs).Err()
-}
diff --git a/modules/queue/manager.go b/modules/queue/manager.go
new file mode 100644 (file)
index 0000000..88b2644
--- /dev/null
@@ -0,0 +1,270 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+       "context"
+       "encoding/json"
+       "fmt"
+       "reflect"
+       "sort"
+       "sync"
+       "time"
+
+       "code.gitea.io/gitea/modules/log"
+)
+
+var manager *Manager
+
+// Manager is a queue manager
+type Manager struct {
+       mutex sync.Mutex
+
+       counter int64
+       Queues  map[int64]*ManagedQueue
+}
+
+// ManagedQueue represents a working queue inheriting from Gitea.
+type ManagedQueue struct {
+       mutex         sync.Mutex
+       QID           int64
+       Queue         Queue
+       Type          Type
+       Name          string
+       Configuration interface{}
+       ExemplarType  string
+       Pool          ManagedPool
+       counter       int64
+       PoolWorkers   map[int64]*PoolWorkers
+}
+
+// ManagedPool is a simple interface to get certain details from a worker pool
+type ManagedPool interface {
+       AddWorkers(number int, timeout time.Duration) context.CancelFunc
+       NumberOfWorkers() int
+       MaxNumberOfWorkers() int
+       SetMaxNumberOfWorkers(int)
+       BoostTimeout() time.Duration
+       BlockTimeout() time.Duration
+       BoostWorkers() int
+       SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
+}
+
+// ManagedQueueList implements the sort.Interface
+type ManagedQueueList []*ManagedQueue
+
+// PoolWorkers represents a working queue inheriting from Gitea.
+type PoolWorkers struct {
+       PID        int64
+       Workers    int
+       Start      time.Time
+       Timeout    time.Time
+       HasTimeout bool
+       Cancel     context.CancelFunc
+}
+
+// PoolWorkersList implements the sort.Interface
+type PoolWorkersList []*PoolWorkers
+
+func init() {
+       _ = GetManager()
+}
+
+// GetManager returns a Manager and initializes one as singleton if there's none yet
+func GetManager() *Manager {
+       if manager == nil {
+               manager = &Manager{
+                       Queues: make(map[int64]*ManagedQueue),
+               }
+       }
+       return manager
+}
+
+// Add adds a queue to this manager
+func (m *Manager) Add(queue Queue,
+       t Type,
+       configuration,
+       exemplar interface{},
+       pool ManagedPool) int64 {
+
+       cfg, _ := json.Marshal(configuration)
+       mq := &ManagedQueue{
+               Queue:         queue,
+               Type:          t,
+               Configuration: string(cfg),
+               ExemplarType:  reflect.TypeOf(exemplar).String(),
+               PoolWorkers:   make(map[int64]*PoolWorkers),
+               Pool:          pool,
+       }
+       m.mutex.Lock()
+       m.counter++
+       mq.QID = m.counter
+       mq.Name = fmt.Sprintf("queue-%d", mq.QID)
+       if named, ok := queue.(Named); ok {
+               mq.Name = named.Name()
+       }
+       m.Queues[mq.QID] = mq
+       m.mutex.Unlock()
+       log.Trace("Queue Manager registered: %s (QID: %d)", mq.Name, mq.QID)
+       return mq.QID
+}
+
+// Remove a queue from the Manager
+func (m *Manager) Remove(qid int64) {
+       m.mutex.Lock()
+       delete(m.Queues, qid)
+       m.mutex.Unlock()
+       log.Trace("Queue Manager removed: QID: %d", qid)
+
+}
+
+// GetManagedQueue by qid
+func (m *Manager) GetManagedQueue(qid int64) *ManagedQueue {
+       m.mutex.Lock()
+       defer m.mutex.Unlock()
+       return m.Queues[qid]
+}
+
+// ManagedQueues returns the managed queues
+func (m *Manager) ManagedQueues() []*ManagedQueue {
+       m.mutex.Lock()
+       mqs := make([]*ManagedQueue, 0, len(m.Queues))
+       for _, mq := range m.Queues {
+               mqs = append(mqs, mq)
+       }
+       m.mutex.Unlock()
+       sort.Sort(ManagedQueueList(mqs))
+       return mqs
+}
+
+// Workers returns the poolworkers
+func (q *ManagedQueue) Workers() []*PoolWorkers {
+       q.mutex.Lock()
+       workers := make([]*PoolWorkers, 0, len(q.PoolWorkers))
+       for _, worker := range q.PoolWorkers {
+               workers = append(workers, worker)
+       }
+       q.mutex.Unlock()
+       sort.Sort(PoolWorkersList(workers))
+       return workers
+}
+
+// RegisterWorkers registers workers to this queue
+func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, cancel context.CancelFunc) int64 {
+       q.mutex.Lock()
+       defer q.mutex.Unlock()
+       q.counter++
+       q.PoolWorkers[q.counter] = &PoolWorkers{
+               PID:        q.counter,
+               Workers:    number,
+               Start:      start,
+               Timeout:    timeout,
+               HasTimeout: hasTimeout,
+               Cancel:     cancel,
+       }
+       return q.counter
+}
+
+// CancelWorkers cancels pooled workers with pid
+func (q *ManagedQueue) CancelWorkers(pid int64) {
+       q.mutex.Lock()
+       pw, ok := q.PoolWorkers[pid]
+       q.mutex.Unlock()
+       if !ok {
+               return
+       }
+       pw.Cancel()
+}
+
+// RemoveWorkers deletes pooled workers with pid
+func (q *ManagedQueue) RemoveWorkers(pid int64) {
+       q.mutex.Lock()
+       pw, ok := q.PoolWorkers[pid]
+       delete(q.PoolWorkers, pid)
+       q.mutex.Unlock()
+       if ok && pw.Cancel != nil {
+               pw.Cancel()
+       }
+}
+
+// AddWorkers adds workers to the queue if it has registered an add worker function
+func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.CancelFunc {
+       if q.Pool != nil {
+               // the cancel will be added to the pool workers description above
+               return q.Pool.AddWorkers(number, timeout)
+       }
+       return nil
+}
+
+// NumberOfWorkers returns the number of workers in the queue
+func (q *ManagedQueue) NumberOfWorkers() int {
+       if q.Pool != nil {
+               return q.Pool.NumberOfWorkers()
+       }
+       return -1
+}
+
+// MaxNumberOfWorkers returns the maximum number of workers for the pool
+func (q *ManagedQueue) MaxNumberOfWorkers() int {
+       if q.Pool != nil {
+               return q.Pool.MaxNumberOfWorkers()
+       }
+       return 0
+}
+
+// BoostWorkers returns the number of workers for a boost
+func (q *ManagedQueue) BoostWorkers() int {
+       if q.Pool != nil {
+               return q.Pool.BoostWorkers()
+       }
+       return -1
+}
+
+// BoostTimeout returns the timeout of the next boost
+func (q *ManagedQueue) BoostTimeout() time.Duration {
+       if q.Pool != nil {
+               return q.Pool.BoostTimeout()
+       }
+       return 0
+}
+
+// BlockTimeout returns the timeout til the next boost
+func (q *ManagedQueue) BlockTimeout() time.Duration {
+       if q.Pool != nil {
+               return q.Pool.BlockTimeout()
+       }
+       return 0
+}
+
+// SetSettings sets the setable boost values
+func (q *ManagedQueue) SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) {
+       if q.Pool != nil {
+               q.Pool.SetSettings(maxNumberOfWorkers, boostWorkers, timeout)
+       }
+}
+
+func (l ManagedQueueList) Len() int {
+       return len(l)
+}
+
+func (l ManagedQueueList) Less(i, j int) bool {
+       return l[i].Name < l[j].Name
+}
+
+func (l ManagedQueueList) Swap(i, j int) {
+       l[i], l[j] = l[j], l[i]
+}
+
+func (l PoolWorkersList) Len() int {
+       return len(l)
+}
+
+func (l PoolWorkersList) Less(i, j int) bool {
+       return l[i].Start.Before(l[j].Start)
+}
+
+func (l PoolWorkersList) Swap(i, j int) {
+       l[i], l[j] = l[j], l[i]
+}
diff --git a/modules/queue/queue.go b/modules/queue/queue.go
new file mode 100644 (file)
index 0000000..d458a7d
--- /dev/null
@@ -0,0 +1,133 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+       "context"
+       "encoding/json"
+       "fmt"
+       "reflect"
+)
+
+// ErrInvalidConfiguration is called when there is invalid configuration for a queue
+type ErrInvalidConfiguration struct {
+       cfg interface{}
+       err error
+}
+
+func (err ErrInvalidConfiguration) Error() string {
+       if err.err != nil {
+               return fmt.Sprintf("Invalid Configuration Argument: %v: Error: %v", err.cfg, err.err)
+       }
+       return fmt.Sprintf("Invalid Configuration Argument: %v", err.cfg)
+}
+
+// IsErrInvalidConfiguration checks if an error is an ErrInvalidConfiguration
+func IsErrInvalidConfiguration(err error) bool {
+       _, ok := err.(ErrInvalidConfiguration)
+       return ok
+}
+
+// Type is a type of Queue
+type Type string
+
+// Data defines an type of queuable data
+type Data interface{}
+
+// HandlerFunc is a function that takes a variable amount of data and processes it
+type HandlerFunc func(...Data)
+
+// NewQueueFunc is a function that creates a queue
+type NewQueueFunc func(handler HandlerFunc, config interface{}, exemplar interface{}) (Queue, error)
+
+// Shutdownable represents a queue that can be shutdown
+type Shutdownable interface {
+       Shutdown()
+       Terminate()
+}
+
+// Named represents a queue with a name
+type Named interface {
+       Name() string
+}
+
+// Queue defines an interface to save an issue indexer queue
+type Queue interface {
+       Run(atShutdown, atTerminate func(context.Context, func()))
+       Push(Data) error
+}
+
+// DummyQueueType is the type for the dummy queue
+const DummyQueueType Type = "dummy"
+
+// NewDummyQueue creates a new DummyQueue
+func NewDummyQueue(handler HandlerFunc, opts, exemplar interface{}) (Queue, error) {
+       return &DummyQueue{}, nil
+}
+
+// DummyQueue represents an empty queue
+type DummyQueue struct {
+}
+
+// Run starts to run the queue
+func (b *DummyQueue) Run(_, _ func(context.Context, func())) {}
+
+// Push pushes data to the queue
+func (b *DummyQueue) Push(Data) error {
+       return nil
+}
+
+func toConfig(exemplar, cfg interface{}) (interface{}, error) {
+       if reflect.TypeOf(cfg).AssignableTo(reflect.TypeOf(exemplar)) {
+               return cfg, nil
+       }
+
+       configBytes, ok := cfg.([]byte)
+       if !ok {
+               configStr, ok := cfg.(string)
+               if !ok {
+                       return nil, ErrInvalidConfiguration{cfg: cfg}
+               }
+               configBytes = []byte(configStr)
+       }
+       newVal := reflect.New(reflect.TypeOf(exemplar))
+       if err := json.Unmarshal(configBytes, newVal.Interface()); err != nil {
+               return nil, ErrInvalidConfiguration{cfg: cfg, err: err}
+       }
+       return newVal.Elem().Interface(), nil
+}
+
+var queuesMap = map[Type]NewQueueFunc{DummyQueueType: NewDummyQueue}
+
+// RegisteredTypes provides the list of requested types of queues
+func RegisteredTypes() []Type {
+       types := make([]Type, len(queuesMap))
+       i := 0
+       for key := range queuesMap {
+               types[i] = key
+               i++
+       }
+       return types
+}
+
+// RegisteredTypesAsString provides the list of requested types of queues
+func RegisteredTypesAsString() []string {
+       types := make([]string, len(queuesMap))
+       i := 0
+       for key := range queuesMap {
+               types[i] = string(key)
+               i++
+       }
+       return types
+}
+
+// NewQueue takes a queue Type and HandlerFunc some options and possibly an exemplar and returns a Queue or an error
+func NewQueue(queueType Type, handlerFunc HandlerFunc, opts, exemplar interface{}) (Queue, error) {
+       newFn, ok := queuesMap[queueType]
+       if !ok {
+               return nil, fmt.Errorf("Unsupported queue type: %v", queueType)
+       }
+       return newFn(handlerFunc, opts, exemplar)
+}
diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go
new file mode 100644 (file)
index 0000000..c8f8a53
--- /dev/null
@@ -0,0 +1,106 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+       "context"
+       "fmt"
+       "reflect"
+       "time"
+
+       "code.gitea.io/gitea/modules/log"
+)
+
+// ChannelQueueType is the type for channel queue
+const ChannelQueueType Type = "channel"
+
+// ChannelQueueConfiguration is the configuration for a ChannelQueue
+type ChannelQueueConfiguration struct {
+       QueueLength  int
+       BatchLength  int
+       Workers      int
+       MaxWorkers   int
+       BlockTimeout time.Duration
+       BoostTimeout time.Duration
+       BoostWorkers int
+       Name         string
+}
+
+// ChannelQueue implements
+type ChannelQueue struct {
+       pool     *WorkerPool
+       exemplar interface{}
+       workers  int
+       name     string
+}
+
+// NewChannelQueue create a memory channel queue
+func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
+       configInterface, err := toConfig(ChannelQueueConfiguration{}, cfg)
+       if err != nil {
+               return nil, err
+       }
+       config := configInterface.(ChannelQueueConfiguration)
+       if config.BatchLength == 0 {
+               config.BatchLength = 1
+       }
+       dataChan := make(chan Data, config.QueueLength)
+
+       ctx, cancel := context.WithCancel(context.Background())
+       queue := &ChannelQueue{
+               pool: &WorkerPool{
+                       baseCtx:            ctx,
+                       cancel:             cancel,
+                       batchLength:        config.BatchLength,
+                       handle:             handle,
+                       dataChan:           dataChan,
+                       blockTimeout:       config.BlockTimeout,
+                       boostTimeout:       config.BoostTimeout,
+                       boostWorkers:       config.BoostWorkers,
+                       maxNumberOfWorkers: config.MaxWorkers,
+               },
+               exemplar: exemplar,
+               workers:  config.Workers,
+               name:     config.Name,
+       }
+       queue.pool.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar, queue.pool)
+       return queue, nil
+}
+
+// Run starts to run the queue
+func (c *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
+       atShutdown(context.Background(), func() {
+               log.Warn("ChannelQueue: %s is not shutdownable!", c.name)
+       })
+       atTerminate(context.Background(), func() {
+               log.Warn("ChannelQueue: %s is not terminatable!", c.name)
+       })
+       go func() {
+               _ = c.pool.AddWorkers(c.workers, 0)
+       }()
+}
+
+// Push will push data into the queue
+func (c *ChannelQueue) Push(data Data) error {
+       if c.exemplar != nil {
+               // Assert data is of same type as r.exemplar
+               t := reflect.TypeOf(data)
+               exemplarType := reflect.TypeOf(c.exemplar)
+               if !t.AssignableTo(exemplarType) || data == nil {
+                       return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, c.exemplar, c.name)
+               }
+       }
+       c.pool.Push(data)
+       return nil
+}
+
+// Name returns the name of this queue
+func (c *ChannelQueue) Name() string {
+       return c.name
+}
+
+func init() {
+       queuesMap[ChannelQueueType] = NewChannelQueue
+}
diff --git a/modules/queue/queue_channel_test.go b/modules/queue/queue_channel_test.go
new file mode 100644 (file)
index 0000000..fafc1e3
--- /dev/null
@@ -0,0 +1,91 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+       "context"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestChannelQueue(t *testing.T) {
+       handleChan := make(chan *testData)
+       handle := func(data ...Data) {
+               for _, datum := range data {
+                       testDatum := datum.(*testData)
+                       handleChan <- testDatum
+               }
+       }
+
+       nilFn := func(_ context.Context, _ func()) {}
+
+       queue, err := NewChannelQueue(handle,
+               ChannelQueueConfiguration{
+                       QueueLength:  20,
+                       Workers:      1,
+                       MaxWorkers:   10,
+                       BlockTimeout: 1 * time.Second,
+                       BoostTimeout: 5 * time.Minute,
+                       BoostWorkers: 5,
+               }, &testData{})
+       assert.NoError(t, err)
+
+       go queue.Run(nilFn, nilFn)
+
+       test1 := testData{"A", 1}
+       go queue.Push(&test1)
+       result1 := <-handleChan
+       assert.Equal(t, test1.TestString, result1.TestString)
+       assert.Equal(t, test1.TestInt, result1.TestInt)
+
+       err = queue.Push(test1)
+       assert.Error(t, err)
+}
+
+func TestChannelQueue_Batch(t *testing.T) {
+       handleChan := make(chan *testData)
+       handle := func(data ...Data) {
+               assert.True(t, len(data) == 2)
+               for _, datum := range data {
+                       testDatum := datum.(*testData)
+                       handleChan <- testDatum
+               }
+       }
+
+       nilFn := func(_ context.Context, _ func()) {}
+
+       queue, err := NewChannelQueue(handle,
+               ChannelQueueConfiguration{
+                       QueueLength:  20,
+                       BatchLength:  2,
+                       Workers:      1,
+                       MaxWorkers:   10,
+                       BlockTimeout: 1 * time.Second,
+                       BoostTimeout: 5 * time.Minute,
+                       BoostWorkers: 5,
+               }, &testData{})
+       assert.NoError(t, err)
+
+       go queue.Run(nilFn, nilFn)
+
+       test1 := testData{"A", 1}
+       test2 := testData{"B", 2}
+
+       queue.Push(&test1)
+       go queue.Push(&test2)
+
+       result1 := <-handleChan
+       assert.Equal(t, test1.TestString, result1.TestString)
+       assert.Equal(t, test1.TestInt, result1.TestInt)
+
+       result2 := <-handleChan
+       assert.Equal(t, test2.TestString, result2.TestString)
+       assert.Equal(t, test2.TestInt, result2.TestInt)
+
+       err = queue.Push(test1)
+       assert.Error(t, err)
+}
diff --git a/modules/queue/queue_disk.go b/modules/queue/queue_disk.go
new file mode 100644 (file)
index 0000000..98e7b24
--- /dev/null
@@ -0,0 +1,213 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+       "context"
+       "encoding/json"
+       "fmt"
+       "reflect"
+       "sync"
+       "time"
+
+       "code.gitea.io/gitea/modules/log"
+
+       "gitea.com/lunny/levelqueue"
+)
+
+// LevelQueueType is the type for level queue
+const LevelQueueType Type = "level"
+
+// LevelQueueConfiguration is the configuration for a LevelQueue
+type LevelQueueConfiguration struct {
+       DataDir      string
+       QueueLength  int
+       BatchLength  int
+       Workers      int
+       MaxWorkers   int
+       BlockTimeout time.Duration
+       BoostTimeout time.Duration
+       BoostWorkers int
+       Name         string
+}
+
+// LevelQueue implements a disk library queue
+type LevelQueue struct {
+       pool       *WorkerPool
+       queue      *levelqueue.Queue
+       closed     chan struct{}
+       terminated chan struct{}
+       lock       sync.Mutex
+       exemplar   interface{}
+       workers    int
+       name       string
+}
+
+// NewLevelQueue creates a ledis local queue
+func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
+       configInterface, err := toConfig(LevelQueueConfiguration{}, cfg)
+       if err != nil {
+               return nil, err
+       }
+       config := configInterface.(LevelQueueConfiguration)
+
+       internal, err := levelqueue.Open(config.DataDir)
+       if err != nil {
+               return nil, err
+       }
+
+       dataChan := make(chan Data, config.QueueLength)
+       ctx, cancel := context.WithCancel(context.Background())
+
+       queue := &LevelQueue{
+               pool: &WorkerPool{
+                       baseCtx:            ctx,
+                       cancel:             cancel,
+                       batchLength:        config.BatchLength,
+                       handle:             handle,
+                       dataChan:           dataChan,
+                       blockTimeout:       config.BlockTimeout,
+                       boostTimeout:       config.BoostTimeout,
+                       boostWorkers:       config.BoostWorkers,
+                       maxNumberOfWorkers: config.MaxWorkers,
+               },
+               queue:      internal,
+               exemplar:   exemplar,
+               closed:     make(chan struct{}),
+               terminated: make(chan struct{}),
+               workers:    config.Workers,
+               name:       config.Name,
+       }
+       queue.pool.qid = GetManager().Add(queue, LevelQueueType, config, exemplar, queue.pool)
+       return queue, nil
+}
+
+// Run starts to run the queue
+func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
+       atShutdown(context.Background(), l.Shutdown)
+       atTerminate(context.Background(), l.Terminate)
+
+       go func() {
+               _ = l.pool.AddWorkers(l.workers, 0)
+       }()
+
+       go l.readToChan()
+
+       log.Trace("LevelQueue: %s Waiting til closed", l.name)
+       <-l.closed
+
+       log.Trace("LevelQueue: %s Waiting til done", l.name)
+       l.pool.Wait()
+
+       log.Trace("LevelQueue: %s Waiting til cleaned", l.name)
+       ctx, cancel := context.WithCancel(context.Background())
+       atTerminate(ctx, cancel)
+       l.pool.CleanUp(ctx)
+       cancel()
+       log.Trace("LevelQueue: %s Cleaned", l.name)
+
+}
+
+func (l *LevelQueue) readToChan() {
+       for {
+               select {
+               case <-l.closed:
+                       // tell the pool to shutdown.
+                       l.pool.cancel()
+                       return
+               default:
+                       bs, err := l.queue.RPop()
+                       if err != nil {
+                               if err != levelqueue.ErrNotFound {
+                                       log.Error("LevelQueue: %s Error on RPop: %v", l.name, err)
+                               }
+                               time.Sleep(time.Millisecond * 100)
+                               continue
+                       }
+
+                       if len(bs) == 0 {
+                               time.Sleep(time.Millisecond * 100)
+                               continue
+                       }
+
+                       var data Data
+                       if l.exemplar != nil {
+                               t := reflect.TypeOf(l.exemplar)
+                               n := reflect.New(t)
+                               ne := n.Elem()
+                               err = json.Unmarshal(bs, ne.Addr().Interface())
+                               data = ne.Interface().(Data)
+                       } else {
+                               err = json.Unmarshal(bs, &data)
+                       }
+                       if err != nil {
+                               log.Error("LevelQueue: %s Failed to unmarshal with error: %v", l.name, err)
+                               time.Sleep(time.Millisecond * 100)
+                               continue
+                       }
+
+                       log.Trace("LevelQueue %s: Task found: %#v", l.name, data)
+                       l.pool.Push(data)
+
+               }
+       }
+}
+
+// Push will push the indexer data to queue
+func (l *LevelQueue) Push(data Data) error {
+       if l.exemplar != nil {
+               // Assert data is of same type as r.exemplar
+               value := reflect.ValueOf(data)
+               t := value.Type()
+               exemplarType := reflect.ValueOf(l.exemplar).Type()
+               if !t.AssignableTo(exemplarType) || data == nil {
+                       return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, l.exemplar, l.name)
+               }
+       }
+       bs, err := json.Marshal(data)
+       if err != nil {
+               return err
+       }
+       return l.queue.LPush(bs)
+}
+
+// Shutdown this queue and stop processing
+func (l *LevelQueue) Shutdown() {
+       l.lock.Lock()
+       defer l.lock.Unlock()
+       log.Trace("LevelQueue: %s Shutdown", l.name)
+       select {
+       case <-l.closed:
+       default:
+               close(l.closed)
+       }
+}
+
+// Terminate this queue and close the queue
+func (l *LevelQueue) Terminate() {
+       log.Trace("LevelQueue: %s Terminating", l.name)
+       l.Shutdown()
+       l.lock.Lock()
+       select {
+       case <-l.terminated:
+               l.lock.Unlock()
+       default:
+               close(l.terminated)
+               l.lock.Unlock()
+               if err := l.queue.Close(); err != nil && err.Error() != "leveldb: closed" {
+                       log.Error("Error whilst closing internal queue in %s: %v", l.name, err)
+               }
+
+       }
+}
+
+// Name returns the name of this queue
+func (l *LevelQueue) Name() string {
+       return l.name
+}
+
+func init() {
+       queuesMap[LevelQueueType] = NewLevelQueue
+}
diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go
new file mode 100644 (file)
index 0000000..895c8ce
--- /dev/null
@@ -0,0 +1,193 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+       "context"
+       "time"
+
+       "code.gitea.io/gitea/modules/log"
+)
+
+// PersistableChannelQueueType is the type for persistable queue
+const PersistableChannelQueueType Type = "persistable-channel"
+
+// PersistableChannelQueueConfiguration is the configuration for a PersistableChannelQueue
+type PersistableChannelQueueConfiguration struct {
+       Name         string
+       DataDir      string
+       BatchLength  int
+       QueueLength  int
+       Timeout      time.Duration
+       MaxAttempts  int
+       Workers      int
+       MaxWorkers   int
+       BlockTimeout time.Duration
+       BoostTimeout time.Duration
+       BoostWorkers int
+}
+
+// PersistableChannelQueue wraps a channel queue and level queue together
+type PersistableChannelQueue struct {
+       *ChannelQueue
+       delayedStarter
+       closed chan struct{}
+}
+
+// NewPersistableChannelQueue creates a wrapped batched channel queue with persistable level queue backend when shutting down
+// This differs from a wrapped queue in that the persistent queue is only used to persist at shutdown/terminate
+func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
+       configInterface, err := toConfig(PersistableChannelQueueConfiguration{}, cfg)
+       if err != nil {
+               return nil, err
+       }
+       config := configInterface.(PersistableChannelQueueConfiguration)
+
+       channelQueue, err := NewChannelQueue(handle, ChannelQueueConfiguration{
+               QueueLength:  config.QueueLength,
+               BatchLength:  config.BatchLength,
+               Workers:      config.Workers,
+               MaxWorkers:   config.MaxWorkers,
+               BlockTimeout: config.BlockTimeout,
+               BoostTimeout: config.BoostTimeout,
+               BoostWorkers: config.BoostWorkers,
+               Name:         config.Name + "-channel",
+       }, exemplar)
+       if err != nil {
+               return nil, err
+       }
+
+       // the level backend only needs temporary workers to catch up with the previously dropped work
+       levelCfg := LevelQueueConfiguration{
+               DataDir:      config.DataDir,
+               QueueLength:  config.QueueLength,
+               BatchLength:  config.BatchLength,
+               Workers:      1,
+               MaxWorkers:   6,
+               BlockTimeout: 1 * time.Second,
+               BoostTimeout: 5 * time.Minute,
+               BoostWorkers: 5,
+               Name:         config.Name + "-level",
+       }
+
+       levelQueue, err := NewLevelQueue(handle, levelCfg, exemplar)
+       if err == nil {
+               queue := &PersistableChannelQueue{
+                       ChannelQueue: channelQueue.(*ChannelQueue),
+                       delayedStarter: delayedStarter{
+                               internal: levelQueue.(*LevelQueue),
+                               name:     config.Name,
+                       },
+                       closed: make(chan struct{}),
+               }
+               _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar, nil)
+               return queue, nil
+       }
+       if IsErrInvalidConfiguration(err) {
+               // Retrying ain't gonna make this any better...
+               return nil, ErrInvalidConfiguration{cfg: cfg}
+       }
+
+       queue := &PersistableChannelQueue{
+               ChannelQueue: channelQueue.(*ChannelQueue),
+               delayedStarter: delayedStarter{
+                       cfg:         levelCfg,
+                       underlying:  LevelQueueType,
+                       timeout:     config.Timeout,
+                       maxAttempts: config.MaxAttempts,
+                       name:        config.Name,
+               },
+               closed: make(chan struct{}),
+       }
+       _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar, nil)
+       return queue, nil
+}
+
+// Name returns the name of this queue
+func (p *PersistableChannelQueue) Name() string {
+       return p.delayedStarter.name
+}
+
+// Push will push the indexer data to queue
+func (p *PersistableChannelQueue) Push(data Data) error {
+       select {
+       case <-p.closed:
+               return p.internal.Push(data)
+       default:
+               return p.ChannelQueue.Push(data)
+       }
+}
+
+// Run starts to run the queue
+func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
+       p.lock.Lock()
+       if p.internal == nil {
+               err := p.setInternal(atShutdown, p.ChannelQueue.pool.handle, p.exemplar)
+               p.lock.Unlock()
+               if err != nil {
+                       log.Fatal("Unable to create internal queue for %s Error: %v", p.Name(), err)
+                       return
+               }
+       } else {
+               p.lock.Unlock()
+       }
+       atShutdown(context.Background(), p.Shutdown)
+       atTerminate(context.Background(), p.Terminate)
+
+       // Just run the level queue - we shut it down later
+       go p.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {})
+
+       go func() {
+               _ = p.ChannelQueue.pool.AddWorkers(p.workers, 0)
+       }()
+
+       log.Trace("PersistableChannelQueue: %s Waiting til closed", p.delayedStarter.name)
+       <-p.closed
+       log.Trace("PersistableChannelQueue: %s Cancelling pools", p.delayedStarter.name)
+       p.ChannelQueue.pool.cancel()
+       p.internal.(*LevelQueue).pool.cancel()
+       log.Trace("PersistableChannelQueue: %s Waiting til done", p.delayedStarter.name)
+       p.ChannelQueue.pool.Wait()
+       p.internal.(*LevelQueue).pool.Wait()
+       // Redirect all remaining data in the chan to the internal channel
+       go func() {
+               log.Trace("PersistableChannelQueue: %s Redirecting remaining data", p.delayedStarter.name)
+               for data := range p.ChannelQueue.pool.dataChan {
+                       _ = p.internal.Push(data)
+               }
+               log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", p.delayedStarter.name)
+       }()
+       log.Trace("PersistableChannelQueue: %s Done main loop", p.delayedStarter.name)
+}
+
+// Shutdown processing this queue
+func (p *PersistableChannelQueue) Shutdown() {
+       log.Trace("PersistableChannelQueue: %s Shutdown", p.delayedStarter.name)
+       select {
+       case <-p.closed:
+       default:
+               p.lock.Lock()
+               defer p.lock.Unlock()
+               if p.internal != nil {
+                       p.internal.(*LevelQueue).Shutdown()
+               }
+               close(p.closed)
+       }
+}
+
+// Terminate this queue and close the queue
+func (p *PersistableChannelQueue) Terminate() {
+       log.Trace("PersistableChannelQueue: %s Terminating", p.delayedStarter.name)
+       p.Shutdown()
+       p.lock.Lock()
+       defer p.lock.Unlock()
+       if p.internal != nil {
+               p.internal.(*LevelQueue).Terminate()
+       }
+}
+
+func init() {
+       queuesMap[PersistableChannelQueueType] = NewPersistableChannelQueue
+}
diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go
new file mode 100644 (file)
index 0000000..4ef6896
--- /dev/null
@@ -0,0 +1,117 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+       "context"
+       "io/ioutil"
+       "os"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestPersistableChannelQueue(t *testing.T) {
+       handleChan := make(chan *testData)
+       handle := func(data ...Data) {
+               assert.True(t, len(data) == 2)
+               for _, datum := range data {
+                       testDatum := datum.(*testData)
+                       handleChan <- testDatum
+               }
+       }
+
+       queueShutdown := []func(){}
+       queueTerminate := []func(){}
+
+       tmpDir, err := ioutil.TempDir("", "persistable-channel-queue-test-data")
+       assert.NoError(t, err)
+       defer os.RemoveAll(tmpDir)
+
+       queue, err := NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{
+               DataDir:     tmpDir,
+               BatchLength: 2,
+               QueueLength: 20,
+               Workers:     1,
+               MaxWorkers:  10,
+       }, &testData{})
+       assert.NoError(t, err)
+
+       go queue.Run(func(_ context.Context, shutdown func()) {
+               queueShutdown = append(queueShutdown, shutdown)
+       }, func(_ context.Context, terminate func()) {
+               queueTerminate = append(queueTerminate, terminate)
+       })
+
+       test1 := testData{"A", 1}
+       test2 := testData{"B", 2}
+
+       err = queue.Push(&test1)
+       assert.NoError(t, err)
+       go func() {
+               err = queue.Push(&test2)
+               assert.NoError(t, err)
+       }()
+
+       result1 := <-handleChan
+       assert.Equal(t, test1.TestString, result1.TestString)
+       assert.Equal(t, test1.TestInt, result1.TestInt)
+
+       result2 := <-handleChan
+       assert.Equal(t, test2.TestString, result2.TestString)
+       assert.Equal(t, test2.TestInt, result2.TestInt)
+
+       err = queue.Push(test1)
+       assert.Error(t, err)
+
+       for _, callback := range queueShutdown {
+               callback()
+       }
+       time.Sleep(200 * time.Millisecond)
+       err = queue.Push(&test1)
+       assert.NoError(t, err)
+       err = queue.Push(&test2)
+       assert.NoError(t, err)
+       select {
+       case <-handleChan:
+               assert.Fail(t, "Handler processing should have stopped")
+       default:
+       }
+       for _, callback := range queueTerminate {
+               callback()
+       }
+
+       // Reopen queue
+       queue, err = NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{
+               DataDir:     tmpDir,
+               BatchLength: 2,
+               QueueLength: 20,
+               Workers:     1,
+               MaxWorkers:  10,
+       }, &testData{})
+       assert.NoError(t, err)
+
+       go queue.Run(func(_ context.Context, shutdown func()) {
+               queueShutdown = append(queueShutdown, shutdown)
+       }, func(_ context.Context, terminate func()) {
+               queueTerminate = append(queueTerminate, terminate)
+       })
+
+       result3 := <-handleChan
+       assert.Equal(t, test1.TestString, result3.TestString)
+       assert.Equal(t, test1.TestInt, result3.TestInt)
+
+       result4 := <-handleChan
+       assert.Equal(t, test2.TestString, result4.TestString)
+       assert.Equal(t, test2.TestInt, result4.TestInt)
+       for _, callback := range queueShutdown {
+               callback()
+       }
+       for _, callback := range queueTerminate {
+               callback()
+       }
+
+}
diff --git a/modules/queue/queue_disk_test.go b/modules/queue/queue_disk_test.go
new file mode 100644 (file)
index 0000000..c5959d6
--- /dev/null
@@ -0,0 +1,126 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+       "context"
+       "io/ioutil"
+       "os"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestLevelQueue(t *testing.T) {
+       handleChan := make(chan *testData)
+       handle := func(data ...Data) {
+               assert.True(t, len(data) == 2)
+               for _, datum := range data {
+                       testDatum := datum.(*testData)
+                       handleChan <- testDatum
+               }
+       }
+
+       queueShutdown := []func(){}
+       queueTerminate := []func(){}
+
+       tmpDir, err := ioutil.TempDir("", "level-queue-test-data")
+       assert.NoError(t, err)
+       defer os.RemoveAll(tmpDir)
+
+       queue, err := NewLevelQueue(handle, LevelQueueConfiguration{
+               DataDir:      tmpDir,
+               BatchLength:  2,
+               Workers:      1,
+               MaxWorkers:   10,
+               QueueLength:  20,
+               BlockTimeout: 1 * time.Second,
+               BoostTimeout: 5 * time.Minute,
+               BoostWorkers: 5,
+       }, &testData{})
+       assert.NoError(t, err)
+
+       go queue.Run(func(_ context.Context, shutdown func()) {
+               queueShutdown = append(queueShutdown, shutdown)
+       }, func(_ context.Context, terminate func()) {
+               queueTerminate = append(queueTerminate, terminate)
+       })
+
+       test1 := testData{"A", 1}
+       test2 := testData{"B", 2}
+
+       err = queue.Push(&test1)
+       assert.NoError(t, err)
+       go func() {
+               err = queue.Push(&test2)
+               assert.NoError(t, err)
+       }()
+
+       result1 := <-handleChan
+       assert.Equal(t, test1.TestString, result1.TestString)
+       assert.Equal(t, test1.TestInt, result1.TestInt)
+
+       result2 := <-handleChan
+       assert.Equal(t, test2.TestString, result2.TestString)
+       assert.Equal(t, test2.TestInt, result2.TestInt)
+
+       err = queue.Push(test1)
+       assert.Error(t, err)
+
+       for _, callback := range queueShutdown {
+               callback()
+       }
+       time.Sleep(200 * time.Millisecond)
+       err = queue.Push(&test1)
+       assert.NoError(t, err)
+       err = queue.Push(&test2)
+       assert.NoError(t, err)
+       select {
+       case <-handleChan:
+               assert.Fail(t, "Handler processing should have stopped")
+       default:
+       }
+       for _, callback := range queueTerminate {
+               callback()
+       }
+
+       // Reopen queue
+       queue, err = NewWrappedQueue(handle,
+               WrappedQueueConfiguration{
+                       Underlying: LevelQueueType,
+                       Config: LevelQueueConfiguration{
+                               DataDir:      tmpDir,
+                               BatchLength:  2,
+                               Workers:      1,
+                               MaxWorkers:   10,
+                               QueueLength:  20,
+                               BlockTimeout: 1 * time.Second,
+                               BoostTimeout: 5 * time.Minute,
+                               BoostWorkers: 5,
+                       },
+               }, &testData{})
+       assert.NoError(t, err)
+
+       go queue.Run(func(_ context.Context, shutdown func()) {
+               queueShutdown = append(queueShutdown, shutdown)
+       }, func(_ context.Context, terminate func()) {
+               queueTerminate = append(queueTerminate, terminate)
+       })
+
+       result3 := <-handleChan
+       assert.Equal(t, test1.TestString, result3.TestString)
+       assert.Equal(t, test1.TestInt, result3.TestInt)
+
+       result4 := <-handleChan
+       assert.Equal(t, test2.TestString, result4.TestString)
+       assert.Equal(t, test2.TestInt, result4.TestInt)
+       for _, callback := range queueShutdown {
+               callback()
+       }
+       for _, callback := range queueTerminate {
+               callback()
+       }
+}
diff --git a/modules/queue/queue_redis.go b/modules/queue/queue_redis.go
new file mode 100644 (file)
index 0000000..14e6893
--- /dev/null
@@ -0,0 +1,234 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+       "context"
+       "encoding/json"
+       "errors"
+       "fmt"
+       "reflect"
+       "strings"
+       "sync"
+       "time"
+
+       "code.gitea.io/gitea/modules/log"
+
+       "github.com/go-redis/redis"
+)
+
+// RedisQueueType is the type for redis queue
+const RedisQueueType Type = "redis"
+
+type redisClient interface {
+       RPush(key string, args ...interface{}) *redis.IntCmd
+       LPop(key string) *redis.StringCmd
+       Ping() *redis.StatusCmd
+       Close() error
+}
+
+// RedisQueue redis queue
+type RedisQueue struct {
+       pool       *WorkerPool
+       client     redisClient
+       queueName  string
+       closed     chan struct{}
+       terminated chan struct{}
+       exemplar   interface{}
+       workers    int
+       name       string
+       lock       sync.Mutex
+}
+
+// RedisQueueConfiguration is the configuration for the redis queue
+type RedisQueueConfiguration struct {
+       Network      string
+       Addresses    string
+       Password     string
+       DBIndex      int
+       BatchLength  int
+       QueueLength  int
+       QueueName    string
+       Workers      int
+       MaxWorkers   int
+       BlockTimeout time.Duration
+       BoostTimeout time.Duration
+       BoostWorkers int
+       Name         string
+}
+
+// NewRedisQueue creates single redis or cluster redis queue
+func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
+       configInterface, err := toConfig(RedisQueueConfiguration{}, cfg)
+       if err != nil {
+               return nil, err
+       }
+       config := configInterface.(RedisQueueConfiguration)
+
+       dbs := strings.Split(config.Addresses, ",")
+
+       dataChan := make(chan Data, config.QueueLength)
+       ctx, cancel := context.WithCancel(context.Background())
+
+       var queue = &RedisQueue{
+               pool: &WorkerPool{
+                       baseCtx:            ctx,
+                       cancel:             cancel,
+                       batchLength:        config.BatchLength,
+                       handle:             handle,
+                       dataChan:           dataChan,
+                       blockTimeout:       config.BlockTimeout,
+                       boostTimeout:       config.BoostTimeout,
+                       boostWorkers:       config.BoostWorkers,
+                       maxNumberOfWorkers: config.MaxWorkers,
+               },
+               queueName: config.QueueName,
+               exemplar:  exemplar,
+               closed:    make(chan struct{}),
+               workers:   config.Workers,
+               name:      config.Name,
+       }
+       if len(dbs) == 0 {
+               return nil, errors.New("no redis host specified")
+       } else if len(dbs) == 1 {
+               queue.client = redis.NewClient(&redis.Options{
+                       Network:  config.Network,
+                       Addr:     strings.TrimSpace(dbs[0]), // use default Addr
+                       Password: config.Password,           // no password set
+                       DB:       config.DBIndex,            // use default DB
+               })
+       } else {
+               queue.client = redis.NewClusterClient(&redis.ClusterOptions{
+                       Addrs: dbs,
+               })
+       }
+       if err := queue.client.Ping().Err(); err != nil {
+               return nil, err
+       }
+       queue.pool.qid = GetManager().Add(queue, RedisQueueType, config, exemplar, queue.pool)
+
+       return queue, nil
+}
+
+// Run runs the redis queue
+func (r *RedisQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
+       atShutdown(context.Background(), r.Shutdown)
+       atTerminate(context.Background(), r.Terminate)
+
+       go func() {
+               _ = r.pool.AddWorkers(r.workers, 0)
+       }()
+
+       go r.readToChan()
+
+       log.Trace("RedisQueue: %s Waiting til closed", r.name)
+       <-r.closed
+       log.Trace("RedisQueue: %s Waiting til done", r.name)
+       r.pool.Wait()
+
+       log.Trace("RedisQueue: %s Waiting til cleaned", r.name)
+       ctx, cancel := context.WithCancel(context.Background())
+       atTerminate(ctx, cancel)
+       r.pool.CleanUp(ctx)
+       cancel()
+}
+
+func (r *RedisQueue) readToChan() {
+       for {
+               select {
+               case <-r.closed:
+                       // tell the pool to shutdown
+                       r.pool.cancel()
+                       return
+               default:
+                       bs, err := r.client.LPop(r.queueName).Bytes()
+                       if err != nil && err != redis.Nil {
+                               log.Error("RedisQueue: %s Error on LPop: %v", r.name, err)
+                               time.Sleep(time.Millisecond * 100)
+                               continue
+                       }
+
+                       if len(bs) == 0 {
+                               time.Sleep(time.Millisecond * 100)
+                               continue
+                       }
+
+                       var data Data
+                       if r.exemplar != nil {
+                               t := reflect.TypeOf(r.exemplar)
+                               n := reflect.New(t)
+                               ne := n.Elem()
+                               err = json.Unmarshal(bs, ne.Addr().Interface())
+                               data = ne.Interface().(Data)
+                       } else {
+                               err = json.Unmarshal(bs, &data)
+                       }
+                       if err != nil {
+                               log.Error("RedisQueue: %s Error on Unmarshal: %v", r.name, err)
+                               time.Sleep(time.Millisecond * 100)
+                               continue
+                       }
+
+                       log.Trace("RedisQueue: %s Task found: %#v", r.name, data)
+                       r.pool.Push(data)
+               }
+       }
+}
+
+// Push implements Queue
+func (r *RedisQueue) Push(data Data) error {
+       if r.exemplar != nil {
+               // Assert data is of same type as r.exemplar
+               value := reflect.ValueOf(data)
+               t := value.Type()
+               exemplarType := reflect.ValueOf(r.exemplar).Type()
+               if !t.AssignableTo(exemplarType) || data == nil {
+                       return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, r.exemplar, r.name)
+               }
+       }
+       bs, err := json.Marshal(data)
+       if err != nil {
+               return err
+       }
+       return r.client.RPush(r.queueName, bs).Err()
+}
+
+// Shutdown processing from this queue
+func (r *RedisQueue) Shutdown() {
+       log.Trace("Shutdown: %s", r.name)
+       r.lock.Lock()
+       select {
+       case <-r.closed:
+       default:
+               close(r.closed)
+       }
+       r.lock.Unlock()
+}
+
+// Terminate this queue and close the queue
+func (r *RedisQueue) Terminate() {
+       log.Trace("Terminating: %s", r.name)
+       r.Shutdown()
+       r.lock.Lock()
+       select {
+       case <-r.terminated:
+               r.lock.Unlock()
+       default:
+               close(r.terminated)
+               r.lock.Unlock()
+               if err := r.client.Close(); err != nil {
+                       log.Error("Error whilst closing internal redis client in %s: %v", r.name, err)
+               }
+       }
+}
+
+// Name returns the name of this queue
+func (r *RedisQueue) Name() string {
+       return r.name
+}
+
+func init() {
+       queuesMap[RedisQueueType] = NewRedisQueue
+}
diff --git a/modules/queue/queue_test.go b/modules/queue/queue_test.go
new file mode 100644 (file)
index 0000000..3608f68
--- /dev/null
@@ -0,0 +1,43 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+       "encoding/json"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+type testData struct {
+       TestString string
+       TestInt    int
+}
+
+func TestToConfig(t *testing.T) {
+       cfg := testData{
+               TestString: "Config",
+               TestInt:    10,
+       }
+       exemplar := testData{}
+
+       cfg2I, err := toConfig(exemplar, cfg)
+       assert.NoError(t, err)
+       cfg2, ok := (cfg2I).(testData)
+       assert.True(t, ok)
+       assert.NotEqual(t, cfg2, exemplar)
+       assert.Equal(t, &cfg, &cfg2)
+
+       cfgString, err := json.Marshal(cfg)
+       assert.NoError(t, err)
+
+       cfg3I, err := toConfig(exemplar, cfgString)
+       assert.NoError(t, err)
+       cfg3, ok := (cfg3I).(testData)
+       assert.True(t, ok)
+       assert.Equal(t, cfg.TestString, cfg3.TestString)
+       assert.Equal(t, cfg.TestInt, cfg3.TestInt)
+       assert.NotEqual(t, cfg3, exemplar)
+}
diff --git a/modules/queue/queue_wrapped.go b/modules/queue/queue_wrapped.go
new file mode 100644 (file)
index 0000000..d0b93b5
--- /dev/null
@@ -0,0 +1,206 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+       "context"
+       "fmt"
+       "reflect"
+       "sync"
+       "time"
+
+       "code.gitea.io/gitea/modules/log"
+)
+
+// WrappedQueueType is the type for a wrapped delayed starting queue
+const WrappedQueueType Type = "wrapped"
+
+// WrappedQueueConfiguration is the configuration for a WrappedQueue
+type WrappedQueueConfiguration struct {
+       Underlying  Type
+       Timeout     time.Duration
+       MaxAttempts int
+       Config      interface{}
+       QueueLength int
+       Name        string
+}
+
+type delayedStarter struct {
+       lock        sync.Mutex
+       internal    Queue
+       underlying  Type
+       cfg         interface{}
+       timeout     time.Duration
+       maxAttempts int
+       name        string
+}
+
+// setInternal must be called with the lock locked.
+func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), handle HandlerFunc, exemplar interface{}) error {
+       var ctx context.Context
+       var cancel context.CancelFunc
+       if q.timeout > 0 {
+               ctx, cancel = context.WithTimeout(context.Background(), q.timeout)
+       } else {
+               ctx, cancel = context.WithCancel(context.Background())
+       }
+
+       defer cancel()
+       // Ensure we also stop at shutdown
+       atShutdown(ctx, func() {
+               cancel()
+       })
+
+       i := 1
+       for q.internal == nil {
+               select {
+               case <-ctx.Done():
+                       return fmt.Errorf("Timedout creating queue %v with cfg %v in %s", q.underlying, q.cfg, q.name)
+               default:
+                       queue, err := NewQueue(q.underlying, handle, q.cfg, exemplar)
+                       if err == nil {
+                               q.internal = queue
+                               q.lock.Unlock()
+                               break
+                       }
+                       if err.Error() != "resource temporarily unavailable" {
+                               log.Warn("[Attempt: %d] Failed to create queue: %v for %s cfg: %v error: %v", i, q.underlying, q.name, q.cfg, err)
+                       }
+                       i++
+                       if q.maxAttempts > 0 && i > q.maxAttempts {
+                               return fmt.Errorf("Unable to create queue %v for %s with cfg %v by max attempts: error: %v", q.underlying, q.name, q.cfg, err)
+                       }
+                       sleepTime := 100 * time.Millisecond
+                       if q.timeout > 0 && q.maxAttempts > 0 {
+                               sleepTime = (q.timeout - 200*time.Millisecond) / time.Duration(q.maxAttempts)
+                       }
+                       t := time.NewTimer(sleepTime)
+                       select {
+                       case <-ctx.Done():
+                               t.Stop()
+                       case <-t.C:
+                       }
+               }
+       }
+       return nil
+}
+
+// WrappedQueue wraps a delayed starting queue
+type WrappedQueue struct {
+       delayedStarter
+       handle   HandlerFunc
+       exemplar interface{}
+       channel  chan Data
+}
+
+// NewWrappedQueue will attempt to create a queue of the provided type,
+// but if there is a problem creating this queue it will instead create
+// a WrappedQueue with delayed startup of the queue instead and a
+// channel which will be redirected to the queue
+func NewWrappedQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
+       configInterface, err := toConfig(WrappedQueueConfiguration{}, cfg)
+       if err != nil {
+               return nil, err
+       }
+       config := configInterface.(WrappedQueueConfiguration)
+
+       queue, err := NewQueue(config.Underlying, handle, config.Config, exemplar)
+       if err == nil {
+               // Just return the queue there is no need to wrap
+               return queue, nil
+       }
+       if IsErrInvalidConfiguration(err) {
+               // Retrying ain't gonna make this any better...
+               return nil, ErrInvalidConfiguration{cfg: cfg}
+       }
+
+       queue = &WrappedQueue{
+               handle:   handle,
+               channel:  make(chan Data, config.QueueLength),
+               exemplar: exemplar,
+               delayedStarter: delayedStarter{
+                       cfg:         config.Config,
+                       underlying:  config.Underlying,
+                       timeout:     config.Timeout,
+                       maxAttempts: config.MaxAttempts,
+                       name:        config.Name,
+               },
+       }
+       _ = GetManager().Add(queue, WrappedQueueType, config, exemplar, nil)
+       return queue, nil
+}
+
+// Name returns the name of the queue
+func (q *WrappedQueue) Name() string {
+       return q.name + "-wrapper"
+}
+
+// Push will push the data to the internal channel checking it against the exemplar
+func (q *WrappedQueue) Push(data Data) error {
+       if q.exemplar != nil {
+               // Assert data is of same type as r.exemplar
+               value := reflect.ValueOf(data)
+               t := value.Type()
+               exemplarType := reflect.ValueOf(q.exemplar).Type()
+               if !t.AssignableTo(exemplarType) || data == nil {
+                       return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
+               }
+       }
+       q.channel <- data
+       return nil
+}
+
+// Run starts to run the queue and attempts to create the internal queue
+func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
+       q.lock.Lock()
+       if q.internal == nil {
+               err := q.setInternal(atShutdown, q.handle, q.exemplar)
+               q.lock.Unlock()
+               if err != nil {
+                       log.Fatal("Unable to set the internal queue for %s Error: %v", q.Name(), err)
+                       return
+               }
+               go func() {
+                       for data := range q.channel {
+                               _ = q.internal.Push(data)
+                       }
+               }()
+       } else {
+               q.lock.Unlock()
+       }
+
+       q.internal.Run(atShutdown, atTerminate)
+       log.Trace("WrappedQueue: %s Done", q.name)
+}
+
+// Shutdown this queue and stop processing
+func (q *WrappedQueue) Shutdown() {
+       log.Trace("WrappedQueue: %s Shutdown", q.name)
+       q.lock.Lock()
+       defer q.lock.Unlock()
+       if q.internal == nil {
+               return
+       }
+       if shutdownable, ok := q.internal.(Shutdownable); ok {
+               shutdownable.Shutdown()
+       }
+}
+
+// Terminate this queue and close the queue
+func (q *WrappedQueue) Terminate() {
+       log.Trace("WrappedQueue: %s Terminating", q.name)
+       q.lock.Lock()
+       defer q.lock.Unlock()
+       if q.internal == nil {
+               return
+       }
+       if shutdownable, ok := q.internal.(Shutdownable); ok {
+               shutdownable.Terminate()
+       }
+}
+
+func init() {
+       queuesMap[WrappedQueueType] = NewWrappedQueue
+}
diff --git a/modules/queue/setting.go b/modules/queue/setting.go
new file mode 100644 (file)
index 0000000..d5a6b41
--- /dev/null
@@ -0,0 +1,75 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+       "encoding/json"
+       "fmt"
+
+       "code.gitea.io/gitea/modules/log"
+       "code.gitea.io/gitea/modules/setting"
+)
+
+func validType(t string) (Type, error) {
+       if len(t) == 0 {
+               return PersistableChannelQueueType, nil
+       }
+       for _, typ := range RegisteredTypes() {
+               if t == string(typ) {
+                       return typ, nil
+               }
+       }
+       return PersistableChannelQueueType, fmt.Errorf("Unknown queue type: %s defaulting to %s", t, string(PersistableChannelQueueType))
+}
+
+// CreateQueue for name with provided handler and exemplar
+func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue {
+       q := setting.GetQueueSettings(name)
+       opts := make(map[string]interface{})
+       opts["Name"] = name
+       opts["QueueLength"] = q.Length
+       opts["BatchLength"] = q.BatchLength
+       opts["DataDir"] = q.DataDir
+       opts["Addresses"] = q.Addresses
+       opts["Network"] = q.Network
+       opts["Password"] = q.Password
+       opts["DBIndex"] = q.DBIndex
+       opts["QueueName"] = q.QueueName
+       opts["Workers"] = q.Workers
+       opts["MaxWorkers"] = q.MaxWorkers
+       opts["BlockTimeout"] = q.BlockTimeout
+       opts["BoostTimeout"] = q.BoostTimeout
+       opts["BoostWorkers"] = q.BoostWorkers
+
+       typ, err := validType(q.Type)
+       if err != nil {
+               log.Error("Invalid type %s provided for queue named %s defaulting to %s", q.Type, name, string(typ))
+       }
+
+       cfg, err := json.Marshal(opts)
+       if err != nil {
+               log.Error("Unable to marshall generic options: %v Error: %v", opts, err)
+               log.Error("Unable to create queue for %s", name, err)
+               return nil
+       }
+
+       returnable, err := NewQueue(typ, handle, cfg, exemplar)
+       if q.WrapIfNecessary && err != nil {
+               log.Warn("Unable to create queue for %s: %v", name, err)
+               log.Warn("Attempting to create wrapped queue")
+               returnable, err = NewQueue(WrappedQueueType, handle, WrappedQueueConfiguration{
+                       Underlying:  Type(q.Type),
+                       Timeout:     q.Timeout,
+                       MaxAttempts: q.MaxAttempts,
+                       Config:      cfg,
+                       QueueLength: q.Length,
+               }, exemplar)
+       }
+       if err != nil {
+               log.Error("Unable to create queue for %s: %v", name, err)
+               return nil
+       }
+       return returnable
+}
diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go
new file mode 100644 (file)
index 0000000..25fc7dd
--- /dev/null
@@ -0,0 +1,325 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+       "context"
+       "sync"
+       "time"
+
+       "code.gitea.io/gitea/modules/log"
+)
+
+// WorkerPool takes
+type WorkerPool struct {
+       lock               sync.Mutex
+       baseCtx            context.Context
+       cancel             context.CancelFunc
+       cond               *sync.Cond
+       qid                int64
+       maxNumberOfWorkers int
+       numberOfWorkers    int
+       batchLength        int
+       handle             HandlerFunc
+       dataChan           chan Data
+       blockTimeout       time.Duration
+       boostTimeout       time.Duration
+       boostWorkers       int
+}
+
+// Push pushes the data to the internal channel
+func (p *WorkerPool) Push(data Data) {
+       p.lock.Lock()
+       if p.blockTimeout > 0 && p.boostTimeout > 0 && (p.numberOfWorkers <= p.maxNumberOfWorkers || p.maxNumberOfWorkers < 0) {
+               p.lock.Unlock()
+               p.pushBoost(data)
+       } else {
+               p.lock.Unlock()
+               p.dataChan <- data
+       }
+}
+
+func (p *WorkerPool) pushBoost(data Data) {
+       select {
+       case p.dataChan <- data:
+       default:
+               p.lock.Lock()
+               if p.blockTimeout <= 0 {
+                       p.lock.Unlock()
+                       p.dataChan <- data
+                       return
+               }
+               ourTimeout := p.blockTimeout
+               timer := time.NewTimer(p.blockTimeout)
+               p.lock.Unlock()
+               select {
+               case p.dataChan <- data:
+                       if timer.Stop() {
+                               select {
+                               case <-timer.C:
+                               default:
+                               }
+                       }
+               case <-timer.C:
+                       p.lock.Lock()
+                       if p.blockTimeout > ourTimeout || (p.numberOfWorkers > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0) {
+                               p.lock.Unlock()
+                               p.dataChan <- data
+                               return
+                       }
+                       p.blockTimeout *= 2
+                       ctx, cancel := context.WithCancel(p.baseCtx)
+                       mq := GetManager().GetManagedQueue(p.qid)
+                       boost := p.boostWorkers
+                       if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 {
+                               boost = p.maxNumberOfWorkers - p.numberOfWorkers
+                       }
+                       if mq != nil {
+                               log.Warn("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, mq.Name, ourTimeout, boost, p.boostTimeout, p.blockTimeout)
+
+                               start := time.Now()
+                               pid := mq.RegisterWorkers(boost, start, false, start, cancel)
+                               go func() {
+                                       <-ctx.Done()
+                                       mq.RemoveWorkers(pid)
+                                       cancel()
+                               }()
+                       } else {
+                               log.Warn("WorkerPool: %d Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, ourTimeout, p.boostWorkers, p.boostTimeout, p.blockTimeout)
+                       }
+                       go func() {
+                               <-time.After(p.boostTimeout)
+                               cancel()
+                               p.lock.Lock()
+                               p.blockTimeout /= 2
+                               p.lock.Unlock()
+                       }()
+                       p.addWorkers(ctx, boost)
+                       p.lock.Unlock()
+                       p.dataChan <- data
+               }
+       }
+}
+
+// NumberOfWorkers returns the number of current workers in the pool
+func (p *WorkerPool) NumberOfWorkers() int {
+       p.lock.Lock()
+       defer p.lock.Unlock()
+       return p.numberOfWorkers
+}
+
+// MaxNumberOfWorkers returns the maximum number of workers automatically added to the pool
+func (p *WorkerPool) MaxNumberOfWorkers() int {
+       p.lock.Lock()
+       defer p.lock.Unlock()
+       return p.maxNumberOfWorkers
+}
+
+// BoostWorkers returns the number of workers for a boost
+func (p *WorkerPool) BoostWorkers() int {
+       p.lock.Lock()
+       defer p.lock.Unlock()
+       return p.boostWorkers
+}
+
+// BoostTimeout returns the timeout of the next boost
+func (p *WorkerPool) BoostTimeout() time.Duration {
+       p.lock.Lock()
+       defer p.lock.Unlock()
+       return p.boostTimeout
+}
+
+// BlockTimeout returns the timeout til the next boost
+func (p *WorkerPool) BlockTimeout() time.Duration {
+       p.lock.Lock()
+       defer p.lock.Unlock()
+       return p.blockTimeout
+}
+
+// SetSettings sets the setable boost values
+func (p *WorkerPool) SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) {
+       p.lock.Lock()
+       defer p.lock.Unlock()
+       p.maxNumberOfWorkers = maxNumberOfWorkers
+       p.boostWorkers = boostWorkers
+       p.boostTimeout = timeout
+}
+
+// SetMaxNumberOfWorkers sets the maximum number of workers automatically added to the pool
+// Changing this number will not change the number of current workers but will change the limit
+// for future additions
+func (p *WorkerPool) SetMaxNumberOfWorkers(newMax int) {
+       p.lock.Lock()
+       defer p.lock.Unlock()
+       p.maxNumberOfWorkers = newMax
+}
+
+// AddWorkers adds workers to the pool - this allows the number of workers to go above the limit
+func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc {
+       var ctx context.Context
+       var cancel context.CancelFunc
+       start := time.Now()
+       end := start
+       hasTimeout := false
+       if timeout > 0 {
+               ctx, cancel = context.WithTimeout(p.baseCtx, timeout)
+               end = start.Add(timeout)
+               hasTimeout = true
+       } else {
+               ctx, cancel = context.WithCancel(p.baseCtx)
+       }
+
+       mq := GetManager().GetManagedQueue(p.qid)
+       if mq != nil {
+               pid := mq.RegisterWorkers(number, start, hasTimeout, end, cancel)
+               go func() {
+                       <-ctx.Done()
+                       mq.RemoveWorkers(pid)
+                       cancel()
+               }()
+               log.Trace("WorkerPool: %d (for %s) adding %d workers with group id: %d", p.qid, mq.Name, number, pid)
+       } else {
+               log.Trace("WorkerPool: %d adding %d workers (no group id)", p.qid, number)
+
+       }
+       p.addWorkers(ctx, number)
+       return cancel
+}
+
+// addWorkers adds workers to the pool
+func (p *WorkerPool) addWorkers(ctx context.Context, number int) {
+       for i := 0; i < number; i++ {
+               p.lock.Lock()
+               if p.cond == nil {
+                       p.cond = sync.NewCond(&p.lock)
+               }
+               p.numberOfWorkers++
+               p.lock.Unlock()
+               go func() {
+                       p.doWork(ctx)
+
+                       p.lock.Lock()
+                       p.numberOfWorkers--
+                       if p.numberOfWorkers == 0 {
+                               p.cond.Broadcast()
+                       } else if p.numberOfWorkers < 0 {
+                               // numberOfWorkers can't go negative but...
+                               log.Warn("Number of Workers < 0 for QID %d - this shouldn't happen", p.qid)
+                               p.numberOfWorkers = 0
+                               p.cond.Broadcast()
+                       }
+                       p.lock.Unlock()
+               }()
+       }
+}
+
+// Wait for WorkerPool to finish
+func (p *WorkerPool) Wait() {
+       p.lock.Lock()
+       defer p.lock.Unlock()
+       if p.cond == nil {
+               p.cond = sync.NewCond(&p.lock)
+       }
+       if p.numberOfWorkers <= 0 {
+               return
+       }
+       p.cond.Wait()
+}
+
+// CleanUp will drain the remaining contents of the channel
+// This should be called after AddWorkers context is closed
+func (p *WorkerPool) CleanUp(ctx context.Context) {
+       log.Trace("WorkerPool: %d CleanUp", p.qid)
+       close(p.dataChan)
+       for data := range p.dataChan {
+               p.handle(data)
+               select {
+               case <-ctx.Done():
+                       log.Warn("WorkerPool: %d Cleanup context closed before finishing clean-up", p.qid)
+                       return
+               default:
+               }
+       }
+       log.Trace("WorkerPool: %d CleanUp Done", p.qid)
+}
+
+func (p *WorkerPool) doWork(ctx context.Context) {
+       delay := time.Millisecond * 300
+       var data = make([]Data, 0, p.batchLength)
+       for {
+               select {
+               case <-ctx.Done():
+                       if len(data) > 0 {
+                               log.Trace("Handling: %d data, %v", len(data), data)
+                               p.handle(data...)
+                       }
+                       log.Trace("Worker shutting down")
+                       return
+               case datum, ok := <-p.dataChan:
+                       if !ok {
+                               // the dataChan has been closed - we should finish up:
+                               if len(data) > 0 {
+                                       log.Trace("Handling: %d data, %v", len(data), data)
+                                       p.handle(data...)
+                               }
+                               log.Trace("Worker shutting down")
+                               return
+                       }
+                       data = append(data, datum)
+                       if len(data) >= p.batchLength {
+                               log.Trace("Handling: %d data, %v", len(data), data)
+                               p.handle(data...)
+                               data = make([]Data, 0, p.batchLength)
+                       }
+               default:
+                       timer := time.NewTimer(delay)
+                       select {
+                       case <-ctx.Done():
+                               if timer.Stop() {
+                                       select {
+                                       case <-timer.C:
+                                       default:
+                                       }
+                               }
+                               if len(data) > 0 {
+                                       log.Trace("Handling: %d data, %v", len(data), data)
+                                       p.handle(data...)
+                               }
+                               log.Trace("Worker shutting down")
+                               return
+                       case datum, ok := <-p.dataChan:
+                               if timer.Stop() {
+                                       select {
+                                       case <-timer.C:
+                                       default:
+                                       }
+                               }
+                               if !ok {
+                                       // the dataChan has been closed - we should finish up:
+                                       if len(data) > 0 {
+                                               log.Trace("Handling: %d data, %v", len(data), data)
+                                               p.handle(data...)
+                                       }
+                                       log.Trace("Worker shutting down")
+                                       return
+                               }
+                               data = append(data, datum)
+                               if len(data) >= p.batchLength {
+                                       log.Trace("Handling: %d data, %v", len(data), data)
+                                       p.handle(data...)
+                                       data = make([]Data, 0, p.batchLength)
+                               }
+                       case <-timer.C:
+                               delay = time.Millisecond * 100
+                               if len(data) > 0 {
+                                       log.Trace("Handling: %d data, %v", len(data), data)
+                                       p.handle(data...)
+                                       data = make([]Data, 0, p.batchLength)
+                               }
+
+                       }
+               }
+       }
+}
diff --git a/modules/setting/queue.go b/modules/setting/queue.go
new file mode 100644 (file)
index 0000000..5468027
--- /dev/null
@@ -0,0 +1,159 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package setting
+
+import (
+       "fmt"
+       "path"
+       "strconv"
+       "strings"
+       "time"
+
+       "code.gitea.io/gitea/modules/log"
+)
+
+// QueueSettings represent the settings for a queue from the ini
+type QueueSettings struct {
+       DataDir          string
+       Length           int
+       BatchLength      int
+       ConnectionString string
+       Type             string
+       Network          string
+       Addresses        string
+       Password         string
+       QueueName        string
+       DBIndex          int
+       WrapIfNecessary  bool
+       MaxAttempts      int
+       Timeout          time.Duration
+       Workers          int
+       MaxWorkers       int
+       BlockTimeout     time.Duration
+       BoostTimeout     time.Duration
+       BoostWorkers     int
+}
+
+// Queue settings
+var Queue = QueueSettings{}
+
+// GetQueueSettings returns the queue settings for the appropriately named queue
+func GetQueueSettings(name string) QueueSettings {
+       q := QueueSettings{}
+       sec := Cfg.Section("queue." + name)
+       // DataDir is not directly inheritable
+       q.DataDir = path.Join(Queue.DataDir, name)
+       // QueueName is not directly inheritable either
+       q.QueueName = name + Queue.QueueName
+       for _, key := range sec.Keys() {
+               switch key.Name() {
+               case "DATADIR":
+                       q.DataDir = key.MustString(q.DataDir)
+               case "QUEUE_NAME":
+                       q.QueueName = key.MustString(q.QueueName)
+               }
+       }
+       if !path.IsAbs(q.DataDir) {
+               q.DataDir = path.Join(AppDataPath, q.DataDir)
+       }
+       sec.Key("DATADIR").SetValue(q.DataDir)
+       // The rest are...
+       q.Length = sec.Key("LENGTH").MustInt(Queue.Length)
+       q.BatchLength = sec.Key("BATCH_LENGTH").MustInt(Queue.BatchLength)
+       q.ConnectionString = sec.Key("CONN_STR").MustString(Queue.ConnectionString)
+       q.Type = sec.Key("TYPE").MustString(Queue.Type)
+       q.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(Queue.WrapIfNecessary)
+       q.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(Queue.MaxAttempts)
+       q.Timeout = sec.Key("TIMEOUT").MustDuration(Queue.Timeout)
+       q.Workers = sec.Key("WORKERS").MustInt(Queue.Workers)
+       q.MaxWorkers = sec.Key("MAX_WORKERS").MustInt(Queue.MaxWorkers)
+       q.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(Queue.BlockTimeout)
+       q.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(Queue.BoostTimeout)
+       q.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(Queue.BoostWorkers)
+
+       q.Network, q.Addresses, q.Password, q.DBIndex, _ = ParseQueueConnStr(q.ConnectionString)
+       return q
+}
+
+// NewQueueService sets up the default settings for Queues
+// This is exported for tests to be able to use the queue
+func NewQueueService() {
+       sec := Cfg.Section("queue")
+       Queue.DataDir = sec.Key("DATADIR").MustString("queues/")
+       if !path.IsAbs(Queue.DataDir) {
+               Queue.DataDir = path.Join(AppDataPath, Queue.DataDir)
+       }
+       Queue.Length = sec.Key("LENGTH").MustInt(20)
+       Queue.BatchLength = sec.Key("BATCH_LENGTH").MustInt(20)
+       Queue.ConnectionString = sec.Key("CONN_STR").MustString(path.Join(AppDataPath, ""))
+       Queue.Type = sec.Key("TYPE").MustString("")
+       Queue.Network, Queue.Addresses, Queue.Password, Queue.DBIndex, _ = ParseQueueConnStr(Queue.ConnectionString)
+       Queue.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(true)
+       Queue.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(10)
+       Queue.Timeout = sec.Key("TIMEOUT").MustDuration(GracefulHammerTime + 30*time.Second)
+       Queue.Workers = sec.Key("WORKERS").MustInt(1)
+       Queue.MaxWorkers = sec.Key("MAX_WORKERS").MustInt(10)
+       Queue.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(1 * time.Second)
+       Queue.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(5 * time.Minute)
+       Queue.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(5)
+       Queue.QueueName = sec.Key("QUEUE_NAME").MustString("_queue")
+
+       // Now handle the old issue_indexer configuration
+       section := Cfg.Section("queue.issue_indexer")
+       issueIndexerSectionMap := map[string]string{}
+       for _, key := range section.Keys() {
+               issueIndexerSectionMap[key.Name()] = key.Value()
+       }
+       if _, ok := issueIndexerSectionMap["TYPE"]; !ok {
+               switch Indexer.IssueQueueType {
+               case LevelQueueType:
+                       section.Key("TYPE").SetValue("level")
+               case ChannelQueueType:
+                       section.Key("TYPE").SetValue("persistable-channel")
+               case RedisQueueType:
+                       section.Key("TYPE").SetValue("redis")
+               default:
+                       log.Fatal("Unsupported indexer queue type: %v",
+                               Indexer.IssueQueueType)
+               }
+       }
+       if _, ok := issueIndexerSectionMap["LENGTH"]; !ok {
+               section.Key("LENGTH").SetValue(fmt.Sprintf("%d", Indexer.UpdateQueueLength))
+       }
+       if _, ok := issueIndexerSectionMap["BATCH_LENGTH"]; !ok {
+               section.Key("BATCH_LENGTH").SetValue(fmt.Sprintf("%d", Indexer.IssueQueueBatchNumber))
+       }
+       if _, ok := issueIndexerSectionMap["DATADIR"]; !ok {
+               section.Key("DATADIR").SetValue(Indexer.IssueQueueDir)
+       }
+       if _, ok := issueIndexerSectionMap["CONN_STR"]; !ok {
+               section.Key("CONN_STR").SetValue(Indexer.IssueQueueConnStr)
+       }
+}
+
+// ParseQueueConnStr parses a queue connection string
+func ParseQueueConnStr(connStr string) (network, addrs, password string, dbIdx int, err error) {
+       fields := strings.Fields(connStr)
+       for _, f := range fields {
+               items := strings.SplitN(f, "=", 2)
+               if len(items) < 2 {
+                       continue
+               }
+               switch strings.ToLower(items[0]) {
+               case "network":
+                       network = items[1]
+               case "addrs":
+                       addrs = items[1]
+               case "password":
+                       password = items[1]
+               case "db":
+                       dbIdx, err = strconv.Atoi(items[1])
+                       if err != nil {
+                               return
+                       }
+               }
+       }
+       return
+}
index 2a5e37b41b448f0895d98927362dd9e7fcda6ca7..17c84d3d313fee4b09bc54a83cf83965d8c581c9 100644 (file)
@@ -1093,4 +1093,5 @@ func NewServices() {
        newMigrationsService()
        newIndexerService()
        newTaskService()
+       NewQueueService()
 }
index 97704d4a4da68418094ea8445d85c489ad7aaf07..81ed39a9fb90e91dfa7f4b4f33b623d8516ac8a5 100644 (file)
@@ -4,22 +4,15 @@
 
 package setting
 
-var (
-       // Task settings
-       Task = struct {
-               QueueType    string
-               QueueLength  int
-               QueueConnStr string
-       }{
-               QueueType:    ChannelQueueType,
-               QueueLength:  1000,
-               QueueConnStr: "addrs=127.0.0.1:6379 db=0",
-       }
-)
-
 func newTaskService() {
-       sec := Cfg.Section("task")
-       Task.QueueType = sec.Key("QUEUE_TYPE").MustString(ChannelQueueType)
-       Task.QueueLength = sec.Key("QUEUE_LENGTH").MustInt(1000)
-       Task.QueueConnStr = sec.Key("QUEUE_CONN_STR").MustString("addrs=127.0.0.1:6379 db=0")
+       taskSec := Cfg.Section("task")
+       queueTaskSec := Cfg.Section("queue.task")
+       switch taskSec.Key("QUEUE_TYPE").MustString(ChannelQueueType) {
+       case ChannelQueueType:
+               queueTaskSec.Key("TYPE").MustString("persistable-channel")
+       case RedisQueueType:
+               queueTaskSec.Key("TYPE").MustString("redis")
+       }
+       queueTaskSec.Key("LENGTH").MustInt(taskSec.Key("QUEUE_LENGTH").MustInt(1000))
+       queueTaskSec.Key("CONN_STR").MustString(taskSec.Key("QUEUE_CONN_STR").MustString("addrs=127.0.0.1:6379 db=0"))
 }
diff --git a/modules/task/queue.go b/modules/task/queue.go
deleted file mode 100644 (file)
index ddee0b3..0000000
+++ /dev/null
@@ -1,14 +0,0 @@
-// Copyright 2019 Gitea. All rights reserved.
-// Use of this source code is governed by a MIT-style
-// license that can be found in the LICENSE file.
-
-package task
-
-import "code.gitea.io/gitea/models"
-
-// Queue defines an interface to run task queue
-type Queue interface {
-       Run() error
-       Push(*models.Task) error
-       Stop()
-}
diff --git a/modules/task/queue_channel.go b/modules/task/queue_channel.go
deleted file mode 100644 (file)
index da541f4..0000000
+++ /dev/null
@@ -1,48 +0,0 @@
-// Copyright 2019 The Gitea Authors. All rights reserved.
-// Use of this source code is governed by a MIT-style
-// license that can be found in the LICENSE file.
-
-package task
-
-import (
-       "code.gitea.io/gitea/models"
-       "code.gitea.io/gitea/modules/log"
-)
-
-var (
-       _ Queue = &ChannelQueue{}
-)
-
-// ChannelQueue implements
-type ChannelQueue struct {
-       queue chan *models.Task
-}
-
-// NewChannelQueue create a memory channel queue
-func NewChannelQueue(queueLen int) *ChannelQueue {
-       return &ChannelQueue{
-               queue: make(chan *models.Task, queueLen),
-       }
-}
-
-// Run starts to run the queue
-func (c *ChannelQueue) Run() error {
-       for task := range c.queue {
-               err := Run(task)
-               if err != nil {
-                       log.Error("Run task failed: %s", err.Error())
-               }
-       }
-       return nil
-}
-
-// Push will push the task ID to queue
-func (c *ChannelQueue) Push(task *models.Task) error {
-       c.queue <- task
-       return nil
-}
-
-// Stop stop the queue
-func (c *ChannelQueue) Stop() {
-       close(c.queue)
-}
diff --git a/modules/task/queue_redis.go b/modules/task/queue_redis.go
deleted file mode 100644 (file)
index 127de0c..0000000
+++ /dev/null
@@ -1,130 +0,0 @@
-// Copyright 2019 The Gitea Authors. All rights reserved.
-// Use of this source code is governed by a MIT-style
-// license that can be found in the LICENSE file.
-
-package task
-
-import (
-       "encoding/json"
-       "errors"
-       "strconv"
-       "strings"
-       "time"
-
-       "code.gitea.io/gitea/models"
-       "code.gitea.io/gitea/modules/log"
-
-       "github.com/go-redis/redis"
-)
-
-var (
-       _ Queue = &RedisQueue{}
-)
-
-type redisClient interface {
-       RPush(key string, args ...interface{}) *redis.IntCmd
-       LPop(key string) *redis.StringCmd
-       Ping() *redis.StatusCmd
-}
-
-// RedisQueue redis queue
-type RedisQueue struct {
-       client    redisClient
-       queueName string
-       closeChan chan bool
-}
-
-func parseConnStr(connStr string) (addrs, password string, dbIdx int, err error) {
-       fields := strings.Fields(connStr)
-       for _, f := range fields {
-               items := strings.SplitN(f, "=", 2)
-               if len(items) < 2 {
-                       continue
-               }
-               switch strings.ToLower(items[0]) {
-               case "addrs":
-                       addrs = items[1]
-               case "password":
-                       password = items[1]
-               case "db":
-                       dbIdx, err = strconv.Atoi(items[1])
-                       if err != nil {
-                               return
-                       }
-               }
-       }
-       return
-}
-
-// NewRedisQueue creates single redis or cluster redis queue
-func NewRedisQueue(addrs string, password string, dbIdx int) (*RedisQueue, error) {
-       dbs := strings.Split(addrs, ",")
-       var queue = RedisQueue{
-               queueName: "task_queue",
-               closeChan: make(chan bool),
-       }
-       if len(dbs) == 0 {
-               return nil, errors.New("no redis host found")
-       } else if len(dbs) == 1 {
-               queue.client = redis.NewClient(&redis.Options{
-                       Addr:     strings.TrimSpace(dbs[0]), // use default Addr
-                       Password: password,                  // no password set
-                       DB:       dbIdx,                     // use default DB
-               })
-       } else {
-               // cluster will ignore db
-               queue.client = redis.NewClusterClient(&redis.ClusterOptions{
-                       Addrs:    dbs,
-                       Password: password,
-               })
-       }
-       if err := queue.client.Ping().Err(); err != nil {
-               return nil, err
-       }
-       return &queue, nil
-}
-
-// Run starts to run the queue
-func (r *RedisQueue) Run() error {
-       for {
-               select {
-               case <-r.closeChan:
-                       return nil
-               case <-time.After(time.Millisecond * 100):
-               }
-
-               bs, err := r.client.LPop(r.queueName).Bytes()
-               if err != nil {
-                       if err != redis.Nil {
-                               log.Error("LPop failed: %v", err)
-                       }
-                       time.Sleep(time.Millisecond * 100)
-                       continue
-               }
-
-               var task models.Task
-               err = json.Unmarshal(bs, &task)
-               if err != nil {
-                       log.Error("Unmarshal task failed: %s", err.Error())
-               } else {
-                       err = Run(&task)
-                       if err != nil {
-                               log.Error("Run task failed: %s", err.Error())
-                       }
-               }
-       }
-}
-
-// Push implements Queue
-func (r *RedisQueue) Push(task *models.Task) error {
-       bs, err := json.Marshal(task)
-       if err != nil {
-               return err
-       }
-       return r.client.RPush(r.queueName, bs).Err()
-}
-
-// Stop stop the queue
-func (r *RedisQueue) Stop() {
-       r.closeChan <- true
-}
index 64744afe7a4c7d6c57d0fdb5c85e519b0aa0dcb8..416f0c696a99de0f45947ffcc3e83ceb85d63abc 100644 (file)
@@ -8,14 +8,15 @@ import (
        "fmt"
 
        "code.gitea.io/gitea/models"
+       "code.gitea.io/gitea/modules/graceful"
        "code.gitea.io/gitea/modules/log"
        "code.gitea.io/gitea/modules/migrations/base"
-       "code.gitea.io/gitea/modules/setting"
+       "code.gitea.io/gitea/modules/queue"
        "code.gitea.io/gitea/modules/structs"
 )
 
 // taskQueue is a global queue of tasks
-var taskQueue Queue
+var taskQueue queue.Queue
 
 // Run a task
 func Run(t *models.Task) error {
@@ -23,38 +24,32 @@ func Run(t *models.Task) error {
        case structs.TaskTypeMigrateRepo:
                return runMigrateTask(t)
        default:
-               return fmt.Errorf("Unknow task type: %d", t.Type)
+               return fmt.Errorf("Unknown task type: %d", t.Type)
        }
 }
 
 // Init will start the service to get all unfinished tasks and run them
 func Init() error {
-       switch setting.Task.QueueType {
-       case setting.ChannelQueueType:
-               taskQueue = NewChannelQueue(setting.Task.QueueLength)
-       case setting.RedisQueueType:
-               var err error
-               addrs, pass, idx, err := parseConnStr(setting.Task.QueueConnStr)
-               if err != nil {
-                       return err
-               }
-               taskQueue, err = NewRedisQueue(addrs, pass, idx)
-               if err != nil {
-                       return err
-               }
-       default:
-               return fmt.Errorf("Unsupported task queue type: %v", setting.Task.QueueType)
+       taskQueue = queue.CreateQueue("task", handle, &models.Task{})
+
+       if taskQueue == nil {
+               return fmt.Errorf("Unable to create Task Queue")
        }
 
-       go func() {
-               if err := taskQueue.Run(); err != nil {
-                       log.Error("taskQueue.Run end failed: %v", err)
-               }
-       }()
+       go graceful.GetManager().RunWithShutdownFns(taskQueue.Run)
 
        return nil
 }
 
+func handle(data ...queue.Data) {
+       for _, datum := range data {
+               task := datum.(*models.Task)
+               if err := Run(task); err != nil {
+                       log.Error("Run task failed: %v", err)
+               }
+       }
+}
+
 // MigrateRepository add migration repository to task
 func MigrateRepository(doer, u *models.User, opts base.MigrateOptions) error {
        task, err := models.CreateMigrateTask(doer, u, opts)
index 00221573d0dabd8d3dbed9f1ae7b7204e8b4b7ef..f6ff12250abf987c1f068fd73419f1438386fa34 100644 (file)
@@ -1410,7 +1410,7 @@ settings.protect_check_status_contexts_list = Status checks found in the last we
 settings.protect_required_approvals = Required approvals:
 settings.protect_required_approvals_desc = Allow only to merge pull request with enough positive reviews.
 settings.protect_approvals_whitelist_enabled = Restrict approvals to whitelisted users or teams
-settings.protect_approvals_whitelist_enabled_desc = Only reviews from whitelisted users or teams will count to the required approvals. Without approval whitelist, reviews from anyone with write access count to the required approvals. 
+settings.protect_approvals_whitelist_enabled_desc = Only reviews from whitelisted users or teams will count to the required approvals. Without approval whitelist, reviews from anyone with write access count to the required approvals.
 settings.protect_approvals_whitelist_users = Whitelisted reviewers:
 settings.protect_approvals_whitelist_teams = Whitelisted teams for reviews:
 settings.add_protected_branch = Enable protection
@@ -2026,6 +2026,54 @@ monitor.execute_time = Execution Time
 monitor.process.cancel = Cancel process
 monitor.process.cancel_desc =  Cancelling a process may cause data loss
 monitor.process.cancel_notices =  Cancel: <strong>%s</strong>?
+monitor.queues = Queues
+monitor.queue = Queue: %s
+monitor.queue.name = Name
+monitor.queue.type = Type
+monitor.queue.exemplar = Exemplar Type
+monitor.queue.numberworkers = Number of Workers
+monitor.queue.maxnumberworkers = Max Number of Workers
+monitor.queue.review = Review Config
+monitor.queue.review_add = Review/Add Workers
+monitor.queue.configuration = Initial Configuration
+monitor.queue.nopool.title = No Worker Pool
+monitor.queue.nopool.desc = This queue wraps other queues and does not itself have a worker pool.
+monitor.queue.wrapped.desc = A wrapped queue wraps a slow starting queue, buffering queued requests in a channel. It does not have a worker pool itself.
+monitor.queue.persistable-channel.desc = A persistable-channel wraps two queues, a channel queue that has its own worker pool and a level queue for persisted requests from previous shutdowns. It does not have a worker pool itself.
+monitor.queue.pool.timeout = Timeout
+monitor.queue.pool.addworkers.title = Add Workers
+monitor.queue.pool.addworkers.submit = Add Workers
+monitor.queue.pool.addworkers.desc = Add Workers to this pool with or without a timeout. If you set a timeout these workers will be removed from the pool after the timeout has lapsed.
+monitor.queue.pool.addworkers.numberworkers.placeholder = Number of Workers
+monitor.queue.pool.addworkers.timeout.placeholder = Set to 0 for no timeout
+monitor.queue.pool.addworkers.mustnumbergreaterzero = Number of Workers to add must be greater than zero
+monitor.queue.pool.addworkers.musttimeoutduration = Timeout must be a golang duration eg. 5m or be 0
+
+monitor.queue.settings.title = Pool Settings
+monitor.queue.settings.desc = Pools dynamically grow with a boost in response to their worker queue blocking. These changes will not affect current worker groups.
+monitor.queue.settings.timeout = Boost Timeout
+monitor.queue.settings.timeout.placeholder = Currently %[1]v
+monitor.queue.settings.timeout.error = Timeout must be a golang duration eg. 5m or be 0
+monitor.queue.settings.numberworkers = Boost Number of Workers
+monitor.queue.settings.numberworkers.placeholder = Currently %[1]d
+monitor.queue.settings.numberworkers.error = Number of Workers to add must be greater than or equal to zero
+monitor.queue.settings.maxnumberworkers = Max Number of workers
+monitor.queue.settings.maxnumberworkers.placeholder = Currently %[1]d
+monitor.queue.settings.maxnumberworkers.error = Max number of workers must be a number
+monitor.queue.settings.submit = Update Settings
+monitor.queue.settings.changed = Settings Updated
+monitor.queue.settings.blocktimeout = Current Block Timeout
+monitor.queue.settings.blocktimeout.value = %[1]v
+
+monitor.queue.pool.none = This queue does not have a Pool
+monitor.queue.pool.added = Worker Group Added
+monitor.queue.pool.max_changed = Maximum number of workers changed
+monitor.queue.pool.workers.title = Active Worker Groups
+monitor.queue.pool.workers.none = No worker groups.
+monitor.queue.pool.cancel = Shutdown Worker Group
+monitor.queue.pool.cancelling = Worker Group shutting down
+monitor.queue.pool.cancel_notices = Shutdown this group of %s workers?
+monitor.queue.pool.cancel_desc = Leaving a queue without any worker groups may cause requests to block indefinitely.
 
 notices.system_notice_list = System Notices
 notices.view_detail_header = View Notice Details
index ccedcaf8a62e9a09f0ab4dba3b5b304c7aa0b8c8..055b8f5a5e9bd5ccc18a715444ef237d45685d56 100644 (file)
@@ -11,6 +11,7 @@ import (
        "net/url"
        "os"
        "runtime"
+       "strconv"
        "strings"
        "time"
 
@@ -22,6 +23,7 @@ import (
        "code.gitea.io/gitea/modules/graceful"
        "code.gitea.io/gitea/modules/log"
        "code.gitea.io/gitea/modules/process"
+       "code.gitea.io/gitea/modules/queue"
        "code.gitea.io/gitea/modules/setting"
        "code.gitea.io/gitea/modules/timeutil"
        "code.gitea.io/gitea/services/mailer"
@@ -35,6 +37,7 @@ const (
        tplDashboard base.TplName = "admin/dashboard"
        tplConfig    base.TplName = "admin/config"
        tplMonitor   base.TplName = "admin/monitor"
+       tplQueue     base.TplName = "admin/queue"
 )
 
 var (
@@ -355,6 +358,7 @@ func Monitor(ctx *context.Context) {
        ctx.Data["PageIsAdminMonitor"] = true
        ctx.Data["Processes"] = process.GetManager().Processes()
        ctx.Data["Entries"] = cron.ListTasks()
+       ctx.Data["Queues"] = queue.GetManager().ManagedQueues()
        ctx.HTML(200, tplMonitor)
 }
 
@@ -366,3 +370,126 @@ func MonitorCancel(ctx *context.Context) {
                "redirect": ctx.Repo.RepoLink + "/admin/monitor",
        })
 }
+
+// Queue shows details for a specific queue
+func Queue(ctx *context.Context) {
+       qid := ctx.ParamsInt64("qid")
+       mq := queue.GetManager().GetManagedQueue(qid)
+       if mq == nil {
+               ctx.Status(404)
+               return
+       }
+       ctx.Data["Title"] = ctx.Tr("admin.monitor.queue", mq.Name)
+       ctx.Data["PageIsAdmin"] = true
+       ctx.Data["PageIsAdminMonitor"] = true
+       ctx.Data["Queue"] = mq
+       ctx.HTML(200, tplQueue)
+}
+
+// WorkerCancel cancels a worker group
+func WorkerCancel(ctx *context.Context) {
+       qid := ctx.ParamsInt64("qid")
+       mq := queue.GetManager().GetManagedQueue(qid)
+       if mq == nil {
+               ctx.Status(404)
+               return
+       }
+       pid := ctx.ParamsInt64("pid")
+       mq.CancelWorkers(pid)
+       ctx.Flash.Info(ctx.Tr("admin.monitor.queue.pool.cancelling"))
+       ctx.JSON(200, map[string]interface{}{
+               "redirect": setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid),
+       })
+}
+
+// AddWorkers adds workers to a worker group
+func AddWorkers(ctx *context.Context) {
+       qid := ctx.ParamsInt64("qid")
+       mq := queue.GetManager().GetManagedQueue(qid)
+       if mq == nil {
+               ctx.Status(404)
+               return
+       }
+       number := ctx.QueryInt("number")
+       if number < 1 {
+               ctx.Flash.Error(ctx.Tr("admin.monitor.queue.pool.addworkers.mustnumbergreaterzero"))
+               ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid))
+               return
+       }
+       timeout, err := time.ParseDuration(ctx.Query("timeout"))
+       if err != nil {
+               ctx.Flash.Error(ctx.Tr("admin.monitor.queue.pool.addworkers.musttimeoutduration"))
+               ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid))
+               return
+       }
+       if mq.Pool == nil {
+               ctx.Flash.Error(ctx.Tr("admin.monitor.queue.pool.none"))
+               ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid))
+               return
+       }
+       mq.AddWorkers(number, timeout)
+       ctx.Flash.Success(ctx.Tr("admin.monitor.queue.pool.added"))
+       ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid))
+}
+
+// SetQueueSettings sets the maximum number of workers and other settings for this queue
+func SetQueueSettings(ctx *context.Context) {
+       qid := ctx.ParamsInt64("qid")
+       mq := queue.GetManager().GetManagedQueue(qid)
+       if mq == nil {
+               ctx.Status(404)
+               return
+       }
+       if mq.Pool == nil {
+               ctx.Flash.Error(ctx.Tr("admin.monitor.queue.pool.none"))
+               ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid))
+               return
+       }
+
+       maxNumberStr := ctx.Query("max-number")
+       numberStr := ctx.Query("number")
+       timeoutStr := ctx.Query("timeout")
+
+       var err error
+       var maxNumber, number int
+       var timeout time.Duration
+       if len(maxNumberStr) > 0 {
+               maxNumber, err = strconv.Atoi(maxNumberStr)
+               if err != nil {
+                       ctx.Flash.Error(ctx.Tr("admin.monitor.queue.settings.maxnumberworkers.error"))
+                       ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid))
+                       return
+               }
+               if maxNumber < -1 {
+                       maxNumber = -1
+               }
+       } else {
+               maxNumber = mq.MaxNumberOfWorkers()
+       }
+
+       if len(numberStr) > 0 {
+               number, err = strconv.Atoi(numberStr)
+               if err != nil || number < 0 {
+                       ctx.Flash.Error(ctx.Tr("admin.monitor.queue.settings.numberworkers.error"))
+                       ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid))
+                       return
+               }
+       } else {
+               number = mq.BoostWorkers()
+       }
+
+       if len(timeoutStr) > 0 {
+               timeout, err = time.ParseDuration(timeoutStr)
+               if err != nil {
+                       ctx.Flash.Error(ctx.Tr("admin.monitor.queue.settings.timeout.error"))
+                       ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid))
+                       return
+               }
+       } else {
+               timeout = mq.Pool.BoostTimeout()
+       }
+
+       mq.SetSettings(maxNumber, number, timeout)
+       ctx.Flash.Success(ctx.Tr("admin.monitor.queue.settings.changed"))
+       ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid))
+}
index 888c92ac4aec807aad4afa9bcbda1e00a4d49147..2f886f749dc8dad68b658cb8e36e751ce66adf30 100644 (file)
@@ -410,8 +410,16 @@ func RegisterRoutes(m *macaron.Macaron) {
                m.Get("", adminReq, admin.Dashboard)
                m.Get("/config", admin.Config)
                m.Post("/config/test_mail", admin.SendTestMail)
-               m.Get("/monitor", admin.Monitor)
-               m.Post("/monitor/cancel/:pid", admin.MonitorCancel)
+               m.Group("/monitor", func() {
+                       m.Get("", admin.Monitor)
+                       m.Post("/cancel/:pid", admin.MonitorCancel)
+                       m.Group("/queue/:qid", func() {
+                               m.Get("", admin.Queue)
+                               m.Post("/set", admin.SetQueueSettings)
+                               m.Post("/add", admin.AddWorkers)
+                               m.Post("/cancel/:pid", admin.WorkerCancel)
+                       })
+               })
 
                m.Group("/users", func() {
                        m.Get("", admin.Users)
index 38402fece2be9e1b99b2771bd77d02827986a3e5..0f9c2150b647e2c052b971e19b6821b8b1fa5422 100644 (file)
                        </table>
                </div>
 
+               <h4 class="ui top attached header">
+                       {{.i18n.Tr "admin.monitor.queues"}}
+               </h4>
+               <div class="ui attached table segment">
+                       <table class="ui very basic striped table">
+                               <thead>
+                                       <tr>
+                                               <th>{{.i18n.Tr "admin.monitor.queue.name"}}</th>
+                                               <th>{{.i18n.Tr "admin.monitor.queue.type"}}</th>
+                                               <th>{{.i18n.Tr "admin.monitor.queue.exemplar"}}</th>
+                                               <th>{{.i18n.Tr "admin.monitor.queue.numberworkers"}}</th>
+                                               <th></th>
+                                       </tr>
+                               </thead>
+                               <tbody>
+                                       {{range .Queues}}
+                                               <tr>
+                                                       <td>{{.Name}}</td>
+                                                       <td>{{.Type}}</td>
+                                                       <td>{{.ExemplarType}}</td>
+                                                       <td>{{$sum := .NumberOfWorkers}}{{if lt $sum 0}}-{{else}}{{$sum}}{{end}}</td>
+                                                       <td><a href="{{$.Link}}/queue/{{.QID}}" class="button">{{if lt $sum 0}}{{$.i18n.Tr "admin.monitor.queue.review"}}{{else}}{{$.i18n.Tr "admin.monitor.queue.review_add"}}{{end}}</a>
+                                               </tr>
+                                       {{end}}
+                               </tbody>
+                       </table>
+               </div>
+
                <h4 class="ui top attached header">
                        {{.i18n.Tr "admin.monitor.process"}}
                </h4>
diff --git a/templates/admin/queue.tmpl b/templates/admin/queue.tmpl
new file mode 100644 (file)
index 0000000..4f42221
--- /dev/null
@@ -0,0 +1,147 @@
+{{template "base/head" .}}
+<div class="admin monitor">
+       {{template "admin/navbar" .}}
+       <div class="ui container">
+               {{template "base/alert" .}}
+               <h4 class="ui top attached header">
+                       {{.i18n.Tr "admin.monitor.queue" .Queue.Name}}
+               </h4>
+               <div class="ui attached table segment">
+                       <table class="ui very basic striped table">
+                               <thead>
+                                       <tr>
+                                               <th>{{.i18n.Tr "admin.monitor.queue.name"}}</th>
+                                               <th>{{.i18n.Tr "admin.monitor.queue.type"}}</th>
+                                               <th>{{.i18n.Tr "admin.monitor.queue.exemplar"}}</th>
+                                               <th>{{.i18n.Tr "admin.monitor.queue.numberworkers"}}</th>
+                                               <th>{{.i18n.Tr "admin.monitor.queue.maxnumberworkers"}}</th>
+                                       </tr>
+                               </thead>
+                               <tbody>
+                                       <tr>
+                                               <td>{{.Queue.Name}}</td>
+                                               <td>{{.Queue.Type}}</td>
+                                               <td>{{.Queue.ExemplarType}}</td>
+                                               <td>{{$sum := .Queue.NumberOfWorkers}}{{if lt $sum 0}}-{{else}}{{$sum}}{{end}}</td>
+                                               <td>{{if lt $sum 0}}-{{else}}{{.Queue.MaxNumberOfWorkers}}{{end}}</td>
+                                       </tr>
+                               </tbody>
+                       </table>
+               </div>
+               {{if lt $sum 0 }}
+               <h4 class="ui top attached header">
+                       {{.i18n.Tr "admin.monitor.queue.nopool.title"}}
+               </h4>
+               <div class="ui attached segment">
+                       {{if eq .Queue.Type "wrapped" }}
+                       <p>{{.i18n.Tr "admin.monitor.queue.wrapped.desc"}}</p>
+                       {{else if eq .Queue.Type "persistable-channel"}}
+                       <p>{{.i18n.Tr "admin.monitor.queue.persistable-channel.desc"}}</p>
+                       {{else}}
+                       <p>{{.i18n.Tr "admin.monitor.queue.nopool.desc"}}</p>
+                       {{end}}
+               </div>
+               {{else}}
+               <h4 class="ui top attached header">
+                       {{.i18n.Tr "admin.monitor.queue.settings.title"}}
+               </h4>
+               <div class="ui attached segment">
+                       <p>{{.i18n.Tr "admin.monitor.queue.settings.desc"}}</p>
+                       <form method="POST" action="{{.Link}}/set">
+                               {{$.CsrfTokenHtml}}
+                               <div class="ui form">
+                                       <div class="inline field">
+                                               <label for="max-number">{{.i18n.Tr "admin.monitor.queue.settings.maxnumberworkers"}}</label>
+                                               <input name="max-number" type="text" placeholder="{{.i18n.Tr "admin.monitor.queue.settings.maxnumberworkers.placeholder" .Queue.MaxNumberOfWorkers}}">
+                                       </div>
+                                       <div class="inline field">
+                                               <label for="timeout">{{.i18n.Tr "admin.monitor.queue.settings.timeout"}}</label>
+                                               <input name="timeout" type="text" placeholder="{{.i18n.Tr "admin.monitor.queue.settings.timeout.placeholder" .Queue.BoostTimeout }}">
+                                       </div>
+                                       <div class="inline field">
+                                               <label for="number">{{.i18n.Tr "admin.monitor.queue.settings.numberworkers"}}</label>
+                                               <input name="number" type="text" placeholder="{{.i18n.Tr "admin.monitor.queue.settings.numberworkers.placeholder" .Queue.BoostWorkers}}">
+                                       </div>
+                                       <div class="inline field">
+                                               <label>{{.i18n.Tr "admin.monitor.queue.settings.blocktimeout"}}</label>
+                                               <span>{{.i18n.Tr "admin.monitor.queue.settings.blocktimeout.value" .Queue.BlockTimeout}}</span>
+                                       </div>
+                                       <button class="ui submit button">{{.i18n.Tr "admin.monitor.queue.settings.submit"}}</button>
+                               </div>
+                       </form>
+               </div>
+               <h4 class="ui top attached header">
+                       {{.i18n.Tr "admin.monitor.queue.pool.addworkers.title"}}
+               </h4>
+               <div class="ui attached segment">
+                       <p>{{.i18n.Tr "admin.monitor.queue.pool.addworkers.desc"}}</p>
+                       <form method="POST" action="{{.Link}}/add">
+                               {{$.CsrfTokenHtml}}
+                               <div class="ui form">
+                                       <div class="fields">
+                                               <div class="field">
+                                                       <label>{{.i18n.Tr "admin.monitor.queue.numberworkers"}}</label>
+                                                       <input name="number" type="text" placeholder="{{.i18n.Tr "admin.monitor.queue.pool.addworkers.numberworkers.placeholder"}}">
+                                               </div>
+                                               <div class="field">
+                                                       <label>{{.i18n.Tr "admin.monitor.queue.pool.timeout"}}</label>
+                                                       <input name="timeout" type="text" placeholder="{{.i18n.Tr "admin.monitor.queue.pool.addworkers.timeout.placeholder"}}">
+                                               </div>
+                                       </div>
+                                       <button class="ui submit button">{{.i18n.Tr "admin.monitor.queue.pool.addworkers.submit"}}</button>
+                               </div>
+                       </form>
+               </div>
+               <h4 class="ui top attached header">
+                       {{.i18n.Tr "admin.monitor.queue.pool.workers.title"}}
+               </h4>
+               <div class="ui attached table segment">
+                       <table class="ui very basic striped table">
+                               <thead>
+                                       <tr>
+                                               <th>{{.i18n.Tr "admin.monitor.queue.numberworkers"}}</th>
+                                               <th>{{.i18n.Tr "admin.monitor.start"}}</th>
+                                               <th>{{.i18n.Tr "admin.monitor.queue.pool.timeout"}}</th>
+                                               <th></th>
+                                       </tr>
+                               </thead>
+                               <tbody>
+                                       {{range .Queue.Workers}}
+                                       <tr>
+                                               <td>{{.Workers}}</td>
+                                               <td>{{DateFmtLong .Start}}</td>
+                                               <td>{{if .HasTimeout}}{{DateFmtLong .Timeout}}{{else}}-{{end}}</td>
+                                               <td>
+                                                       <a class="delete-button" href="" data-url="{{$.Link}}/cancel/{{.PID}}" data-id="{{.PID}}" data-name="{{.Workers}}"><i class="close icon text red" title="{{$.i18n.Tr "remove"}}"></i></a>
+                                               </td>
+                                       </tr>
+                                       {{else}}
+                                               <tr>
+                                                       <td colspan="4">{{.i18n.Tr "admin.monitor.queue.pool.workers.none" }}
+                                               </tr>
+                                       {{end}}
+                               </tbody>
+                       </table>
+               </div>
+               {{end}}
+               <h4 class="ui top attached header">
+                       {{.i18n.Tr "admin.monitor.queue.configuration"}}
+               </h4>
+               <div class="ui attached segment">
+                       <pre>{{.Queue.Configuration | JsonPrettyPrint}}
+               </div>
+       </div>
+</div>
+<div class="ui small basic delete modal">
+       <div class="ui icon header">
+               <i class="close icon"></i>
+               {{.i18n.Tr "admin.monitor.queue.pool.cancel"}}
+       </div>
+       <div class="content">
+               <p>{{$.i18n.Tr "admin.monitor.queue.pool.cancel_notices" `<span class="name"></span>` | Safe}}</p>
+               <p>{{$.i18n.Tr "admin.monitor.queue.pool.cancel_desc"}}</p>
+       </div>
+       {{template "base/delete_modal_actions" .}}
+</div>
+
+{{template "base/footer" .}}