aboutsummaryrefslogtreecommitdiffstats
path: root/modules
diff options
context:
space:
mode:
Diffstat (limited to 'modules')
-rw-r--r--modules/indexer/code/bleve.go4
-rw-r--r--modules/indexer/code/elastic_search.go22
-rw-r--r--modules/indexer/code/indexer.go55
-rw-r--r--modules/indexer/code/wrapped.go10
-rw-r--r--modules/indexer/issues/bleve.go4
-rw-r--r--modules/indexer/issues/db.go4
-rw-r--r--modules/indexer/issues/elastic_search.go22
-rw-r--r--modules/indexer/issues/indexer.go73
-rw-r--r--modules/indexer/issues/meilisearch.go22
-rw-r--r--modules/indexer/stats/indexer_test.go2
-rw-r--r--modules/indexer/stats/queue.go9
-rw-r--r--modules/mirror/mirror.go6
-rw-r--r--modules/notification/ui/ui.go11
-rw-r--r--modules/queue/backoff.go63
-rw-r--r--modules/queue/base.go42
-rw-r--r--modules/queue/base_channel.go123
-rw-r--r--modules/queue/base_channel_test.go11
-rw-r--r--modules/queue/base_dummy.go38
-rw-r--r--modules/queue/base_levelqueue.go72
-rw-r--r--modules/queue/base_levelqueue_common.go92
-rw-r--r--modules/queue/base_levelqueue_test.go23
-rw-r--r--modules/queue/base_levelqueue_unique.go93
-rw-r--r--modules/queue/base_redis.go135
-rw-r--r--modules/queue/base_redis_test.go71
-rw-r--r--modules/queue/base_test.go140
-rw-r--r--modules/queue/bytefifo.go69
-rw-r--r--modules/queue/config.go36
-rw-r--r--modules/queue/helper.go91
-rw-r--r--modules/queue/manager.go477
-rw-r--r--modules/queue/manager_test.go124
-rw-r--r--modules/queue/queue.go220
-rw-r--r--modules/queue/queue_bytefifo.go419
-rw-r--r--modules/queue/queue_channel.go160
-rw-r--r--modules/queue/queue_channel_test.go315
-rw-r--r--modules/queue/queue_disk.go124
-rw-r--r--modules/queue/queue_disk_channel.go358
-rw-r--r--modules/queue/queue_disk_channel_test.go544
-rw-r--r--modules/queue/queue_disk_test.go147
-rw-r--r--modules/queue/queue_redis.go137
-rw-r--r--modules/queue/queue_test.go42
-rw-r--r--modules/queue/queue_wrapped.go315
-rw-r--r--modules/queue/setting.go126
-rw-r--r--modules/queue/testhelper.go40
-rw-r--r--modules/queue/unique_queue.go28
-rw-r--r--modules/queue/unique_queue_channel.go212
-rw-r--r--modules/queue/unique_queue_channel_test.go258
-rw-r--r--modules/queue/unique_queue_disk.go128
-rw-r--r--modules/queue/unique_queue_disk_channel.go336
-rw-r--r--modules/queue/unique_queue_disk_channel_test.go265
-rw-r--r--modules/queue/unique_queue_redis.go141
-rw-r--r--modules/queue/unique_queue_wrapped.go174
-rw-r--r--modules/queue/workergroup.go331
-rw-r--r--modules/queue/workerpool.go613
-rw-r--r--modules/queue/workerqueue.go241
-rw-r--r--modules/queue/workerqueue_test.go260
-rw-r--r--modules/setting/config_provider.go6
-rw-r--r--modules/setting/cron_test.go2
-rw-r--r--modules/setting/indexer.go9
-rw-r--r--modules/setting/queue.go241
-rw-r--r--modules/setting/storage_test.go18
-rw-r--r--modules/test/context_tests.go1
-rw-r--r--modules/testlogger/testlogger.go212
-rw-r--r--modules/util/timer.go12
63 files changed, 2398 insertions, 5981 deletions
diff --git a/modules/indexer/code/bleve.go b/modules/indexer/code/bleve.go
index e9085f4107..5936613e3a 100644
--- a/modules/indexer/code/bleve.go
+++ b/modules/indexer/code/bleve.go
@@ -273,10 +273,6 @@ func (b *BleveIndexer) Close() {
log.Info("PID: %d Repository Indexer closed", os.Getpid())
}
-// SetAvailabilityChangeCallback does nothing
-func (b *BleveIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
-}
-
// Ping does nothing
func (b *BleveIndexer) Ping() bool {
return true
diff --git a/modules/indexer/code/elastic_search.go b/modules/indexer/code/elastic_search.go
index 68c8096758..6097538009 100644
--- a/modules/indexer/code/elastic_search.go
+++ b/modules/indexer/code/elastic_search.go
@@ -42,12 +42,11 @@ var _ Indexer = &ElasticSearchIndexer{}
// ElasticSearchIndexer implements Indexer interface
type ElasticSearchIndexer struct {
- client *elastic.Client
- indexerAliasName string
- available bool
- availabilityCallback func(bool)
- stopTimer chan struct{}
- lock sync.RWMutex
+ client *elastic.Client
+ indexerAliasName string
+ available bool
+ stopTimer chan struct{}
+ lock sync.RWMutex
}
type elasticLogger struct {
@@ -198,13 +197,6 @@ func (b *ElasticSearchIndexer) init() (bool, error) {
return exists, nil
}
-// SetAvailabilityChangeCallback sets callback that will be triggered when availability changes
-func (b *ElasticSearchIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
- b.lock.Lock()
- defer b.lock.Unlock()
- b.availabilityCallback = callback
-}
-
// Ping checks if elastic is available
func (b *ElasticSearchIndexer) Ping() bool {
b.lock.RLock()
@@ -529,8 +521,4 @@ func (b *ElasticSearchIndexer) setAvailability(available bool) {
}
b.available = available
- if b.availabilityCallback != nil {
- // Call the callback from within the lock to ensure that the ordering remains correct
- b.availabilityCallback(b.available)
- }
}
diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go
index 2c493ccf94..a5e40b52c1 100644
--- a/modules/indexer/code/indexer.go
+++ b/modules/indexer/code/indexer.go
@@ -44,7 +44,6 @@ type SearchResultLanguages struct {
// Indexer defines an interface to index and search code contents
type Indexer interface {
Ping() bool
- SetAvailabilityChangeCallback(callback func(bool))
Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *repoChanges) error
Delete(repoID int64) error
Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error)
@@ -81,7 +80,7 @@ type IndexerData struct {
RepoID int64
}
-var indexerQueue queue.UniqueQueue
+var indexerQueue *queue.WorkerPoolQueue[*IndexerData]
func index(ctx context.Context, indexer Indexer, repoID int64) error {
repo, err := repo_model.GetRepositoryByID(ctx, repoID)
@@ -137,37 +136,45 @@ func Init() {
// Create the Queue
switch setting.Indexer.RepoType {
case "bleve", "elasticsearch":
- handler := func(data ...queue.Data) []queue.Data {
+ handler := func(items ...*IndexerData) (unhandled []*IndexerData) {
idx, err := indexer.get()
if idx == nil || err != nil {
log.Error("Codes indexer handler: unable to get indexer!")
- return data
+ return items
}
- unhandled := make([]queue.Data, 0, len(data))
- for _, datum := range data {
- indexerData, ok := datum.(*IndexerData)
- if !ok {
- log.Error("Unable to process provided datum: %v - not possible to cast to IndexerData", datum)
- continue
- }
+ for _, indexerData := range items {
log.Trace("IndexerData Process Repo: %d", indexerData.RepoID)
+ // FIXME: it seems there is a bug in `CatFileBatch` or `nio.Pipe`, which will cause the process to hang forever in rare cases
+ /*
+ sync.(*Cond).Wait(cond.go:70)
+ github.com/djherbis/nio/v3.(*PipeReader).Read(sync.go:106)
+ bufio.(*Reader).fill(bufio.go:106)
+ bufio.(*Reader).ReadSlice(bufio.go:372)
+ bufio.(*Reader).collectFragments(bufio.go:447)
+ bufio.(*Reader).ReadString(bufio.go:494)
+ code.gitea.io/gitea/modules/git.ReadBatchLine(batch_reader.go:149)
+ code.gitea.io/gitea/modules/indexer/code.(*BleveIndexer).addUpdate(bleve.go:214)
+ code.gitea.io/gitea/modules/indexer/code.(*BleveIndexer).Index(bleve.go:296)
+ code.gitea.io/gitea/modules/indexer/code.(*wrappedIndexer).Index(wrapped.go:74)
+ code.gitea.io/gitea/modules/indexer/code.index(indexer.go:105)
+ */
if err := index(ctx, indexer, indexerData.RepoID); err != nil {
- if !setting.IsInTesting {
- log.Error("indexer index error for repo %v: %v", indexerData.RepoID, err)
- }
- if indexer.Ping() {
+ if !idx.Ping() {
+ log.Error("Code indexer handler: indexer is unavailable.")
+ unhandled = append(unhandled, indexerData)
continue
}
- // Add back to queue
- unhandled = append(unhandled, datum)
+ if !setting.IsInTesting {
+ log.Error("Codes indexer handler: index error for repo %v: %v", indexerData.RepoID, err)
+ }
}
}
return unhandled
}
- indexerQueue = queue.CreateUniqueQueue("code_indexer", handler, &IndexerData{})
+ indexerQueue = queue.CreateUniqueQueue("code_indexer", handler)
if indexerQueue == nil {
log.Fatal("Unable to create codes indexer queue")
}
@@ -224,18 +231,6 @@ func Init() {
indexer.set(rIndexer)
- if queue, ok := indexerQueue.(queue.Pausable); ok {
- rIndexer.SetAvailabilityChangeCallback(func(available bool) {
- if !available {
- log.Info("Code index queue paused")
- queue.Pause()
- } else {
- log.Info("Code index queue resumed")
- queue.Resume()
- }
- })
- }
-
// Start processing the queue
go graceful.GetManager().RunWithShutdownFns(indexerQueue.Run)
diff --git a/modules/indexer/code/wrapped.go b/modules/indexer/code/wrapped.go
index 33ba57a094..7eed3e8557 100644
--- a/modules/indexer/code/wrapped.go
+++ b/modules/indexer/code/wrapped.go
@@ -56,16 +56,6 @@ func (w *wrappedIndexer) get() (Indexer, error) {
return w.internal, nil
}
-// SetAvailabilityChangeCallback sets callback that will be triggered when availability changes
-func (w *wrappedIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
- indexer, err := w.get()
- if err != nil {
- log.Error("Failed to get indexer: %v", err)
- return
- }
- indexer.SetAvailabilityChangeCallback(callback)
-}
-
// Ping checks if elastic is available
func (w *wrappedIndexer) Ping() bool {
indexer, err := w.get()
diff --git a/modules/indexer/issues/bleve.go b/modules/indexer/issues/bleve.go
index e3ef9af5b9..60d9ef7617 100644
--- a/modules/indexer/issues/bleve.go
+++ b/modules/indexer/issues/bleve.go
@@ -187,10 +187,6 @@ func (b *BleveIndexer) Init() (bool, error) {
return false, err
}
-// SetAvailabilityChangeCallback does nothing
-func (b *BleveIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
-}
-
// Ping does nothing
func (b *BleveIndexer) Ping() bool {
return true
diff --git a/modules/indexer/issues/db.go b/modules/indexer/issues/db.go
index d28b536e02..04c101c356 100644
--- a/modules/indexer/issues/db.go
+++ b/modules/indexer/issues/db.go
@@ -18,10 +18,6 @@ func (i *DBIndexer) Init() (bool, error) {
return false, nil
}
-// SetAvailabilityChangeCallback dummy function
-func (i *DBIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
-}
-
// Ping checks if database is available
func (i *DBIndexer) Ping() bool {
return db.GetEngine(db.DefaultContext).Ping() != nil
diff --git a/modules/indexer/issues/elastic_search.go b/modules/indexer/issues/elastic_search.go
index ee8e3df62f..fd1dd4b452 100644
--- a/modules/indexer/issues/elastic_search.go
+++ b/modules/indexer/issues/elastic_search.go
@@ -22,12 +22,11 @@ var _ Indexer = &ElasticSearchIndexer{}
// ElasticSearchIndexer implements Indexer interface
type ElasticSearchIndexer struct {
- client *elastic.Client
- indexerName string
- available bool
- availabilityCallback func(bool)
- stopTimer chan struct{}
- lock sync.RWMutex
+ client *elastic.Client
+ indexerName string
+ available bool
+ stopTimer chan struct{}
+ lock sync.RWMutex
}
type elasticLogger struct {
@@ -138,13 +137,6 @@ func (b *ElasticSearchIndexer) Init() (bool, error) {
return true, nil
}
-// SetAvailabilityChangeCallback sets callback that will be triggered when availability changes
-func (b *ElasticSearchIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
- b.lock.Lock()
- defer b.lock.Unlock()
- b.availabilityCallback = callback
-}
-
// Ping checks if elastic is available
func (b *ElasticSearchIndexer) Ping() bool {
b.lock.RLock()
@@ -305,8 +297,4 @@ func (b *ElasticSearchIndexer) setAvailability(available bool) {
}
b.available = available
- if b.availabilityCallback != nil {
- // Call the callback from within the lock to ensure that the ordering remains correct
- b.availabilityCallback(b.available)
- }
}
diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go
index 47a8b10794..e88b1b2bef 100644
--- a/modules/indexer/issues/indexer.go
+++ b/modules/indexer/issues/indexer.go
@@ -49,7 +49,6 @@ type SearchResult struct {
type Indexer interface {
Init() (bool, error)
Ping() bool
- SetAvailabilityChangeCallback(callback func(bool))
Index(issue []*IndexerData) error
Delete(ids ...int64) error
Search(ctx context.Context, kw string, repoIDs []int64, limit, start int) (*SearchResult, error)
@@ -94,7 +93,7 @@ func (h *indexerHolder) get() Indexer {
var (
// issueIndexerQueue queue of issue ids to be updated
- issueIndexerQueue queue.Queue
+ issueIndexerQueue *queue.WorkerPoolQueue[*IndexerData]
holder = newIndexerHolder()
)
@@ -108,62 +107,44 @@ func InitIssueIndexer(syncReindex bool) {
// Create the Queue
switch setting.Indexer.IssueType {
case "bleve", "elasticsearch", "meilisearch":
- handler := func(data ...queue.Data) []queue.Data {
+ handler := func(items ...*IndexerData) (unhandled []*IndexerData) {
indexer := holder.get()
if indexer == nil {
- log.Error("Issue indexer handler: unable to get indexer!")
- return data
+ log.Error("Issue indexer handler: unable to get indexer.")
+ return items
}
-
- iData := make([]*IndexerData, 0, len(data))
- unhandled := make([]queue.Data, 0, len(data))
- for _, datum := range data {
- indexerData, ok := datum.(*IndexerData)
- if !ok {
- log.Error("Unable to process provided datum: %v - not possible to cast to IndexerData", datum)
- continue
- }
+ toIndex := make([]*IndexerData, 0, len(items))
+ for _, indexerData := range items {
log.Trace("IndexerData Process: %d %v %t", indexerData.ID, indexerData.IDs, indexerData.IsDelete)
if indexerData.IsDelete {
if err := indexer.Delete(indexerData.IDs...); err != nil {
- log.Error("Error whilst deleting from index: %v Error: %v", indexerData.IDs, err)
- if indexer.Ping() {
- continue
+ log.Error("Issue indexer handler: failed to from index: %v Error: %v", indexerData.IDs, err)
+ if !indexer.Ping() {
+ log.Error("Issue indexer handler: indexer is unavailable when deleting")
+ unhandled = append(unhandled, indexerData)
}
- // Add back to queue
- unhandled = append(unhandled, datum)
}
continue
}
- iData = append(iData, indexerData)
+ toIndex = append(toIndex, indexerData)
}
- if len(unhandled) > 0 {
- for _, indexerData := range iData {
- unhandled = append(unhandled, indexerData)
- }
- return unhandled
- }
- if err := indexer.Index(iData); err != nil {
- log.Error("Error whilst indexing: %v Error: %v", iData, err)
- if indexer.Ping() {
- return nil
- }
- // Add back to queue
- for _, indexerData := range iData {
- unhandled = append(unhandled, indexerData)
+ if err := indexer.Index(toIndex); err != nil {
+ log.Error("Error whilst indexing: %v Error: %v", toIndex, err)
+ if !indexer.Ping() {
+ log.Error("Issue indexer handler: indexer is unavailable when indexing")
+ unhandled = append(unhandled, toIndex...)
}
- return unhandled
}
- return nil
+ return unhandled
}
- issueIndexerQueue = queue.CreateQueue("issue_indexer", handler, &IndexerData{})
+ issueIndexerQueue = queue.CreateSimpleQueue("issue_indexer", handler)
if issueIndexerQueue == nil {
log.Fatal("Unable to create issue indexer queue")
}
default:
- issueIndexerQueue = &queue.DummyQueue{}
+ issueIndexerQueue = queue.CreateSimpleQueue[*IndexerData]("issue_indexer", nil)
}
// Create the Indexer
@@ -240,18 +221,6 @@ func InitIssueIndexer(syncReindex bool) {
log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType)
}
- if queue, ok := issueIndexerQueue.(queue.Pausable); ok {
- holder.get().SetAvailabilityChangeCallback(func(available bool) {
- if !available {
- log.Info("Issue index queue paused")
- queue.Pause()
- } else {
- log.Info("Issue index queue resumed")
- queue.Resume()
- }
- })
- }
-
// Start processing the queue
go graceful.GetManager().RunWithShutdownFns(issueIndexerQueue.Run)
@@ -285,9 +254,7 @@ func InitIssueIndexer(syncReindex bool) {
case <-graceful.GetManager().IsShutdown():
log.Warn("Shutdown occurred before issue index initialisation was complete")
case <-time.After(timeout):
- if shutdownable, ok := issueIndexerQueue.(queue.Shutdownable); ok {
- shutdownable.Terminate()
- }
+ issueIndexerQueue.ShutdownWait(5 * time.Second)
log.Fatal("Issue Indexer Initialization timed-out after: %v", timeout)
}
}()
diff --git a/modules/indexer/issues/meilisearch.go b/modules/indexer/issues/meilisearch.go
index 319dc3e30b..990bc57a05 100644
--- a/modules/indexer/issues/meilisearch.go
+++ b/modules/indexer/issues/meilisearch.go
@@ -17,12 +17,11 @@ var _ Indexer = &MeilisearchIndexer{}
// MeilisearchIndexer implements Indexer interface
type MeilisearchIndexer struct {
- client *meilisearch.Client
- indexerName string
- available bool
- availabilityCallback func(bool)
- stopTimer chan struct{}
- lock sync.RWMutex
+ client *meilisearch.Client
+ indexerName string
+ available bool
+ stopTimer chan struct{}
+ lock sync.RWMutex
}
// MeilisearchIndexer creates a new meilisearch indexer
@@ -73,13 +72,6 @@ func (b *MeilisearchIndexer) Init() (bool, error) {
return false, b.checkError(err)
}
-// SetAvailabilityChangeCallback sets callback that will be triggered when availability changes
-func (b *MeilisearchIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
- b.lock.Lock()
- defer b.lock.Unlock()
- b.availabilityCallback = callback
-}
-
// Ping checks if meilisearch is available
func (b *MeilisearchIndexer) Ping() bool {
b.lock.RLock()
@@ -178,8 +170,4 @@ func (b *MeilisearchIndexer) setAvailability(available bool) {
}
b.available = available
- if b.availabilityCallback != nil {
- // Call the callback from within the lock to ensure that the ordering remains correct
- b.availabilityCallback(b.available)
- }
}
diff --git a/modules/indexer/stats/indexer_test.go b/modules/indexer/stats/indexer_test.go
index 8d9b4e36d9..be9c6659f1 100644
--- a/modules/indexer/stats/indexer_test.go
+++ b/modules/indexer/stats/indexer_test.go
@@ -41,7 +41,7 @@ func TestRepoStatsIndex(t *testing.T) {
err = UpdateRepoIndexer(repo)
assert.NoError(t, err)
- queue.GetManager().FlushAll(context.Background(), 5*time.Second)
+ assert.NoError(t, queue.GetManager().FlushAll(context.Background(), 5*time.Second))
status, err := repo_model.GetIndexerStatus(db.DefaultContext, repo, repo_model.RepoIndexerTypeStats)
assert.NoError(t, err)
diff --git a/modules/indexer/stats/queue.go b/modules/indexer/stats/queue.go
index a57338e07d..46438925e4 100644
--- a/modules/indexer/stats/queue.go
+++ b/modules/indexer/stats/queue.go
@@ -14,12 +14,11 @@ import (
)
// statsQueue represents a queue to handle repository stats updates
-var statsQueue queue.UniqueQueue
+var statsQueue *queue.WorkerPoolQueue[int64]
// handle passed PR IDs and test the PRs
-func handle(data ...queue.Data) []queue.Data {
- for _, datum := range data {
- opts := datum.(int64)
+func handler(items ...int64) []int64 {
+ for _, opts := range items {
if err := indexer.Index(opts); err != nil {
if !setting.IsInTesting {
log.Error("stats queue indexer.Index(%d) failed: %v", opts, err)
@@ -30,7 +29,7 @@ func handle(data ...queue.Data) []queue.Data {
}
func initStatsQueue() error {
- statsQueue = queue.CreateUniqueQueue("repo_stats_update", handle, int64(0))
+ statsQueue = queue.CreateUniqueQueue("repo_stats_update", handler)
if statsQueue == nil {
return fmt.Errorf("Unable to create repo_stats_update Queue")
}
diff --git a/modules/mirror/mirror.go b/modules/mirror/mirror.go
index 37b4c2ac95..73e591adba 100644
--- a/modules/mirror/mirror.go
+++ b/modules/mirror/mirror.go
@@ -10,7 +10,7 @@ import (
"code.gitea.io/gitea/modules/setting"
)
-var mirrorQueue queue.UniqueQueue
+var mirrorQueue *queue.WorkerPoolQueue[*SyncRequest]
// SyncType type of sync request
type SyncType int
@@ -29,11 +29,11 @@ type SyncRequest struct {
}
// StartSyncMirrors starts a go routine to sync the mirrors
-func StartSyncMirrors(queueHandle func(data ...queue.Data) []queue.Data) {
+func StartSyncMirrors(queueHandle func(data ...*SyncRequest) []*SyncRequest) {
if !setting.Mirror.Enabled {
return
}
- mirrorQueue = queue.CreateUniqueQueue("mirror", queueHandle, new(SyncRequest))
+ mirrorQueue = queue.CreateUniqueQueue("mirror", queueHandle)
go graceful.GetManager().RunWithShutdownFns(mirrorQueue.Run)
}
diff --git a/modules/notification/ui/ui.go b/modules/notification/ui/ui.go
index 73ea922748..a4576b6791 100644
--- a/modules/notification/ui/ui.go
+++ b/modules/notification/ui/ui.go
@@ -21,7 +21,7 @@ import (
type (
notificationService struct {
base.NullNotifier
- issueQueue queue.Queue
+ issueQueue *queue.WorkerPoolQueue[issueNotificationOpts]
}
issueNotificationOpts struct {
@@ -37,13 +37,12 @@ var _ base.Notifier = &notificationService{}
// NewNotifier create a new notificationService notifier
func NewNotifier() base.Notifier {
ns := &notificationService{}
- ns.issueQueue = queue.CreateQueue("notification-service", ns.handle, issueNotificationOpts{})
+ ns.issueQueue = queue.CreateSimpleQueue("notification-service", handler)
return ns
}
-func (ns *notificationService) handle(data ...queue.Data) []queue.Data {
- for _, datum := range data {
- opts := datum.(issueNotificationOpts)
+func handler(items ...issueNotificationOpts) []issueNotificationOpts {
+ for _, opts := range items {
if err := activities_model.CreateOrUpdateIssueNotifications(opts.IssueID, opts.CommentID, opts.NotificationAuthorID, opts.ReceiverID); err != nil {
log.Error("Was unable to create issue notification: %v", err)
}
@@ -52,7 +51,7 @@ func (ns *notificationService) handle(data ...queue.Data) []queue.Data {
}
func (ns *notificationService) Run() {
- graceful.GetManager().RunWithShutdownFns(ns.issueQueue.Run)
+ go graceful.GetManager().RunWithShutdownFns(ns.issueQueue.Run)
}
func (ns *notificationService) NotifyCreateIssueComment(ctx context.Context, doer *user_model.User, repo *repo_model.Repository,
diff --git a/modules/queue/backoff.go b/modules/queue/backoff.go
new file mode 100644
index 0000000000..cda7233567
--- /dev/null
+++ b/modules/queue/backoff.go
@@ -0,0 +1,63 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package queue
+
+import (
+ "context"
+ "time"
+)
+
+const (
+ backoffBegin = 50 * time.Millisecond
+ backoffUpper = 2 * time.Second
+)
+
+type (
+ backoffFuncRetErr[T any] func() (retry bool, ret T, err error)
+ backoffFuncErr func() (retry bool, err error)
+)
+
+func backoffRetErr[T any](ctx context.Context, begin, upper time.Duration, end <-chan time.Time, fn backoffFuncRetErr[T]) (ret T, err error) {
+ d := begin
+ for {
+ // check whether the context has been cancelled or has reached the deadline, return early
+ select {
+ case <-ctx.Done():
+ return ret, ctx.Err()
+ case <-end:
+ return ret, context.DeadlineExceeded
+ default:
+ }
+
+ // call the target function
+ retry, ret, err := fn()
+ if err != nil {
+ return ret, err
+ }
+ if !retry {
+ return ret, nil
+ }
+
+ // wait for a while before retrying, and also respect the context & deadline
+ select {
+ case <-ctx.Done():
+ return ret, ctx.Err()
+ case <-time.After(d):
+ d *= 2
+ if d > upper {
+ d = upper
+ }
+ case <-end:
+ return ret, context.DeadlineExceeded
+ }
+ }
+}
+
+func backoffErr(ctx context.Context, begin, upper time.Duration, end <-chan time.Time, fn backoffFuncErr) error {
+ _, err := backoffRetErr(ctx, begin, upper, end, func() (retry bool, ret any, err error) {
+ retry, err = fn()
+ return retry, nil, err
+ })
+ return err
+}
diff --git a/modules/queue/base.go b/modules/queue/base.go
new file mode 100644
index 0000000000..102e79e541
--- /dev/null
+++ b/modules/queue/base.go
@@ -0,0 +1,42 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package queue
+
+import (
+ "context"
+ "time"
+)
+
+var pushBlockTime = 5 * time.Second
+
+type baseQueue interface {
+ PushItem(ctx context.Context, data []byte) error
+ PopItem(ctx context.Context) ([]byte, error)
+ HasItem(ctx context.Context, data []byte) (bool, error)
+ Len(ctx context.Context) (int, error)
+ Close() error
+ RemoveAll(ctx context.Context) error
+}
+
+func popItemByChan(ctx context.Context, popItemFn func(ctx context.Context) ([]byte, error)) (chanItem chan []byte, chanErr chan error) {
+ chanItem = make(chan []byte)
+ chanErr = make(chan error)
+ go func() {
+ for {
+ it, err := popItemFn(ctx)
+ if err != nil {
+ close(chanItem)
+ chanErr <- err
+ return
+ }
+ if it == nil {
+ close(chanItem)
+ close(chanErr)
+ return
+ }
+ chanItem <- it
+ }
+ }()
+ return chanItem, chanErr
+}
diff --git a/modules/queue/base_channel.go b/modules/queue/base_channel.go
new file mode 100644
index 0000000000..27055faf4b
--- /dev/null
+++ b/modules/queue/base_channel.go
@@ -0,0 +1,123 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package queue
+
+import (
+ "context"
+ "errors"
+ "sync"
+ "time"
+
+ "code.gitea.io/gitea/modules/container"
+)
+
+var errChannelClosed = errors.New("channel is closed")
+
+type baseChannel struct {
+ c chan []byte
+ set container.Set[string]
+ mu sync.Mutex
+
+ isUnique bool
+}
+
+var _ baseQueue = (*baseChannel)(nil)
+
+func newBaseChannelGeneric(cfg *BaseConfig, unique bool) (baseQueue, error) {
+ q := &baseChannel{c: make(chan []byte, cfg.Length), isUnique: unique}
+ if unique {
+ q.set = container.Set[string]{}
+ }
+ return q, nil
+}
+
+func newBaseChannelSimple(cfg *BaseConfig) (baseQueue, error) {
+ return newBaseChannelGeneric(cfg, false)
+}
+
+func newBaseChannelUnique(cfg *BaseConfig) (baseQueue, error) {
+ return newBaseChannelGeneric(cfg, true)
+}
+
+func (q *baseChannel) PushItem(ctx context.Context, data []byte) error {
+ if q.c == nil {
+ return errChannelClosed
+ }
+
+ if q.isUnique {
+ q.mu.Lock()
+ has := q.set.Contains(string(data))
+ q.mu.Unlock()
+ if has {
+ return ErrAlreadyInQueue
+ }
+ }
+
+ select {
+ case q.c <- data:
+ if q.isUnique {
+ q.mu.Lock()
+ q.set.Add(string(data))
+ q.mu.Unlock()
+ }
+ return nil
+ case <-time.After(pushBlockTime):
+ return context.DeadlineExceeded
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+}
+
+func (q *baseChannel) PopItem(ctx context.Context) ([]byte, error) {
+ select {
+ case data, ok := <-q.c:
+ if !ok {
+ return nil, errChannelClosed
+ }
+ q.mu.Lock()
+ q.set.Remove(string(data))
+ q.mu.Unlock()
+ return data, nil
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ }
+}
+
+func (q *baseChannel) HasItem(ctx context.Context, data []byte) (bool, error) {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+
+ return q.set.Contains(string(data)), nil
+}
+
+func (q *baseChannel) Len(ctx context.Context) (int, error) {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+
+ if q.c == nil {
+ return 0, errChannelClosed
+ }
+
+ return len(q.c), nil
+}
+
+func (q *baseChannel) Close() error {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+
+ close(q.c)
+ q.set = container.Set[string]{}
+
+ return nil
+}
+
+func (q *baseChannel) RemoveAll(ctx context.Context) error {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+
+ for q.c != nil && len(q.c) > 0 {
+ <-q.c
+ }
+ return nil
+}
diff --git a/modules/queue/base_channel_test.go b/modules/queue/base_channel_test.go
new file mode 100644
index 0000000000..5d0a2ed0a7
--- /dev/null
+++ b/modules/queue/base_channel_test.go
@@ -0,0 +1,11 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package queue
+
+import "testing"
+
+func TestBaseChannel(t *testing.T) {
+ testQueueBasic(t, newBaseChannelSimple, &BaseConfig{ManagedName: "baseChannel", Length: 10}, false)
+ testQueueBasic(t, newBaseChannelUnique, &BaseConfig{ManagedName: "baseChannel", Length: 10}, true)
+}
diff --git a/modules/queue/base_dummy.go b/modules/queue/base_dummy.go
new file mode 100644
index 0000000000..7503568a09
--- /dev/null
+++ b/modules/queue/base_dummy.go
@@ -0,0 +1,38 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package queue
+
+import "context"
+
+type baseDummy struct{}
+
+var _ baseQueue = (*baseDummy)(nil)
+
+func newBaseDummy(cfg *BaseConfig, unique bool) (baseQueue, error) {
+ return &baseDummy{}, nil
+}
+
+func (q *baseDummy) PushItem(ctx context.Context, data []byte) error {
+ return nil
+}
+
+func (q *baseDummy) PopItem(ctx context.Context) ([]byte, error) {
+ return nil, nil
+}
+
+func (q *baseDummy) Len(ctx context.Context) (int, error) {
+ return 0, nil
+}
+
+func (q *baseDummy) HasItem(ctx context.Context, data []byte) (bool, error) {
+ return false, nil
+}
+
+func (q *baseDummy) Close() error {
+ return nil
+}
+
+func (q *baseDummy) RemoveAll(ctx context.Context) error {
+ return nil
+}
diff --git a/modules/queue/base_levelqueue.go b/modules/queue/base_levelqueue.go
new file mode 100644
index 0000000000..afde502116
--- /dev/null
+++ b/modules/queue/base_levelqueue.go
@@ -0,0 +1,72 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package queue
+
+import (
+ "context"
+
+ "code.gitea.io/gitea/modules/nosql"
+
+ "gitea.com/lunny/levelqueue"
+)
+
+type baseLevelQueue struct {
+ internal *levelqueue.Queue
+ conn string
+ cfg *BaseConfig
+}
+
+var _ baseQueue = (*baseLevelQueue)(nil)
+
+func newBaseLevelQueueGeneric(cfg *BaseConfig, unique bool) (baseQueue, error) {
+ if unique {
+ return newBaseLevelQueueUnique(cfg)
+ }
+ return newBaseLevelQueueSimple(cfg)
+}
+
+func newBaseLevelQueueSimple(cfg *BaseConfig) (baseQueue, error) {
+ conn, db, err := prepareLevelDB(cfg)
+ if err != nil {
+ return nil, err
+ }
+ q := &baseLevelQueue{conn: conn, cfg: cfg}
+ q.internal, err = levelqueue.NewQueue(db, []byte(cfg.QueueFullName), false)
+ if err != nil {
+ return nil, err
+ }
+
+ return q, nil
+}
+
+func (q *baseLevelQueue) PushItem(ctx context.Context, data []byte) error {
+ return baseLevelQueueCommon(q.cfg, q.internal, nil).PushItem(ctx, data)
+}
+
+func (q *baseLevelQueue) PopItem(ctx context.Context) ([]byte, error) {
+ return baseLevelQueueCommon(q.cfg, q.internal, nil).PopItem(ctx)
+}
+
+func (q *baseLevelQueue) HasItem(ctx context.Context, data []byte) (bool, error) {
+ return false, nil
+}
+
+func (q *baseLevelQueue) Len(ctx context.Context) (int, error) {
+ return int(q.internal.Len()), nil
+}
+
+func (q *baseLevelQueue) Close() error {
+ err := q.internal.Close()
+ _ = nosql.GetManager().CloseLevelDB(q.conn)
+ return err
+}
+
+func (q *baseLevelQueue) RemoveAll(ctx context.Context) error {
+ for q.internal.Len() > 0 {
+ if _, err := q.internal.LPop(); err != nil {
+ return err
+ }
+ }
+ return nil
+}
diff --git a/modules/queue/base_levelqueue_common.go b/modules/queue/base_levelqueue_common.go
new file mode 100644
index 0000000000..409a965517
--- /dev/null
+++ b/modules/queue/base_levelqueue_common.go
@@ -0,0 +1,92 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package queue
+
+import (
+ "context"
+ "fmt"
+ "path/filepath"
+ "strings"
+ "sync"
+ "time"
+
+ "code.gitea.io/gitea/modules/nosql"
+
+ "gitea.com/lunny/levelqueue"
+ "github.com/syndtr/goleveldb/leveldb"
+)
+
+type baseLevelQueuePushPoper interface {
+ RPush(data []byte) error
+ LPop() ([]byte, error)
+ Len() int64
+}
+
+type baseLevelQueueCommonImpl struct {
+ length int
+ internal baseLevelQueuePushPoper
+ mu *sync.Mutex
+}
+
+func (q *baseLevelQueueCommonImpl) PushItem(ctx context.Context, data []byte) error {
+ return backoffErr(ctx, backoffBegin, backoffUpper, time.After(pushBlockTime), func() (retry bool, err error) {
+ if q.mu != nil {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+ }
+
+ cnt := int(q.internal.Len())
+ if cnt >= q.length {
+ return true, nil
+ }
+ retry, err = false, q.internal.RPush(data)
+ if err == levelqueue.ErrAlreadyInQueue {
+ err = ErrAlreadyInQueue
+ }
+ return retry, err
+ })
+}
+
+func (q *baseLevelQueueCommonImpl) PopItem(ctx context.Context) ([]byte, error) {
+ return backoffRetErr(ctx, backoffBegin, backoffUpper, infiniteTimerC, func() (retry bool, data []byte, err error) {
+ if q.mu != nil {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+ }
+
+ data, err = q.internal.LPop()
+ if err == levelqueue.ErrNotFound {
+ return true, nil, nil
+ }
+ if err != nil {
+ return false, nil, err
+ }
+ return false, data, nil
+ })
+}
+
+func baseLevelQueueCommon(cfg *BaseConfig, internal baseLevelQueuePushPoper, mu *sync.Mutex) *baseLevelQueueCommonImpl {
+ return &baseLevelQueueCommonImpl{length: cfg.Length, internal: internal}
+}
+
+func prepareLevelDB(cfg *BaseConfig) (conn string, db *leveldb.DB, err error) {
+ if cfg.ConnStr == "" { // use data dir as conn str
+ if !filepath.IsAbs(cfg.DataFullDir) {
+ return "", nil, fmt.Errorf("invalid leveldb data dir (not absolute): %q", cfg.DataFullDir)
+ }
+ conn = cfg.DataFullDir
+ } else {
+ if !strings.HasPrefix(cfg.ConnStr, "leveldb://") {
+ return "", nil, fmt.Errorf("invalid leveldb connection string: %q", cfg.ConnStr)
+ }
+ conn = cfg.ConnStr
+ }
+ for i := 0; i < 10; i++ {
+ if db, err = nosql.GetManager().GetLevelDB(conn); err == nil {
+ break
+ }
+ time.Sleep(1 * time.Second)
+ }
+ return conn, db, err
+}
diff --git a/modules/queue/base_levelqueue_test.go b/modules/queue/base_levelqueue_test.go
new file mode 100644
index 0000000000..712a0892cd
--- /dev/null
+++ b/modules/queue/base_levelqueue_test.go
@@ -0,0 +1,23 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package queue
+
+import (
+ "testing"
+
+ "code.gitea.io/gitea/modules/setting"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestBaseLevelDB(t *testing.T) {
+ _, err := newBaseLevelQueueGeneric(&BaseConfig{ConnStr: "redis://"}, false)
+ assert.ErrorContains(t, err, "invalid leveldb connection string")
+
+ _, err = newBaseLevelQueueGeneric(&BaseConfig{DataFullDir: "relative"}, false)
+ assert.ErrorContains(t, err, "invalid leveldb data dir")
+
+ testQueueBasic(t, newBaseLevelQueueSimple, toBaseConfig("baseLevelQueue", setting.QueueSettings{Datadir: t.TempDir() + "/queue-test", Length: 10}), false)
+ testQueueBasic(t, newBaseLevelQueueUnique, toBaseConfig("baseLevelQueueUnique", setting.QueueSettings{ConnStr: "leveldb://" + t.TempDir() + "/queue-test", Length: 10}), true)
+}
diff --git a/modules/queue/base_levelqueue_unique.go b/modules/queue/base_levelqueue_unique.go
new file mode 100644
index 0000000000..7546221631
--- /dev/null
+++ b/modules/queue/base_levelqueue_unique.go
@@ -0,0 +1,93 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package queue
+
+import (
+ "context"
+ "sync"
+ "unsafe"
+
+ "code.gitea.io/gitea/modules/nosql"
+
+ "gitea.com/lunny/levelqueue"
+ "github.com/syndtr/goleveldb/leveldb"
+)
+
+type baseLevelQueueUnique struct {
+ internal *levelqueue.UniqueQueue
+ conn string
+ cfg *BaseConfig
+
+ mu sync.Mutex // the levelqueue.UniqueQueue is not thread-safe, there is no mutex protecting the underlying queue&set together
+}
+
+var _ baseQueue = (*baseLevelQueueUnique)(nil)
+
+func newBaseLevelQueueUnique(cfg *BaseConfig) (baseQueue, error) {
+ conn, db, err := prepareLevelDB(cfg)
+ if err != nil {
+ return nil, err
+ }
+ q := &baseLevelQueueUnique{conn: conn, cfg: cfg}
+ q.internal, err = levelqueue.NewUniqueQueue(db, []byte(cfg.QueueFullName), []byte(cfg.SetFullName), false)
+ if err != nil {
+ return nil, err
+ }
+
+ return q, nil
+}
+
+func (q *baseLevelQueueUnique) PushItem(ctx context.Context, data []byte) error {
+ return baseLevelQueueCommon(q.cfg, q.internal, &q.mu).PushItem(ctx, data)
+}
+
+func (q *baseLevelQueueUnique) PopItem(ctx context.Context) ([]byte, error) {
+ return baseLevelQueueCommon(q.cfg, q.internal, &q.mu).PopItem(ctx)
+}
+
+func (q *baseLevelQueueUnique) HasItem(ctx context.Context, data []byte) (bool, error) {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+ return q.internal.Has(data)
+}
+
+func (q *baseLevelQueueUnique) Len(ctx context.Context) (int, error) {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+ return int(q.internal.Len()), nil
+}
+
+func (q *baseLevelQueueUnique) Close() error {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+ err := q.internal.Close()
+ _ = nosql.GetManager().CloseLevelDB(q.conn)
+ return err
+}
+
+func (q *baseLevelQueueUnique) RemoveAll(ctx context.Context) error {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+
+ type levelUniqueQueue struct {
+ q *levelqueue.Queue
+ set *levelqueue.Set
+ db *leveldb.DB
+ }
+ lq := (*levelUniqueQueue)(unsafe.Pointer(q.internal))
+
+ members, err := lq.set.Members()
+ if err != nil {
+ return err // seriously corrupted
+ }
+ for _, v := range members {
+ _, _ = lq.set.Remove(v)
+ }
+ for lq.q.Len() > 0 {
+ if _, err = lq.q.LPop(); err != nil {
+ return err
+ }
+ }
+ return nil
+}
diff --git a/modules/queue/base_redis.go b/modules/queue/base_redis.go
new file mode 100644
index 0000000000..a294077cc6
--- /dev/null
+++ b/modules/queue/base_redis.go
@@ -0,0 +1,135 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package queue
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "code.gitea.io/gitea/modules/graceful"
+ "code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/nosql"
+
+ "github.com/redis/go-redis/v9"
+)
+
+type baseRedis struct {
+ client redis.UniversalClient
+ isUnique bool
+ cfg *BaseConfig
+
+ mu sync.Mutex // the old implementation is not thread-safe, the queue operation and set operation should be protected together
+}
+
+var _ baseQueue = (*baseRedis)(nil)
+
+func newBaseRedisGeneric(cfg *BaseConfig, unique bool) (baseQueue, error) {
+ client := nosql.GetManager().GetRedisClient(cfg.ConnStr)
+
+ var err error
+ for i := 0; i < 10; i++ {
+ err = client.Ping(graceful.GetManager().ShutdownContext()).Err()
+ if err == nil {
+ break
+ }
+ log.Warn("Redis is not ready, waiting for 1 second to retry: %v", err)
+ time.Sleep(time.Second)
+ }
+ if err != nil {
+ return nil, err
+ }
+
+ return &baseRedis{cfg: cfg, client: client, isUnique: unique}, nil
+}
+
+func newBaseRedisSimple(cfg *BaseConfig) (baseQueue, error) {
+ return newBaseRedisGeneric(cfg, false)
+}
+
+func newBaseRedisUnique(cfg *BaseConfig) (baseQueue, error) {
+ return newBaseRedisGeneric(cfg, true)
+}
+
+func (q *baseRedis) PushItem(ctx context.Context, data []byte) error {
+ return backoffErr(ctx, backoffBegin, backoffUpper, time.After(pushBlockTime), func() (retry bool, err error) {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+
+ cnt, err := q.client.LLen(ctx, q.cfg.QueueFullName).Result()
+ if err != nil {
+ return false, err
+ }
+ if int(cnt) >= q.cfg.Length {
+ return true, nil
+ }
+
+ if q.isUnique {
+ added, err := q.client.SAdd(ctx, q.cfg.SetFullName, data).Result()
+ if err != nil {
+ return false, err
+ }
+ if added == 0 {
+ return false, ErrAlreadyInQueue
+ }
+ }
+ return false, q.client.RPush(ctx, q.cfg.QueueFullName, data).Err()
+ })
+}
+
+func (q *baseRedis) PopItem(ctx context.Context) ([]byte, error) {
+ return backoffRetErr(ctx, backoffBegin, backoffUpper, infiniteTimerC, func() (retry bool, data []byte, err error) {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+
+ data, err = q.client.LPop(ctx, q.cfg.QueueFullName).Bytes()
+ if err == redis.Nil {
+ return true, nil, nil
+ }
+ if err != nil {
+ return true, nil, nil
+ }
+ if q.isUnique {
+ // the data has been popped, even if there is any error we can't do anything
+ _ = q.client.SRem(ctx, q.cfg.SetFullName, data).Err()
+ }
+ return false, data, err
+ })
+}
+
+func (q *baseRedis) HasItem(ctx context.Context, data []byte) (bool, error) {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+ if !q.isUnique {
+ return false, nil
+ }
+ return q.client.SIsMember(ctx, q.cfg.SetFullName, data).Result()
+}
+
+func (q *baseRedis) Len(ctx context.Context) (int, error) {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+ cnt, err := q.client.LLen(ctx, q.cfg.QueueFullName).Result()
+ return int(cnt), err
+}
+
+func (q *baseRedis) Close() error {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+ return q.client.Close()
+}
+
+func (q *baseRedis) RemoveAll(ctx context.Context) error {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+ c1 := q.client.Del(ctx, q.cfg.QueueFullName)
+ c2 := q.client.Del(ctx, q.cfg.SetFullName)
+ if c1.Err() != nil {
+ return c1.Err()
+ }
+ if c2.Err() != nil {
+ return c2.Err()
+ }
+ return nil // actually, checking errors doesn't make sense here because the state could be out-of-sync
+}
diff --git a/modules/queue/base_redis_test.go b/modules/queue/base_redis_test.go
new file mode 100644
index 0000000000..3d49e8d98c
--- /dev/null
+++ b/modules/queue/base_redis_test.go
@@ -0,0 +1,71 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package queue
+
+import (
+ "context"
+ "os"
+ "os/exec"
+ "testing"
+ "time"
+
+ "code.gitea.io/gitea/modules/nosql"
+ "code.gitea.io/gitea/modules/setting"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func waitRedisReady(conn string, dur time.Duration) (ready bool) {
+ ctxTimed, cancel := context.WithTimeout(context.Background(), time.Second*5)
+ defer cancel()
+ for t := time.Now(); ; time.Sleep(50 * time.Millisecond) {
+ ret := nosql.GetManager().GetRedisClient(conn).Ping(ctxTimed)
+ if ret.Err() == nil {
+ return true
+ }
+ if time.Since(t) > dur {
+ return false
+ }
+ }
+}
+
+func redisServerCmd(t *testing.T) *exec.Cmd {
+ redisServerProg, err := exec.LookPath("redis-server")
+ if err != nil {
+ return nil
+ }
+ c := &exec.Cmd{
+ Path: redisServerProg,
+ Args: []string{redisServerProg, "--bind", "127.0.0.1", "--port", "6379"},
+ Dir: t.TempDir(),
+ Stdin: os.Stdin,
+ Stdout: os.Stdout,
+ Stderr: os.Stderr,
+ }
+ return c
+}
+
+func TestBaseRedis(t *testing.T) {
+ var redisServer *exec.Cmd
+ defer func() {
+ if redisServer != nil {
+ _ = redisServer.Process.Signal(os.Interrupt)
+ _ = redisServer.Wait()
+ }
+ }()
+ if !waitRedisReady("redis://127.0.0.1:6379/0", 0) {
+ redisServer = redisServerCmd(t)
+ if redisServer == nil && os.Getenv("CI") != "" {
+ t.Skip("redis-server not found")
+ return
+ }
+ assert.NoError(t, redisServer.Start())
+ if !assert.True(t, waitRedisReady("redis://127.0.0.1:6379/0", 5*time.Second), "start redis-server") {
+ return
+ }
+ }
+
+ testQueueBasic(t, newBaseRedisSimple, toBaseConfig("baseRedis", setting.QueueSettings{Length: 10}), false)
+ testQueueBasic(t, newBaseRedisUnique, toBaseConfig("baseRedisUnique", setting.QueueSettings{Length: 10}), true)
+}
diff --git a/modules/queue/base_test.go b/modules/queue/base_test.go
new file mode 100644
index 0000000000..c5bf526ae6
--- /dev/null
+++ b/modules/queue/base_test.go
@@ -0,0 +1,140 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package queue
+
+import (
+ "context"
+ "fmt"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func testQueueBasic(t *testing.T, newFn func(cfg *BaseConfig) (baseQueue, error), cfg *BaseConfig, isUnique bool) {
+ t.Run(fmt.Sprintf("testQueueBasic-%s-unique:%v", cfg.ManagedName, isUnique), func(t *testing.T) {
+ q, err := newFn(cfg)
+ assert.NoError(t, err)
+
+ ctx := context.Background()
+ _ = q.RemoveAll(ctx)
+ cnt, err := q.Len(ctx)
+ assert.NoError(t, err)
+ assert.EqualValues(t, 0, cnt)
+
+ // push the first item
+ err = q.PushItem(ctx, []byte("foo"))
+ assert.NoError(t, err)
+
+ cnt, err = q.Len(ctx)
+ assert.NoError(t, err)
+ assert.EqualValues(t, 1, cnt)
+
+ // push a duplicate item
+ err = q.PushItem(ctx, []byte("foo"))
+ if !isUnique {
+ assert.NoError(t, err)
+ } else {
+ assert.ErrorIs(t, err, ErrAlreadyInQueue)
+ }
+
+ // check the duplicate item
+ cnt, err = q.Len(ctx)
+ assert.NoError(t, err)
+ has, err := q.HasItem(ctx, []byte("foo"))
+ assert.NoError(t, err)
+ if !isUnique {
+ assert.EqualValues(t, 2, cnt)
+ assert.EqualValues(t, false, has) // non-unique queues don't check for duplicates
+ } else {
+ assert.EqualValues(t, 1, cnt)
+ assert.EqualValues(t, true, has)
+ }
+
+ // push another item
+ err = q.PushItem(ctx, []byte("bar"))
+ assert.NoError(t, err)
+
+ // pop the first item (and the duplicate if non-unique)
+ it, err := q.PopItem(ctx)
+ assert.NoError(t, err)
+ assert.EqualValues(t, "foo", string(it))
+
+ if !isUnique {
+ it, err = q.PopItem(ctx)
+ assert.NoError(t, err)
+ assert.EqualValues(t, "foo", string(it))
+ }
+
+ // pop another item
+ it, err = q.PopItem(ctx)
+ assert.NoError(t, err)
+ assert.EqualValues(t, "bar", string(it))
+
+ // pop an empty queue (timeout, cancel)
+ ctxTimed, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
+ it, err = q.PopItem(ctxTimed)
+ assert.ErrorIs(t, err, context.DeadlineExceeded)
+ assert.Nil(t, it)
+ cancel()
+
+ ctxTimed, cancel = context.WithTimeout(ctx, 10*time.Millisecond)
+ cancel()
+ it, err = q.PopItem(ctxTimed)
+ assert.ErrorIs(t, err, context.Canceled)
+ assert.Nil(t, it)
+
+ // test blocking push if queue is full
+ for i := 0; i < cfg.Length; i++ {
+ err = q.PushItem(ctx, []byte(fmt.Sprintf("item-%d", i)))
+ assert.NoError(t, err)
+ }
+ ctxTimed, cancel = context.WithTimeout(ctx, 10*time.Millisecond)
+ err = q.PushItem(ctxTimed, []byte("item-full"))
+ assert.ErrorIs(t, err, context.DeadlineExceeded)
+ cancel()
+
+ // test blocking push if queue is full (with custom pushBlockTime)
+ oldPushBlockTime := pushBlockTime
+ timeStart := time.Now()
+ pushBlockTime = 30 * time.Millisecond
+ err = q.PushItem(ctx, []byte("item-full"))
+ assert.ErrorIs(t, err, context.DeadlineExceeded)
+ assert.True(t, time.Since(timeStart) >= pushBlockTime*2/3)
+ pushBlockTime = oldPushBlockTime
+
+ // remove all
+ cnt, err = q.Len(ctx)
+ assert.NoError(t, err)
+ assert.EqualValues(t, cfg.Length, cnt)
+
+ _ = q.RemoveAll(ctx)
+
+ cnt, err = q.Len(ctx)
+ assert.NoError(t, err)
+ assert.EqualValues(t, 0, cnt)
+ })
+}
+
+func TestBaseDummy(t *testing.T) {
+ q, err := newBaseDummy(&BaseConfig{}, true)
+ assert.NoError(t, err)
+
+ ctx := context.Background()
+ assert.NoError(t, q.PushItem(ctx, []byte("foo")))
+
+ cnt, err := q.Len(ctx)
+ assert.NoError(t, err)
+ assert.EqualValues(t, 0, cnt)
+
+ has, err := q.HasItem(ctx, []byte("foo"))
+ assert.NoError(t, err)
+ assert.False(t, has)
+
+ it, err := q.PopItem(ctx)
+ assert.NoError(t, err)
+ assert.Nil(t, it)
+
+ assert.NoError(t, q.RemoveAll(ctx))
+}
diff --git a/modules/queue/bytefifo.go b/modules/queue/bytefifo.go
deleted file mode 100644
index c33b79426e..0000000000
--- a/modules/queue/bytefifo.go
+++ /dev/null
@@ -1,69 +0,0 @@
-// Copyright 2020 The Gitea Authors. All rights reserved.
-// SPDX-License-Identifier: MIT
-
-package queue
-
-import "context"
-
-// ByteFIFO defines a FIFO that takes a byte array
-type ByteFIFO interface {
- // Len returns the length of the fifo
- Len(ctx context.Context) int64
- // PushFunc pushes data to the end of the fifo and calls the callback if it is added
- PushFunc(ctx context.Context, data []byte, fn func() error) error
- // Pop pops data from the start of the fifo
- Pop(ctx context.Context) ([]byte, error)
- // Close this fifo
- Close() error
- // PushBack pushes data back to the top of the fifo
- PushBack(ctx context.Context, data []byte) error
-}
-
-// UniqueByteFIFO defines a FIFO that Uniques its contents
-type UniqueByteFIFO interface {
- ByteFIFO
- // Has returns whether the fifo contains this data
- Has(ctx context.Context, data []byte) (bool, error)
-}
-
-var _ ByteFIFO = &DummyByteFIFO{}
-
-// DummyByteFIFO represents a dummy fifo
-type DummyByteFIFO struct{}
-
-// PushFunc returns nil
-func (*DummyByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error {
- return nil
-}
-
-// Pop returns nil
-func (*DummyByteFIFO) Pop(ctx context.Context) ([]byte, error) {
- return []byte{}, nil
-}
-
-// Close returns nil
-func (*DummyByteFIFO) Close() error {
- return nil
-}
-
-// Len is always 0
-func (*DummyByteFIFO) Len(ctx context.Context) int64 {
- return 0
-}
-
-// PushBack pushes data back to the top of the fifo
-func (*DummyByteFIFO) PushBack(ctx context.Context, data []byte) error {
- return nil
-}
-
-var _ UniqueByteFIFO = &DummyUniqueByteFIFO{}
-
-// DummyUniqueByteFIFO represents a dummy unique fifo
-type DummyUniqueByteFIFO struct {
- DummyByteFIFO
-}
-
-// Has always returns false
-func (*DummyUniqueByteFIFO) Has(ctx context.Context, data []byte) (bool, error) {
- return false, nil
-}
diff --git a/modules/queue/config.go b/modules/queue/config.go
new file mode 100644
index 0000000000..c5bc16b6f0
--- /dev/null
+++ b/modules/queue/config.go
@@ -0,0 +1,36 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package queue
+
+import (
+ "code.gitea.io/gitea/modules/setting"
+)
+
+type BaseConfig struct {
+ ManagedName string
+ DataFullDir string // the caller must prepare an absolute path
+
+ ConnStr string
+ Length int
+
+ QueueFullName, SetFullName string
+}
+
+func toBaseConfig(managedName string, queueSetting setting.QueueSettings) *BaseConfig {
+ baseConfig := &BaseConfig{
+ ManagedName: managedName,
+ DataFullDir: queueSetting.Datadir,
+
+ ConnStr: queueSetting.ConnStr,
+ Length: queueSetting.Length,
+ }
+
+ // queue name and set name
+ baseConfig.QueueFullName = managedName + queueSetting.QueueName
+ baseConfig.SetFullName = baseConfig.QueueFullName + queueSetting.SetName
+ if baseConfig.SetFullName == baseConfig.QueueFullName {
+ baseConfig.SetFullName += "_unique"
+ }
+ return baseConfig
+}
diff --git a/modules/queue/helper.go b/modules/queue/helper.go
deleted file mode 100644
index c6fb9447b7..0000000000
--- a/modules/queue/helper.go
+++ /dev/null
@@ -1,91 +0,0 @@
-// Copyright 2020 The Gitea Authors. All rights reserved.
-// SPDX-License-Identifier: MIT
-
-package queue
-
-import (
- "reflect"
-
- "code.gitea.io/gitea/modules/json"
-)
-
-// Mappable represents an interface that can MapTo another interface
-type Mappable interface {
- MapTo(v interface{}) error
-}
-
-// toConfig will attempt to convert a given configuration cfg into the provided exemplar type.
-//
-// It will tolerate the cfg being passed as a []byte or string of a json representation of the
-// exemplar or the correct type of the exemplar itself
-func toConfig(exemplar, cfg interface{}) (interface{}, error) {
- // First of all check if we've got the same type as the exemplar - if so it's all fine.
- if reflect.TypeOf(cfg).AssignableTo(reflect.TypeOf(exemplar)) {
- return cfg, nil
- }
-
- // Now if not - does it provide a MapTo function we can try?
- if mappable, ok := cfg.(Mappable); ok {
- newVal := reflect.New(reflect.TypeOf(exemplar))
- if err := mappable.MapTo(newVal.Interface()); err == nil {
- return newVal.Elem().Interface(), nil
- }
- // MapTo has failed us ... let's try the json route ...
- }
-
- // OK we've been passed a byte array right?
- configBytes, ok := cfg.([]byte)
- if !ok {
- // oh ... it's a string then?
- var configStr string
-
- configStr, ok = cfg.(string)
- configBytes = []byte(configStr)
- }
- if !ok {
- // hmm ... can we marshal it to json?
- var err error
- configBytes, err = json.Marshal(cfg)
- ok = err == nil
- }
- if !ok {
- // no ... we've tried hard enough at this point - throw an error!
- return nil, ErrInvalidConfiguration{cfg: cfg}
- }
-
- // OK unmarshal the byte array into a new copy of the exemplar
- newVal := reflect.New(reflect.TypeOf(exemplar))
- if err := json.Unmarshal(configBytes, newVal.Interface()); err != nil {
- // If we can't unmarshal it then return an error!
- return nil, ErrInvalidConfiguration{cfg: cfg, err: err}
- }
- return newVal.Elem().Interface(), nil
-}
-
-// unmarshalAs will attempt to unmarshal provided bytes as the provided exemplar
-func unmarshalAs(bs []byte, exemplar interface{}) (data Data, err error) {
- if exemplar != nil {
- t := reflect.TypeOf(exemplar)
- n := reflect.New(t)
- ne := n.Elem()
- err = json.Unmarshal(bs, ne.Addr().Interface())
- data = ne.Interface().(Data)
- } else {
- err = json.Unmarshal(bs, &data)
- }
- return data, err
-}
-
-// assignableTo will check if provided data is assignable to the same type as the exemplar
-// if the provided exemplar is nil then it will always return true
-func assignableTo(data Data, exemplar interface{}) bool {
- if exemplar == nil {
- return true
- }
-
- // Assert data is of same type as exemplar
- t := reflect.TypeOf(data)
- exemplarType := reflect.TypeOf(exemplar)
-
- return t.AssignableTo(exemplarType) && data != nil
-}
diff --git a/modules/queue/manager.go b/modules/queue/manager.go
index 6975e02907..03dbc72da4 100644
--- a/modules/queue/manager.go
+++ b/modules/queue/manager.go
@@ -5,457 +5,106 @@ package queue
import (
"context"
- "fmt"
- "reflect"
- "sort"
- "strings"
"sync"
"time"
- "code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/setting"
)
-var manager *Manager
-
-// Manager is a queue manager
+// Manager is a manager for the queues created by "CreateXxxQueue" functions, these queues are called "managed queues".
type Manager struct {
- mutex sync.Mutex
-
- counter int64
- Queues map[int64]*ManagedQueue
-}
+ mu sync.Mutex
-// ManagedQueue represents a working queue with a Pool of workers.
-//
-// Although a ManagedQueue should really represent a Queue this does not
-// necessarily have to be the case. This could be used to describe any queue.WorkerPool.
-type ManagedQueue struct {
- mutex sync.Mutex
- QID int64
- Type Type
- Name string
- Configuration interface{}
- ExemplarType string
- Managed interface{}
- counter int64
- PoolWorkers map[int64]*PoolWorkers
+ qidCounter int64
+ Queues map[int64]ManagedWorkerPoolQueue
}
-// Flushable represents a pool or queue that is flushable
-type Flushable interface {
- // Flush will add a flush worker to the pool - the worker should be autoregistered with the manager
- Flush(time.Duration) error
- // FlushWithContext is very similar to Flush
- // NB: The worker will not be registered with the manager.
- FlushWithContext(ctx context.Context) error
- // IsEmpty will return if the managed pool is empty and has no work
- IsEmpty() bool
-}
+type ManagedWorkerPoolQueue interface {
+ GetName() string
+ GetType() string
+ GetItemTypeName() string
+ GetWorkerNumber() int
+ GetWorkerActiveNumber() int
+ GetWorkerMaxNumber() int
+ SetWorkerMaxNumber(num int)
+ GetQueueItemNumber() int
-// Pausable represents a pool or queue that is Pausable
-type Pausable interface {
- // IsPaused will return if the pool or queue is paused
- IsPaused() bool
- // Pause will pause the pool or queue
- Pause()
- // Resume will resume the pool or queue
- Resume()
- // IsPausedIsResumed will return a bool indicating if the pool or queue is paused and a channel that will be closed when it is resumed
- IsPausedIsResumed() (paused, resumed <-chan struct{})
+ // FlushWithContext tries to make the handler process all items in the queue synchronously.
+ // It is for testing purpose only. It's not designed to be used in a cluster.
+ FlushWithContext(ctx context.Context, timeout time.Duration) error
}
-// ManagedPool is a simple interface to get certain details from a worker pool
-type ManagedPool interface {
- // AddWorkers adds a number of worker as group to the pool with the provided timeout. A CancelFunc is provided to cancel the group
- AddWorkers(number int, timeout time.Duration) context.CancelFunc
- // NumberOfWorkers returns the total number of workers in the pool
- NumberOfWorkers() int
- // MaxNumberOfWorkers returns the maximum number of workers the pool can dynamically grow to
- MaxNumberOfWorkers() int
- // SetMaxNumberOfWorkers sets the maximum number of workers the pool can dynamically grow to
- SetMaxNumberOfWorkers(int)
- // BoostTimeout returns the current timeout for worker groups created during a boost
- BoostTimeout() time.Duration
- // BlockTimeout returns the timeout the internal channel can block for before a boost would occur
- BlockTimeout() time.Duration
- // BoostWorkers sets the number of workers to be created during a boost
- BoostWorkers() int
- // SetPoolSettings sets the user updatable settings for the pool
- SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
- // NumberInQueue returns the total number of items in the pool
- NumberInQueue() int64
- // Done returns a channel that will be closed when the Pool's baseCtx is closed
- Done() <-chan struct{}
-}
-
-// ManagedQueueList implements the sort.Interface
-type ManagedQueueList []*ManagedQueue
-
-// PoolWorkers represents a group of workers working on a queue
-type PoolWorkers struct {
- PID int64
- Workers int
- Start time.Time
- Timeout time.Time
- HasTimeout bool
- Cancel context.CancelFunc
- IsFlusher bool
-}
-
-// PoolWorkersList implements the sort.Interface for PoolWorkers
-type PoolWorkersList []*PoolWorkers
+var manager *Manager
func init() {
- _ = GetManager()
+ manager = &Manager{
+ Queues: make(map[int64]ManagedWorkerPoolQueue),
+ }
}
-// GetManager returns a Manager and initializes one as singleton if there's none yet
func GetManager() *Manager {
- if manager == nil {
- manager = &Manager{
- Queues: make(map[int64]*ManagedQueue),
- }
- }
return manager
}
-// Add adds a queue to this manager
-func (m *Manager) Add(managed interface{},
- t Type,
- configuration,
- exemplar interface{},
-) int64 {
- cfg, _ := json.Marshal(configuration)
- mq := &ManagedQueue{
- Type: t,
- Configuration: string(cfg),
- ExemplarType: reflect.TypeOf(exemplar).String(),
- PoolWorkers: make(map[int64]*PoolWorkers),
- Managed: managed,
- }
- m.mutex.Lock()
- m.counter++
- mq.QID = m.counter
- mq.Name = fmt.Sprintf("queue-%d", mq.QID)
- if named, ok := managed.(Named); ok {
- name := named.Name()
- if len(name) > 0 {
- mq.Name = name
- }
- }
- m.Queues[mq.QID] = mq
- m.mutex.Unlock()
- log.Trace("Queue Manager registered: %s (QID: %d)", mq.Name, mq.QID)
- return mq.QID
+func (m *Manager) AddManagedQueue(managed ManagedWorkerPoolQueue) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.qidCounter++
+ m.Queues[m.qidCounter] = managed
}
-// Remove a queue from the Manager
-func (m *Manager) Remove(qid int64) {
- m.mutex.Lock()
- delete(m.Queues, qid)
- m.mutex.Unlock()
- log.Trace("Queue Manager removed: QID: %d", qid)
-}
-
-// GetManagedQueue by qid
-func (m *Manager) GetManagedQueue(qid int64) *ManagedQueue {
- m.mutex.Lock()
- defer m.mutex.Unlock()
+func (m *Manager) GetManagedQueue(qid int64) ManagedWorkerPoolQueue {
+ m.mu.Lock()
+ defer m.mu.Unlock()
return m.Queues[qid]
}
-// FlushAll flushes all the flushable queues attached to this manager
-func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error {
- var ctx context.Context
- var cancel context.CancelFunc
- start := time.Now()
- end := start
- hasTimeout := false
- if timeout > 0 {
- ctx, cancel = context.WithTimeout(baseCtx, timeout)
- end = start.Add(timeout)
- hasTimeout = true
- } else {
- ctx, cancel = context.WithCancel(baseCtx)
- }
- defer cancel()
-
- for {
- select {
- case <-ctx.Done():
- mqs := m.ManagedQueues()
- nonEmptyQueues := []string{}
- for _, mq := range mqs {
- if !mq.IsEmpty() {
- nonEmptyQueues = append(nonEmptyQueues, mq.Name)
- }
- }
- if len(nonEmptyQueues) > 0 {
- return fmt.Errorf("flush timeout with non-empty queues: %s", strings.Join(nonEmptyQueues, ", "))
- }
- return nil
- default:
- }
- mqs := m.ManagedQueues()
- log.Debug("Found %d Managed Queues", len(mqs))
- wg := sync.WaitGroup{}
- wg.Add(len(mqs))
- allEmpty := true
- for _, mq := range mqs {
- if mq.IsEmpty() {
- wg.Done()
- continue
- }
- if pausable, ok := mq.Managed.(Pausable); ok {
- // no point flushing paused queues
- if pausable.IsPaused() {
- wg.Done()
- continue
- }
- }
- if pool, ok := mq.Managed.(ManagedPool); ok {
- // No point into flushing pools when their base's ctx is already done.
- select {
- case <-pool.Done():
- wg.Done()
- continue
- default:
- }
- }
-
- allEmpty = false
- if flushable, ok := mq.Managed.(Flushable); ok {
- log.Debug("Flushing (flushable) queue: %s", mq.Name)
- go func(q *ManagedQueue) {
- localCtx, localCtxCancel := context.WithCancel(ctx)
- pid := q.RegisterWorkers(1, start, hasTimeout, end, localCtxCancel, true)
- err := flushable.FlushWithContext(localCtx)
- if err != nil && err != ctx.Err() {
- cancel()
- }
- q.CancelWorkers(pid)
- localCtxCancel()
- wg.Done()
- }(mq)
- } else {
- log.Debug("Queue: %s is non-empty but is not flushable", mq.Name)
- wg.Done()
- }
- }
- if allEmpty {
- log.Debug("All queues are empty")
- break
- }
- // Ensure there are always at least 100ms between loops but not more if we've actually been doing some flushing
- // but don't delay cancellation here.
- select {
- case <-ctx.Done():
- case <-time.After(100 * time.Millisecond):
- }
- wg.Wait()
- }
- return nil
-}
-
-// ManagedQueues returns the managed queues
-func (m *Manager) ManagedQueues() []*ManagedQueue {
- m.mutex.Lock()
- mqs := make([]*ManagedQueue, 0, len(m.Queues))
- for _, mq := range m.Queues {
- mqs = append(mqs, mq)
- }
- m.mutex.Unlock()
- sort.Sort(ManagedQueueList(mqs))
- return mqs
-}
-
-// Workers returns the poolworkers
-func (q *ManagedQueue) Workers() []*PoolWorkers {
- q.mutex.Lock()
- workers := make([]*PoolWorkers, 0, len(q.PoolWorkers))
- for _, worker := range q.PoolWorkers {
- workers = append(workers, worker)
- }
- q.mutex.Unlock()
- sort.Sort(PoolWorkersList(workers))
- return workers
-}
-
-// RegisterWorkers registers workers to this queue
-func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, cancel context.CancelFunc, isFlusher bool) int64 {
- q.mutex.Lock()
- defer q.mutex.Unlock()
- q.counter++
- q.PoolWorkers[q.counter] = &PoolWorkers{
- PID: q.counter,
- Workers: number,
- Start: start,
- Timeout: timeout,
- HasTimeout: hasTimeout,
- Cancel: cancel,
- IsFlusher: isFlusher,
- }
- return q.counter
-}
-
-// CancelWorkers cancels pooled workers with pid
-func (q *ManagedQueue) CancelWorkers(pid int64) {
- q.mutex.Lock()
- pw, ok := q.PoolWorkers[pid]
- q.mutex.Unlock()
- if !ok {
- return
- }
- pw.Cancel()
-}
-
-// RemoveWorkers deletes pooled workers with pid
-func (q *ManagedQueue) RemoveWorkers(pid int64) {
- q.mutex.Lock()
- pw, ok := q.PoolWorkers[pid]
- delete(q.PoolWorkers, pid)
- q.mutex.Unlock()
- if ok && pw.Cancel != nil {
- pw.Cancel()
- }
-}
+func (m *Manager) ManagedQueues() map[int64]ManagedWorkerPoolQueue {
+ m.mu.Lock()
+ defer m.mu.Unlock()
-// AddWorkers adds workers to the queue if it has registered an add worker function
-func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.CancelFunc {
- if pool, ok := q.Managed.(ManagedPool); ok {
- // the cancel will be added to the pool workers description above
- return pool.AddWorkers(number, timeout)
+ queues := make(map[int64]ManagedWorkerPoolQueue, len(m.Queues))
+ for k, v := range m.Queues {
+ queues[k] = v
}
- return nil
+ return queues
}
-// Flushable returns true if the queue is flushable
-func (q *ManagedQueue) Flushable() bool {
- _, ok := q.Managed.(Flushable)
- return ok
-}
-
-// Flush flushes the queue with a timeout
-func (q *ManagedQueue) Flush(timeout time.Duration) error {
- if flushable, ok := q.Managed.(Flushable); ok {
- // the cancel will be added to the pool workers description above
- return flushable.Flush(timeout)
- }
- return nil
-}
-
-// IsEmpty returns if the queue is empty
-func (q *ManagedQueue) IsEmpty() bool {
- if flushable, ok := q.Managed.(Flushable); ok {
- return flushable.IsEmpty()
- }
- return true
-}
-
-// Pausable returns whether the queue is Pausable
-func (q *ManagedQueue) Pausable() bool {
- _, ok := q.Managed.(Pausable)
- return ok
-}
-
-// Pause pauses the queue
-func (q *ManagedQueue) Pause() {
- if pausable, ok := q.Managed.(Pausable); ok {
- pausable.Pause()
- }
-}
-
-// IsPaused reveals if the queue is paused
-func (q *ManagedQueue) IsPaused() bool {
- if pausable, ok := q.Managed.(Pausable); ok {
- return pausable.IsPaused()
- }
- return false
-}
-
-// Resume resumes the queue
-func (q *ManagedQueue) Resume() {
- if pausable, ok := q.Managed.(Pausable); ok {
- pausable.Resume()
- }
-}
-
-// NumberOfWorkers returns the number of workers in the queue
-func (q *ManagedQueue) NumberOfWorkers() int {
- if pool, ok := q.Managed.(ManagedPool); ok {
- return pool.NumberOfWorkers()
- }
- return -1
-}
-
-// MaxNumberOfWorkers returns the maximum number of workers for the pool
-func (q *ManagedQueue) MaxNumberOfWorkers() int {
- if pool, ok := q.Managed.(ManagedPool); ok {
- return pool.MaxNumberOfWorkers()
+// FlushAll tries to make all managed queues process all items synchronously, until timeout or the queue is empty.
+// It is for testing purpose only. It's not designed to be used in a cluster.
+func (m *Manager) FlushAll(ctx context.Context, timeout time.Duration) error {
+ var finalErr error
+ qs := m.ManagedQueues()
+ for _, q := range qs {
+ if err := q.FlushWithContext(ctx, timeout); err != nil {
+ finalErr = err // TODO: in Go 1.20: errors.Join
+ }
}
- return 0
+ return finalErr
}
-// BoostWorkers returns the number of workers for a boost
-func (q *ManagedQueue) BoostWorkers() int {
- if pool, ok := q.Managed.(ManagedPool); ok {
- return pool.BoostWorkers()
- }
- return -1
+// CreateSimpleQueue creates a simple queue from global setting config provider by name
+func CreateSimpleQueue[T any](name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T] {
+ return createWorkerPoolQueue(name, setting.CfgProvider, handler, false)
}
-// BoostTimeout returns the timeout of the next boost
-func (q *ManagedQueue) BoostTimeout() time.Duration {
- if pool, ok := q.Managed.(ManagedPool); ok {
- return pool.BoostTimeout()
- }
- return 0
+// CreateUniqueQueue creates a unique queue from global setting config provider by name
+func CreateUniqueQueue[T any](name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T] {
+ return createWorkerPoolQueue(name, setting.CfgProvider, handler, true)
}
-// BlockTimeout returns the timeout til the next boost
-func (q *ManagedQueue) BlockTimeout() time.Duration {
- if pool, ok := q.Managed.(ManagedPool); ok {
- return pool.BlockTimeout()
+func createWorkerPoolQueue[T any](name string, cfgProvider setting.ConfigProvider, handler HandlerFuncT[T], unique bool) *WorkerPoolQueue[T] {
+ queueSetting, err := setting.GetQueueSettings(cfgProvider, name)
+ if err != nil {
+ log.Error("Failed to get queue settings for %q: %v", name, err)
+ return nil
}
- return 0
-}
-
-// SetPoolSettings sets the setable boost values
-func (q *ManagedQueue) SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) {
- if pool, ok := q.Managed.(ManagedPool); ok {
- pool.SetPoolSettings(maxNumberOfWorkers, boostWorkers, timeout)
+ w, err := NewWorkerPoolQueueBySetting(name, queueSetting, handler, unique)
+ if err != nil {
+ log.Error("Failed to create queue %q: %v", name, err)
+ return nil
}
-}
-
-// NumberInQueue returns the number of items in the queue
-func (q *ManagedQueue) NumberInQueue() int64 {
- if pool, ok := q.Managed.(ManagedPool); ok {
- return pool.NumberInQueue()
- }
- return -1
-}
-
-func (l ManagedQueueList) Len() int {
- return len(l)
-}
-
-func (l ManagedQueueList) Less(i, j int) bool {
- return l[i].Name < l[j].Name
-}
-
-func (l ManagedQueueList) Swap(i, j int) {
- l[i], l[j] = l[j], l[i]
-}
-
-func (l PoolWorkersList) Len() int {
- return len(l)
-}
-
-func (l PoolWorkersList) Less(i, j int) bool {
- return l[i].Start.Before(l[j].Start)
-}
-
-func (l PoolWorkersList) Swap(i, j int) {
- l[i], l[j] = l[j], l[i]
+ GetManager().AddManagedQueue(w)
+ return w
}
diff --git a/modules/queue/manager_test.go b/modules/queue/manager_test.go
new file mode 100644
index 0000000000..50265e27b6
--- /dev/null
+++ b/modules/queue/manager_test.go
@@ -0,0 +1,124 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package queue
+
+import (
+ "context"
+ "path/filepath"
+ "testing"
+
+ "code.gitea.io/gitea/modules/setting"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestManager(t *testing.T) {
+ oldAppDataPath := setting.AppDataPath
+ setting.AppDataPath = t.TempDir()
+ defer func() {
+ setting.AppDataPath = oldAppDataPath
+ }()
+
+ newQueueFromConfig := func(name, cfg string) (*WorkerPoolQueue[int], error) {
+ cfgProvider, err := setting.NewConfigProviderFromData(cfg)
+ if err != nil {
+ return nil, err
+ }
+ qs, err := setting.GetQueueSettings(cfgProvider, name)
+ if err != nil {
+ return nil, err
+ }
+ return NewWorkerPoolQueueBySetting(name, qs, func(s ...int) (unhandled []int) { return nil }, false)
+ }
+
+ // test invalid CONN_STR
+ _, err := newQueueFromConfig("default", `
+[queue]
+DATADIR = temp-dir
+CONN_STR = redis://
+`)
+ assert.ErrorContains(t, err, "invalid leveldb connection string")
+
+ // test default config
+ q, err := newQueueFromConfig("default", "")
+ assert.NoError(t, err)
+ assert.Equal(t, "default", q.GetName())
+ assert.Equal(t, "level", q.GetType())
+ assert.Equal(t, filepath.Join(setting.AppDataPath, "queues/common"), q.baseConfig.DataFullDir)
+ assert.Equal(t, 100, q.baseConfig.Length)
+ assert.Equal(t, 20, q.batchLength)
+ assert.Equal(t, "", q.baseConfig.ConnStr)
+ assert.Equal(t, "default_queue", q.baseConfig.QueueFullName)
+ assert.Equal(t, "default_queue_unique", q.baseConfig.SetFullName)
+ assert.Equal(t, 10, q.GetWorkerMaxNumber())
+ assert.Equal(t, 0, q.GetWorkerNumber())
+ assert.Equal(t, 0, q.GetWorkerActiveNumber())
+ assert.Equal(t, 0, q.GetQueueItemNumber())
+ assert.Equal(t, "int", q.GetItemTypeName())
+
+ // test inherited config
+ cfgProvider, err := setting.NewConfigProviderFromData(`
+[queue]
+TYPE = channel
+DATADIR = queues/dir1
+LENGTH = 100
+BATCH_LENGTH = 20
+CONN_STR = "addrs=127.0.0.1:6379 db=0"
+QUEUE_NAME = _queue1
+
+[queue.sub]
+TYPE = level
+DATADIR = queues/dir2
+LENGTH = 102
+BATCH_LENGTH = 22
+CONN_STR =
+QUEUE_NAME = _q2
+SET_NAME = _u2
+MAX_WORKERS = 2
+`)
+
+ assert.NoError(t, err)
+
+ q1 := createWorkerPoolQueue[string]("no-such", cfgProvider, nil, false)
+ assert.Equal(t, "no-such", q1.GetName())
+ assert.Equal(t, "dummy", q1.GetType()) // no handler, so it becomes dummy
+ assert.Equal(t, filepath.Join(setting.AppDataPath, "queues/dir1"), q1.baseConfig.DataFullDir)
+ assert.Equal(t, 100, q1.baseConfig.Length)
+ assert.Equal(t, 20, q1.batchLength)
+ assert.Equal(t, "addrs=127.0.0.1:6379 db=0", q1.baseConfig.ConnStr)
+ assert.Equal(t, "no-such_queue1", q1.baseConfig.QueueFullName)
+ assert.Equal(t, "no-such_queue1_unique", q1.baseConfig.SetFullName)
+ assert.Equal(t, 10, q1.GetWorkerMaxNumber())
+ assert.Equal(t, 0, q1.GetWorkerNumber())
+ assert.Equal(t, 0, q1.GetWorkerActiveNumber())
+ assert.Equal(t, 0, q1.GetQueueItemNumber())
+ assert.Equal(t, "string", q1.GetItemTypeName())
+ qid1 := GetManager().qidCounter
+
+ q2 := createWorkerPoolQueue("sub", cfgProvider, func(s ...int) (unhandled []int) { return nil }, false)
+ assert.Equal(t, "sub", q2.GetName())
+ assert.Equal(t, "level", q2.GetType())
+ assert.Equal(t, filepath.Join(setting.AppDataPath, "queues/dir2"), q2.baseConfig.DataFullDir)
+ assert.Equal(t, 102, q2.baseConfig.Length)
+ assert.Equal(t, 22, q2.batchLength)
+ assert.Equal(t, "", q2.baseConfig.ConnStr)
+ assert.Equal(t, "sub_q2", q2.baseConfig.QueueFullName)
+ assert.Equal(t, "sub_q2_u2", q2.baseConfig.SetFullName)
+ assert.Equal(t, 2, q2.GetWorkerMaxNumber())
+ assert.Equal(t, 0, q2.GetWorkerNumber())
+ assert.Equal(t, 0, q2.GetWorkerActiveNumber())
+ assert.Equal(t, 0, q2.GetQueueItemNumber())
+ assert.Equal(t, "int", q2.GetItemTypeName())
+ qid2 := GetManager().qidCounter
+
+ assert.Equal(t, q1, GetManager().ManagedQueues()[qid1])
+
+ GetManager().GetManagedQueue(qid1).SetWorkerMaxNumber(120)
+ assert.Equal(t, 120, q1.workerMaxNum)
+
+ stop := runWorkerPoolQueue(q2)
+ assert.NoError(t, GetManager().GetManagedQueue(qid2).FlushWithContext(context.Background(), 0))
+ assert.NoError(t, GetManager().FlushAll(context.Background(), 0))
+ stop()
+}
diff --git a/modules/queue/queue.go b/modules/queue/queue.go
index 22ee64f8e2..0ab8dd4ae4 100644
--- a/modules/queue/queue.go
+++ b/modules/queue/queue.go
@@ -1,201 +1,31 @@
-// Copyright 2019 The Gitea Authors. All rights reserved.
+// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
-package queue
-
-import (
- "context"
- "fmt"
- "time"
-)
-
-// ErrInvalidConfiguration is called when there is invalid configuration for a queue
-type ErrInvalidConfiguration struct {
- cfg interface{}
- err error
-}
-
-func (err ErrInvalidConfiguration) Error() string {
- if err.err != nil {
- return fmt.Sprintf("Invalid Configuration Argument: %v: Error: %v", err.cfg, err.err)
- }
- return fmt.Sprintf("Invalid Configuration Argument: %v", err.cfg)
-}
-
-// IsErrInvalidConfiguration checks if an error is an ErrInvalidConfiguration
-func IsErrInvalidConfiguration(err error) bool {
- _, ok := err.(ErrInvalidConfiguration)
- return ok
-}
-
-// Type is a type of Queue
-type Type string
-
-// Data defines an type of queuable data
-type Data interface{}
-
-// HandlerFunc is a function that takes a variable amount of data and processes it
-type HandlerFunc func(...Data) (unhandled []Data)
-
-// NewQueueFunc is a function that creates a queue
-type NewQueueFunc func(handler HandlerFunc, config, exemplar interface{}) (Queue, error)
-
-// Shutdownable represents a queue that can be shutdown
-type Shutdownable interface {
- Shutdown()
- Terminate()
-}
-
-// Named represents a queue with a name
-type Named interface {
- Name() string
-}
-
-// Queue defines an interface of a queue-like item
+// Package queue implements a specialized queue system for Gitea.
//
-// Queues will handle their own contents in the Run method
-type Queue interface {
- Flushable
- Run(atShutdown, atTerminate func(func()))
- Push(Data) error
-}
-
-// PushBackable queues can be pushed back to
-type PushBackable interface {
- // PushBack pushes data back to the top of the fifo
- PushBack(Data) error
-}
-
-// DummyQueueType is the type for the dummy queue
-const DummyQueueType Type = "dummy"
-
-// NewDummyQueue creates a new DummyQueue
-func NewDummyQueue(handler HandlerFunc, opts, exemplar interface{}) (Queue, error) {
- return &DummyQueue{}, nil
-}
-
-// DummyQueue represents an empty queue
-type DummyQueue struct{}
-
-// Run does nothing
-func (*DummyQueue) Run(_, _ func(func())) {}
-
-// Push fakes a push of data to the queue
-func (*DummyQueue) Push(Data) error {
- return nil
-}
-
-// PushFunc fakes a push of data to the queue with a function. The function is never run.
-func (*DummyQueue) PushFunc(Data, func() error) error {
- return nil
-}
-
-// Has always returns false as this queue never does anything
-func (*DummyQueue) Has(Data) (bool, error) {
- return false, nil
-}
-
-// Flush always returns nil
-func (*DummyQueue) Flush(time.Duration) error {
- return nil
-}
-
-// FlushWithContext always returns nil
-func (*DummyQueue) FlushWithContext(context.Context) error {
- return nil
-}
-
-// IsEmpty asserts that the queue is empty
-func (*DummyQueue) IsEmpty() bool {
- return true
-}
-
-// ImmediateType is the type to execute the function when push
-const ImmediateType Type = "immediate"
-
-// NewImmediate creates a new false queue to execute the function when push
-func NewImmediate(handler HandlerFunc, opts, exemplar interface{}) (Queue, error) {
- return &Immediate{
- handler: handler,
- }, nil
-}
-
-// Immediate represents an direct execution queue
-type Immediate struct {
- handler HandlerFunc
-}
-
-// Run does nothing
-func (*Immediate) Run(_, _ func(func())) {}
-
-// Push fakes a push of data to the queue
-func (q *Immediate) Push(data Data) error {
- return q.PushFunc(data, nil)
-}
-
-// PushFunc fakes a push of data to the queue with a function. The function is never run.
-func (q *Immediate) PushFunc(data Data, f func() error) error {
- if f != nil {
- if err := f(); err != nil {
- return err
- }
- }
- q.handler(data)
- return nil
-}
-
-// Has always returns false as this queue never does anything
-func (*Immediate) Has(Data) (bool, error) {
- return false, nil
-}
-
-// Flush always returns nil
-func (*Immediate) Flush(time.Duration) error {
- return nil
-}
-
-// FlushWithContext always returns nil
-func (*Immediate) FlushWithContext(context.Context) error {
- return nil
-}
-
-// IsEmpty asserts that the queue is empty
-func (*Immediate) IsEmpty() bool {
- return true
-}
-
-var queuesMap = map[Type]NewQueueFunc{
- DummyQueueType: NewDummyQueue,
- ImmediateType: NewImmediate,
-}
+// There are two major kinds of concepts:
+//
+// * The "base queue": channel, level, redis:
+// - They have the same abstraction, the same interface, and they are tested by the same testing code.
+// - The dummy(immediate) queue is special, it's not a real queue, it's only used as a no-op queue or a testing queue.
+//
+// * The WorkerPoolQueue: it uses the "base queue" to provide "worker pool" function.
+// - It calls the "handler" to process the data in the base queue.
+// - Its "Push" function doesn't block forever,
+// it will return an error if the queue is full after the timeout.
+//
+// A queue can be "simple" or "unique". A unique queue will try to avoid duplicate items.
+// Unique queue's "Has" function can be used to check whether an item is already in the queue,
+// although it's not 100% reliable due to there is no proper transaction support.
+// Simple queue's "Has" function always returns "has=false".
+//
+// The HandlerFuncT function is called by the WorkerPoolQueue to process the data in the base queue.
+// If the handler returns "unhandled" items, they will be re-queued to the base queue after a slight delay,
+// in case the item processor (eg: document indexer) is not available.
+package queue
-// RegisteredTypes provides the list of requested types of queues
-func RegisteredTypes() []Type {
- types := make([]Type, len(queuesMap))
- i := 0
- for key := range queuesMap {
- types[i] = key
- i++
- }
- return types
-}
+import "code.gitea.io/gitea/modules/util"
-// RegisteredTypesAsString provides the list of requested types of queues
-func RegisteredTypesAsString() []string {
- types := make([]string, len(queuesMap))
- i := 0
- for key := range queuesMap {
- types[i] = string(key)
- i++
- }
- return types
-}
+type HandlerFuncT[T any] func(...T) (unhandled []T)
-// NewQueue takes a queue Type, HandlerFunc, some options and possibly an exemplar and returns a Queue or an error
-func NewQueue(queueType Type, handlerFunc HandlerFunc, opts, exemplar interface{}) (Queue, error) {
- newFn, ok := queuesMap[queueType]
- if !ok {
- return nil, fmt.Errorf("unsupported queue type: %v", queueType)
- }
- return newFn(handlerFunc, opts, exemplar)
-}
+var ErrAlreadyInQueue = util.NewAlreadyExistErrorf("already in queue")
diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go
deleted file mode 100644
index ee00a5428a..0000000000
--- a/modules/queue/queue_bytefifo.go
+++ /dev/null
@@ -1,419 +0,0 @@
-// Copyright 2020 The Gitea Authors. All rights reserved.
-// SPDX-License-Identifier: MIT
-
-package queue
-
-import (
- "context"
- "fmt"
- "runtime/pprof"
- "sync"
- "sync/atomic"
- "time"
-
- "code.gitea.io/gitea/modules/json"
- "code.gitea.io/gitea/modules/log"
- "code.gitea.io/gitea/modules/util"
-)
-
-// ByteFIFOQueueConfiguration is the configuration for a ByteFIFOQueue
-type ByteFIFOQueueConfiguration struct {
- WorkerPoolConfiguration
- Workers int
- WaitOnEmpty bool
-}
-
-var _ Queue = &ByteFIFOQueue{}
-
-// ByteFIFOQueue is a Queue formed from a ByteFIFO and WorkerPool
-type ByteFIFOQueue struct {
- *WorkerPool
- byteFIFO ByteFIFO
- typ Type
- shutdownCtx context.Context
- shutdownCtxCancel context.CancelFunc
- terminateCtx context.Context
- terminateCtxCancel context.CancelFunc
- exemplar interface{}
- workers int
- name string
- lock sync.Mutex
- waitOnEmpty bool
- pushed chan struct{}
-}
-
-// NewByteFIFOQueue creates a new ByteFIFOQueue
-func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exemplar interface{}) (*ByteFIFOQueue, error) {
- configInterface, err := toConfig(ByteFIFOQueueConfiguration{}, cfg)
- if err != nil {
- return nil, err
- }
- config := configInterface.(ByteFIFOQueueConfiguration)
-
- terminateCtx, terminateCtxCancel := context.WithCancel(context.Background())
- shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
-
- q := &ByteFIFOQueue{
- byteFIFO: byteFIFO,
- typ: typ,
- shutdownCtx: shutdownCtx,
- shutdownCtxCancel: shutdownCtxCancel,
- terminateCtx: terminateCtx,
- terminateCtxCancel: terminateCtxCancel,
- exemplar: exemplar,
- workers: config.Workers,
- name: config.Name,
- waitOnEmpty: config.WaitOnEmpty,
- pushed: make(chan struct{}, 1),
- }
- q.WorkerPool = NewWorkerPool(func(data ...Data) (failed []Data) {
- for _, unhandled := range handle(data...) {
- if fail := q.PushBack(unhandled); fail != nil {
- failed = append(failed, fail)
- }
- }
- return failed
- }, config.WorkerPoolConfiguration)
-
- return q, nil
-}
-
-// Name returns the name of this queue
-func (q *ByteFIFOQueue) Name() string {
- return q.name
-}
-
-// Push pushes data to the fifo
-func (q *ByteFIFOQueue) Push(data Data) error {
- return q.PushFunc(data, nil)
-}
-
-// PushBack pushes data to the fifo
-func (q *ByteFIFOQueue) PushBack(data Data) error {
- if !assignableTo(data, q.exemplar) {
- return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
- }
- bs, err := json.Marshal(data)
- if err != nil {
- return err
- }
- defer func() {
- select {
- case q.pushed <- struct{}{}:
- default:
- }
- }()
- return q.byteFIFO.PushBack(q.terminateCtx, bs)
-}
-
-// PushFunc pushes data to the fifo
-func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error {
- if !assignableTo(data, q.exemplar) {
- return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
- }
- bs, err := json.Marshal(data)
- if err != nil {
- return err
- }
- defer func() {
- select {
- case q.pushed <- struct{}{}:
- default:
- }
- }()
- return q.byteFIFO.PushFunc(q.terminateCtx, bs, fn)
-}
-
-// IsEmpty checks if the queue is empty
-func (q *ByteFIFOQueue) IsEmpty() bool {
- q.lock.Lock()
- defer q.lock.Unlock()
- if !q.WorkerPool.IsEmpty() {
- return false
- }
- return q.byteFIFO.Len(q.terminateCtx) == 0
-}
-
-// NumberInQueue returns the number in the queue
-func (q *ByteFIFOQueue) NumberInQueue() int64 {
- q.lock.Lock()
- defer q.lock.Unlock()
- return q.byteFIFO.Len(q.terminateCtx) + q.WorkerPool.NumberInQueue()
-}
-
-// Flush flushes the ByteFIFOQueue
-func (q *ByteFIFOQueue) Flush(timeout time.Duration) error {
- select {
- case q.pushed <- struct{}{}:
- default:
- }
- return q.WorkerPool.Flush(timeout)
-}
-
-// Run runs the bytefifo queue
-func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(func())) {
- pprof.SetGoroutineLabels(q.baseCtx)
- atShutdown(q.Shutdown)
- atTerminate(q.Terminate)
- log.Debug("%s: %s Starting", q.typ, q.name)
-
- _ = q.AddWorkers(q.workers, 0)
-
- log.Trace("%s: %s Now running", q.typ, q.name)
- q.readToChan()
-
- <-q.shutdownCtx.Done()
- log.Trace("%s: %s Waiting til done", q.typ, q.name)
- q.Wait()
-
- log.Trace("%s: %s Waiting til cleaned", q.typ, q.name)
- q.CleanUp(q.terminateCtx)
- q.terminateCtxCancel()
-}
-
-const maxBackOffTime = time.Second * 3
-
-func (q *ByteFIFOQueue) readToChan() {
- // handle quick cancels
- select {
- case <-q.shutdownCtx.Done():
- // tell the pool to shutdown.
- q.baseCtxCancel()
- return
- default:
- }
-
- // Default backoff values
- backOffTime := time.Millisecond * 100
- backOffTimer := time.NewTimer(0)
- util.StopTimer(backOffTimer)
-
- paused, _ := q.IsPausedIsResumed()
-
-loop:
- for {
- select {
- case <-paused:
- log.Trace("Queue %s pausing", q.name)
- _, resumed := q.IsPausedIsResumed()
-
- select {
- case <-resumed:
- paused, _ = q.IsPausedIsResumed()
- log.Trace("Queue %s resuming", q.name)
- if q.HasNoWorkerScaling() {
- log.Warn(
- "Queue: %s is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n"+
- "The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required.", q.name)
- q.Pause()
- continue loop
- }
- case <-q.shutdownCtx.Done():
- // tell the pool to shutdown.
- q.baseCtxCancel()
- return
- case data, ok := <-q.dataChan:
- if !ok {
- return
- }
- if err := q.PushBack(data); err != nil {
- log.Error("Unable to push back data into queue %s", q.name)
- }
- atomic.AddInt64(&q.numInQueue, -1)
- }
- default:
- }
-
- // empty the pushed channel
- select {
- case <-q.pushed:
- default:
- }
-
- err := q.doPop()
-
- util.StopTimer(backOffTimer)
-
- if err != nil {
- if err == errQueueEmpty && q.waitOnEmpty {
- log.Trace("%s: %s Waiting on Empty", q.typ, q.name)
-
- // reset the backoff time but don't set the timer
- backOffTime = 100 * time.Millisecond
- } else if err == errUnmarshal {
- // reset the timer and backoff
- backOffTime = 100 * time.Millisecond
- backOffTimer.Reset(backOffTime)
- } else {
- // backoff
- backOffTimer.Reset(backOffTime)
- }
-
- // Need to Backoff
- select {
- case <-q.shutdownCtx.Done():
- // Oops we've been shutdown whilst backing off
- // Make sure the worker pool is shutdown too
- q.baseCtxCancel()
- return
- case <-q.pushed:
- // Data has been pushed to the fifo (or flush has been called)
- // reset the backoff time
- backOffTime = 100 * time.Millisecond
- continue loop
- case <-backOffTimer.C:
- // Calculate the next backoff time
- backOffTime += backOffTime / 2
- if backOffTime > maxBackOffTime {
- backOffTime = maxBackOffTime
- }
- continue loop
- }
- }
-
- // Reset the backoff time
- backOffTime = 100 * time.Millisecond
-
- select {
- case <-q.shutdownCtx.Done():
- // Oops we've been shutdown
- // Make sure the worker pool is shutdown too
- q.baseCtxCancel()
- return
- default:
- continue loop
- }
- }
-}
-
-var (
- errQueueEmpty = fmt.Errorf("empty queue")
- errEmptyBytes = fmt.Errorf("empty bytes")
- errUnmarshal = fmt.Errorf("failed to unmarshal")
-)
-
-func (q *ByteFIFOQueue) doPop() error {
- q.lock.Lock()
- defer q.lock.Unlock()
- bs, err := q.byteFIFO.Pop(q.shutdownCtx)
- if err != nil {
- if err == context.Canceled {
- q.baseCtxCancel()
- return err
- }
- log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err)
- return err
- }
- if len(bs) == 0 {
- if q.waitOnEmpty && q.byteFIFO.Len(q.shutdownCtx) == 0 {
- return errQueueEmpty
- }
- return errEmptyBytes
- }
-
- data, err := unmarshalAs(bs, q.exemplar)
- if err != nil {
- log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err)
- return errUnmarshal
- }
-
- log.Trace("%s %s: Task found: %#v", q.typ, q.name, data)
- q.WorkerPool.Push(data)
- return nil
-}
-
-// Shutdown processing from this queue
-func (q *ByteFIFOQueue) Shutdown() {
- log.Trace("%s: %s Shutting down", q.typ, q.name)
- select {
- case <-q.shutdownCtx.Done():
- return
- default:
- }
- q.shutdownCtxCancel()
- log.Debug("%s: %s Shutdown", q.typ, q.name)
-}
-
-// IsShutdown returns a channel which is closed when this Queue is shutdown
-func (q *ByteFIFOQueue) IsShutdown() <-chan struct{} {
- return q.shutdownCtx.Done()
-}
-
-// Terminate this queue and close the queue
-func (q *ByteFIFOQueue) Terminate() {
- log.Trace("%s: %s Terminating", q.typ, q.name)
- q.Shutdown()
- select {
- case <-q.terminateCtx.Done():
- return
- default:
- }
- if log.IsDebug() {
- log.Debug("%s: %s Closing with %d tasks left in queue", q.typ, q.name, q.byteFIFO.Len(q.terminateCtx))
- }
- q.terminateCtxCancel()
- if err := q.byteFIFO.Close(); err != nil {
- log.Error("Error whilst closing internal byte fifo in %s: %s: %v", q.typ, q.name, err)
- }
- q.baseCtxFinished()
- log.Debug("%s: %s Terminated", q.typ, q.name)
-}
-
-// IsTerminated returns a channel which is closed when this Queue is terminated
-func (q *ByteFIFOQueue) IsTerminated() <-chan struct{} {
- return q.terminateCtx.Done()
-}
-
-var _ UniqueQueue = &ByteFIFOUniqueQueue{}
-
-// ByteFIFOUniqueQueue represents a UniqueQueue formed from a UniqueByteFifo
-type ByteFIFOUniqueQueue struct {
- ByteFIFOQueue
-}
-
-// NewByteFIFOUniqueQueue creates a new ByteFIFOUniqueQueue
-func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFunc, cfg, exemplar interface{}) (*ByteFIFOUniqueQueue, error) {
- configInterface, err := toConfig(ByteFIFOQueueConfiguration{}, cfg)
- if err != nil {
- return nil, err
- }
- config := configInterface.(ByteFIFOQueueConfiguration)
- terminateCtx, terminateCtxCancel := context.WithCancel(context.Background())
- shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
-
- q := &ByteFIFOUniqueQueue{
- ByteFIFOQueue: ByteFIFOQueue{
- byteFIFO: byteFIFO,
- typ: typ,
- shutdownCtx: shutdownCtx,
- shutdownCtxCancel: shutdownCtxCancel,
- terminateCtx: terminateCtx,
- terminateCtxCancel: terminateCtxCancel,
- exemplar: exemplar,
- workers: config.Workers,
- name: config.Name,
- },
- }
- q.WorkerPool = NewWorkerPool(func(data ...Data) (failed []Data) {
- for _, unhandled := range handle(data...) {
- if fail := q.PushBack(unhandled); fail != nil {
- failed = append(failed, fail)
- }
- }
- return failed
- }, config.WorkerPoolConfiguration)
-
- return q, nil
-}
-
-// Has checks if the provided data is in the queue
-func (q *ByteFIFOUniqueQueue) Has(data Data) (bool, error) {
- if !assignableTo(data, q.exemplar) {
- return false, fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
- }
- bs, err := json.Marshal(data)
- if err != nil {
- return false, err
- }
- return q.byteFIFO.(UniqueByteFIFO).Has(q.terminateCtx, bs)
-}
diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go
deleted file mode 100644
index baac097393..0000000000
--- a/modules/queue/queue_channel.go
+++ /dev/null
@@ -1,160 +0,0 @@
-// Copyright 2019 The Gitea Authors. All rights reserved.
-// SPDX-License-Identifier: MIT
-
-package queue
-
-import (
- "context"
- "fmt"
- "runtime/pprof"
- "sync/atomic"
- "time"
-
- "code.gitea.io/gitea/modules/log"
-)
-
-// ChannelQueueType is the type for channel queue
-const ChannelQueueType Type = "channel"
-
-// ChannelQueueConfiguration is the configuration for a ChannelQueue
-type ChannelQueueConfiguration struct {
- WorkerPoolConfiguration
- Workers int
-}
-
-// ChannelQueue implements Queue
-//
-// A channel queue is not persistable and does not shutdown or terminate cleanly
-// It is basically a very thin wrapper around a WorkerPool
-type ChannelQueue struct {
- *WorkerPool
- shutdownCtx context.Context
- shutdownCtxCancel context.CancelFunc
- terminateCtx context.Context
- terminateCtxCancel context.CancelFunc
- exemplar interface{}
- workers int
- name string
-}
-
-// NewChannelQueue creates a memory channel queue
-func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
- configInterface, err := toConfig(ChannelQueueConfiguration{}, cfg)
- if err != nil {
- return nil, err
- }
- config := configInterface.(ChannelQueueConfiguration)
- if config.BatchLength == 0 {
- config.BatchLength = 1
- }
-
- terminateCtx, terminateCtxCancel := context.WithCancel(context.Background())
- shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
-
- queue := &ChannelQueue{
- shutdownCtx: shutdownCtx,
- shutdownCtxCancel: shutdownCtxCancel,
- terminateCtx: terminateCtx,
- terminateCtxCancel: terminateCtxCancel,
- exemplar: exemplar,
- workers: config.Workers,
- name: config.Name,
- }
- queue.WorkerPool = NewWorkerPool(func(data ...Data) []Data {
- unhandled := handle(data...)
- if len(unhandled) > 0 {
- // We can only pushback to the channel if we're paused.
- if queue.IsPaused() {
- atomic.AddInt64(&queue.numInQueue, int64(len(unhandled)))
- go func() {
- for _, datum := range data {
- queue.dataChan <- datum
- }
- }()
- return nil
- }
- }
- return unhandled
- }, config.WorkerPoolConfiguration)
-
- queue.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar)
- return queue, nil
-}
-
-// Run starts to run the queue
-func (q *ChannelQueue) Run(atShutdown, atTerminate func(func())) {
- pprof.SetGoroutineLabels(q.baseCtx)
- atShutdown(q.Shutdown)
- atTerminate(q.Terminate)
- log.Debug("ChannelQueue: %s Starting", q.name)
- _ = q.AddWorkers(q.workers, 0)
-}
-
-// Push will push data into the queue
-func (q *ChannelQueue) Push(data Data) error {
- if !assignableTo(data, q.exemplar) {
- return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
- }
- q.WorkerPool.Push(data)
- return nil
-}
-
-// Flush flushes the channel with a timeout - the Flush worker will be registered as a flush worker with the manager
-func (q *ChannelQueue) Flush(timeout time.Duration) error {
- if q.IsPaused() {
- return nil
- }
- ctx, cancel := q.commonRegisterWorkers(1, timeout, true)
- defer cancel()
- return q.FlushWithContext(ctx)
-}
-
-// Shutdown processing from this queue
-func (q *ChannelQueue) Shutdown() {
- q.lock.Lock()
- defer q.lock.Unlock()
- select {
- case <-q.shutdownCtx.Done():
- log.Trace("ChannelQueue: %s Already Shutting down", q.name)
- return
- default:
- }
- log.Trace("ChannelQueue: %s Shutting down", q.name)
- go func() {
- log.Trace("ChannelQueue: %s Flushing", q.name)
- // We can't use Cleanup here because that will close the channel
- if err := q.FlushWithContext(q.terminateCtx); err != nil {
- count := atomic.LoadInt64(&q.numInQueue)
- if count > 0 {
- log.Warn("ChannelQueue: %s Terminated before completed flushing", q.name)
- }
- return
- }
- log.Debug("ChannelQueue: %s Flushed", q.name)
- }()
- q.shutdownCtxCancel()
- log.Debug("ChannelQueue: %s Shutdown", q.name)
-}
-
-// Terminate this queue and close the queue
-func (q *ChannelQueue) Terminate() {
- log.Trace("ChannelQueue: %s Terminating", q.name)
- q.Shutdown()
- select {
- case <-q.terminateCtx.Done():
- return
- default:
- }
- q.terminateCtxCancel()
- q.baseCtxFinished()
- log.Debug("ChannelQueue: %s Terminated", q.name)
-}
-
-// Name returns the name of this queue
-func (q *ChannelQueue) Name() string {
- return q.name
-}
-
-func init() {
- queuesMap[ChannelQueueType] = NewChannelQueue
-}
diff --git a/modules/queue/queue_channel_test.go b/modules/queue/queue_channel_test.go
deleted file mode 100644
index f9dae742e2..0000000000
--- a/modules/queue/queue_channel_test.go
+++ /dev/null
@@ -1,315 +0,0 @@
-// Copyright 2019 The Gitea Authors. All rights reserved.
-// SPDX-License-Identifier: MIT
-
-package queue
-
-import (
- "os"
- "sync"
- "testing"
- "time"
-
- "code.gitea.io/gitea/modules/log"
-
- "github.com/stretchr/testify/assert"
-)
-
-func TestChannelQueue(t *testing.T) {
- handleChan := make(chan *testData)
- handle := func(data ...Data) []Data {
- for _, datum := range data {
- testDatum := datum.(*testData)
- handleChan <- testDatum
- }
- return nil
- }
-
- nilFn := func(_ func()) {}
-
- queue, err := NewChannelQueue(handle,
- ChannelQueueConfiguration{
- WorkerPoolConfiguration: WorkerPoolConfiguration{
- QueueLength: 0,
- MaxWorkers: 10,
- BlockTimeout: 1 * time.Second,
- BoostTimeout: 5 * time.Minute,
- BoostWorkers: 5,
- Name: "TestChannelQueue",
- },
- Workers: 0,
- }, &testData{})
- assert.NoError(t, err)
-
- assert.Equal(t, 5, queue.(*ChannelQueue).WorkerPool.boostWorkers)
-
- go queue.Run(nilFn, nilFn)
-
- test1 := testData{"A", 1}
- go queue.Push(&test1)
- result1 := <-handleChan
- assert.Equal(t, test1.TestString, result1.TestString)
- assert.Equal(t, test1.TestInt, result1.TestInt)
-
- err = queue.Push(test1)
- assert.Error(t, err)
-}
-
-func TestChannelQueue_Batch(t *testing.T) {
- handleChan := make(chan *testData)
- handle := func(data ...Data) []Data {
- assert.True(t, len(data) == 2)
- for _, datum := range data {
- testDatum := datum.(*testData)
- handleChan <- testDatum
- }
- return nil
- }
-
- nilFn := func(_ func()) {}
-
- queue, err := NewChannelQueue(handle,
- ChannelQueueConfiguration{
- WorkerPoolConfiguration: WorkerPoolConfiguration{
- QueueLength: 20,
- BatchLength: 2,
- BlockTimeout: 0,
- BoostTimeout: 0,
- BoostWorkers: 0,
- MaxWorkers: 10,
- },
- Workers: 1,
- }, &testData{})
- assert.NoError(t, err)
-
- go queue.Run(nilFn, nilFn)
-
- test1 := testData{"A", 1}
- test2 := testData{"B", 2}
-
- queue.Push(&test1)
- go queue.Push(&test2)
-
- result1 := <-handleChan
- assert.Equal(t, test1.TestString, result1.TestString)
- assert.Equal(t, test1.TestInt, result1.TestInt)
-
- result2 := <-handleChan
- assert.Equal(t, test2.TestString, result2.TestString)
- assert.Equal(t, test2.TestInt, result2.TestInt)
-
- err = queue.Push(test1)
- assert.Error(t, err)
-}
-
-func TestChannelQueue_Pause(t *testing.T) {
- if os.Getenv("CI") != "" {
- t.Skip("Skipping because test is flaky on CI")
- }
- lock := sync.Mutex{}
- var queue Queue
- var err error
- pushBack := false
- handleChan := make(chan *testData)
- handle := func(data ...Data) []Data {
- lock.Lock()
- if pushBack {
- if pausable, ok := queue.(Pausable); ok {
- pausable.Pause()
- }
- lock.Unlock()
- return data
- }
- lock.Unlock()
-
- for _, datum := range data {
- testDatum := datum.(*testData)
- handleChan <- testDatum
- }
- return nil
- }
-
- queueShutdown := []func(){}
- queueTerminate := []func(){}
-
- terminated := make(chan struct{})
-
- queue, err = NewChannelQueue(handle,
- ChannelQueueConfiguration{
- WorkerPoolConfiguration: WorkerPoolConfiguration{
- QueueLength: 20,
- BatchLength: 1,
- BlockTimeout: 0,
- BoostTimeout: 0,
- BoostWorkers: 0,
- MaxWorkers: 10,
- },
- Workers: 1,
- }, &testData{})
- assert.NoError(t, err)
-
- go func() {
- queue.Run(func(shutdown func()) {
- lock.Lock()
- defer lock.Unlock()
- queueShutdown = append(queueShutdown, shutdown)
- }, func(terminate func()) {
- lock.Lock()
- defer lock.Unlock()
- queueTerminate = append(queueTerminate, terminate)
- })
- close(terminated)
- }()
-
- // Shutdown and Terminate in defer
- defer func() {
- lock.Lock()
- callbacks := make([]func(), len(queueShutdown))
- copy(callbacks, queueShutdown)
- lock.Unlock()
- for _, callback := range callbacks {
- callback()
- }
- lock.Lock()
- log.Info("Finally terminating")
- callbacks = make([]func(), len(queueTerminate))
- copy(callbacks, queueTerminate)
- lock.Unlock()
- for _, callback := range callbacks {
- callback()
- }
- }()
-
- test1 := testData{"A", 1}
- test2 := testData{"B", 2}
- queue.Push(&test1)
-
- pausable, ok := queue.(Pausable)
- if !assert.True(t, ok) {
- return
- }
- result1 := <-handleChan
- assert.Equal(t, test1.TestString, result1.TestString)
- assert.Equal(t, test1.TestInt, result1.TestInt)
-
- pausable.Pause()
-
- paused, _ := pausable.IsPausedIsResumed()
-
- select {
- case <-paused:
- case <-time.After(100 * time.Millisecond):
- assert.Fail(t, "Queue is not paused")
- return
- }
-
- queue.Push(&test2)
-
- var result2 *testData
- select {
- case result2 = <-handleChan:
- assert.Fail(t, "handler chan should be empty")
- case <-time.After(100 * time.Millisecond):
- }
-
- assert.Nil(t, result2)
-
- pausable.Resume()
- _, resumed := pausable.IsPausedIsResumed()
-
- select {
- case <-resumed:
- case <-time.After(100 * time.Millisecond):
- assert.Fail(t, "Queue should be resumed")
- }
-
- select {
- case result2 = <-handleChan:
- case <-time.After(500 * time.Millisecond):
- assert.Fail(t, "handler chan should contain test2")
- }
-
- assert.Equal(t, test2.TestString, result2.TestString)
- assert.Equal(t, test2.TestInt, result2.TestInt)
-
- lock.Lock()
- pushBack = true
- lock.Unlock()
-
- _, resumed = pausable.IsPausedIsResumed()
-
- select {
- case <-resumed:
- case <-time.After(100 * time.Millisecond):
- assert.Fail(t, "Queue is not resumed")
- return
- }
-
- queue.Push(&test1)
- paused, _ = pausable.IsPausedIsResumed()
-
- select {
- case <-paused:
- case <-handleChan:
- assert.Fail(t, "handler chan should not contain test1")
- return
- case <-time.After(100 * time.Millisecond):
- assert.Fail(t, "queue should be paused")
- return
- }
-
- lock.Lock()
- pushBack = false
- lock.Unlock()
-
- paused, _ = pausable.IsPausedIsResumed()
-
- select {
- case <-paused:
- case <-time.After(100 * time.Millisecond):
- assert.Fail(t, "Queue is not paused")
- return
- }
-
- pausable.Resume()
- _, resumed = pausable.IsPausedIsResumed()
-
- select {
- case <-resumed:
- case <-time.After(100 * time.Millisecond):
- assert.Fail(t, "Queue should be resumed")
- }
-
- select {
- case result1 = <-handleChan:
- case <-time.After(500 * time.Millisecond):
- assert.Fail(t, "handler chan should contain test1")
- }
- assert.Equal(t, test1.TestString, result1.TestString)
- assert.Equal(t, test1.TestInt, result1.TestInt)
-
- lock.Lock()
- callbacks := make([]func(), len(queueShutdown))
- copy(callbacks, queueShutdown)
- queueShutdown = queueShutdown[:0]
- lock.Unlock()
- // Now shutdown the queue
- for _, callback := range callbacks {
- callback()
- }
-
- // terminate the queue
- lock.Lock()
- callbacks = make([]func(), len(queueTerminate))
- copy(callbacks, queueTerminate)
- queueShutdown = queueTerminate[:0]
- lock.Unlock()
- for _, callback := range callbacks {
- callback()
- }
- select {
- case <-terminated:
- case <-time.After(10 * time.Second):
- assert.Fail(t, "Queue should have terminated")
- return
- }
-}
diff --git a/modules/queue/queue_disk.go b/modules/queue/queue_disk.go
deleted file mode 100644
index fbedb8e5b9..0000000000
--- a/modules/queue/queue_disk.go
+++ /dev/null
@@ -1,124 +0,0 @@
-// Copyright 2019 The Gitea Authors. All rights reserved.
-// SPDX-License-Identifier: MIT
-
-package queue
-
-import (
- "context"
-
- "code.gitea.io/gitea/modules/nosql"
-
- "gitea.com/lunny/levelqueue"
-)
-
-// LevelQueueType is the type for level queue
-const LevelQueueType Type = "level"
-
-// LevelQueueConfiguration is the configuration for a LevelQueue
-type LevelQueueConfiguration struct {
- ByteFIFOQueueConfiguration
- DataDir string
- ConnectionString string
- QueueName string
-}
-
-// LevelQueue implements a disk library queue
-type LevelQueue struct {
- *ByteFIFOQueue
-}
-
-// NewLevelQueue creates a ledis local queue
-func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
- configInterface, err := toConfig(LevelQueueConfiguration{}, cfg)
- if err != nil {
- return nil, err
- }
- config := configInterface.(LevelQueueConfiguration)
-
- if len(config.ConnectionString) == 0 {
- config.ConnectionString = config.DataDir
- }
- config.WaitOnEmpty = true
-
- byteFIFO, err := NewLevelQueueByteFIFO(config.ConnectionString, config.QueueName)
- if err != nil {
- return nil, err
- }
-
- byteFIFOQueue, err := NewByteFIFOQueue(LevelQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar)
- if err != nil {
- return nil, err
- }
-
- queue := &LevelQueue{
- ByteFIFOQueue: byteFIFOQueue,
- }
- queue.qid = GetManager().Add(queue, LevelQueueType, config, exemplar)
- return queue, nil
-}
-
-var _ ByteFIFO = &LevelQueueByteFIFO{}
-
-// LevelQueueByteFIFO represents a ByteFIFO formed from a LevelQueue
-type LevelQueueByteFIFO struct {
- internal *levelqueue.Queue
- connection string
-}
-
-// NewLevelQueueByteFIFO creates a ByteFIFO formed from a LevelQueue
-func NewLevelQueueByteFIFO(connection, prefix string) (*LevelQueueByteFIFO, error) {
- db, err := nosql.GetManager().GetLevelDB(connection)
- if err != nil {
- return nil, err
- }
-
- internal, err := levelqueue.NewQueue(db, []byte(prefix), false)
- if err != nil {
- return nil, err
- }
-
- return &LevelQueueByteFIFO{
- connection: connection,
- internal: internal,
- }, nil
-}
-
-// PushFunc will push data into the fifo
-func (fifo *LevelQueueByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error {
- if fn != nil {
- if err := fn(); err != nil {
- return err
- }
- }
- return fifo.internal.LPush(data)
-}
-
-// PushBack pushes data to the top of the fifo
-func (fifo *LevelQueueByteFIFO) PushBack(ctx context.Context, data []byte) error {
- return fifo.internal.RPush(data)
-}
-
-// Pop pops data from the start of the fifo
-func (fifo *LevelQueueByteFIFO) Pop(ctx context.Context) ([]byte, error) {
- data, err := fifo.internal.RPop()
- if err != nil && err != levelqueue.ErrNotFound {
- return nil, err
- }
- return data, nil
-}
-
-// Close this fifo
-func (fifo *LevelQueueByteFIFO) Close() error {
- err := fifo.internal.Close()
- _ = nosql.GetManager().CloseLevelDB(fifo.connection)
- return err
-}
-
-// Len returns the length of the fifo
-func (fifo *LevelQueueByteFIFO) Len(ctx context.Context) int64 {
- return fifo.internal.Len()
-}
-
-func init() {
- queuesMap[LevelQueueType] = NewLevelQueue
-}
diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go
deleted file mode 100644
index 91f91f0dfc..0000000000
--- a/modules/queue/queue_disk_channel.go
+++ /dev/null
@@ -1,358 +0,0 @@
-// Copyright 2019 The Gitea Authors. All rights reserved.
-// SPDX-License-Identifier: MIT
-
-package queue
-
-import (
- "context"
- "fmt"
- "runtime/pprof"
- "sync"
- "sync/atomic"
- "time"
-
- "code.gitea.io/gitea/modules/log"
-)
-
-// PersistableChannelQueueType is the type for persistable queue
-const PersistableChannelQueueType Type = "persistable-channel"
-
-// PersistableChannelQueueConfiguration is the configuration for a PersistableChannelQueue
-type PersistableChannelQueueConfiguration struct {
- Name string
- DataDir string
- BatchLength int
- QueueLength int
- Timeout time.Duration
- MaxAttempts int
- Workers int
- MaxWorkers int
- BlockTimeout time.Duration
- BoostTimeout time.Duration
- BoostWorkers int
-}
-
-// PersistableChannelQueue wraps a channel queue and level queue together
-// The disk level queue will be used to store data at shutdown and terminate - and will be restored
-// on start up.
-type PersistableChannelQueue struct {
- channelQueue *ChannelQueue
- delayedStarter
- lock sync.Mutex
- closed chan struct{}
-}
-
-// NewPersistableChannelQueue creates a wrapped batched channel queue with persistable level queue backend when shutting down
-// This differs from a wrapped queue in that the persistent queue is only used to persist at shutdown/terminate
-func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
- configInterface, err := toConfig(PersistableChannelQueueConfiguration{}, cfg)
- if err != nil {
- return nil, err
- }
- config := configInterface.(PersistableChannelQueueConfiguration)
-
- queue := &PersistableChannelQueue{
- closed: make(chan struct{}),
- }
-
- wrappedHandle := func(data ...Data) (failed []Data) {
- for _, unhandled := range handle(data...) {
- if fail := queue.PushBack(unhandled); fail != nil {
- failed = append(failed, fail)
- }
- }
- return failed
- }
-
- channelQueue, err := NewChannelQueue(wrappedHandle, ChannelQueueConfiguration{
- WorkerPoolConfiguration: WorkerPoolConfiguration{
- QueueLength: config.QueueLength,
- BatchLength: config.BatchLength,
- BlockTimeout: config.BlockTimeout,
- BoostTimeout: config.BoostTimeout,
- BoostWorkers: config.BoostWorkers,
- MaxWorkers: config.MaxWorkers,
- Name: config.Name + "-channel",
- },
- Workers: config.Workers,
- }, exemplar)
- if err != nil {
- return nil, err
- }
-
- // the level backend only needs temporary workers to catch up with the previously dropped work
- levelCfg := LevelQueueConfiguration{
- ByteFIFOQueueConfiguration: ByteFIFOQueueConfiguration{
- WorkerPoolConfiguration: WorkerPoolConfiguration{
- QueueLength: config.QueueLength,
- BatchLength: config.BatchLength,
- BlockTimeout: 1 * time.Second,
- BoostTimeout: 5 * time.Minute,
- BoostWorkers: 1,
- MaxWorkers: 5,
- Name: config.Name + "-level",
- },
- Workers: 0,
- },
- DataDir: config.DataDir,
- QueueName: config.Name + "-level",
- }
-
- levelQueue, err := NewLevelQueue(wrappedHandle, levelCfg, exemplar)
- if err == nil {
- queue.channelQueue = channelQueue.(*ChannelQueue)
- queue.delayedStarter = delayedStarter{
- internal: levelQueue.(*LevelQueue),
- name: config.Name,
- }
- _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar)
- return queue, nil
- }
- if IsErrInvalidConfiguration(err) {
- // Retrying ain't gonna make this any better...
- return nil, ErrInvalidConfiguration{cfg: cfg}
- }
-
- queue.channelQueue = channelQueue.(*ChannelQueue)
- queue.delayedStarter = delayedStarter{
- cfg: levelCfg,
- underlying: LevelQueueType,
- timeout: config.Timeout,
- maxAttempts: config.MaxAttempts,
- name: config.Name,
- }
- _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar)
- return queue, nil
-}
-
-// Name returns the name of this queue
-func (q *PersistableChannelQueue) Name() string {
- return q.delayedStarter.name
-}
-
-// Push will push the indexer data to queue
-func (q *PersistableChannelQueue) Push(data Data) error {
- select {
- case <-q.closed:
- return q.internal.Push(data)
- default:
- return q.channelQueue.Push(data)
- }
-}
-
-// PushBack will push the indexer data to queue
-func (q *PersistableChannelQueue) PushBack(data Data) error {
- select {
- case <-q.closed:
- if pbr, ok := q.internal.(PushBackable); ok {
- return pbr.PushBack(data)
- }
- return q.internal.Push(data)
- default:
- return q.channelQueue.Push(data)
- }
-}
-
-// Run starts to run the queue
-func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) {
- pprof.SetGoroutineLabels(q.channelQueue.baseCtx)
- log.Debug("PersistableChannelQueue: %s Starting", q.delayedStarter.name)
- _ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0)
-
- q.lock.Lock()
- if q.internal == nil {
- err := q.setInternal(atShutdown, q.channelQueue.handle, q.channelQueue.exemplar)
- q.lock.Unlock()
- if err != nil {
- log.Fatal("Unable to create internal queue for %s Error: %v", q.Name(), err)
- return
- }
- } else {
- q.lock.Unlock()
- }
- atShutdown(q.Shutdown)
- atTerminate(q.Terminate)
-
- if lq, ok := q.internal.(*LevelQueue); ok && lq.byteFIFO.Len(lq.terminateCtx) != 0 {
- // Just run the level queue - we shut it down once it's flushed
- go q.internal.Run(func(_ func()) {}, func(_ func()) {})
- go func() {
- for !lq.IsEmpty() {
- _ = lq.Flush(0)
- select {
- case <-time.After(100 * time.Millisecond):
- case <-lq.shutdownCtx.Done():
- if lq.byteFIFO.Len(lq.terminateCtx) > 0 {
- log.Warn("LevelQueue: %s shut down before completely flushed", q.internal.(*LevelQueue).Name())
- }
- return
- }
- }
- log.Debug("LevelQueue: %s flushed so shutting down", q.internal.(*LevelQueue).Name())
- q.internal.(*LevelQueue).Shutdown()
- GetManager().Remove(q.internal.(*LevelQueue).qid)
- }()
- } else {
- log.Debug("PersistableChannelQueue: %s Skipping running the empty level queue", q.delayedStarter.name)
- q.internal.(*LevelQueue).Shutdown()
- GetManager().Remove(q.internal.(*LevelQueue).qid)
- }
-}
-
-// Flush flushes the queue and blocks till the queue is empty
-func (q *PersistableChannelQueue) Flush(timeout time.Duration) error {
- var ctx context.Context
- var cancel context.CancelFunc
- if timeout > 0 {
- ctx, cancel = context.WithTimeout(context.Background(), timeout)
- } else {
- ctx, cancel = context.WithCancel(context.Background())
- }
- defer cancel()
- return q.FlushWithContext(ctx)
-}
-
-// FlushWithContext flushes the queue and blocks till the queue is empty
-func (q *PersistableChannelQueue) FlushWithContext(ctx context.Context) error {
- errChan := make(chan error, 1)
- go func() {
- errChan <- q.channelQueue.FlushWithContext(ctx)
- }()
- go func() {
- q.lock.Lock()
- if q.internal == nil {
- q.lock.Unlock()
- errChan <- fmt.Errorf("not ready to flush internal queue %s yet", q.Name())
- return
- }
- q.lock.Unlock()
- errChan <- q.internal.FlushWithContext(ctx)
- }()
- err1 := <-errChan
- err2 := <-errChan
-
- if err1 != nil {
- return err1
- }
- return err2
-}
-
-// IsEmpty checks if a queue is empty
-func (q *PersistableChannelQueue) IsEmpty() bool {
- if !q.channelQueue.IsEmpty() {
- return false
- }
- q.lock.Lock()
- defer q.lock.Unlock()
- if q.internal == nil {
- return false
- }
- return q.internal.IsEmpty()
-}
-
-// IsPaused returns if the pool is paused
-func (q *PersistableChannelQueue) IsPaused() bool {
- return q.channelQueue.IsPaused()
-}
-
-// IsPausedIsResumed returns if the pool is paused and a channel that is closed when it is resumed
-func (q *PersistableChannelQueue) IsPausedIsResumed() (<-chan struct{}, <-chan struct{}) {
- return q.channelQueue.IsPausedIsResumed()
-}
-
-// Pause pauses the WorkerPool
-func (q *PersistableChannelQueue) Pause() {
- q.channelQueue.Pause()
- q.lock.Lock()
- defer q.lock.Unlock()
- if q.internal == nil {
- return
- }
-
- pausable, ok := q.internal.(Pausable)
- if !ok {
- return
- }
- pausable.Pause()
-}
-
-// Resume resumes the WorkerPool
-func (q *PersistableChannelQueue) Resume() {
- q.channelQueue.Resume()
- q.lock.Lock()
- defer q.lock.Unlock()
- if q.internal == nil {
- return
- }
-
- pausable, ok := q.internal.(Pausable)
- if !ok {
- return
- }
- pausable.Resume()
-}
-
-// Shutdown processing this queue
-func (q *PersistableChannelQueue) Shutdown() {
- log.Trace("PersistableChannelQueue: %s Shutting down", q.delayedStarter.name)
- q.lock.Lock()
-
- select {
- case <-q.closed:
- q.lock.Unlock()
- return
- default:
- }
- q.channelQueue.Shutdown()
- if q.internal != nil {
- q.internal.(*LevelQueue).Shutdown()
- }
- close(q.closed)
- q.lock.Unlock()
-
- log.Trace("PersistableChannelQueue: %s Cancelling pools", q.delayedStarter.name)
- q.channelQueue.baseCtxCancel()
- q.internal.(*LevelQueue).baseCtxCancel()
- log.Trace("PersistableChannelQueue: %s Waiting til done", q.delayedStarter.name)
- q.channelQueue.Wait()
- q.internal.(*LevelQueue).Wait()
- // Redirect all remaining data in the chan to the internal channel
- log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
- close(q.channelQueue.dataChan)
- countOK, countLost := 0, 0
- for data := range q.channelQueue.dataChan {
- err := q.internal.Push(data)
- if err != nil {
- log.Error("PersistableChannelQueue: %s Unable redirect %v due to: %v", q.delayedStarter.name, data, err)
- countLost++
- } else {
- countOK++
- }
- atomic.AddInt64(&q.channelQueue.numInQueue, -1)
- }
- if countLost > 0 {
- log.Warn("PersistableChannelQueue: %s %d will be restored on restart, %d lost", q.delayedStarter.name, countOK, countLost)
- } else if countOK > 0 {
- log.Warn("PersistableChannelQueue: %s %d will be restored on restart", q.delayedStarter.name, countOK)
- }
- log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
-
- log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name)
-}
-
-// Terminate this queue and close the queue
-func (q *PersistableChannelQueue) Terminate() {
- log.Trace("PersistableChannelQueue: %s Terminating", q.delayedStarter.name)
- q.Shutdown()
- q.lock.Lock()
- defer q.lock.Unlock()
- q.channelQueue.Terminate()
- if q.internal != nil {
- q.internal.(*LevelQueue).Terminate()
- }
- log.Debug("PersistableChannelQueue: %s Terminated", q.delayedStarter.name)
-}
-
-func init() {
- queuesMap[PersistableChannelQueueType] = NewPersistableChannelQueue
-}
diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go
deleted file mode 100644
index 4f14a5d79d..0000000000
--- a/modules/queue/queue_disk_channel_test.go
+++ /dev/null
@@ -1,544 +0,0 @@
-// Copyright 2019 The Gitea Authors. All rights reserved.
-// SPDX-License-Identifier: MIT
-
-package queue
-
-import (
- "sync"
- "testing"
- "time"
-
- "code.gitea.io/gitea/modules/log"
-
- "github.com/stretchr/testify/assert"
-)
-
-func TestPersistableChannelQueue(t *testing.T) {
- handleChan := make(chan *testData)
- handle := func(data ...Data) []Data {
- for _, datum := range data {
- if datum == nil {
- continue
- }
- testDatum := datum.(*testData)
- handleChan <- testDatum
- }
- return nil
- }
-
- lock := sync.Mutex{}
- queueShutdown := []func(){}
- queueTerminate := []func(){}
-
- tmpDir := t.TempDir()
-
- queue, err := NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{
- DataDir: tmpDir,
- BatchLength: 2,
- QueueLength: 20,
- Workers: 1,
- BoostWorkers: 0,
- MaxWorkers: 10,
- Name: "test-queue",
- }, &testData{})
- assert.NoError(t, err)
-
- readyForShutdown := make(chan struct{})
- readyForTerminate := make(chan struct{})
-
- go queue.Run(func(shutdown func()) {
- lock.Lock()
- defer lock.Unlock()
- select {
- case <-readyForShutdown:
- default:
- close(readyForShutdown)
- }
- queueShutdown = append(queueShutdown, shutdown)
- }, func(terminate func()) {
- lock.Lock()
- defer lock.Unlock()
- select {
- case <-readyForTerminate:
- default:
- close(readyForTerminate)
- }
- queueTerminate = append(queueTerminate, terminate)
- })
-
- test1 := testData{"A", 1}
- test2 := testData{"B", 2}
-
- err = queue.Push(&test1)
- assert.NoError(t, err)
- go func() {
- err := queue.Push(&test2)
- assert.NoError(t, err)
- }()
-
- result1 := <-handleChan
- assert.Equal(t, test1.TestString, result1.TestString)
- assert.Equal(t, test1.TestInt, result1.TestInt)
-
- result2 := <-handleChan
- assert.Equal(t, test2.TestString, result2.TestString)
- assert.Equal(t, test2.TestInt, result2.TestInt)
-
- // test1 is a testData not a *testData so will be rejected
- err = queue.Push(test1)
- assert.Error(t, err)
-
- <-readyForShutdown
- // Now shutdown the queue
- lock.Lock()
- callbacks := make([]func(), len(queueShutdown))
- copy(callbacks, queueShutdown)
- lock.Unlock()
- for _, callback := range callbacks {
- callback()
- }
-
- // Wait til it is closed
- <-queue.(*PersistableChannelQueue).closed
-
- err = queue.Push(&test1)
- assert.NoError(t, err)
- err = queue.Push(&test2)
- assert.NoError(t, err)
- select {
- case <-handleChan:
- assert.Fail(t, "Handler processing should have stopped")
- default:
- }
-
- // terminate the queue
- <-readyForTerminate
- lock.Lock()
- callbacks = make([]func(), len(queueTerminate))
- copy(callbacks, queueTerminate)
- lock.Unlock()
- for _, callback := range callbacks {
- callback()
- }
-
- select {
- case <-handleChan:
- assert.Fail(t, "Handler processing should have stopped")
- default:
- }
-
- // Reopen queue
- queue, err = NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{
- DataDir: tmpDir,
- BatchLength: 2,
- QueueLength: 20,
- Workers: 1,
- BoostWorkers: 0,
- MaxWorkers: 10,
- Name: "test-queue",
- }, &testData{})
- assert.NoError(t, err)
-
- readyForShutdown = make(chan struct{})
- readyForTerminate = make(chan struct{})
-
- go queue.Run(func(shutdown func()) {
- lock.Lock()
- defer lock.Unlock()
- select {
- case <-readyForShutdown:
- default:
- close(readyForShutdown)
- }
- queueShutdown = append(queueShutdown, shutdown)
- }, func(terminate func()) {
- lock.Lock()
- defer lock.Unlock()
- select {
- case <-readyForTerminate:
- default:
- close(readyForTerminate)
- }
- queueTerminate = append(queueTerminate, terminate)
- })
-
- result3 := <-handleChan
- assert.Equal(t, test1.TestString, result3.TestString)
- assert.Equal(t, test1.TestInt, result3.TestInt)
-
- result4 := <-handleChan
- assert.Equal(t, test2.TestString, result4.TestString)
- assert.Equal(t, test2.TestInt, result4.TestInt)
-
- <-readyForShutdown
- lock.Lock()
- callbacks = make([]func(), len(queueShutdown))
- copy(callbacks, queueShutdown)
- lock.Unlock()
- for _, callback := range callbacks {
- callback()
- }
- <-readyForTerminate
- lock.Lock()
- callbacks = make([]func(), len(queueTerminate))
- copy(callbacks, queueTerminate)
- lock.Unlock()
- for _, callback := range callbacks {
- callback()
- }
-}
-
-func TestPersistableChannelQueue_Pause(t *testing.T) {
- lock := sync.Mutex{}
- var queue Queue
- var err error
- pushBack := false
-
- handleChan := make(chan *testData)
- handle := func(data ...Data) []Data {
- lock.Lock()
- if pushBack {
- if pausable, ok := queue.(Pausable); ok {
- log.Info("pausing")
- pausable.Pause()
- }
- lock.Unlock()
- return data
- }
- lock.Unlock()
-
- for _, datum := range data {
- testDatum := datum.(*testData)
- handleChan <- testDatum
- }
- return nil
- }
-
- queueShutdown := []func(){}
- queueTerminate := []func(){}
- terminated := make(chan struct{})
-
- tmpDir := t.TempDir()
-
- queue, err = NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{
- DataDir: tmpDir,
- BatchLength: 2,
- QueueLength: 20,
- Workers: 1,
- BoostWorkers: 0,
- MaxWorkers: 10,
- Name: "test-queue",
- }, &testData{})
- assert.NoError(t, err)
-
- go func() {
- queue.Run(func(shutdown func()) {
- lock.Lock()
- defer lock.Unlock()
- queueShutdown = append(queueShutdown, shutdown)
- }, func(terminate func()) {
- lock.Lock()
- defer lock.Unlock()
- queueTerminate = append(queueTerminate, terminate)
- })
- close(terminated)
- }()
-
- // Shutdown and Terminate in defer
- defer func() {
- lock.Lock()
- callbacks := make([]func(), len(queueShutdown))
- copy(callbacks, queueShutdown)
- lock.Unlock()
- for _, callback := range callbacks {
- callback()
- }
- lock.Lock()
- log.Info("Finally terminating")
- callbacks = make([]func(), len(queueTerminate))
- copy(callbacks, queueTerminate)
- lock.Unlock()
- for _, callback := range callbacks {
- callback()
- }
- }()
-
- test1 := testData{"A", 1}
- test2 := testData{"B", 2}
-
- err = queue.Push(&test1)
- assert.NoError(t, err)
-
- pausable, ok := queue.(Pausable)
- if !assert.True(t, ok) {
- return
- }
- result1 := <-handleChan
- assert.Equal(t, test1.TestString, result1.TestString)
- assert.Equal(t, test1.TestInt, result1.TestInt)
-
- pausable.Pause()
- paused, _ := pausable.IsPausedIsResumed()
-
- select {
- case <-paused:
- case <-time.After(100 * time.Millisecond):
- assert.Fail(t, "Queue is not paused")
- return
- }
-
- queue.Push(&test2)
-
- var result2 *testData
- select {
- case result2 = <-handleChan:
- assert.Fail(t, "handler chan should be empty")
- case <-time.After(100 * time.Millisecond):
- }
-
- assert.Nil(t, result2)
-
- pausable.Resume()
- _, resumed := pausable.IsPausedIsResumed()
-
- select {
- case <-resumed:
- case <-time.After(100 * time.Millisecond):
- assert.Fail(t, "Queue should be resumed")
- return
- }
-
- select {
- case result2 = <-handleChan:
- case <-time.After(500 * time.Millisecond):
- assert.Fail(t, "handler chan should contain test2")
- }
-
- assert.Equal(t, test2.TestString, result2.TestString)
- assert.Equal(t, test2.TestInt, result2.TestInt)
-
- // Set pushBack to so that the next handle will result in a Pause
- lock.Lock()
- pushBack = true
- lock.Unlock()
-
- // Ensure that we're still resumed
- _, resumed = pausable.IsPausedIsResumed()
-
- select {
- case <-resumed:
- case <-time.After(100 * time.Millisecond):
- assert.Fail(t, "Queue is not resumed")
- return
- }
-
- // push test1
- queue.Push(&test1)
-
- // Now as this is handled it should pause
- paused, _ = pausable.IsPausedIsResumed()
-
- select {
- case <-paused:
- case <-handleChan:
- assert.Fail(t, "handler chan should not contain test1")
- return
- case <-time.After(500 * time.Millisecond):
- assert.Fail(t, "queue should be paused")
- return
- }
-
- lock.Lock()
- pushBack = false
- lock.Unlock()
-
- pausable.Resume()
-
- _, resumed = pausable.IsPausedIsResumed()
- select {
- case <-resumed:
- case <-time.After(500 * time.Millisecond):
- assert.Fail(t, "Queue should be resumed")
- return
- }
-
- select {
- case result1 = <-handleChan:
- case <-time.After(500 * time.Millisecond):
- assert.Fail(t, "handler chan should contain test1")
- return
- }
- assert.Equal(t, test1.TestString, result1.TestString)
- assert.Equal(t, test1.TestInt, result1.TestInt)
-
- lock.Lock()
- callbacks := make([]func(), len(queueShutdown))
- copy(callbacks, queueShutdown)
- queueShutdown = queueShutdown[:0]
- lock.Unlock()
- // Now shutdown the queue
- for _, callback := range callbacks {
- callback()
- }
-
- // Wait til it is closed
- select {
- case <-queue.(*PersistableChannelQueue).closed:
- case <-time.After(5 * time.Second):
- assert.Fail(t, "queue should close")
- return
- }
-
- err = queue.Push(&test1)
- assert.NoError(t, err)
- err = queue.Push(&test2)
- assert.NoError(t, err)
- select {
- case <-handleChan:
- assert.Fail(t, "Handler processing should have stopped")
- return
- default:
- }
-
- // terminate the queue
- lock.Lock()
- callbacks = make([]func(), len(queueTerminate))
- copy(callbacks, queueTerminate)
- queueShutdown = queueTerminate[:0]
- lock.Unlock()
- for _, callback := range callbacks {
- callback()
- }
-
- select {
- case <-handleChan:
- assert.Fail(t, "Handler processing should have stopped")
- return
- case <-terminated:
- case <-time.After(10 * time.Second):
- assert.Fail(t, "Queue should have terminated")
- return
- }
-
- lock.Lock()
- pushBack = true
- lock.Unlock()
-
- // Reopen queue
- terminated = make(chan struct{})
- queue, err = NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{
- DataDir: tmpDir,
- BatchLength: 1,
- QueueLength: 20,
- Workers: 1,
- BoostWorkers: 0,
- MaxWorkers: 10,
- Name: "test-queue",
- }, &testData{})
- assert.NoError(t, err)
- pausable, ok = queue.(Pausable)
- if !assert.True(t, ok) {
- return
- }
-
- paused, _ = pausable.IsPausedIsResumed()
-
- go func() {
- queue.Run(func(shutdown func()) {
- lock.Lock()
- defer lock.Unlock()
- queueShutdown = append(queueShutdown, shutdown)
- }, func(terminate func()) {
- lock.Lock()
- defer lock.Unlock()
- queueTerminate = append(queueTerminate, terminate)
- })
- close(terminated)
- }()
-
- select {
- case <-handleChan:
- assert.Fail(t, "Handler processing should have stopped")
- return
- case <-paused:
- }
-
- paused, _ = pausable.IsPausedIsResumed()
-
- select {
- case <-paused:
- case <-time.After(500 * time.Millisecond):
- assert.Fail(t, "Queue is not paused")
- return
- }
-
- select {
- case <-handleChan:
- assert.Fail(t, "Handler processing should have stopped")
- return
- default:
- }
-
- lock.Lock()
- pushBack = false
- lock.Unlock()
-
- pausable.Resume()
- _, resumed = pausable.IsPausedIsResumed()
- select {
- case <-resumed:
- case <-time.After(500 * time.Millisecond):
- assert.Fail(t, "Queue should be resumed")
- return
- }
-
- var result3, result4 *testData
-
- select {
- case result3 = <-handleChan:
- case <-time.After(1 * time.Second):
- assert.Fail(t, "Handler processing should have resumed")
- return
- }
- select {
- case result4 = <-handleChan:
- case <-time.After(1 * time.Second):
- assert.Fail(t, "Handler processing should have resumed")
- return
- }
- if result4.TestString == test1.TestString {
- result3, result4 = result4, result3
- }
- assert.Equal(t, test1.TestString, result3.TestString)
- assert.Equal(t, test1.TestInt, result3.TestInt)
-
- assert.Equal(t, test2.TestString, result4.TestString)
- assert.Equal(t, test2.TestInt, result4.TestInt)
-
- lock.Lock()
- callbacks = make([]func(), len(queueShutdown))
- copy(callbacks, queueShutdown)
- queueShutdown = queueShutdown[:0]
- lock.Unlock()
- // Now shutdown the queue
- for _, callback := range callbacks {
- callback()
- }
-
- // terminate the queue
- lock.Lock()
- callbacks = make([]func(), len(queueTerminate))
- copy(callbacks, queueTerminate)
- queueShutdown = queueTerminate[:0]
- lock.Unlock()
- for _, callback := range callbacks {
- callback()
- }
-
- select {
- case <-time.After(10 * time.Second):
- assert.Fail(t, "Queue should have terminated")
- return
- case <-terminated:
- }
-}
diff --git a/modules/queue/queue_disk_test.go b/modules/queue/queue_disk_test.go
deleted file mode 100644
index 8f83abf42c..0000000000
--- a/modules/queue/queue_disk_test.go
+++ /dev/null
@@ -1,147 +0,0 @@
-// Copyright 2019 The Gitea Authors. All rights reserved.
-// SPDX-License-Identifier: MIT
-
-package queue
-
-import (
- "sync"
- "testing"
- "time"
-
- "github.com/stretchr/testify/assert"
-)
-
-func TestLevelQueue(t *testing.T) {
- handleChan := make(chan *testData)
- handle := func(data ...Data) []Data {
- assert.True(t, len(data) == 2)
- for _, datum := range data {
- testDatum := datum.(*testData)
- handleChan <- testDatum
- }
- return nil
- }
-
- var lock sync.Mutex
- queueShutdown := []func(){}
- queueTerminate := []func(){}
-
- tmpDir := t.TempDir()
-
- queue, err := NewLevelQueue(handle, LevelQueueConfiguration{
- ByteFIFOQueueConfiguration: ByteFIFOQueueConfiguration{
- WorkerPoolConfiguration: WorkerPoolConfiguration{
- QueueLength: 20,
- BatchLength: 2,
- BlockTimeout: 1 * time.Second,
- BoostTimeout: 5 * time.Minute,
- BoostWorkers: 5,
- MaxWorkers: 10,
- },
- Workers: 1,
- },
- DataDir: tmpDir,
- }, &testData{})
- assert.NoError(t, err)
-
- go queue.Run(func(shutdown func()) {
- lock.Lock()
- queueShutdown = append(queueShutdown, shutdown)
- lock.Unlock()
- }, func(terminate func()) {
- lock.Lock()
- queueTerminate = append(queueTerminate, terminate)
- lock.Unlock()
- })
-
- test1 := testData{"A", 1}
- test2 := testData{"B", 2}
-
- err = queue.Push(&test1)
- assert.NoError(t, err)
- go func() {
- err := queue.Push(&test2)
- assert.NoError(t, err)
- }()
-
- result1 := <-handleChan
- assert.Equal(t, test1.TestString, result1.TestString)
- assert.Equal(t, test1.TestInt, result1.TestInt)
-
- result2 := <-handleChan
- assert.Equal(t, test2.TestString, result2.TestString)
- assert.Equal(t, test2.TestInt, result2.TestInt)
-
- err = queue.Push(test1)
- assert.Error(t, err)
-
- lock.Lock()
- for _, callback := range queueShutdown {
- callback()
- }
- lock.Unlock()
-
- time.Sleep(200 * time.Millisecond)
- err = queue.Push(&test1)
- assert.NoError(t, err)
- err = queue.Push(&test2)
- assert.NoError(t, err)
- select {
- case <-handleChan:
- assert.Fail(t, "Handler processing should have stopped")
- default:
- }
- lock.Lock()
- for _, callback := range queueTerminate {
- callback()
- }
- lock.Unlock()
-
- // Reopen queue
- queue, err = NewWrappedQueue(handle,
- WrappedQueueConfiguration{
- Underlying: LevelQueueType,
- Config: LevelQueueConfiguration{
- ByteFIFOQueueConfiguration: ByteFIFOQueueConfiguration{
- WorkerPoolConfiguration: WorkerPoolConfiguration{
- QueueLength: 20,
- BatchLength: 2,
- BlockTimeout: 1 * time.Second,
- BoostTimeout: 5 * time.Minute,
- BoostWorkers: 5,
- MaxWorkers: 10,
- },
- Workers: 1,
- },
- DataDir: tmpDir,
- },
- }, &testData{})
- assert.NoError(t, err)
-
- go queue.Run(func(shutdown func()) {
- lock.Lock()
- queueShutdown = append(queueShutdown, shutdown)
- lock.Unlock()
- }, func(terminate func()) {
- lock.Lock()
- queueTerminate = append(queueTerminate, terminate)
- lock.Unlock()
- })
-
- result3 := <-handleChan
- assert.Equal(t, test1.TestString, result3.TestString)
- assert.Equal(t, test1.TestInt, result3.TestInt)
-
- result4 := <-handleChan
- assert.Equal(t, test2.TestString, result4.TestString)
- assert.Equal(t, test2.TestInt, result4.TestInt)
-
- lock.Lock()
- for _, callback := range queueShutdown {
- callback()
- }
- for _, callback := range queueTerminate {
- callback()
- }
- lock.Unlock()
-}
diff --git a/modules/queue/queue_redis.go b/modules/queue/queue_redis.go
deleted file mode 100644
index f8842fea9f..0000000000
--- a/modules/queue/queue_redis.go
+++ /dev/null
@@ -1,137 +0,0 @@
-// Copyright 2019 The Gitea Authors. All rights reserved.
-// SPDX-License-Identifier: MIT
-
-package queue
-
-import (
- "context"
-
- "code.gitea.io/gitea/modules/graceful"
- "code.gitea.io/gitea/modules/log"
- "code.gitea.io/gitea/modules/nosql"
-
- "github.com/redis/go-redis/v9"
-)
-
-// RedisQueueType is the type for redis queue
-const RedisQueueType Type = "redis"
-
-// RedisQueueConfiguration is the configuration for the redis queue
-type RedisQueueConfiguration struct {
- ByteFIFOQueueConfiguration
- RedisByteFIFOConfiguration
-}
-
-// RedisQueue redis queue
-type RedisQueue struct {
- *ByteFIFOQueue
-}
-
-// NewRedisQueue creates single redis or cluster redis queue
-func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
- configInterface, err := toConfig(RedisQueueConfiguration{}, cfg)
- if err != nil {
- return nil, err
- }
- config := configInterface.(RedisQueueConfiguration)
-
- byteFIFO, err := NewRedisByteFIFO(config.RedisByteFIFOConfiguration)
- if err != nil {
- return nil, err
- }
-
- byteFIFOQueue, err := NewByteFIFOQueue(RedisQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar)
- if err != nil {
- return nil, err
- }
-
- queue := &RedisQueue{
- ByteFIFOQueue: byteFIFOQueue,
- }
-
- queue.qid = GetManager().Add(queue, RedisQueueType, config, exemplar)
-
- return queue, nil
-}
-
-type redisClient interface {
- RPush(ctx context.Context, key string, args ...interface{}) *redis.IntCmd
- LPush(ctx context.Context, key string, args ...interface{}) *redis.IntCmd
- LPop(ctx context.Context, key string) *redis.StringCmd
- LLen(ctx context.Context, key string) *redis.IntCmd
- SAdd(ctx context.Context, key string, members ...interface{}) *redis.IntCmd
- SRem(ctx context.Context, key string, members ...interface{}) *redis.IntCmd
- SIsMember(ctx context.Context, key string, member interface{}) *redis.BoolCmd
- Ping(ctx context.Context) *redis.StatusCmd
- Close() error
-}
-
-var _ ByteFIFO = &RedisByteFIFO{}
-
-// RedisByteFIFO represents a ByteFIFO formed from a redisClient
-type RedisByteFIFO struct {
- client redisClient
-
- queueName string
-}
-
-// RedisByteFIFOConfiguration is the configuration for the RedisByteFIFO
-type RedisByteFIFOConfiguration struct {
- ConnectionString string
- QueueName string
-}
-
-// NewRedisByteFIFO creates a ByteFIFO formed from a redisClient
-func NewRedisByteFIFO(config RedisByteFIFOConfiguration) (*RedisByteFIFO, error) {
- fifo := &RedisByteFIFO{
- queueName: config.QueueName,
- }
- fifo.client = nosql.GetManager().GetRedisClient(config.ConnectionString)
- if err := fifo.client.Ping(graceful.GetManager().ShutdownContext()).Err(); err != nil {
- return nil, err
- }
- return fifo, nil
-}
-
-// PushFunc pushes data to the end of the fifo and calls the callback if it is added
-func (fifo *RedisByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error {
- if fn != nil {
- if err := fn(); err != nil {
- return err
- }
- }
- return fifo.client.RPush(ctx, fifo.queueName, data).Err()
-}
-
-// PushBack pushes data to the top of the fifo
-func (fifo *RedisByteFIFO) PushBack(ctx context.Context, data []byte) error {
- return fifo.client.LPush(ctx, fifo.queueName, data).Err()
-}
-
-// Pop pops data from the start of the fifo
-func (fifo *RedisByteFIFO) Pop(ctx context.Context) ([]byte, error) {
- data, err := fifo.client.LPop(ctx, fifo.queueName).Bytes()
- if err == nil || err == redis.Nil {
- return data, nil
- }
- return data, err
-}
-
-// Close this fifo
-func (fifo *RedisByteFIFO) Close() error {
- return fifo.client.Close()
-}
-
-// Len returns the length of the fifo
-func (fifo *RedisByteFIFO) Len(ctx context.Context) int64 {
- val, err := fifo.client.LLen(ctx, fifo.queueName).Result()
- if err != nil {
- log.Error("Error whilst getting length of redis queue %s: Error: %v", fifo.queueName, err)
- return -1
- }
- return val
-}
-
-func init() {
- queuesMap[RedisQueueType] = NewRedisQueue
-}
diff --git a/modules/queue/queue_test.go b/modules/queue/queue_test.go
deleted file mode 100644
index 42d34c806c..0000000000
--- a/modules/queue/queue_test.go
+++ /dev/null
@@ -1,42 +0,0 @@
-// Copyright 2019 The Gitea Authors. All rights reserved.
-// SPDX-License-Identifier: MIT
-
-package queue
-
-import (
- "testing"
-
- "code.gitea.io/gitea/modules/json"
-
- "github.com/stretchr/testify/assert"
-)
-
-type testData struct {
- TestString string
- TestInt int
-}
-
-func TestToConfig(t *testing.T) {
- cfg := testData{
- TestString: "Config",
- TestInt: 10,
- }
- exemplar := testData{}
-
- cfg2I, err := toConfig(exemplar, cfg)
- assert.NoError(t, err)
- cfg2, ok := (cfg2I).(testData)
- assert.True(t, ok)
- assert.NotEqual(t, cfg2, exemplar)
- assert.Equal(t, &cfg, &cfg2)
- cfgString, err := json.Marshal(cfg)
- assert.NoError(t, err)
-
- cfg3I, err := toConfig(exemplar, cfgString)
- assert.NoError(t, err)
- cfg3, ok := (cfg3I).(testData)
- assert.True(t, ok)
- assert.Equal(t, cfg.TestString, cfg3.TestString)
- assert.Equal(t, cfg.TestInt, cfg3.TestInt)
- assert.NotEqual(t, cfg3, exemplar)
-}
diff --git a/modules/queue/queue_wrapped.go b/modules/queue/queue_wrapped.go
deleted file mode 100644
index 84d6dd98a5..0000000000
--- a/modules/queue/queue_wrapped.go
+++ /dev/null
@@ -1,315 +0,0 @@
-// Copyright 2019 The Gitea Authors. All rights reserved.
-// SPDX-License-Identifier: MIT
-
-package queue
-
-import (
- "context"
- "fmt"
- "sync"
- "sync/atomic"
- "time"
-
- "code.gitea.io/gitea/modules/log"
- "code.gitea.io/gitea/modules/util"
-)
-
-// WrappedQueueType is the type for a wrapped delayed starting queue
-const WrappedQueueType Type = "wrapped"
-
-// WrappedQueueConfiguration is the configuration for a WrappedQueue
-type WrappedQueueConfiguration struct {
- Underlying Type
- Timeout time.Duration
- MaxAttempts int
- Config interface{}
- QueueLength int
- Name string
-}
-
-type delayedStarter struct {
- internal Queue
- underlying Type
- cfg interface{}
- timeout time.Duration
- maxAttempts int
- name string
-}
-
-// setInternal must be called with the lock locked.
-func (q *delayedStarter) setInternal(atShutdown func(func()), handle HandlerFunc, exemplar interface{}) error {
- var ctx context.Context
- var cancel context.CancelFunc
- if q.timeout > 0 {
- ctx, cancel = context.WithTimeout(context.Background(), q.timeout)
- } else {
- ctx, cancel = context.WithCancel(context.Background())
- }
-
- defer cancel()
- // Ensure we also stop at shutdown
- atShutdown(cancel)
-
- i := 1
- for q.internal == nil {
- select {
- case <-ctx.Done():
- cfg := q.cfg
- if s, ok := cfg.([]byte); ok {
- cfg = string(s)
- }
- return fmt.Errorf("timedout creating queue %v with cfg %#v in %s", q.underlying, cfg, q.name)
- default:
- queue, err := NewQueue(q.underlying, handle, q.cfg, exemplar)
- if err == nil {
- q.internal = queue
- break
- }
- if err.Error() != "resource temporarily unavailable" {
- if bs, ok := q.cfg.([]byte); ok {
- log.Warn("[Attempt: %d] Failed to create queue: %v for %s cfg: %s error: %v", i, q.underlying, q.name, string(bs), err)
- } else {
- log.Warn("[Attempt: %d] Failed to create queue: %v for %s cfg: %#v error: %v", i, q.underlying, q.name, q.cfg, err)
- }
- }
- i++
- if q.maxAttempts > 0 && i > q.maxAttempts {
- if bs, ok := q.cfg.([]byte); ok {
- return fmt.Errorf("unable to create queue %v for %s with cfg %s by max attempts: error: %w", q.underlying, q.name, string(bs), err)
- }
- return fmt.Errorf("unable to create queue %v for %s with cfg %#v by max attempts: error: %w", q.underlying, q.name, q.cfg, err)
- }
- sleepTime := 100 * time.Millisecond
- if q.timeout > 0 && q.maxAttempts > 0 {
- sleepTime = (q.timeout - 200*time.Millisecond) / time.Duration(q.maxAttempts)
- }
- t := time.NewTimer(sleepTime)
- select {
- case <-ctx.Done():
- util.StopTimer(t)
- case <-t.C:
- }
- }
- }
- return nil
-}
-
-// WrappedQueue wraps a delayed starting queue
-type WrappedQueue struct {
- delayedStarter
- lock sync.Mutex
- handle HandlerFunc
- exemplar interface{}
- channel chan Data
- numInQueue int64
-}
-
-// NewWrappedQueue will attempt to create a queue of the provided type,
-// but if there is a problem creating this queue it will instead create
-// a WrappedQueue with delayed startup of the queue instead and a
-// channel which will be redirected to the queue
-func NewWrappedQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
- configInterface, err := toConfig(WrappedQueueConfiguration{}, cfg)
- if err != nil {
- return nil, err
- }
- config := configInterface.(WrappedQueueConfiguration)
-
- queue, err := NewQueue(config.Underlying, handle, config.Config, exemplar)
- if err == nil {
- // Just return the queue there is no need to wrap
- return queue, nil
- }
- if IsErrInvalidConfiguration(err) {
- // Retrying ain't gonna make this any better...
- return nil, ErrInvalidConfiguration{cfg: cfg}
- }
-
- queue = &WrappedQueue{
- handle: handle,
- channel: make(chan Data, config.QueueLength),
- exemplar: exemplar,
- delayedStarter: delayedStarter{
- cfg: config.Config,
- underlying: config.Underlying,
- timeout: config.Timeout,
- maxAttempts: config.MaxAttempts,
- name: config.Name,
- },
- }
- _ = GetManager().Add(queue, WrappedQueueType, config, exemplar)
- return queue, nil
-}
-
-// Name returns the name of the queue
-func (q *WrappedQueue) Name() string {
- return q.name + "-wrapper"
-}
-
-// Push will push the data to the internal channel checking it against the exemplar
-func (q *WrappedQueue) Push(data Data) error {
- if !assignableTo(data, q.exemplar) {
- return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
- }
- atomic.AddInt64(&q.numInQueue, 1)
- q.channel <- data
- return nil
-}
-
-func (q *WrappedQueue) flushInternalWithContext(ctx context.Context) error {
- q.lock.Lock()
- if q.internal == nil {
- q.lock.Unlock()
- return fmt.Errorf("not ready to flush wrapped queue %s yet", q.Name())
- }
- q.lock.Unlock()
- select {
- case <-ctx.Done():
- return ctx.Err()
- default:
- }
- return q.internal.FlushWithContext(ctx)
-}
-
-// Flush flushes the queue and blocks till the queue is empty
-func (q *WrappedQueue) Flush(timeout time.Duration) error {
- var ctx context.Context
- var cancel context.CancelFunc
- if timeout > 0 {
- ctx, cancel = context.WithTimeout(context.Background(), timeout)
- } else {
- ctx, cancel = context.WithCancel(context.Background())
- }
- defer cancel()
- return q.FlushWithContext(ctx)
-}
-
-// FlushWithContext implements the final part of Flushable
-func (q *WrappedQueue) FlushWithContext(ctx context.Context) error {
- log.Trace("WrappedQueue: %s FlushWithContext", q.Name())
- errChan := make(chan error, 1)
- go func() {
- errChan <- q.flushInternalWithContext(ctx)
- close(errChan)
- }()
-
- select {
- case err := <-errChan:
- return err
- case <-ctx.Done():
- go func() {
- <-errChan
- }()
- return ctx.Err()
- }
-}
-
-// IsEmpty checks whether the queue is empty
-func (q *WrappedQueue) IsEmpty() bool {
- if atomic.LoadInt64(&q.numInQueue) != 0 {
- return false
- }
- q.lock.Lock()
- defer q.lock.Unlock()
- if q.internal == nil {
- return false
- }
- return q.internal.IsEmpty()
-}
-
-// Run starts to run the queue and attempts to create the internal queue
-func (q *WrappedQueue) Run(atShutdown, atTerminate func(func())) {
- log.Debug("WrappedQueue: %s Starting", q.name)
- q.lock.Lock()
- if q.internal == nil {
- err := q.setInternal(atShutdown, q.handle, q.exemplar)
- q.lock.Unlock()
- if err != nil {
- log.Fatal("Unable to set the internal queue for %s Error: %v", q.Name(), err)
- return
- }
- go func() {
- for data := range q.channel {
- _ = q.internal.Push(data)
- atomic.AddInt64(&q.numInQueue, -1)
- }
- }()
- } else {
- q.lock.Unlock()
- }
-
- q.internal.Run(atShutdown, atTerminate)
- log.Trace("WrappedQueue: %s Done", q.name)
-}
-
-// Shutdown this queue and stop processing
-func (q *WrappedQueue) Shutdown() {
- log.Trace("WrappedQueue: %s Shutting down", q.name)
- q.lock.Lock()
- defer q.lock.Unlock()
- if q.internal == nil {
- return
- }
- if shutdownable, ok := q.internal.(Shutdownable); ok {
- shutdownable.Shutdown()
- }
- log.Debug("WrappedQueue: %s Shutdown", q.name)
-}
-
-// Terminate this queue and close the queue
-func (q *WrappedQueue) Terminate() {
- log.Trace("WrappedQueue: %s Terminating", q.name)
- q.lock.Lock()
- defer q.lock.Unlock()
- if q.internal == nil {
- return
- }
- if shutdownable, ok := q.internal.(Shutdownable); ok {
- shutdownable.Terminate()
- }
- log.Debug("WrappedQueue: %s Terminated", q.name)
-}
-
-// IsPaused will return if the pool or queue is paused
-func (q *WrappedQueue) IsPaused() bool {
- q.lock.Lock()
- defer q.lock.Unlock()
- pausable, ok := q.internal.(Pausable)
- return ok && pausable.IsPaused()
-}
-
-// Pause will pause the pool or queue
-func (q *WrappedQueue) Pause() {
- q.lock.Lock()
- defer q.lock.Unlock()
- if pausable, ok := q.internal.(Pausable); ok {
- pausable.Pause()
- }
-}
-
-// Resume will resume the pool or queue
-func (q *WrappedQueue) Resume() {
- q.lock.Lock()
- defer q.lock.Unlock()
- if pausable, ok := q.internal.(Pausable); ok {
- pausable.Resume()
- }
-}
-
-// IsPausedIsResumed will return a bool indicating if the pool or queue is paused and a channel that will be closed when it is resumed
-func (q *WrappedQueue) IsPausedIsResumed() (paused, resumed <-chan struct{}) {
- q.lock.Lock()
- defer q.lock.Unlock()
- if pausable, ok := q.internal.(Pausable); ok {
- return pausable.IsPausedIsResumed()
- }
- return context.Background().Done(), closedChan
-}
-
-var closedChan chan struct{}
-
-func init() {
- queuesMap[WrappedQueueType] = NewWrappedQueue
- closedChan = make(chan struct{})
- close(closedChan)
-}
diff --git a/modules/queue/setting.go b/modules/queue/setting.go
deleted file mode 100644
index 1e5259fcfb..0000000000
--- a/modules/queue/setting.go
+++ /dev/null
@@ -1,126 +0,0 @@
-// Copyright 2019 The Gitea Authors. All rights reserved.
-// SPDX-License-Identifier: MIT
-
-package queue
-
-import (
- "fmt"
- "strings"
-
- "code.gitea.io/gitea/modules/json"
- "code.gitea.io/gitea/modules/log"
- "code.gitea.io/gitea/modules/setting"
-)
-
-func validType(t string) (Type, error) {
- if len(t) == 0 {
- return PersistableChannelQueueType, nil
- }
- for _, typ := range RegisteredTypes() {
- if t == string(typ) {
- return typ, nil
- }
- }
- return PersistableChannelQueueType, fmt.Errorf("unknown queue type: %s defaulting to %s", t, string(PersistableChannelQueueType))
-}
-
-func getQueueSettings(name string) (setting.QueueSettings, []byte) {
- q := setting.GetQueueSettings(name)
- cfg, err := json.Marshal(q)
- if err != nil {
- log.Error("Unable to marshall generic options: %v Error: %v", q, err)
- log.Error("Unable to create queue for %s", name, err)
- return q, []byte{}
- }
- return q, cfg
-}
-
-// CreateQueue for name with provided handler and exemplar
-func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue {
- q, cfg := getQueueSettings(name)
- if len(cfg) == 0 {
- return nil
- }
-
- typ, err := validType(q.Type)
- if err != nil {
- log.Error("Invalid type %s provided for queue named %s defaulting to %s", q.Type, name, string(typ))
- }
-
- returnable, err := NewQueue(typ, handle, cfg, exemplar)
- if q.WrapIfNecessary && err != nil {
- log.Warn("Unable to create queue for %s: %v", name, err)
- log.Warn("Attempting to create wrapped queue")
- returnable, err = NewQueue(WrappedQueueType, handle, WrappedQueueConfiguration{
- Underlying: typ,
- Timeout: q.Timeout,
- MaxAttempts: q.MaxAttempts,
- Config: cfg,
- QueueLength: q.QueueLength,
- Name: name,
- }, exemplar)
- }
- if err != nil {
- log.Error("Unable to create queue for %s: %v", name, err)
- return nil
- }
-
- // Sanity check configuration
- if q.Workers == 0 && (q.BoostTimeout == 0 || q.BoostWorkers == 0 || q.MaxWorkers == 0) {
- log.Warn("Queue: %s is configured to be non-scaling and have no workers\n - this configuration is likely incorrect and could cause Gitea to block", q.Name)
- if pausable, ok := returnable.(Pausable); ok {
- log.Warn("Queue: %s is being paused to prevent data-loss, add workers manually and unpause.", q.Name)
- pausable.Pause()
- }
- }
-
- return returnable
-}
-
-// CreateUniqueQueue for name with provided handler and exemplar
-func CreateUniqueQueue(name string, handle HandlerFunc, exemplar interface{}) UniqueQueue {
- q, cfg := getQueueSettings(name)
- if len(cfg) == 0 {
- return nil
- }
-
- if len(q.Type) > 0 && q.Type != "dummy" && q.Type != "immediate" && !strings.HasPrefix(q.Type, "unique-") {
- q.Type = "unique-" + q.Type
- }
-
- typ, err := validType(q.Type)
- if err != nil || typ == PersistableChannelQueueType {
- typ = PersistableChannelUniqueQueueType
- if err != nil {
- log.Error("Invalid type %s provided for queue named %s defaulting to %s", q.Type, name, string(typ))
- }
- }
-
- returnable, err := NewQueue(typ, handle, cfg, exemplar)
- if q.WrapIfNecessary && err != nil {
- log.Warn("Unable to create unique queue for %s: %v", name, err)
- log.Warn("Attempting to create wrapped queue")
- returnable, err = NewQueue(WrappedUniqueQueueType, handle, WrappedUniqueQueueConfiguration{
- Underlying: typ,
- Timeout: q.Timeout,
- MaxAttempts: q.MaxAttempts,
- Config: cfg,
- QueueLength: q.QueueLength,
- }, exemplar)
- }
- if err != nil {
- log.Error("Unable to create unique queue for %s: %v", name, err)
- return nil
- }
-
- // Sanity check configuration
- if q.Workers == 0 && (q.BoostTimeout == 0 || q.BoostWorkers == 0 || q.MaxWorkers == 0) {
- log.Warn("Queue: %s is configured to be non-scaling and have no workers\n - this configuration is likely incorrect and could cause Gitea to block", q.Name)
- if pausable, ok := returnable.(Pausable); ok {
- log.Warn("Queue: %s is being paused to prevent data-loss, add workers manually and unpause.", q.Name)
- pausable.Pause()
- }
- }
-
- return returnable.(UniqueQueue)
-}
diff --git a/modules/queue/testhelper.go b/modules/queue/testhelper.go
new file mode 100644
index 0000000000..edfa438b1a
--- /dev/null
+++ b/modules/queue/testhelper.go
@@ -0,0 +1,40 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package queue
+
+import (
+ "fmt"
+ "sync"
+)
+
+// testStateRecorder is used to record state changes for testing, to help debug async behaviors
+type testStateRecorder struct {
+ records []string
+ mu sync.Mutex
+}
+
+var testRecorder = &testStateRecorder{}
+
+func (t *testStateRecorder) Record(format string, args ...any) {
+ t.mu.Lock()
+ t.records = append(t.records, fmt.Sprintf(format, args...))
+ if len(t.records) > 1000 {
+ t.records = t.records[len(t.records)-1000:]
+ }
+ t.mu.Unlock()
+}
+
+func (t *testStateRecorder) Records() []string {
+ t.mu.Lock()
+ r := make([]string, len(t.records))
+ copy(r, t.records)
+ t.mu.Unlock()
+ return r
+}
+
+func (t *testStateRecorder) Reset() {
+ t.mu.Lock()
+ t.records = nil
+ t.mu.Unlock()
+}
diff --git a/modules/queue/unique_queue.go b/modules/queue/unique_queue.go
deleted file mode 100644
index 8f8215c71d..0000000000
--- a/modules/queue/unique_queue.go
+++ /dev/null
@@ -1,28 +0,0 @@
-// Copyright 2020 The Gitea Authors. All rights reserved.
-// SPDX-License-Identifier: MIT
-
-package queue
-
-import (
- "fmt"
-)
-
-// UniqueQueue defines a queue which guarantees only one instance of same
-// data is in the queue. Instances with same identity will be
-// discarded if there is already one in the line.
-//
-// This queue is particularly useful for preventing duplicated task
-// of same purpose - please note that this does not guarantee that a particular
-// task cannot be processed twice or more at the same time. Uniqueness is
-// only guaranteed whilst the task is waiting in the queue.
-//
-// Users of this queue should be careful to push only the identifier of the
-// data
-type UniqueQueue interface {
- Queue
- PushFunc(Data, func() error) error
- Has(Data) (bool, error)
-}
-
-// ErrAlreadyInQueue is returned when trying to push data to the queue that is already in the queue
-var ErrAlreadyInQueue = fmt.Errorf("already in queue")
diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go
deleted file mode 100644
index 62c051aa39..0000000000
--- a/modules/queue/unique_queue_channel.go
+++ /dev/null
@@ -1,212 +0,0 @@
-// Copyright 2020 The Gitea Authors. All rights reserved.
-// SPDX-License-Identifier: MIT
-
-package queue
-
-import (
- "context"
- "fmt"
- "runtime/pprof"
- "sync"
- "time"
-
- "code.gitea.io/gitea/modules/container"
- "code.gitea.io/gitea/modules/json"
- "code.gitea.io/gitea/modules/log"
-)
-
-// ChannelUniqueQueueType is the type for channel queue
-const ChannelUniqueQueueType Type = "unique-channel"
-
-// ChannelUniqueQueueConfiguration is the configuration for a ChannelUniqueQueue
-type ChannelUniqueQueueConfiguration ChannelQueueConfiguration
-
-// ChannelUniqueQueue implements UniqueQueue
-//
-// It is basically a thin wrapper around a WorkerPool but keeps a store of
-// what has been pushed within a table.
-//
-// Please note that this Queue does not guarantee that a particular
-// task cannot be processed twice or more at the same time. Uniqueness is
-// only guaranteed whilst the task is waiting in the queue.
-type ChannelUniqueQueue struct {
- *WorkerPool
- lock sync.Mutex
- table container.Set[string]
- shutdownCtx context.Context
- shutdownCtxCancel context.CancelFunc
- terminateCtx context.Context
- terminateCtxCancel context.CancelFunc
- exemplar interface{}
- workers int
- name string
-}
-
-// NewChannelUniqueQueue create a memory channel queue
-func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
- configInterface, err := toConfig(ChannelUniqueQueueConfiguration{}, cfg)
- if err != nil {
- return nil, err
- }
- config := configInterface.(ChannelUniqueQueueConfiguration)
- if config.BatchLength == 0 {
- config.BatchLength = 1
- }
-
- terminateCtx, terminateCtxCancel := context.WithCancel(context.Background())
- shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
-
- queue := &ChannelUniqueQueue{
- table: make(container.Set[string]),
- shutdownCtx: shutdownCtx,
- shutdownCtxCancel: shutdownCtxCancel,
- terminateCtx: terminateCtx,
- terminateCtxCancel: terminateCtxCancel,
- exemplar: exemplar,
- workers: config.Workers,
- name: config.Name,
- }
- queue.WorkerPool = NewWorkerPool(func(data ...Data) (unhandled []Data) {
- for _, datum := range data {
- // No error is possible here because PushFunc ensures that this can be marshalled
- bs, _ := json.Marshal(datum)
-
- queue.lock.Lock()
- queue.table.Remove(string(bs))
- queue.lock.Unlock()
-
- if u := handle(datum); u != nil {
- if queue.IsPaused() {
- // We can only pushback to the channel if we're paused.
- go func() {
- if err := queue.Push(u[0]); err != nil {
- log.Error("Unable to push back to queue %d. Error: %v", queue.qid, err)
- }
- }()
- } else {
- unhandled = append(unhandled, u...)
- }
- }
- }
- return unhandled
- }, config.WorkerPoolConfiguration)
-
- queue.qid = GetManager().Add(queue, ChannelUniqueQueueType, config, exemplar)
- return queue, nil
-}
-
-// Run starts to run the queue
-func (q *ChannelUniqueQueue) Run(atShutdown, atTerminate func(func())) {
- pprof.SetGoroutineLabels(q.baseCtx)
- atShutdown(q.Shutdown)
- atTerminate(q.Terminate)
- log.Debug("ChannelUniqueQueue: %s Starting", q.name)
- _ = q.AddWorkers(q.workers, 0)
-}
-
-// Push will push data into the queue if the data is not already in the queue
-func (q *ChannelUniqueQueue) Push(data Data) error {
- return q.PushFunc(data, nil)
-}
-
-// PushFunc will push data into the queue
-func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
- if !assignableTo(data, q.exemplar) {
- return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
- }
-
- bs, err := json.Marshal(data)
- if err != nil {
- return err
- }
- q.lock.Lock()
- locked := true
- defer func() {
- if locked {
- q.lock.Unlock()
- }
- }()
- if !q.table.Add(string(bs)) {
- return ErrAlreadyInQueue
- }
- // FIXME: We probably need to implement some sort of limit here
- // If the downstream queue blocks this table will grow without limit
- if fn != nil {
- err := fn()
- if err != nil {
- q.table.Remove(string(bs))
- return err
- }
- }
- locked = false
- q.lock.Unlock()
- q.WorkerPool.Push(data)
- return nil
-}
-
-// Has checks if the data is in the queue
-func (q *ChannelUniqueQueue) Has(data Data) (bool, error) {
- bs, err := json.Marshal(data)
- if err != nil {
- return false, err
- }
-
- q.lock.Lock()
- defer q.lock.Unlock()
- return q.table.Contains(string(bs)), nil
-}
-
-// Flush flushes the channel with a timeout - the Flush worker will be registered as a flush worker with the manager
-func (q *ChannelUniqueQueue) Flush(timeout time.Duration) error {
- if q.IsPaused() {
- return nil
- }
- ctx, cancel := q.commonRegisterWorkers(1, timeout, true)
- defer cancel()
- return q.FlushWithContext(ctx)
-}
-
-// Shutdown processing from this queue
-func (q *ChannelUniqueQueue) Shutdown() {
- log.Trace("ChannelUniqueQueue: %s Shutting down", q.name)
- select {
- case <-q.shutdownCtx.Done():
- return
- default:
- }
- go func() {
- log.Trace("ChannelUniqueQueue: %s Flushing", q.name)
- if err := q.FlushWithContext(q.terminateCtx); err != nil {
- if !q.IsEmpty() {
- log.Warn("ChannelUniqueQueue: %s Terminated before completed flushing", q.name)
- }
- return
- }
- log.Debug("ChannelUniqueQueue: %s Flushed", q.name)
- }()
- q.shutdownCtxCancel()
- log.Debug("ChannelUniqueQueue: %s Shutdown", q.name)
-}
-
-// Terminate this queue and close the queue
-func (q *ChannelUniqueQueue) Terminate() {
- log.Trace("ChannelUniqueQueue: %s Terminating", q.name)
- q.Shutdown()
- select {
- case <-q.terminateCtx.Done():
- return
- default:
- }
- q.terminateCtxCancel()
- q.baseCtxFinished()
- log.Debug("ChannelUniqueQueue: %s Terminated", q.name)
-}
-
-// Name returns the name of this queue
-func (q *ChannelUniqueQueue) Name() string {
- return q.name
-}
-
-func init() {
- queuesMap[ChannelUniqueQueueType] = NewChannelUniqueQueue
-}
diff --git a/modules/queue/unique_queue_channel_test.go b/modules/queue/unique_queue_channel_test.go
deleted file mode 100644
index 824015b834..0000000000
--- a/modules/queue/unique_queue_channel_test.go
+++ /dev/null
@@ -1,258 +0,0 @@
-// Copyright 2019 The Gitea Authors. All rights reserved.
-// SPDX-License-Identifier: MIT
-
-package queue
-
-import (
- "sync"
- "testing"
- "time"
-
- "code.gitea.io/gitea/modules/log"
-
- "github.com/stretchr/testify/assert"
-)
-
-func TestChannelUniqueQueue(t *testing.T) {
- _ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`)
- handleChan := make(chan *testData)
- handle := func(data ...Data) []Data {
- for _, datum := range data {
- testDatum := datum.(*testData)
- handleChan <- testDatum
- }
- return nil
- }
-
- nilFn := func(_ func()) {}
-
- queue, err := NewChannelUniqueQueue(handle,
- ChannelQueueConfiguration{
- WorkerPoolConfiguration: WorkerPoolConfiguration{
- QueueLength: 0,
- MaxWorkers: 10,
- BlockTimeout: 1 * time.Second,
- BoostTimeout: 5 * time.Minute,
- BoostWorkers: 5,
- Name: "TestChannelQueue",
- },
- Workers: 0,
- }, &testData{})
- assert.NoError(t, err)
-
- assert.Equal(t, queue.(*ChannelUniqueQueue).WorkerPool.boostWorkers, 5)
-
- go queue.Run(nilFn, nilFn)
-
- test1 := testData{"A", 1}
- go queue.Push(&test1)
- result1 := <-handleChan
- assert.Equal(t, test1.TestString, result1.TestString)
- assert.Equal(t, test1.TestInt, result1.TestInt)
-
- err = queue.Push(test1)
- assert.Error(t, err)
-}
-
-func TestChannelUniqueQueue_Batch(t *testing.T) {
- _ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`)
-
- handleChan := make(chan *testData)
- handle := func(data ...Data) []Data {
- for _, datum := range data {
- testDatum := datum.(*testData)
- handleChan <- testDatum
- }
- return nil
- }
-
- nilFn := func(_ func()) {}
-
- queue, err := NewChannelUniqueQueue(handle,
- ChannelQueueConfiguration{
- WorkerPoolConfiguration: WorkerPoolConfiguration{
- QueueLength: 20,
- BatchLength: 2,
- BlockTimeout: 0,
- BoostTimeout: 0,
- BoostWorkers: 0,
- MaxWorkers: 10,
- },
- Workers: 1,
- }, &testData{})
- assert.NoError(t, err)
-
- go queue.Run(nilFn, nilFn)
-
- test1 := testData{"A", 1}
- test2 := testData{"B", 2}
-
- queue.Push(&test1)
- go queue.Push(&test2)
-
- result1 := <-handleChan
- assert.Equal(t, test1.TestString, result1.TestString)
- assert.Equal(t, test1.TestInt, result1.TestInt)
-
- result2 := <-handleChan
- assert.Equal(t, test2.TestString, result2.TestString)
- assert.Equal(t, test2.TestInt, result2.TestInt)
-
- err = queue.Push(test1)
- assert.Error(t, err)
-}
-
-func TestChannelUniqueQueue_Pause(t *testing.T) {
- _ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`)
-
- lock := sync.Mutex{}
- var queue Queue
- var err error
- pushBack := false
- handleChan := make(chan *testData)
- handle := func(data ...Data) []Data {
- lock.Lock()
- if pushBack {
- if pausable, ok := queue.(Pausable); ok {
- pausable.Pause()
- }
- pushBack = false
- lock.Unlock()
- return data
- }
- lock.Unlock()
-
- for _, datum := range data {
- testDatum := datum.(*testData)
- handleChan <- testDatum
- }
- return nil
- }
- nilFn := func(_ func()) {}
-
- queue, err = NewChannelUniqueQueue(handle,
- ChannelQueueConfiguration{
- WorkerPoolConfiguration: WorkerPoolConfiguration{
- QueueLength: 20,
- BatchLength: 1,
- BlockTimeout: 0,
- BoostTimeout: 0,
- BoostWorkers: 0,
- MaxWorkers: 10,
- },
- Workers: 1,
- }, &testData{})
- assert.NoError(t, err)
-
- go queue.Run(nilFn, nilFn)
-
- test1 := testData{"A", 1}
- test2 := testData{"B", 2}
- queue.Push(&test1)
-
- pausable, ok := queue.(Pausable)
- if !assert.True(t, ok) {
- return
- }
- result1 := <-handleChan
- assert.Equal(t, test1.TestString, result1.TestString)
- assert.Equal(t, test1.TestInt, result1.TestInt)
-
- pausable.Pause()
-
- paused, resumed := pausable.IsPausedIsResumed()
-
- select {
- case <-paused:
- case <-resumed:
- assert.Fail(t, "Queue should not be resumed")
- return
- default:
- assert.Fail(t, "Queue is not paused")
- return
- }
-
- queue.Push(&test2)
-
- var result2 *testData
- select {
- case result2 = <-handleChan:
- assert.Fail(t, "handler chan should be empty")
- case <-time.After(100 * time.Millisecond):
- }
-
- assert.Nil(t, result2)
-
- pausable.Resume()
-
- select {
- case <-resumed:
- default:
- assert.Fail(t, "Queue should be resumed")
- }
-
- select {
- case result2 = <-handleChan:
- case <-time.After(500 * time.Millisecond):
- assert.Fail(t, "handler chan should contain test2")
- }
-
- assert.Equal(t, test2.TestString, result2.TestString)
- assert.Equal(t, test2.TestInt, result2.TestInt)
-
- lock.Lock()
- pushBack = true
- lock.Unlock()
-
- paused, resumed = pausable.IsPausedIsResumed()
-
- select {
- case <-paused:
- assert.Fail(t, "Queue should not be paused")
- return
- case <-resumed:
- default:
- assert.Fail(t, "Queue is not resumed")
- return
- }
-
- queue.Push(&test1)
-
- select {
- case <-paused:
- case <-handleChan:
- assert.Fail(t, "handler chan should not contain test1")
- return
- case <-time.After(500 * time.Millisecond):
- assert.Fail(t, "queue should be paused")
- return
- }
-
- paused, resumed = pausable.IsPausedIsResumed()
-
- select {
- case <-paused:
- case <-resumed:
- assert.Fail(t, "Queue should not be resumed")
- return
- default:
- assert.Fail(t, "Queue is not paused")
- return
- }
-
- pausable.Resume()
-
- select {
- case <-resumed:
- default:
- assert.Fail(t, "Queue should be resumed")
- }
-
- select {
- case result1 = <-handleChan:
- case <-time.After(500 * time.Millisecond):
- assert.Fail(t, "handler chan should contain test1")
- }
- assert.Equal(t, test1.TestString, result1.TestString)
- assert.Equal(t, test1.TestInt, result1.TestInt)
-}
diff --git a/modules/queue/unique_queue_disk.go b/modules/queue/unique_queue_disk.go
deleted file mode 100644
index 406f64784c..0000000000
--- a/modules/queue/unique_queue_disk.go
+++ /dev/null
@@ -1,128 +0,0 @@
-// Copyright 2019 The Gitea Authors. All rights reserved.
-// SPDX-License-Identifier: MIT
-
-package queue
-
-import (
- "context"
-
- "code.gitea.io/gitea/modules/nosql"
-
- "gitea.com/lunny/levelqueue"
-)
-
-// LevelUniqueQueueType is the type for level queue
-const LevelUniqueQueueType Type = "unique-level"
-
-// LevelUniqueQueueConfiguration is the configuration for a LevelUniqueQueue
-type LevelUniqueQueueConfiguration struct {
- ByteFIFOQueueConfiguration
- DataDir string
- ConnectionString string
- QueueName string
-}
-
-// LevelUniqueQueue implements a disk library queue
-type LevelUniqueQueue struct {
- *ByteFIFOUniqueQueue
-}
-
-// NewLevelUniqueQueue creates a ledis local queue
-//
-// Please note that this Queue does not guarantee that a particular
-// task cannot be processed twice or more at the same time. Uniqueness is
-// only guaranteed whilst the task is waiting in the queue.
-func NewLevelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
- configInterface, err := toConfig(LevelUniqueQueueConfiguration{}, cfg)
- if err != nil {
- return nil, err
- }
- config := configInterface.(LevelUniqueQueueConfiguration)
-
- if len(config.ConnectionString) == 0 {
- config.ConnectionString = config.DataDir
- }
- config.WaitOnEmpty = true
-
- byteFIFO, err := NewLevelUniqueQueueByteFIFO(config.ConnectionString, config.QueueName)
- if err != nil {
- return nil, err
- }
-
- byteFIFOQueue, err := NewByteFIFOUniqueQueue(LevelUniqueQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar)
- if err != nil {
- return nil, err
- }
-
- queue := &LevelUniqueQueue{
- ByteFIFOUniqueQueue: byteFIFOQueue,
- }
- queue.qid = GetManager().Add(queue, LevelUniqueQueueType, config, exemplar)
- return queue, nil
-}
-
-var _ UniqueByteFIFO = &LevelUniqueQueueByteFIFO{}
-
-// LevelUniqueQueueByteFIFO represents a ByteFIFO formed from a LevelUniqueQueue
-type LevelUniqueQueueByteFIFO struct {
- internal *levelqueue.UniqueQueue
- connection string
-}
-
-// NewLevelUniqueQueueByteFIFO creates a new ByteFIFO formed from a LevelUniqueQueue
-func NewLevelUniqueQueueByteFIFO(connection, prefix string) (*LevelUniqueQueueByteFIFO, error) {
- db, err := nosql.GetManager().GetLevelDB(connection)
- if err != nil {
- return nil, err
- }
-
- internal, err := levelqueue.NewUniqueQueue(db, []byte(prefix), []byte(prefix+"-unique"), false)
- if err != nil {
- return nil, err
- }
-
- return &LevelUniqueQueueByteFIFO{
- connection: connection,
- internal: internal,
- }, nil
-}
-
-// PushFunc pushes data to the end of the fifo and calls the callback if it is added
-func (fifo *LevelUniqueQueueByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error {
- return fifo.internal.LPushFunc(data, fn)
-}
-
-// PushBack pushes data to the top of the fifo
-func (fifo *LevelUniqueQueueByteFIFO) PushBack(ctx context.Context, data []byte) error {
- return fifo.internal.RPush(data)
-}
-
-// Pop pops data from the start of the fifo
-func (fifo *LevelUniqueQueueByteFIFO) Pop(ctx context.Context) ([]byte, error) {
- data, err := fifo.internal.RPop()
- if err != nil && err != levelqueue.ErrNotFound {
- return nil, err
- }
- return data, nil
-}
-
-// Len returns the length of the fifo
-func (fifo *LevelUniqueQueueByteFIFO) Len(ctx context.Context) int64 {
- return fifo.internal.Len()
-}
-
-// Has returns whether the fifo contains this data
-func (fifo *LevelUniqueQueueByteFIFO) Has(ctx context.Context, data []byte) (bool, error) {
- return fifo.internal.Has(data)
-}
-
-// Close this fifo
-func (fifo *LevelUniqueQueueByteFIFO) Close() error {
- err := fifo.internal.Close()
- _ = nosql.GetManager().CloseLevelDB(fifo.connection)
- return err
-}
-
-func init() {
- queuesMap[LevelUniqueQueueType] = NewLevelUniqueQueue
-}
diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go
deleted file mode 100644
index cc8a807c67..0000000000
--- a/modules/queue/unique_queue_disk_channel.go
+++ /dev/null
@@ -1,336 +0,0 @@
-// Copyright 2020 The Gitea Authors. All rights reserved.
-// SPDX-License-Identifier: MIT
-
-package queue
-
-import (
- "context"
- "runtime/pprof"
- "sync"
- "time"
-
- "code.gitea.io/gitea/modules/log"
-)
-
-// PersistableChannelUniqueQueueType is the type for persistable queue
-const PersistableChannelUniqueQueueType Type = "unique-persistable-channel"
-
-// PersistableChannelUniqueQueueConfiguration is the configuration for a PersistableChannelUniqueQueue
-type PersistableChannelUniqueQueueConfiguration struct {
- Name string
- DataDir string
- BatchLength int
- QueueLength int
- Timeout time.Duration
- MaxAttempts int
- Workers int
- MaxWorkers int
- BlockTimeout time.Duration
- BoostTimeout time.Duration
- BoostWorkers int
-}
-
-// PersistableChannelUniqueQueue wraps a channel queue and level queue together
-//
-// Please note that this Queue does not guarantee that a particular
-// task cannot be processed twice or more at the same time. Uniqueness is
-// only guaranteed whilst the task is waiting in the queue.
-type PersistableChannelUniqueQueue struct {
- channelQueue *ChannelUniqueQueue
- delayedStarter
- lock sync.Mutex
- closed chan struct{}
-}
-
-// NewPersistableChannelUniqueQueue creates a wrapped batched channel queue with persistable level queue backend when shutting down
-// This differs from a wrapped queue in that the persistent queue is only used to persist at shutdown/terminate
-func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
- configInterface, err := toConfig(PersistableChannelUniqueQueueConfiguration{}, cfg)
- if err != nil {
- return nil, err
- }
- config := configInterface.(PersistableChannelUniqueQueueConfiguration)
-
- queue := &PersistableChannelUniqueQueue{
- closed: make(chan struct{}),
- }
-
- wrappedHandle := func(data ...Data) (failed []Data) {
- for _, unhandled := range handle(data...) {
- if fail := queue.PushBack(unhandled); fail != nil {
- failed = append(failed, fail)
- }
- }
- return failed
- }
-
- channelUniqueQueue, err := NewChannelUniqueQueue(wrappedHandle, ChannelUniqueQueueConfiguration{
- WorkerPoolConfiguration: WorkerPoolConfiguration{
- QueueLength: config.QueueLength,
- BatchLength: config.BatchLength,
- BlockTimeout: config.BlockTimeout,
- BoostTimeout: config.BoostTimeout,
- BoostWorkers: config.BoostWorkers,
- MaxWorkers: config.MaxWorkers,
- Name: config.Name + "-channel",
- },
- Workers: config.Workers,
- }, exemplar)
- if err != nil {
- return nil, err
- }
-
- // the level backend only needs temporary workers to catch up with the previously dropped work
- levelCfg := LevelUniqueQueueConfiguration{
- ByteFIFOQueueConfiguration: ByteFIFOQueueConfiguration{
- WorkerPoolConfiguration: WorkerPoolConfiguration{
- QueueLength: config.QueueLength,
- BatchLength: config.BatchLength,
- BlockTimeout: 1 * time.Second,
- BoostTimeout: 5 * time.Minute,
- BoostWorkers: 1,
- MaxWorkers: 5,
- Name: config.Name + "-level",
- },
- Workers: 0,
- },
- DataDir: config.DataDir,
- QueueName: config.Name + "-level",
- }
-
- queue.channelQueue = channelUniqueQueue.(*ChannelUniqueQueue)
-
- levelQueue, err := NewLevelUniqueQueue(func(data ...Data) []Data {
- for _, datum := range data {
- err := queue.Push(datum)
- if err != nil && err != ErrAlreadyInQueue {
- log.Error("Unable push to channelled queue: %v", err)
- }
- }
- return nil
- }, levelCfg, exemplar)
- if err == nil {
- queue.delayedStarter = delayedStarter{
- internal: levelQueue.(*LevelUniqueQueue),
- name: config.Name,
- }
-
- _ = GetManager().Add(queue, PersistableChannelUniqueQueueType, config, exemplar)
- return queue, nil
- }
- if IsErrInvalidConfiguration(err) {
- // Retrying ain't gonna make this any better...
- return nil, ErrInvalidConfiguration{cfg: cfg}
- }
-
- queue.delayedStarter = delayedStarter{
- cfg: levelCfg,
- underlying: LevelUniqueQueueType,
- timeout: config.Timeout,
- maxAttempts: config.MaxAttempts,
- name: config.Name,
- }
- _ = GetManager().Add(queue, PersistableChannelUniqueQueueType, config, exemplar)
- return queue, nil
-}
-
-// Name returns the name of this queue
-func (q *PersistableChannelUniqueQueue) Name() string {
- return q.delayedStarter.name
-}
-
-// Push will push the indexer data to queue
-func (q *PersistableChannelUniqueQueue) Push(data Data) error {
- return q.PushFunc(data, nil)
-}
-
-// PushFunc will push the indexer data to queue
-func (q *PersistableChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
- select {
- case <-q.closed:
- return q.internal.(UniqueQueue).PushFunc(data, fn)
- default:
- return q.channelQueue.PushFunc(data, fn)
- }
-}
-
-// PushBack will push the indexer data to queue
-func (q *PersistableChannelUniqueQueue) PushBack(data Data) error {
- select {
- case <-q.closed:
- if pbr, ok := q.internal.(PushBackable); ok {
- return pbr.PushBack(data)
- }
- return q.internal.Push(data)
- default:
- return q.channelQueue.Push(data)
- }
-}
-
-// Has will test if the queue has the data
-func (q *PersistableChannelUniqueQueue) Has(data Data) (bool, error) {
- // This is more difficult...
- has, err := q.channelQueue.Has(data)
- if err != nil || has {
- return has, err
- }
- q.lock.Lock()
- defer q.lock.Unlock()
- if q.internal == nil {
- return false, nil
- }
- return q.internal.(UniqueQueue).Has(data)
-}
-
-// Run starts to run the queue
-func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func())) {
- pprof.SetGoroutineLabels(q.channelQueue.baseCtx)
- log.Debug("PersistableChannelUniqueQueue: %s Starting", q.delayedStarter.name)
-
- q.lock.Lock()
- if q.internal == nil {
- err := q.setInternal(atShutdown, func(data ...Data) []Data {
- for _, datum := range data {
- err := q.Push(datum)
- if err != nil && err != ErrAlreadyInQueue {
- log.Error("Unable push to channelled queue: %v", err)
- }
- }
- return nil
- }, q.channelQueue.exemplar)
- q.lock.Unlock()
- if err != nil {
- log.Fatal("Unable to create internal queue for %s Error: %v", q.Name(), err)
- return
- }
- } else {
- q.lock.Unlock()
- }
- atShutdown(q.Shutdown)
- atTerminate(q.Terminate)
- _ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0)
-
- if luq, ok := q.internal.(*LevelUniqueQueue); ok && !luq.IsEmpty() {
- // Just run the level queue - we shut it down once it's flushed
- go luq.Run(func(_ func()) {}, func(_ func()) {})
- go func() {
- _ = luq.Flush(0)
- for !luq.IsEmpty() {
- _ = luq.Flush(0)
- select {
- case <-time.After(100 * time.Millisecond):
- case <-luq.shutdownCtx.Done():
- if luq.byteFIFO.Len(luq.terminateCtx) > 0 {
- log.Warn("LevelUniqueQueue: %s shut down before completely flushed", luq.Name())
- }
- return
- }
- }
- log.Debug("LevelUniqueQueue: %s flushed so shutting down", luq.Name())
- luq.Shutdown()
- GetManager().Remove(luq.qid)
- }()
- } else {
- log.Debug("PersistableChannelUniqueQueue: %s Skipping running the empty level queue", q.delayedStarter.name)
- _ = q.internal.Flush(0)
- q.internal.(*LevelUniqueQueue).Shutdown()
- GetManager().Remove(q.internal.(*LevelUniqueQueue).qid)
- }
-}
-
-// Flush flushes the queue
-func (q *PersistableChannelUniqueQueue) Flush(timeout time.Duration) error {
- return q.channelQueue.Flush(timeout)
-}
-
-// FlushWithContext flushes the queue
-func (q *PersistableChannelUniqueQueue) FlushWithContext(ctx context.Context) error {
- return q.channelQueue.FlushWithContext(ctx)
-}
-
-// IsEmpty checks if a queue is empty
-func (q *PersistableChannelUniqueQueue) IsEmpty() bool {
- return q.channelQueue.IsEmpty()
-}
-
-// IsPaused will return if the pool or queue is paused
-func (q *PersistableChannelUniqueQueue) IsPaused() bool {
- return q.channelQueue.IsPaused()
-}
-
-// Pause will pause the pool or queue
-func (q *PersistableChannelUniqueQueue) Pause() {
- q.channelQueue.Pause()
-}
-
-// Resume will resume the pool or queue
-func (q *PersistableChannelUniqueQueue) Resume() {
- q.channelQueue.Resume()
-}
-
-// IsPausedIsResumed will return a bool indicating if the pool or queue is paused and a channel that will be closed when it is resumed
-func (q *PersistableChannelUniqueQueue) IsPausedIsResumed() (paused, resumed <-chan struct{}) {
- return q.channelQueue.IsPausedIsResumed()
-}
-
-// Shutdown processing this queue
-func (q *PersistableChannelUniqueQueue) Shutdown() {
- log.Trace("PersistableChannelUniqueQueue: %s Shutting down", q.delayedStarter.name)
- q.lock.Lock()
- select {
- case <-q.closed:
- q.lock.Unlock()
- return
- default:
- if q.internal != nil {
- q.internal.(*LevelUniqueQueue).Shutdown()
- }
- close(q.closed)
- q.lock.Unlock()
- }
-
- log.Trace("PersistableChannelUniqueQueue: %s Cancelling pools", q.delayedStarter.name)
- q.internal.(*LevelUniqueQueue).baseCtxCancel()
- q.channelQueue.baseCtxCancel()
- log.Trace("PersistableChannelUniqueQueue: %s Waiting til done", q.delayedStarter.name)
- q.channelQueue.Wait()
- q.internal.(*LevelUniqueQueue).Wait()
- // Redirect all remaining data in the chan to the internal channel
- close(q.channelQueue.dataChan)
- log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name)
- countOK, countLost := 0, 0
- for data := range q.channelQueue.dataChan {
- err := q.internal.(*LevelUniqueQueue).Push(data)
- if err != nil {
- log.Error("PersistableChannelUniqueQueue: %s Unable redirect %v due to: %v", q.delayedStarter.name, data, err)
- countLost++
- } else {
- countOK++
- }
- }
- if countLost > 0 {
- log.Warn("PersistableChannelUniqueQueue: %s %d will be restored on restart, %d lost", q.delayedStarter.name, countOK, countLost)
- } else if countOK > 0 {
- log.Warn("PersistableChannelUniqueQueue: %s %d will be restored on restart", q.delayedStarter.name, countOK)
- }
- log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
-
- log.Debug("PersistableChannelUniqueQueue: %s Shutdown", q.delayedStarter.name)
-}
-
-// Terminate this queue and close the queue
-func (q *PersistableChannelUniqueQueue) Terminate() {
- log.Trace("PersistableChannelUniqueQueue: %s Terminating", q.delayedStarter.name)
- q.Shutdown()
- q.lock.Lock()
- defer q.lock.Unlock()
- if q.internal != nil {
- q.internal.(*LevelUniqueQueue).Terminate()
- }
- q.channelQueue.baseCtxFinished()
- log.Debug("PersistableChannelUniqueQueue: %s Terminated", q.delayedStarter.name)
-}
-
-func init() {
- queuesMap[PersistableChannelUniqueQueueType] = NewPersistableChannelUniqueQueue
-}
diff --git a/modules/queue/unique_queue_disk_channel_test.go b/modules/queue/unique_queue_disk_channel_test.go
deleted file mode 100644
index 11a1d4b88d..0000000000
--- a/modules/queue/unique_queue_disk_channel_test.go
+++ /dev/null
@@ -1,265 +0,0 @@
-// Copyright 2023 The Gitea Authors. All rights reserved.
-// SPDX-License-Identifier: MIT
-
-package queue
-
-import (
- "strconv"
- "sync"
- "sync/atomic"
- "testing"
- "time"
-
- "code.gitea.io/gitea/modules/log"
-
- "github.com/stretchr/testify/assert"
-)
-
-func TestPersistableChannelUniqueQueue(t *testing.T) {
- // Create a temporary directory for the queue
- tmpDir := t.TempDir()
- _ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`)
-
- // Common function to create the Queue
- newQueue := func(name string, handle func(data ...Data) []Data) Queue {
- q, err := NewPersistableChannelUniqueQueue(handle,
- PersistableChannelUniqueQueueConfiguration{
- Name: name,
- DataDir: tmpDir,
- QueueLength: 200,
- MaxWorkers: 1,
- BlockTimeout: 1 * time.Second,
- BoostTimeout: 5 * time.Minute,
- BoostWorkers: 1,
- Workers: 0,
- }, "task-0")
- assert.NoError(t, err)
- return q
- }
-
- // runs the provided queue and provides some timer function
- type channels struct {
- readyForShutdown chan struct{} // closed when shutdown functions have been assigned
- readyForTerminate chan struct{} // closed when terminate functions have been assigned
- signalShutdown chan struct{} // Should close to signal shutdown
- doneShutdown chan struct{} // closed when shutdown function is done
- queueTerminate []func() // list of atTerminate functions to call atTerminate - need to be accessed with lock
- }
- runQueue := func(q Queue, lock *sync.Mutex) *channels {
- chans := &channels{
- readyForShutdown: make(chan struct{}),
- readyForTerminate: make(chan struct{}),
- signalShutdown: make(chan struct{}),
- doneShutdown: make(chan struct{}),
- }
- go q.Run(func(atShutdown func()) {
- go func() {
- lock.Lock()
- select {
- case <-chans.readyForShutdown:
- default:
- close(chans.readyForShutdown)
- }
- lock.Unlock()
- <-chans.signalShutdown
- atShutdown()
- close(chans.doneShutdown)
- }()
- }, func(atTerminate func()) {
- lock.Lock()
- defer lock.Unlock()
- select {
- case <-chans.readyForTerminate:
- default:
- close(chans.readyForTerminate)
- }
- chans.queueTerminate = append(chans.queueTerminate, atTerminate)
- })
-
- return chans
- }
-
- // call to shutdown and terminate the queue associated with the channels
- doTerminate := func(chans *channels, lock *sync.Mutex) {
- <-chans.readyForTerminate
-
- lock.Lock()
- callbacks := []func(){}
- callbacks = append(callbacks, chans.queueTerminate...)
- lock.Unlock()
-
- for _, callback := range callbacks {
- callback()
- }
- }
-
- mapLock := sync.Mutex{}
- executedInitial := map[string][]string{}
- hasInitial := map[string][]string{}
-
- fillQueue := func(name string, done chan int64) {
- t.Run("Initial Filling: "+name, func(t *testing.T) {
- lock := sync.Mutex{}
-
- startAt100Queued := make(chan struct{})
- stopAt20Shutdown := make(chan struct{}) // stop and shutdown at the 20th item
-
- handle := func(data ...Data) []Data {
- <-startAt100Queued
- for _, datum := range data {
- s := datum.(string)
- mapLock.Lock()
- executedInitial[name] = append(executedInitial[name], s)
- mapLock.Unlock()
- if s == "task-20" {
- close(stopAt20Shutdown)
- }
- }
- return nil
- }
-
- q := newQueue(name, handle)
-
- // add 100 tasks to the queue
- for i := 0; i < 100; i++ {
- _ = q.Push("task-" + strconv.Itoa(i))
- }
- close(startAt100Queued)
-
- chans := runQueue(q, &lock)
-
- <-chans.readyForShutdown
- <-stopAt20Shutdown
- close(chans.signalShutdown)
- <-chans.doneShutdown
- _ = q.Push("final")
-
- // check which tasks are still in the queue
- for i := 0; i < 100; i++ {
- if has, _ := q.(UniqueQueue).Has("task-" + strconv.Itoa(i)); has {
- mapLock.Lock()
- hasInitial[name] = append(hasInitial[name], "task-"+strconv.Itoa(i))
- mapLock.Unlock()
- }
- }
- if has, _ := q.(UniqueQueue).Has("final"); has {
- mapLock.Lock()
- hasInitial[name] = append(hasInitial[name], "final")
- mapLock.Unlock()
- } else {
- assert.Fail(t, "UnqueQueue %s should have \"final\"", name)
- }
- doTerminate(chans, &lock)
- mapLock.Lock()
- assert.Equal(t, 101, len(executedInitial[name])+len(hasInitial[name]))
- mapLock.Unlock()
- })
- mapLock.Lock()
- count := int64(len(hasInitial[name]))
- mapLock.Unlock()
- done <- count
- close(done)
- }
-
- hasQueueAChan := make(chan int64)
- hasQueueBChan := make(chan int64)
-
- go fillQueue("QueueA", hasQueueAChan)
- go fillQueue("QueueB", hasQueueBChan)
-
- hasA := <-hasQueueAChan
- hasB := <-hasQueueBChan
-
- executedEmpty := map[string][]string{}
- hasEmpty := map[string][]string{}
- emptyQueue := func(name string, numInQueue int64, done chan struct{}) {
- t.Run("Empty Queue: "+name, func(t *testing.T) {
- lock := sync.Mutex{}
- stop := make(chan struct{})
-
- // collect the tasks that have been executed
- atomicCount := int64(0)
- handle := func(data ...Data) []Data {
- lock.Lock()
- for _, datum := range data {
- mapLock.Lock()
- executedEmpty[name] = append(executedEmpty[name], datum.(string))
- mapLock.Unlock()
- count := atomic.AddInt64(&atomicCount, 1)
- if count >= numInQueue {
- close(stop)
- }
- }
- lock.Unlock()
- return nil
- }
-
- q := newQueue(name, handle)
- chans := runQueue(q, &lock)
-
- <-chans.readyForShutdown
- <-stop
- close(chans.signalShutdown)
- <-chans.doneShutdown
-
- // check which tasks are still in the queue
- for i := 0; i < 100; i++ {
- if has, _ := q.(UniqueQueue).Has("task-" + strconv.Itoa(i)); has {
- mapLock.Lock()
- hasEmpty[name] = append(hasEmpty[name], "task-"+strconv.Itoa(i))
- mapLock.Unlock()
- }
- }
- doTerminate(chans, &lock)
-
- mapLock.Lock()
- assert.Equal(t, 101, len(executedInitial[name])+len(executedEmpty[name]))
- assert.Empty(t, hasEmpty[name])
- mapLock.Unlock()
- })
- close(done)
- }
-
- doneA := make(chan struct{})
- doneB := make(chan struct{})
-
- go emptyQueue("QueueA", hasA, doneA)
- go emptyQueue("QueueB", hasB, doneB)
-
- <-doneA
- <-doneB
-
- mapLock.Lock()
- t.Logf("TestPersistableChannelUniqueQueue executedInitiallyA=%v, executedInitiallyB=%v, executedToEmptyA=%v, executedToEmptyB=%v",
- len(executedInitial["QueueA"]), len(executedInitial["QueueB"]), len(executedEmpty["QueueA"]), len(executedEmpty["QueueB"]))
-
- // reset and rerun
- executedInitial = map[string][]string{}
- hasInitial = map[string][]string{}
- executedEmpty = map[string][]string{}
- hasEmpty = map[string][]string{}
- mapLock.Unlock()
-
- hasQueueAChan = make(chan int64)
- hasQueueBChan = make(chan int64)
-
- go fillQueue("QueueA", hasQueueAChan)
- go fillQueue("QueueB", hasQueueBChan)
-
- hasA = <-hasQueueAChan
- hasB = <-hasQueueBChan
-
- doneA = make(chan struct{})
- doneB = make(chan struct{})
-
- go emptyQueue("QueueA", hasA, doneA)
- go emptyQueue("QueueB", hasB, doneB)
-
- <-doneA
- <-doneB
-
- mapLock.Lock()
- t.Logf("TestPersistableChannelUniqueQueue executedInitiallyA=%v, executedInitiallyB=%v, executedToEmptyA=%v, executedToEmptyB=%v",
- len(executedInitial["QueueA"]), len(executedInitial["QueueB"]), len(executedEmpty["QueueA"]), len(executedEmpty["QueueB"]))
- mapLock.Unlock()
-}
diff --git a/modules/queue/unique_queue_redis.go b/modules/queue/unique_queue_redis.go
deleted file mode 100644
index ae1df08ebd..0000000000
--- a/modules/queue/unique_queue_redis.go
+++ /dev/null
@@ -1,141 +0,0 @@
-// Copyright 2019 The Gitea Authors. All rights reserved.
-// SPDX-License-Identifier: MIT
-
-package queue
-
-import (
- "context"
-
- "github.com/redis/go-redis/v9"
-)
-
-// RedisUniqueQueueType is the type for redis queue
-const RedisUniqueQueueType Type = "unique-redis"
-
-// RedisUniqueQueue redis queue
-type RedisUniqueQueue struct {
- *ByteFIFOUniqueQueue
-}
-
-// RedisUniqueQueueConfiguration is the configuration for the redis queue
-type RedisUniqueQueueConfiguration struct {
- ByteFIFOQueueConfiguration
- RedisUniqueByteFIFOConfiguration
-}
-
-// NewRedisUniqueQueue creates single redis or cluster redis queue.
-//
-// Please note that this Queue does not guarantee that a particular
-// task cannot be processed twice or more at the same time. Uniqueness is
-// only guaranteed whilst the task is waiting in the queue.
-func NewRedisUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
- configInterface, err := toConfig(RedisUniqueQueueConfiguration{}, cfg)
- if err != nil {
- return nil, err
- }
- config := configInterface.(RedisUniqueQueueConfiguration)
-
- byteFIFO, err := NewRedisUniqueByteFIFO(config.RedisUniqueByteFIFOConfiguration)
- if err != nil {
- return nil, err
- }
-
- if len(byteFIFO.setName) == 0 {
- byteFIFO.setName = byteFIFO.queueName + "_unique"
- }
-
- byteFIFOQueue, err := NewByteFIFOUniqueQueue(RedisUniqueQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar)
- if err != nil {
- return nil, err
- }
-
- queue := &RedisUniqueQueue{
- ByteFIFOUniqueQueue: byteFIFOQueue,
- }
-
- queue.qid = GetManager().Add(queue, RedisUniqueQueueType, config, exemplar)
-
- return queue, nil
-}
-
-var _ UniqueByteFIFO = &RedisUniqueByteFIFO{}
-
-// RedisUniqueByteFIFO represents a UniqueByteFIFO formed from a redisClient
-type RedisUniqueByteFIFO struct {
- RedisByteFIFO
- setName string
-}
-
-// RedisUniqueByteFIFOConfiguration is the configuration for the RedisUniqueByteFIFO
-type RedisUniqueByteFIFOConfiguration struct {
- RedisByteFIFOConfiguration
- SetName string
-}
-
-// NewRedisUniqueByteFIFO creates a UniqueByteFIFO formed from a redisClient
-func NewRedisUniqueByteFIFO(config RedisUniqueByteFIFOConfiguration) (*RedisUniqueByteFIFO, error) {
- internal, err := NewRedisByteFIFO(config.RedisByteFIFOConfiguration)
- if err != nil {
- return nil, err
- }
-
- fifo := &RedisUniqueByteFIFO{
- RedisByteFIFO: *internal,
- setName: config.SetName,
- }
-
- return fifo, nil
-}
-
-// PushFunc pushes data to the end of the fifo and calls the callback if it is added
-func (fifo *RedisUniqueByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error {
- added, err := fifo.client.SAdd(ctx, fifo.setName, data).Result()
- if err != nil {
- return err
- }
- if added == 0 {
- return ErrAlreadyInQueue
- }
- if fn != nil {
- if err := fn(); err != nil {
- return err
- }
- }
- return fifo.client.RPush(ctx, fifo.queueName, data).Err()
-}
-
-// PushBack pushes data to the top of the fifo
-func (fifo *RedisUniqueByteFIFO) PushBack(ctx context.Context, data []byte) error {
- added, err := fifo.client.SAdd(ctx, fifo.setName, data).Result()
- if err != nil {
- return err
- }
- if added == 0 {
- return ErrAlreadyInQueue
- }
- return fifo.client.LPush(ctx, fifo.queueName, data).Err()
-}
-
-// Pop pops data from the start of the fifo
-func (fifo *RedisUniqueByteFIFO) Pop(ctx context.Context) ([]byte, error) {
- data, err := fifo.client.LPop(ctx, fifo.queueName).Bytes()
- if err != nil && err != redis.Nil {
- return data, err
- }
-
- if len(data) == 0 {
- return data, nil
- }
-
- err = fifo.client.SRem(ctx, fifo.setName, data).Err()
- return data, err
-}
-
-// Has returns whether the fifo contains this data
-func (fifo *RedisUniqueByteFIFO) Has(ctx context.Context, data []byte) (bool, error) {
- return fifo.client.SIsMember(ctx, fifo.setName, data).Result()
-}
-
-func init() {
- queuesMap[RedisUniqueQueueType] = NewRedisUniqueQueue
-}
diff --git a/modules/queue/unique_queue_wrapped.go b/modules/queue/unique_queue_wrapped.go
deleted file mode 100644
index 22eeb75c40..0000000000
--- a/modules/queue/unique_queue_wrapped.go
+++ /dev/null
@@ -1,174 +0,0 @@
-// Copyright 2020 The Gitea Authors. All rights reserved.
-// SPDX-License-Identifier: MIT
-
-package queue
-
-import (
- "fmt"
- "sync"
- "time"
-)
-
-// WrappedUniqueQueueType is the type for a wrapped delayed starting queue
-const WrappedUniqueQueueType Type = "unique-wrapped"
-
-// WrappedUniqueQueueConfiguration is the configuration for a WrappedUniqueQueue
-type WrappedUniqueQueueConfiguration struct {
- Underlying Type
- Timeout time.Duration
- MaxAttempts int
- Config interface{}
- QueueLength int
- Name string
-}
-
-// WrappedUniqueQueue wraps a delayed starting unique queue
-type WrappedUniqueQueue struct {
- *WrappedQueue
- table map[Data]bool
- tlock sync.Mutex
- ready bool
-}
-
-// NewWrappedUniqueQueue will attempt to create a unique queue of the provided type,
-// but if there is a problem creating this queue it will instead create
-// a WrappedUniqueQueue with delayed startup of the queue instead and a
-// channel which will be redirected to the queue
-//
-// Please note that this Queue does not guarantee that a particular
-// task cannot be processed twice or more at the same time. Uniqueness is
-// only guaranteed whilst the task is waiting in the queue.
-func NewWrappedUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
- configInterface, err := toConfig(WrappedUniqueQueueConfiguration{}, cfg)
- if err != nil {
- return nil, err
- }
- config := configInterface.(WrappedUniqueQueueConfiguration)
-
- queue, err := NewQueue(config.Underlying, handle, config.Config, exemplar)
- if err == nil {
- // Just return the queue there is no need to wrap
- return queue, nil
- }
- if IsErrInvalidConfiguration(err) {
- // Retrying ain't gonna make this any better...
- return nil, ErrInvalidConfiguration{cfg: cfg}
- }
-
- wrapped := &WrappedUniqueQueue{
- WrappedQueue: &WrappedQueue{
- channel: make(chan Data, config.QueueLength),
- exemplar: exemplar,
- delayedStarter: delayedStarter{
- cfg: config.Config,
- underlying: config.Underlying,
- timeout: config.Timeout,
- maxAttempts: config.MaxAttempts,
- name: config.Name,
- },
- },
- table: map[Data]bool{},
- }
-
- // wrapped.handle is passed to the delayedStarting internal queue and is run to handle
- // data passed to
- wrapped.handle = func(data ...Data) (unhandled []Data) {
- for _, datum := range data {
- wrapped.tlock.Lock()
- if !wrapped.ready {
- delete(wrapped.table, data)
- // If our table is empty all of the requests we have buffered between the
- // wrapper queue starting and the internal queue starting have been handled.
- // We can stop buffering requests in our local table and just pass Push
- // direct to the internal queue
- if len(wrapped.table) == 0 {
- wrapped.ready = true
- }
- }
- wrapped.tlock.Unlock()
- if u := handle(datum); u != nil {
- unhandled = append(unhandled, u...)
- }
- }
- return unhandled
- }
- _ = GetManager().Add(queue, WrappedUniqueQueueType, config, exemplar)
- return wrapped, nil
-}
-
-// Push will push the data to the internal channel checking it against the exemplar
-func (q *WrappedUniqueQueue) Push(data Data) error {
- return q.PushFunc(data, nil)
-}
-
-// PushFunc will push the data to the internal channel checking it against the exemplar
-func (q *WrappedUniqueQueue) PushFunc(data Data, fn func() error) error {
- if !assignableTo(data, q.exemplar) {
- return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
- }
-
- q.tlock.Lock()
- if q.ready {
- // ready means our table is empty and all of the requests we have buffered between the
- // wrapper queue starting and the internal queue starting have been handled.
- // We can stop buffering requests in our local table and just pass Push
- // direct to the internal queue
- q.tlock.Unlock()
- return q.internal.(UniqueQueue).PushFunc(data, fn)
- }
-
- locked := true
- defer func() {
- if locked {
- q.tlock.Unlock()
- }
- }()
- if _, ok := q.table[data]; ok {
- return ErrAlreadyInQueue
- }
- // FIXME: We probably need to implement some sort of limit here
- // If the downstream queue blocks this table will grow without limit
- q.table[data] = true
- if fn != nil {
- err := fn()
- if err != nil {
- delete(q.table, data)
- return err
- }
- }
- locked = false
- q.tlock.Unlock()
-
- q.channel <- data
- return nil
-}
-
-// Has checks if the data is in the queue
-func (q *WrappedUniqueQueue) Has(data Data) (bool, error) {
- q.tlock.Lock()
- defer q.tlock.Unlock()
- if q.ready {
- return q.internal.(UniqueQueue).Has(data)
- }
- _, has := q.table[data]
- return has, nil
-}
-
-// IsEmpty checks whether the queue is empty
-func (q *WrappedUniqueQueue) IsEmpty() bool {
- q.tlock.Lock()
- if len(q.table) > 0 {
- q.tlock.Unlock()
- return false
- }
- if q.ready {
- q.tlock.Unlock()
- return q.internal.IsEmpty()
- }
- q.tlock.Unlock()
- return false
-}
-
-func init() {
- queuesMap[WrappedUniqueQueueType] = NewWrappedUniqueQueue
-}
diff --git a/modules/queue/workergroup.go b/modules/queue/workergroup.go
new file mode 100644
index 0000000000..7127ea1117
--- /dev/null
+++ b/modules/queue/workergroup.go
@@ -0,0 +1,331 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package queue
+
+import (
+ "context"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "code.gitea.io/gitea/modules/log"
+)
+
+var (
+ infiniteTimerC = make(chan time.Time)
+ batchDebounceDuration = 100 * time.Millisecond
+ workerIdleDuration = 1 * time.Second
+
+ unhandledItemRequeueDuration atomic.Int64 // to avoid data race during test
+)
+
+func init() {
+ unhandledItemRequeueDuration.Store(int64(5 * time.Second))
+}
+
+// workerGroup is a group of workers to work with a WorkerPoolQueue
+type workerGroup[T any] struct {
+ q *WorkerPoolQueue[T]
+ wg sync.WaitGroup
+
+ ctxWorker context.Context
+ ctxWorkerCancel context.CancelFunc
+
+ batchBuffer []T
+ popItemChan chan []byte
+ popItemErr chan error
+}
+
+func (wg *workerGroup[T]) doPrepareWorkerContext() {
+ wg.ctxWorker, wg.ctxWorkerCancel = context.WithCancel(wg.q.ctxRun)
+}
+
+// doDispatchBatchToWorker dispatches a batch of items to worker's channel.
+// If the channel is full, it tries to start a new worker if possible.
+func (q *WorkerPoolQueue[T]) doDispatchBatchToWorker(wg *workerGroup[T], flushChan chan flushType) {
+ batch := wg.batchBuffer
+ wg.batchBuffer = nil
+
+ if len(batch) == 0 {
+ return
+ }
+
+ full := false
+ select {
+ case q.batchChan <- batch:
+ default:
+ full = true
+ }
+
+ q.workerNumMu.Lock()
+ noWorker := q.workerNum == 0
+ if full || noWorker {
+ if q.workerNum < q.workerMaxNum || noWorker && q.workerMaxNum <= 0 {
+ q.workerNum++
+ q.doStartNewWorker(wg)
+ }
+ }
+ q.workerNumMu.Unlock()
+
+ if full {
+ select {
+ case q.batchChan <- batch:
+ case flush := <-flushChan:
+ q.doWorkerHandle(batch)
+ q.doFlush(wg, flush)
+ case <-q.ctxRun.Done():
+ wg.batchBuffer = batch // return the batch to buffer, the "doRun" function will handle it
+ }
+ }
+}
+
+// doWorkerHandle calls the safeHandler to handle a batch of items, and it increases/decreases the active worker number.
+// If the context has been canceled, it should not be caller because the "Push" still needs the context, in such case, call q.safeHandler directly
+func (q *WorkerPoolQueue[T]) doWorkerHandle(batch []T) {
+ q.workerNumMu.Lock()
+ q.workerActiveNum++
+ q.workerNumMu.Unlock()
+
+ defer func() {
+ q.workerNumMu.Lock()
+ q.workerActiveNum--
+ q.workerNumMu.Unlock()
+ }()
+
+ unhandled := q.safeHandler(batch...)
+ // if none of the items were handled, it should back-off for a few seconds
+ // in this case the handler (eg: document indexer) may have encountered some errors/failures
+ if len(unhandled) == len(batch) && unhandledItemRequeueDuration.Load() != 0 {
+ log.Error("Queue %q failed to handle batch of %d items, backoff for a few seconds", q.GetName(), len(batch))
+ select {
+ case <-q.ctxRun.Done():
+ case <-time.After(time.Duration(unhandledItemRequeueDuration.Load())):
+ }
+ }
+ for _, item := range unhandled {
+ if err := q.Push(item); err != nil {
+ if !q.basePushForShutdown(item) {
+ log.Error("Failed to requeue item for queue %q when calling handler: %v", q.GetName(), err)
+ }
+ }
+ }
+}
+
+// basePushForShutdown tries to requeue items into the base queue when the WorkerPoolQueue is shutting down.
+// If the queue is shutting down, it returns true and try to push the items
+// Otherwise it does nothing and returns false
+func (q *WorkerPoolQueue[T]) basePushForShutdown(items ...T) bool {
+ ctxShutdown := q.ctxShutdown.Load()
+ if ctxShutdown == nil {
+ return false
+ }
+ for _, item := range items {
+ // if there is still any error, the queue can do nothing instead of losing the items
+ if err := q.baseQueue.PushItem(*ctxShutdown, q.marshal(item)); err != nil {
+ log.Error("Failed to requeue item for queue %q when shutting down: %v", q.GetName(), err)
+ }
+ }
+ return true
+}
+
+// doStartNewWorker starts a new worker for the queue, the worker reads from worker's channel and handles the items.
+func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) {
+ wp.wg.Add(1)
+
+ go func() {
+ defer wp.wg.Done()
+
+ log.Debug("Queue %q starts new worker", q.GetName())
+ defer log.Debug("Queue %q stops idle worker", q.GetName())
+
+ t := time.NewTicker(workerIdleDuration)
+ keepWorking := true
+ stopWorking := func() {
+ q.workerNumMu.Lock()
+ keepWorking = false
+ q.workerNum--
+ q.workerNumMu.Unlock()
+ }
+ for keepWorking {
+ select {
+ case <-wp.ctxWorker.Done():
+ stopWorking()
+ case batch, ok := <-q.batchChan:
+ if !ok {
+ stopWorking()
+ } else {
+ q.doWorkerHandle(batch)
+ t.Reset(workerIdleDuration)
+ }
+ case <-t.C:
+ q.workerNumMu.Lock()
+ keepWorking = q.workerNum <= 1
+ if !keepWorking {
+ q.workerNum--
+ }
+ q.workerNumMu.Unlock()
+ }
+ }
+ }()
+}
+
+// doFlush flushes the queue: it tries to read all items from the queue and handles them.
+// It is for testing purpose only. It's not designed to work for a cluster.
+func (q *WorkerPoolQueue[T]) doFlush(wg *workerGroup[T], flush flushType) {
+ log.Debug("Queue %q starts flushing", q.GetName())
+ defer log.Debug("Queue %q finishes flushing", q.GetName())
+
+ // stop all workers, and prepare a new worker context to start new workers
+
+ wg.ctxWorkerCancel()
+ wg.wg.Wait()
+
+ defer func() {
+ close(flush)
+ wg.doPrepareWorkerContext()
+ }()
+
+ // drain the batch channel first
+loop:
+ for {
+ select {
+ case batch := <-q.batchChan:
+ q.doWorkerHandle(batch)
+ default:
+ break loop
+ }
+ }
+
+ // drain the popItem channel
+ emptyCounter := 0
+ for {
+ select {
+ case data, dataOk := <-wg.popItemChan:
+ if !dataOk {
+ return
+ }
+ emptyCounter = 0
+ if v, jsonOk := q.unmarshal(data); !jsonOk {
+ continue
+ } else {
+ q.doWorkerHandle([]T{v})
+ }
+ case err := <-wg.popItemErr:
+ if !q.isCtxRunCanceled() {
+ log.Error("Failed to pop item from queue %q (doFlush): %v", q.GetName(), err)
+ }
+ return
+ case <-q.ctxRun.Done():
+ log.Debug("Queue %q is shutting down", q.GetName())
+ return
+ case <-time.After(20 * time.Millisecond):
+ // There is no reliable way to make sure all queue items are consumed by the Flush, there always might be some items stored in some buffers/temp variables.
+ // If we run Gitea in a cluster, we can even not guarantee all items are consumed in a deterministic instance.
+ // Luckily, the "Flush" trick is only used in tests, so far so good.
+ if cnt, _ := q.baseQueue.Len(q.ctxRun); cnt == 0 && len(wg.popItemChan) == 0 {
+ emptyCounter++
+ }
+ if emptyCounter >= 2 {
+ return
+ }
+ }
+ }
+}
+
+func (q *WorkerPoolQueue[T]) isCtxRunCanceled() bool {
+ select {
+ case <-q.ctxRun.Done():
+ return true
+ default:
+ return false
+ }
+}
+
+var skipFlushChan = make(chan flushType) // an empty flush chan, used to skip reading other flush requests
+
+// doRun is the main loop of the queue. All related "doXxx" functions are executed in its context.
+func (q *WorkerPoolQueue[T]) doRun() {
+ log.Debug("Queue %q starts running", q.GetName())
+ defer log.Debug("Queue %q stops running", q.GetName())
+
+ wg := &workerGroup[T]{q: q}
+ wg.doPrepareWorkerContext()
+ wg.popItemChan, wg.popItemErr = popItemByChan(q.ctxRun, q.baseQueue.PopItem)
+
+ defer func() {
+ q.ctxRunCancel()
+
+ // drain all data on the fly
+ // since the queue is shutting down, the items can't be dispatched to workers because the context is canceled
+ // it can't call doWorkerHandle either, because there is no chance to push unhandled items back to the queue
+ var unhandled []T
+ close(q.batchChan)
+ for batch := range q.batchChan {
+ unhandled = append(unhandled, batch...)
+ }
+ unhandled = append(unhandled, wg.batchBuffer...)
+ for data := range wg.popItemChan {
+ if v, ok := q.unmarshal(data); ok {
+ unhandled = append(unhandled, v)
+ }
+ }
+
+ ctxShutdownPtr := q.ctxShutdown.Load()
+ if ctxShutdownPtr != nil {
+ // if there is a shutdown context, try to push the items back to the base queue
+ q.basePushForShutdown(unhandled...)
+ workerDone := make(chan struct{})
+ // the only way to wait for the workers, because the handlers do not have context to wait for
+ go func() { wg.wg.Wait(); close(workerDone) }()
+ select {
+ case <-workerDone:
+ case <-(*ctxShutdownPtr).Done():
+ log.Error("Queue %q is shutting down, but workers are still running after timeout", q.GetName())
+ }
+ } else {
+ // if there is no shutdown context, just call the handler to try to handle the items. if the handler fails again, the items are lost
+ q.safeHandler(unhandled...)
+ }
+
+ close(q.shutdownDone)
+ }()
+
+ var batchDispatchC <-chan time.Time = infiniteTimerC
+ for {
+ select {
+ case data, dataOk := <-wg.popItemChan:
+ if !dataOk {
+ return
+ }
+ if v, jsonOk := q.unmarshal(data); !jsonOk {
+ testRecorder.Record("pop:corrupted:%s", data) // in rare cases the levelqueue(leveldb) might be corrupted
+ continue
+ } else {
+ wg.batchBuffer = append(wg.batchBuffer, v)
+ }
+ if len(wg.batchBuffer) >= q.batchLength {
+ q.doDispatchBatchToWorker(wg, q.flushChan)
+ } else if batchDispatchC == infiniteTimerC {
+ batchDispatchC = time.After(batchDebounceDuration)
+ } // else: batchDispatchC is already a debounce timer, it will be triggered soon
+ case <-batchDispatchC:
+ batchDispatchC = infiniteTimerC
+ q.doDispatchBatchToWorker(wg, q.flushChan)
+ case flush := <-q.flushChan:
+ // before flushing, it needs to try to dispatch the batch to worker first, in case there is no worker running
+ // after the flushing, there is at least one worker running, so "doFlush" could wait for workers to finish
+ // since we are already in a "flush" operation, so the dispatching function shouldn't read the flush chan.
+ q.doDispatchBatchToWorker(wg, skipFlushChan)
+ q.doFlush(wg, flush)
+ case err := <-wg.popItemErr:
+ if !q.isCtxRunCanceled() {
+ log.Error("Failed to pop item from queue %q (doRun): %v", q.GetName(), err)
+ }
+ return
+ case <-q.ctxRun.Done():
+ log.Debug("Queue %q is shutting down", q.GetName())
+ return
+ }
+ }
+}
diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go
deleted file mode 100644
index b32128cb82..0000000000
--- a/modules/queue/workerpool.go
+++ /dev/null
@@ -1,613 +0,0 @@
-// Copyright 2019 The Gitea Authors. All rights reserved.
-// SPDX-License-Identifier: MIT
-
-package queue
-
-import (
- "context"
- "fmt"
- "runtime/pprof"
- "sync"
- "sync/atomic"
- "time"
-
- "code.gitea.io/gitea/modules/log"
- "code.gitea.io/gitea/modules/process"
- "code.gitea.io/gitea/modules/util"
-)
-
-// WorkerPool represent a dynamically growable worker pool for a
-// provided handler function. They have an internal channel which
-// they use to detect if there is a block and will grow and shrink in
-// response to demand as per configuration.
-type WorkerPool struct {
- // This field requires to be the first one in the struct.
- // This is to allow 64 bit atomic operations on 32-bit machines.
- // See: https://pkg.go.dev/sync/atomic#pkg-note-BUG & Gitea issue 19518
- numInQueue int64
- lock sync.Mutex
- baseCtx context.Context
- baseCtxCancel context.CancelFunc
- baseCtxFinished process.FinishedFunc
- paused chan struct{}
- resumed chan struct{}
- cond *sync.Cond
- qid int64
- maxNumberOfWorkers int
- numberOfWorkers int
- batchLength int
- handle HandlerFunc
- dataChan chan Data
- blockTimeout time.Duration
- boostTimeout time.Duration
- boostWorkers int
-}
-
-var (
- _ Flushable = &WorkerPool{}
- _ ManagedPool = &WorkerPool{}
-)
-
-// WorkerPoolConfiguration is the basic configuration for a WorkerPool
-type WorkerPoolConfiguration struct {
- Name string
- QueueLength int
- BatchLength int
- BlockTimeout time.Duration
- BoostTimeout time.Duration
- BoostWorkers int
- MaxWorkers int
-}
-
-// NewWorkerPool creates a new worker pool
-func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPool {
- ctx, cancel, finished := process.GetManager().AddTypedContext(context.Background(), fmt.Sprintf("Queue: %s", config.Name), process.SystemProcessType, false)
-
- dataChan := make(chan Data, config.QueueLength)
- pool := &WorkerPool{
- baseCtx: ctx,
- baseCtxCancel: cancel,
- baseCtxFinished: finished,
- batchLength: config.BatchLength,
- dataChan: dataChan,
- resumed: closedChan,
- paused: make(chan struct{}),
- handle: handle,
- blockTimeout: config.BlockTimeout,
- boostTimeout: config.BoostTimeout,
- boostWorkers: config.BoostWorkers,
- maxNumberOfWorkers: config.MaxWorkers,
- }
-
- return pool
-}
-
-// Done returns when this worker pool's base context has been cancelled
-func (p *WorkerPool) Done() <-chan struct{} {
- return p.baseCtx.Done()
-}
-
-// Push pushes the data to the internal channel
-func (p *WorkerPool) Push(data Data) {
- atomic.AddInt64(&p.numInQueue, 1)
- p.lock.Lock()
- select {
- case <-p.paused:
- p.lock.Unlock()
- p.dataChan <- data
- return
- default:
- }
-
- if p.blockTimeout > 0 && p.boostTimeout > 0 && (p.numberOfWorkers <= p.maxNumberOfWorkers || p.maxNumberOfWorkers < 0) {
- if p.numberOfWorkers == 0 {
- p.zeroBoost()
- } else {
- p.lock.Unlock()
- }
- p.pushBoost(data)
- } else {
- p.lock.Unlock()
- p.dataChan <- data
- }
-}
-
-// HasNoWorkerScaling will return true if the queue has no workers, and has no worker boosting
-func (p *WorkerPool) HasNoWorkerScaling() bool {
- p.lock.Lock()
- defer p.lock.Unlock()
- return p.hasNoWorkerScaling()
-}
-
-func (p *WorkerPool) hasNoWorkerScaling() bool {
- return p.numberOfWorkers == 0 && (p.boostTimeout == 0 || p.boostWorkers == 0 || p.maxNumberOfWorkers == 0)
-}
-
-// zeroBoost will add a temporary boost worker for a no worker queue
-// p.lock must be locked at the start of this function BUT it will be unlocked by the end of this function
-// (This is because addWorkers has to be called whilst unlocked)
-func (p *WorkerPool) zeroBoost() {
- ctx, cancel := context.WithTimeout(p.baseCtx, p.boostTimeout)
- mq := GetManager().GetManagedQueue(p.qid)
- boost := p.boostWorkers
- if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 {
- boost = p.maxNumberOfWorkers - p.numberOfWorkers
- }
- if mq != nil {
- log.Debug("WorkerPool: %d (for %s) has zero workers - adding %d temporary workers for %s", p.qid, mq.Name, boost, p.boostTimeout)
-
- start := time.Now()
- pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), cancel, false)
- cancel = func() {
- mq.RemoveWorkers(pid)
- }
- } else {
- log.Debug("WorkerPool: %d has zero workers - adding %d temporary workers for %s", p.qid, p.boostWorkers, p.boostTimeout)
- }
- p.lock.Unlock()
- p.addWorkers(ctx, cancel, boost)
-}
-
-func (p *WorkerPool) pushBoost(data Data) {
- select {
- case p.dataChan <- data:
- default:
- p.lock.Lock()
- if p.blockTimeout <= 0 {
- p.lock.Unlock()
- p.dataChan <- data
- return
- }
- ourTimeout := p.blockTimeout
- timer := time.NewTimer(p.blockTimeout)
- p.lock.Unlock()
- select {
- case p.dataChan <- data:
- util.StopTimer(timer)
- case <-timer.C:
- p.lock.Lock()
- if p.blockTimeout > ourTimeout || (p.numberOfWorkers > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0) {
- p.lock.Unlock()
- p.dataChan <- data
- return
- }
- p.blockTimeout *= 2
- boostCtx, boostCtxCancel := context.WithCancel(p.baseCtx)
- mq := GetManager().GetManagedQueue(p.qid)
- boost := p.boostWorkers
- if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 {
- boost = p.maxNumberOfWorkers - p.numberOfWorkers
- }
- if mq != nil {
- log.Debug("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, mq.Name, ourTimeout, boost, p.boostTimeout, p.blockTimeout)
-
- start := time.Now()
- pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), boostCtxCancel, false)
- go func() {
- <-boostCtx.Done()
- mq.RemoveWorkers(pid)
- boostCtxCancel()
- }()
- } else {
- log.Debug("WorkerPool: %d Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, ourTimeout, p.boostWorkers, p.boostTimeout, p.blockTimeout)
- }
- go func() {
- <-time.After(p.boostTimeout)
- boostCtxCancel()
- p.lock.Lock()
- p.blockTimeout /= 2
- p.lock.Unlock()
- }()
- p.lock.Unlock()
- p.addWorkers(boostCtx, boostCtxCancel, boost)
- p.dataChan <- data
- }
- }
-}
-
-// NumberOfWorkers returns the number of current workers in the pool
-func (p *WorkerPool) NumberOfWorkers() int {
- p.lock.Lock()
- defer p.lock.Unlock()
- return p.numberOfWorkers
-}
-
-// NumberInQueue returns the number of items in the queue
-func (p *WorkerPool) NumberInQueue() int64 {
- return atomic.LoadInt64(&p.numInQueue)
-}
-
-// MaxNumberOfWorkers returns the maximum number of workers automatically added to the pool
-func (p *WorkerPool) MaxNumberOfWorkers() int {
- p.lock.Lock()
- defer p.lock.Unlock()
- return p.maxNumberOfWorkers
-}
-
-// BoostWorkers returns the number of workers for a boost
-func (p *WorkerPool) BoostWorkers() int {
- p.lock.Lock()
- defer p.lock.Unlock()
- return p.boostWorkers
-}
-
-// BoostTimeout returns the timeout of the next boost
-func (p *WorkerPool) BoostTimeout() time.Duration {
- p.lock.Lock()
- defer p.lock.Unlock()
- return p.boostTimeout
-}
-
-// BlockTimeout returns the timeout til the next boost
-func (p *WorkerPool) BlockTimeout() time.Duration {
- p.lock.Lock()
- defer p.lock.Unlock()
- return p.blockTimeout
-}
-
-// SetPoolSettings sets the setable boost values
-func (p *WorkerPool) SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) {
- p.lock.Lock()
- defer p.lock.Unlock()
- p.maxNumberOfWorkers = maxNumberOfWorkers
- p.boostWorkers = boostWorkers
- p.boostTimeout = timeout
-}
-
-// SetMaxNumberOfWorkers sets the maximum number of workers automatically added to the pool
-// Changing this number will not change the number of current workers but will change the limit
-// for future additions
-func (p *WorkerPool) SetMaxNumberOfWorkers(newMax int) {
- p.lock.Lock()
- defer p.lock.Unlock()
- p.maxNumberOfWorkers = newMax
-}
-
-func (p *WorkerPool) commonRegisterWorkers(number int, timeout time.Duration, isFlusher bool) (context.Context, context.CancelFunc) {
- var ctx context.Context
- var cancel context.CancelFunc
- start := time.Now()
- end := start
- hasTimeout := false
- if timeout > 0 {
- ctx, cancel = context.WithTimeout(p.baseCtx, timeout)
- end = start.Add(timeout)
- hasTimeout = true
- } else {
- ctx, cancel = context.WithCancel(p.baseCtx)
- }
-
- mq := GetManager().GetManagedQueue(p.qid)
- if mq != nil {
- pid := mq.RegisterWorkers(number, start, hasTimeout, end, cancel, isFlusher)
- log.Trace("WorkerPool: %d (for %s) adding %d workers with group id: %d", p.qid, mq.Name, number, pid)
- return ctx, func() {
- mq.RemoveWorkers(pid)
- }
- }
- log.Trace("WorkerPool: %d adding %d workers (no group id)", p.qid, number)
-
- return ctx, cancel
-}
-
-// AddWorkers adds workers to the pool - this allows the number of workers to go above the limit
-func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc {
- ctx, cancel := p.commonRegisterWorkers(number, timeout, false)
- p.addWorkers(ctx, cancel, number)
- return cancel
-}
-
-// addWorkers adds workers to the pool
-func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc, number int) {
- for i := 0; i < number; i++ {
- p.lock.Lock()
- if p.cond == nil {
- p.cond = sync.NewCond(&p.lock)
- }
- p.numberOfWorkers++
- p.lock.Unlock()
- go func() {
- pprof.SetGoroutineLabels(ctx)
- p.doWork(ctx)
-
- p.lock.Lock()
- p.numberOfWorkers--
- if p.numberOfWorkers == 0 {
- p.cond.Broadcast()
- cancel()
- } else if p.numberOfWorkers < 0 {
- // numberOfWorkers can't go negative but...
- log.Warn("Number of Workers < 0 for QID %d - this shouldn't happen", p.qid)
- p.numberOfWorkers = 0
- p.cond.Broadcast()
- cancel()
- }
- select {
- case <-p.baseCtx.Done():
- // Don't warn or check for ongoing work if the baseCtx is shutdown
- case <-p.paused:
- // Don't warn or check for ongoing work if the pool is paused
- default:
- if p.hasNoWorkerScaling() {
- log.Warn(
- "Queue: %d is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n"+
- "The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required.", p.qid)
- p.pause()
- } else if p.numberOfWorkers == 0 && atomic.LoadInt64(&p.numInQueue) > 0 {
- // OK there are no workers but... there's still work to be done -> Reboost
- p.zeroBoost()
- // p.lock will be unlocked by zeroBoost
- return
- }
- }
- p.lock.Unlock()
- }()
- }
-}
-
-// Wait for WorkerPool to finish
-func (p *WorkerPool) Wait() {
- p.lock.Lock()
- defer p.lock.Unlock()
- if p.cond == nil {
- p.cond = sync.NewCond(&p.lock)
- }
- if p.numberOfWorkers <= 0 {
- return
- }
- p.cond.Wait()
-}
-
-// IsPaused returns if the pool is paused
-func (p *WorkerPool) IsPaused() bool {
- p.lock.Lock()
- defer p.lock.Unlock()
- select {
- case <-p.paused:
- return true
- default:
- return false
- }
-}
-
-// IsPausedIsResumed returns if the pool is paused and a channel that is closed when it is resumed
-func (p *WorkerPool) IsPausedIsResumed() (<-chan struct{}, <-chan struct{}) {
- p.lock.Lock()
- defer p.lock.Unlock()
- return p.paused, p.resumed
-}
-
-// Pause pauses the WorkerPool
-func (p *WorkerPool) Pause() {
- p.lock.Lock()
- defer p.lock.Unlock()
- p.pause()
-}
-
-func (p *WorkerPool) pause() {
- select {
- case <-p.paused:
- default:
- p.resumed = make(chan struct{})
- close(p.paused)
- }
-}
-
-// Resume resumes the WorkerPool
-func (p *WorkerPool) Resume() {
- p.lock.Lock() // can't defer unlock because of the zeroBoost at the end
- select {
- case <-p.resumed:
- // already resumed - there's nothing to do
- p.lock.Unlock()
- return
- default:
- }
-
- p.paused = make(chan struct{})
- close(p.resumed)
-
- // OK now we need to check if we need to add some workers...
- if p.numberOfWorkers > 0 || p.hasNoWorkerScaling() || atomic.LoadInt64(&p.numInQueue) == 0 {
- // We either have workers, can't scale or there's no work to be done -> so just resume
- p.lock.Unlock()
- return
- }
-
- // OK we got some work but no workers we need to think about boosting
- select {
- case <-p.baseCtx.Done():
- // don't bother boosting if the baseCtx is done
- p.lock.Unlock()
- return
- default:
- }
-
- // OK we'd better add some boost workers!
- p.zeroBoost()
- // p.zeroBoost will unlock the lock
-}
-
-// CleanUp will drain the remaining contents of the channel
-// This should be called after AddWorkers context is closed
-func (p *WorkerPool) CleanUp(ctx context.Context) {
- log.Trace("WorkerPool: %d CleanUp", p.qid)
- close(p.dataChan)
- for data := range p.dataChan {
- if unhandled := p.handle(data); unhandled != nil {
- if unhandled != nil {
- log.Error("Unhandled Data in clean-up of queue %d", p.qid)
- }
- }
-
- atomic.AddInt64(&p.numInQueue, -1)
- select {
- case <-ctx.Done():
- log.Warn("WorkerPool: %d Cleanup context closed before finishing clean-up", p.qid)
- return
- default:
- }
- }
- log.Trace("WorkerPool: %d CleanUp Done", p.qid)
-}
-
-// Flush flushes the channel with a timeout - the Flush worker will be registered as a flush worker with the manager
-func (p *WorkerPool) Flush(timeout time.Duration) error {
- ctx, cancel := p.commonRegisterWorkers(1, timeout, true)
- defer cancel()
- return p.FlushWithContext(ctx)
-}
-
-// IsEmpty returns if true if the worker queue is empty
-func (p *WorkerPool) IsEmpty() bool {
- return atomic.LoadInt64(&p.numInQueue) == 0
-}
-
-// contextError returns either ctx.Done(), the base context's error or nil
-func (p *WorkerPool) contextError(ctx context.Context) error {
- select {
- case <-p.baseCtx.Done():
- return p.baseCtx.Err()
- case <-ctx.Done():
- return ctx.Err()
- default:
- return nil
- }
-}
-
-// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
-// NB: The worker will not be registered with the manager.
-func (p *WorkerPool) FlushWithContext(ctx context.Context) error {
- log.Trace("WorkerPool: %d Flush", p.qid)
- paused, _ := p.IsPausedIsResumed()
- for {
- // Because select will return any case that is satisified at random we precheck here before looking at dataChan.
- select {
- case <-paused:
- // Ensure that even if paused that the cancelled error is still sent
- return p.contextError(ctx)
- case <-p.baseCtx.Done():
- return p.baseCtx.Err()
- case <-ctx.Done():
- return ctx.Err()
- default:
- }
-
- select {
- case <-paused:
- return p.contextError(ctx)
- case data, ok := <-p.dataChan:
- if !ok {
- return nil
- }
- if unhandled := p.handle(data); unhandled != nil {
- log.Error("Unhandled Data whilst flushing queue %d", p.qid)
- }
- atomic.AddInt64(&p.numInQueue, -1)
- case <-p.baseCtx.Done():
- return p.baseCtx.Err()
- case <-ctx.Done():
- return ctx.Err()
- default:
- return nil
- }
- }
-}
-
-func (p *WorkerPool) doWork(ctx context.Context) {
- pprof.SetGoroutineLabels(ctx)
- delay := time.Millisecond * 300
-
- // Create a common timer - we will use this elsewhere
- timer := time.NewTimer(0)
- util.StopTimer(timer)
-
- paused, _ := p.IsPausedIsResumed()
- data := make([]Data, 0, p.batchLength)
- for {
- // Because select will return any case that is satisified at random we precheck here before looking at dataChan.
- select {
- case <-paused:
- log.Trace("Worker for Queue %d Pausing", p.qid)
- if len(data) > 0 {
- log.Trace("Handling: %d data, %v", len(data), data)
- if unhandled := p.handle(data...); unhandled != nil {
- log.Error("Unhandled Data in queue %d", p.qid)
- }
- atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
- }
- _, resumed := p.IsPausedIsResumed()
- select {
- case <-resumed:
- paused, _ = p.IsPausedIsResumed()
- log.Trace("Worker for Queue %d Resuming", p.qid)
- util.StopTimer(timer)
- case <-ctx.Done():
- log.Trace("Worker shutting down")
- return
- }
- case <-ctx.Done():
- if len(data) > 0 {
- log.Trace("Handling: %d data, %v", len(data), data)
- if unhandled := p.handle(data...); unhandled != nil {
- log.Error("Unhandled Data in queue %d", p.qid)
- }
- atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
- }
- log.Trace("Worker shutting down")
- return
- default:
- }
-
- select {
- case <-paused:
- // go back around
- case <-ctx.Done():
- if len(data) > 0 {
- log.Trace("Handling: %d data, %v", len(data), data)
- if unhandled := p.handle(data...); unhandled != nil {
- log.Error("Unhandled Data in queue %d", p.qid)
- }
- atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
- }
- log.Trace("Worker shutting down")
- return
- case datum, ok := <-p.dataChan:
- if !ok {
- // the dataChan has been closed - we should finish up:
- if len(data) > 0 {
- log.Trace("Handling: %d data, %v", len(data), data)
- if unhandled := p.handle(data...); unhandled != nil {
- log.Error("Unhandled Data in queue %d", p.qid)
- }
- atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
- }
- log.Trace("Worker shutting down")
- return
- }
- data = append(data, datum)
- util.StopTimer(timer)
-
- if len(data) >= p.batchLength {
- log.Trace("Handling: %d data, %v", len(data), data)
- if unhandled := p.handle(data...); unhandled != nil {
- log.Error("Unhandled Data in queue %d", p.qid)
- }
- atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
- data = make([]Data, 0, p.batchLength)
- } else {
- timer.Reset(delay)
- }
- case <-timer.C:
- delay = time.Millisecond * 100
- if len(data) > 0 {
- log.Trace("Handling: %d data, %v", len(data), data)
- if unhandled := p.handle(data...); unhandled != nil {
- log.Error("Unhandled Data in queue %d", p.qid)
- }
- atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
- data = make([]Data, 0, p.batchLength)
- }
- }
- }
-}
diff --git a/modules/queue/workerqueue.go b/modules/queue/workerqueue.go
new file mode 100644
index 0000000000..493bea17aa
--- /dev/null
+++ b/modules/queue/workerqueue.go
@@ -0,0 +1,241 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package queue
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "code.gitea.io/gitea/modules/graceful"
+ "code.gitea.io/gitea/modules/json"
+ "code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/setting"
+)
+
+// WorkerPoolQueue is a queue that uses a pool of workers to process items
+// It can use different underlying (base) queue types
+type WorkerPoolQueue[T any] struct {
+ ctxRun context.Context
+ ctxRunCancel context.CancelFunc
+ ctxShutdown atomic.Pointer[context.Context]
+ shutdownDone chan struct{}
+
+ origHandler HandlerFuncT[T]
+ safeHandler HandlerFuncT[T]
+
+ baseQueueType string
+ baseConfig *BaseConfig
+ baseQueue baseQueue
+
+ batchChan chan []T
+ flushChan chan flushType
+
+ batchLength int
+ workerNum int
+ workerMaxNum int
+ workerActiveNum int
+ workerNumMu sync.Mutex
+}
+
+type flushType chan struct{}
+
+var _ ManagedWorkerPoolQueue = (*WorkerPoolQueue[any])(nil)
+
+func (q *WorkerPoolQueue[T]) GetName() string {
+ return q.baseConfig.ManagedName
+}
+
+func (q *WorkerPoolQueue[T]) GetType() string {
+ return q.baseQueueType
+}
+
+func (q *WorkerPoolQueue[T]) GetItemTypeName() string {
+ var t T
+ return fmt.Sprintf("%T", t)
+}
+
+func (q *WorkerPoolQueue[T]) GetWorkerNumber() int {
+ q.workerNumMu.Lock()
+ defer q.workerNumMu.Unlock()
+ return q.workerNum
+}
+
+func (q *WorkerPoolQueue[T]) GetWorkerActiveNumber() int {
+ q.workerNumMu.Lock()
+ defer q.workerNumMu.Unlock()
+ return q.workerActiveNum
+}
+
+func (q *WorkerPoolQueue[T]) GetWorkerMaxNumber() int {
+ q.workerNumMu.Lock()
+ defer q.workerNumMu.Unlock()
+ return q.workerMaxNum
+}
+
+func (q *WorkerPoolQueue[T]) SetWorkerMaxNumber(num int) {
+ q.workerNumMu.Lock()
+ defer q.workerNumMu.Unlock()
+ q.workerMaxNum = num
+}
+
+func (q *WorkerPoolQueue[T]) GetQueueItemNumber() int {
+ cnt, err := q.baseQueue.Len(q.ctxRun)
+ if err != nil {
+ log.Error("Failed to get number of items in queue %q: %v", q.GetName(), err)
+ }
+ return cnt
+}
+
+func (q *WorkerPoolQueue[T]) FlushWithContext(ctx context.Context, timeout time.Duration) (err error) {
+ if q.isBaseQueueDummy() {
+ return
+ }
+
+ log.Debug("Try to flush queue %q with timeout %v", q.GetName(), timeout)
+ defer log.Debug("Finish flushing queue %q, err: %v", q.GetName(), err)
+
+ var after <-chan time.Time
+ after = infiniteTimerC
+ if timeout > 0 {
+ after = time.After(timeout)
+ }
+ c := make(flushType)
+
+ // send flush request
+ // if it blocks, it means that there is a flush in progress or the queue hasn't been started yet
+ select {
+ case q.flushChan <- c:
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-q.ctxRun.Done():
+ return q.ctxRun.Err()
+ case <-after:
+ return context.DeadlineExceeded
+ }
+
+ // wait for flush to finish
+ select {
+ case <-c:
+ return nil
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-q.ctxRun.Done():
+ return q.ctxRun.Err()
+ case <-after:
+ return context.DeadlineExceeded
+ }
+}
+
+func (q *WorkerPoolQueue[T]) marshal(data T) []byte {
+ bs, err := json.Marshal(data)
+ if err != nil {
+ log.Error("Failed to marshal item for queue %q: %v", q.GetName(), err)
+ return nil
+ }
+ return bs
+}
+
+func (q *WorkerPoolQueue[T]) unmarshal(data []byte) (t T, ok bool) {
+ if err := json.Unmarshal(data, &t); err != nil {
+ log.Error("Failed to unmarshal item from queue %q: %v", q.GetName(), err)
+ return t, false
+ }
+ return t, true
+}
+
+func (q *WorkerPoolQueue[T]) isBaseQueueDummy() bool {
+ _, isDummy := q.baseQueue.(*baseDummy)
+ return isDummy
+}
+
+// Push adds an item to the queue, it may block for a while and then returns an error if the queue is full
+func (q *WorkerPoolQueue[T]) Push(data T) error {
+ if q.isBaseQueueDummy() && q.safeHandler != nil {
+ // FIXME: the "immediate" queue is only for testing, but it really causes problems because its behavior is different from a real queue.
+ // Even if tests pass, it doesn't mean that there is no bug in code.
+ if data, ok := q.unmarshal(q.marshal(data)); ok {
+ q.safeHandler(data)
+ }
+ }
+ return q.baseQueue.PushItem(q.ctxRun, q.marshal(data))
+}
+
+// Has only works for unique queues. Keep in mind that this check may not be reliable (due to lacking of proper transaction support)
+// There could be a small chance that duplicate items appear in the queue
+func (q *WorkerPoolQueue[T]) Has(data T) (bool, error) {
+ return q.baseQueue.HasItem(q.ctxRun, q.marshal(data))
+}
+
+func (q *WorkerPoolQueue[T]) Run(atShutdown, atTerminate func(func())) {
+ atShutdown(func() {
+ // in case some queue handlers are slow or have hanging bugs, at most wait for a short time
+ q.ShutdownWait(1 * time.Second)
+ })
+ q.doRun()
+}
+
+// ShutdownWait shuts down the queue, waits for all workers to finish their jobs, and pushes the unhandled items back to the base queue
+// It waits for all workers (handlers) to finish their jobs, in case some buggy handlers would hang forever, a reasonable timeout is needed
+func (q *WorkerPoolQueue[T]) ShutdownWait(timeout time.Duration) {
+ shutdownCtx, shutdownCtxCancel := context.WithTimeout(context.Background(), timeout)
+ defer shutdownCtxCancel()
+ if q.ctxShutdown.CompareAndSwap(nil, &shutdownCtx) {
+ q.ctxRunCancel()
+ }
+ <-q.shutdownDone
+}
+
+func getNewQueueFn(t string) (string, func(cfg *BaseConfig, unique bool) (baseQueue, error)) {
+ switch t {
+ case "dummy", "immediate":
+ return t, newBaseDummy
+ case "channel":
+ return t, newBaseChannelGeneric
+ case "redis":
+ return t, newBaseRedisGeneric
+ default: // level(leveldb,levelqueue,persistable-channel)
+ return "level", newBaseLevelQueueGeneric
+ }
+}
+
+func NewWorkerPoolQueueBySetting[T any](name string, queueSetting setting.QueueSettings, handler HandlerFuncT[T], unique bool) (*WorkerPoolQueue[T], error) {
+ if handler == nil {
+ log.Debug("Use dummy queue for %q because handler is nil and caller doesn't want to process the queue items", name)
+ queueSetting.Type = "dummy"
+ }
+
+ var w WorkerPoolQueue[T]
+ var err error
+ queueType, newQueueFn := getNewQueueFn(queueSetting.Type)
+ w.baseQueueType = queueType
+ w.baseConfig = toBaseConfig(name, queueSetting)
+ w.baseQueue, err = newQueueFn(w.baseConfig, unique)
+ if err != nil {
+ return nil, err
+ }
+ log.Trace("Created queue %q of type %q", name, queueType)
+
+ w.ctxRun, w.ctxRunCancel = context.WithCancel(graceful.GetManager().ShutdownContext())
+ w.batchChan = make(chan []T)
+ w.flushChan = make(chan flushType)
+ w.shutdownDone = make(chan struct{})
+ w.workerMaxNum = queueSetting.MaxWorkers
+ w.batchLength = queueSetting.BatchLength
+
+ w.origHandler = handler
+ w.safeHandler = func(t ...T) (unhandled []T) {
+ defer func() {
+ err := recover()
+ if err != nil {
+ log.Error("Recovered from panic in queue %q handler: %v\n%s", name, err, log.Stack(2))
+ }
+ }()
+ return w.origHandler(t...)
+ }
+
+ return &w, nil
+}
diff --git a/modules/queue/workerqueue_test.go b/modules/queue/workerqueue_test.go
new file mode 100644
index 0000000000..da9451cd77
--- /dev/null
+++ b/modules/queue/workerqueue_test.go
@@ -0,0 +1,260 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package queue
+
+import (
+ "context"
+ "strconv"
+ "sync"
+ "testing"
+ "time"
+
+ "code.gitea.io/gitea/modules/setting"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func runWorkerPoolQueue[T any](q *WorkerPoolQueue[T]) func() {
+ var stop func()
+ started := make(chan struct{})
+ stopped := make(chan struct{})
+ go func() {
+ q.Run(func(f func()) { stop = f; close(started) }, nil)
+ close(stopped)
+ }()
+ <-started
+ return func() {
+ stop()
+ <-stopped
+ }
+}
+
+func TestWorkerPoolQueueUnhandled(t *testing.T) {
+ oldUnhandledItemRequeueDuration := unhandledItemRequeueDuration.Load()
+ unhandledItemRequeueDuration.Store(0)
+ defer unhandledItemRequeueDuration.Store(oldUnhandledItemRequeueDuration)
+
+ mu := sync.Mutex{}
+
+ test := func(t *testing.T, queueSetting setting.QueueSettings) {
+ queueSetting.Length = 100
+ queueSetting.Type = "channel"
+ queueSetting.Datadir = t.TempDir() + "/test-queue"
+ m := map[int]int{}
+
+ // odds are handled once, evens are handled twice
+ handler := func(items ...int) (unhandled []int) {
+ testRecorder.Record("handle:%v", items)
+ for _, item := range items {
+ mu.Lock()
+ if item%2 == 0 && m[item] == 0 {
+ unhandled = append(unhandled, item)
+ }
+ m[item]++
+ mu.Unlock()
+ }
+ return unhandled
+ }
+
+ q, _ := NewWorkerPoolQueueBySetting("test-workpoolqueue", queueSetting, handler, false)
+ stop := runWorkerPoolQueue(q)
+ for i := 0; i < queueSetting.Length; i++ {
+ testRecorder.Record("push:%v", i)
+ assert.NoError(t, q.Push(i))
+ }
+ assert.NoError(t, q.FlushWithContext(context.Background(), 0))
+ stop()
+
+ ok := true
+ for i := 0; i < queueSetting.Length; i++ {
+ if i%2 == 0 {
+ ok = ok && assert.EqualValues(t, 2, m[i], "test %s: item %d", t.Name(), i)
+ } else {
+ ok = ok && assert.EqualValues(t, 1, m[i], "test %s: item %d", t.Name(), i)
+ }
+ }
+ if !ok {
+ t.Logf("m: %v", m)
+ t.Logf("records: %v", testRecorder.Records())
+ }
+ testRecorder.Reset()
+ }
+
+ runCount := 2 // we can run these tests even hundreds times to see its stability
+ t.Run("1/1", func(t *testing.T) {
+ for i := 0; i < runCount; i++ {
+ test(t, setting.QueueSettings{BatchLength: 1, MaxWorkers: 1})
+ }
+ })
+ t.Run("3/1", func(t *testing.T) {
+ for i := 0; i < runCount; i++ {
+ test(t, setting.QueueSettings{BatchLength: 3, MaxWorkers: 1})
+ }
+ })
+ t.Run("4/5", func(t *testing.T) {
+ for i := 0; i < runCount; i++ {
+ test(t, setting.QueueSettings{BatchLength: 4, MaxWorkers: 5})
+ }
+ })
+}
+
+func TestWorkerPoolQueuePersistence(t *testing.T) {
+ runCount := 2 // we can run these tests even hundreds times to see its stability
+ t.Run("1/1", func(t *testing.T) {
+ for i := 0; i < runCount; i++ {
+ testWorkerPoolQueuePersistence(t, setting.QueueSettings{BatchLength: 1, MaxWorkers: 1, Length: 100})
+ }
+ })
+ t.Run("3/1", func(t *testing.T) {
+ for i := 0; i < runCount; i++ {
+ testWorkerPoolQueuePersistence(t, setting.QueueSettings{BatchLength: 3, MaxWorkers: 1, Length: 100})
+ }
+ })
+ t.Run("4/5", func(t *testing.T) {
+ for i := 0; i < runCount; i++ {
+ testWorkerPoolQueuePersistence(t, setting.QueueSettings{BatchLength: 4, MaxWorkers: 5, Length: 100})
+ }
+ })
+}
+
+func testWorkerPoolQueuePersistence(t *testing.T, queueSetting setting.QueueSettings) {
+ testCount := queueSetting.Length
+ queueSetting.Type = "level"
+ queueSetting.Datadir = t.TempDir() + "/test-queue"
+
+ mu := sync.Mutex{}
+
+ var tasksQ1, tasksQ2 []string
+ q1 := func() {
+ startWhenAllReady := make(chan struct{}) // only start data consuming when the "testCount" tasks are all pushed into queue
+ stopAt20Shutdown := make(chan struct{}) // stop and shutdown at the 20th item
+
+ testHandler := func(data ...string) []string {
+ <-startWhenAllReady
+ time.Sleep(10 * time.Millisecond)
+ for _, s := range data {
+ mu.Lock()
+ tasksQ1 = append(tasksQ1, s)
+ mu.Unlock()
+
+ if s == "task-20" {
+ close(stopAt20Shutdown)
+ }
+ }
+ return nil
+ }
+
+ q, _ := NewWorkerPoolQueueBySetting("pr_patch_checker_test", queueSetting, testHandler, true)
+ stop := runWorkerPoolQueue(q)
+ for i := 0; i < testCount; i++ {
+ _ = q.Push("task-" + strconv.Itoa(i))
+ }
+ close(startWhenAllReady)
+ <-stopAt20Shutdown // it's possible to have more than 20 tasks executed
+ stop()
+ }
+
+ q1() // run some tasks and shutdown at an intermediate point
+
+ time.Sleep(100 * time.Millisecond) // because the handler in q1 has a slight delay, we need to wait for it to finish
+
+ q2 := func() {
+ testHandler := func(data ...string) []string {
+ for _, s := range data {
+ mu.Lock()
+ tasksQ2 = append(tasksQ2, s)
+ mu.Unlock()
+ }
+ return nil
+ }
+
+ q, _ := NewWorkerPoolQueueBySetting("pr_patch_checker_test", queueSetting, testHandler, true)
+ stop := runWorkerPoolQueue(q)
+ assert.NoError(t, q.FlushWithContext(context.Background(), 0))
+ stop()
+ }
+
+ q2() // restart the queue to continue to execute the tasks in it
+
+ assert.NotZero(t, len(tasksQ1))
+ assert.NotZero(t, len(tasksQ2))
+ assert.EqualValues(t, testCount, len(tasksQ1)+len(tasksQ2))
+}
+
+func TestWorkerPoolQueueActiveWorkers(t *testing.T) {
+ oldWorkerIdleDuration := workerIdleDuration
+ workerIdleDuration = 300 * time.Millisecond
+ defer func() {
+ workerIdleDuration = oldWorkerIdleDuration
+ }()
+
+ handler := func(items ...int) (unhandled []int) {
+ time.Sleep(100 * time.Millisecond)
+ return nil
+ }
+
+ q, _ := NewWorkerPoolQueueBySetting("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 1, Length: 100}, handler, false)
+ stop := runWorkerPoolQueue(q)
+ for i := 0; i < 5; i++ {
+ assert.NoError(t, q.Push(i))
+ }
+
+ time.Sleep(50 * time.Millisecond)
+ assert.EqualValues(t, 1, q.GetWorkerNumber())
+ assert.EqualValues(t, 1, q.GetWorkerActiveNumber())
+ time.Sleep(500 * time.Millisecond)
+ assert.EqualValues(t, 1, q.GetWorkerNumber())
+ assert.EqualValues(t, 0, q.GetWorkerActiveNumber())
+ time.Sleep(workerIdleDuration)
+ assert.EqualValues(t, 1, q.GetWorkerNumber()) // there is at least one worker after the queue begins working
+ stop()
+
+ q, _ = NewWorkerPoolQueueBySetting("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 3, Length: 100}, handler, false)
+ stop = runWorkerPoolQueue(q)
+ for i := 0; i < 15; i++ {
+ assert.NoError(t, q.Push(i))
+ }
+
+ time.Sleep(50 * time.Millisecond)
+ assert.EqualValues(t, 3, q.GetWorkerNumber())
+ assert.EqualValues(t, 3, q.GetWorkerActiveNumber())
+ time.Sleep(500 * time.Millisecond)
+ assert.EqualValues(t, 3, q.GetWorkerNumber())
+ assert.EqualValues(t, 0, q.GetWorkerActiveNumber())
+ time.Sleep(workerIdleDuration)
+ assert.EqualValues(t, 1, q.GetWorkerNumber()) // there is at least one worker after the queue begins working
+ stop()
+}
+
+func TestWorkerPoolQueueShutdown(t *testing.T) {
+ oldUnhandledItemRequeueDuration := unhandledItemRequeueDuration.Load()
+ unhandledItemRequeueDuration.Store(int64(100 * time.Millisecond))
+ defer unhandledItemRequeueDuration.Store(oldUnhandledItemRequeueDuration)
+
+ // simulate a slow handler, it doesn't handle any item (all items will be pushed back to the queue)
+ handlerCalled := make(chan struct{})
+ handler := func(items ...int) (unhandled []int) {
+ if items[0] == 0 {
+ close(handlerCalled)
+ }
+ time.Sleep(100 * time.Millisecond)
+ return items
+ }
+
+ qs := setting.QueueSettings{Type: "level", Datadir: t.TempDir() + "/queue", BatchLength: 3, MaxWorkers: 4, Length: 20}
+ q, _ := NewWorkerPoolQueueBySetting("test-workpoolqueue", qs, handler, false)
+ stop := runWorkerPoolQueue(q)
+ for i := 0; i < qs.Length; i++ {
+ assert.NoError(t, q.Push(i))
+ }
+ <-handlerCalled
+ time.Sleep(50 * time.Millisecond) // wait for a while to make sure all workers are active
+ assert.EqualValues(t, 4, q.GetWorkerActiveNumber())
+ stop() // stop triggers shutdown
+ assert.EqualValues(t, 0, q.GetWorkerActiveNumber())
+
+ // no item was ever handled, so we still get all of them again
+ q, _ = NewWorkerPoolQueueBySetting("test-workpoolqueue", qs, handler, false)
+ assert.EqualValues(t, 20, q.GetQueueItemNumber())
+}
diff --git a/modules/setting/config_provider.go b/modules/setting/config_provider.go
index 1685958298..24825a6205 100644
--- a/modules/setting/config_provider.go
+++ b/modules/setting/config_provider.go
@@ -42,12 +42,12 @@ type iniFileConfigProvider struct {
// NewEmptyConfigProvider create a new empty config provider
func NewEmptyConfigProvider() ConfigProvider {
- cp, _ := newConfigProviderFromData("")
+ cp, _ := NewConfigProviderFromData("")
return cp
}
-// newConfigProviderFromData this function is only for testing
-func newConfigProviderFromData(configContent string) (ConfigProvider, error) {
+// NewConfigProviderFromData this function is only for testing
+func NewConfigProviderFromData(configContent string) (ConfigProvider, error) {
var cfg *ini.File
var err error
if configContent == "" {
diff --git a/modules/setting/cron_test.go b/modules/setting/cron_test.go
index 8d58cf8b48..3187ab18a2 100644
--- a/modules/setting/cron_test.go
+++ b/modules/setting/cron_test.go
@@ -26,7 +26,7 @@ BASE = true
SECOND = white rabbit
EXTEND = true
`
- cfg, err := newConfigProviderFromData(iniStr)
+ cfg, err := NewConfigProviderFromData(iniStr)
assert.NoError(t, err)
extended := &Extended{
diff --git a/modules/setting/indexer.go b/modules/setting/indexer.go
index 8aee8596de..6836e62311 100644
--- a/modules/setting/indexer.go
+++ b/modules/setting/indexer.go
@@ -70,15 +70,6 @@ func loadIndexerFrom(rootCfg ConfigProvider) {
Indexer.IssueIndexerName = sec.Key("ISSUE_INDEXER_NAME").MustString(Indexer.IssueIndexerName)
- // The following settings are deprecated and can be overridden by settings in [queue] or [queue.issue_indexer]
- // DEPRECATED should not be removed because users maybe upgrade from lower version to the latest version
- // if these are removed, the warning will not be shown
- deprecatedSetting(rootCfg, "indexer", "ISSUE_INDEXER_QUEUE_TYPE", "queue.issue_indexer", "TYPE", "v1.19.0")
- deprecatedSetting(rootCfg, "indexer", "ISSUE_INDEXER_QUEUE_DIR", "queue.issue_indexer", "DATADIR", "v1.19.0")
- deprecatedSetting(rootCfg, "indexer", "ISSUE_INDEXER_QUEUE_CONN_STR", "queue.issue_indexer", "CONN_STR", "v1.19.0")
- deprecatedSetting(rootCfg, "indexer", "ISSUE_INDEXER_QUEUE_BATCH_NUMBER", "queue.issue_indexer", "BATCH_LENGTH", "v1.19.0")
- deprecatedSetting(rootCfg, "indexer", "UPDATE_BUFFER_LEN", "queue.issue_indexer", "LENGTH", "v1.19.0")
-
Indexer.RepoIndexerEnabled = sec.Key("REPO_INDEXER_ENABLED").MustBool(false)
Indexer.RepoType = sec.Key("REPO_INDEXER_TYPE").MustString("bleve")
Indexer.RepoPath = filepath.ToSlash(sec.Key("REPO_INDEXER_PATH").MustString(filepath.ToSlash(filepath.Join(AppDataPath, "indexers/repos.bleve"))))
diff --git a/modules/setting/queue.go b/modules/setting/queue.go
index 8c37e538bb..8673537b52 100644
--- a/modules/setting/queue.go
+++ b/modules/setting/queue.go
@@ -5,198 +5,109 @@ package setting
import (
"path/filepath"
- "strconv"
- "time"
- "code.gitea.io/gitea/modules/container"
+ "code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
)
// QueueSettings represent the settings for a queue from the ini
type QueueSettings struct {
- Name string
- DataDir string
- QueueLength int `ini:"LENGTH"`
- BatchLength int
- ConnectionString string
- Type string
- QueueName string
- SetName string
- WrapIfNecessary bool
- MaxAttempts int
- Timeout time.Duration
- Workers int
- MaxWorkers int
- BlockTimeout time.Duration
- BoostTimeout time.Duration
- BoostWorkers int
-}
+ Name string // not an INI option, it is the name for [queue.the-name] section
-// Queue settings
-var Queue = QueueSettings{}
+ Type string
+ Datadir string
+ ConnStr string // for leveldb or redis
+ Length int // max queue length before blocking
-// GetQueueSettings returns the queue settings for the appropriately named queue
-func GetQueueSettings(name string) QueueSettings {
- return getQueueSettings(CfgProvider, name)
-}
+ QueueName, SetName string // the name suffix for storage (db key, redis key), "set" is for unique queue
-func getQueueSettings(rootCfg ConfigProvider, name string) QueueSettings {
- q := QueueSettings{}
- sec := rootCfg.Section("queue." + name)
- q.Name = name
-
- // DataDir is not directly inheritable
- q.DataDir = filepath.ToSlash(filepath.Join(Queue.DataDir, "common"))
- // QueueName is not directly inheritable either
- q.QueueName = name + Queue.QueueName
- for _, key := range sec.Keys() {
- switch key.Name() {
- case "DATADIR":
- q.DataDir = key.MustString(q.DataDir)
- case "QUEUE_NAME":
- q.QueueName = key.MustString(q.QueueName)
- case "SET_NAME":
- q.SetName = key.MustString(q.SetName)
- }
- }
- if len(q.SetName) == 0 && len(Queue.SetName) > 0 {
- q.SetName = q.QueueName + Queue.SetName
- }
- if !filepath.IsAbs(q.DataDir) {
- q.DataDir = filepath.ToSlash(filepath.Join(AppDataPath, q.DataDir))
- }
- _, _ = sec.NewKey("DATADIR", q.DataDir)
-
- // The rest are...
- q.QueueLength = sec.Key("LENGTH").MustInt(Queue.QueueLength)
- q.BatchLength = sec.Key("BATCH_LENGTH").MustInt(Queue.BatchLength)
- q.ConnectionString = sec.Key("CONN_STR").MustString(Queue.ConnectionString)
- q.Type = sec.Key("TYPE").MustString(Queue.Type)
- q.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(Queue.WrapIfNecessary)
- q.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(Queue.MaxAttempts)
- q.Timeout = sec.Key("TIMEOUT").MustDuration(Queue.Timeout)
- q.Workers = sec.Key("WORKERS").MustInt(Queue.Workers)
- q.MaxWorkers = sec.Key("MAX_WORKERS").MustInt(Queue.MaxWorkers)
- q.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(Queue.BlockTimeout)
- q.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(Queue.BoostTimeout)
- q.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(Queue.BoostWorkers)
-
- return q
+ BatchLength int
+ MaxWorkers int
}
-// LoadQueueSettings sets up the default settings for Queues
-// This is exported for tests to be able to use the queue
-func LoadQueueSettings() {
- loadQueueFrom(CfgProvider)
+var queueSettingsDefault = QueueSettings{
+ Type: "level", // dummy, channel, level, redis
+ Datadir: "queues/common", // relative to AppDataPath
+ Length: 100, // queue length before a channel queue will block
+
+ QueueName: "_queue",
+ SetName: "_unique",
+ BatchLength: 20,
+ MaxWorkers: 10,
}
-func loadQueueFrom(rootCfg ConfigProvider) {
- sec := rootCfg.Section("queue")
- Queue.DataDir = filepath.ToSlash(sec.Key("DATADIR").MustString("queues/"))
- if !filepath.IsAbs(Queue.DataDir) {
- Queue.DataDir = filepath.ToSlash(filepath.Join(AppDataPath, Queue.DataDir))
+func GetQueueSettings(rootCfg ConfigProvider, name string) (QueueSettings, error) {
+ // deep copy default settings
+ cfg := QueueSettings{}
+ if cfgBs, err := json.Marshal(queueSettingsDefault); err != nil {
+ return cfg, err
+ } else if err = json.Unmarshal(cfgBs, &cfg); err != nil {
+ return cfg, err
}
- Queue.QueueLength = sec.Key("LENGTH").MustInt(20)
- Queue.BatchLength = sec.Key("BATCH_LENGTH").MustInt(20)
- Queue.ConnectionString = sec.Key("CONN_STR").MustString("")
- defaultType := sec.Key("TYPE").String()
- Queue.Type = sec.Key("TYPE").MustString("persistable-channel")
- Queue.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(true)
- Queue.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(10)
- Queue.Timeout = sec.Key("TIMEOUT").MustDuration(GracefulHammerTime + 30*time.Second)
- Queue.Workers = sec.Key("WORKERS").MustInt(0)
- Queue.MaxWorkers = sec.Key("MAX_WORKERS").MustInt(10)
- Queue.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(1 * time.Second)
- Queue.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(5 * time.Minute)
- Queue.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(1)
- Queue.QueueName = sec.Key("QUEUE_NAME").MustString("_queue")
- Queue.SetName = sec.Key("SET_NAME").MustString("")
-
- // Now handle the old issue_indexer configuration
- // FIXME: DEPRECATED to be removed in v1.18.0
- section := rootCfg.Section("queue.issue_indexer")
- directlySet := toDirectlySetKeysSet(section)
- if !directlySet.Contains("TYPE") && defaultType == "" {
- switch typ := rootCfg.Section("indexer").Key("ISSUE_INDEXER_QUEUE_TYPE").MustString(""); typ {
- case "levelqueue":
- _, _ = section.NewKey("TYPE", "level")
- case "channel":
- _, _ = section.NewKey("TYPE", "persistable-channel")
- case "redis":
- _, _ = section.NewKey("TYPE", "redis")
- case "":
- _, _ = section.NewKey("TYPE", "level")
- default:
- log.Fatal("Unsupported indexer queue type: %v", typ)
+
+ cfg.Name = name
+ if sec, err := rootCfg.GetSection("queue"); err == nil {
+ if err = sec.MapTo(&cfg); err != nil {
+ log.Error("Failed to map queue common config for %q: %v", name, err)
+ return cfg, nil
}
}
- if !directlySet.Contains("LENGTH") {
- length := rootCfg.Section("indexer").Key("UPDATE_BUFFER_LEN").MustInt(0)
- if length != 0 {
- _, _ = section.NewKey("LENGTH", strconv.Itoa(length))
+ if sec, err := rootCfg.GetSection("queue." + name); err == nil {
+ if err = sec.MapTo(&cfg); err != nil {
+ log.Error("Failed to map queue spec config for %q: %v", name, err)
+ return cfg, nil
}
- }
- if !directlySet.Contains("BATCH_LENGTH") {
- fallback := rootCfg.Section("indexer").Key("ISSUE_INDEXER_QUEUE_BATCH_NUMBER").MustInt(0)
- if fallback != 0 {
- _, _ = section.NewKey("BATCH_LENGTH", strconv.Itoa(fallback))
+ if sec.HasKey("CONN_STR") {
+ cfg.ConnStr = sec.Key("CONN_STR").String()
}
}
- if !directlySet.Contains("DATADIR") {
- queueDir := filepath.ToSlash(rootCfg.Section("indexer").Key("ISSUE_INDEXER_QUEUE_DIR").MustString(""))
- if queueDir != "" {
- _, _ = section.NewKey("DATADIR", queueDir)
- }
+
+ if cfg.Datadir == "" {
+ cfg.Datadir = queueSettingsDefault.Datadir
}
- if !directlySet.Contains("CONN_STR") {
- connStr := rootCfg.Section("indexer").Key("ISSUE_INDEXER_QUEUE_CONN_STR").MustString("")
- if connStr != "" {
- _, _ = section.NewKey("CONN_STR", connStr)
- }
+ if !filepath.IsAbs(cfg.Datadir) {
+ cfg.Datadir = filepath.Join(AppDataPath, cfg.Datadir)
}
+ cfg.Datadir = filepath.ToSlash(cfg.Datadir)
- // FIXME: DEPRECATED to be removed in v1.18.0
- // - will need to set default for [queue.*)] LENGTH appropriately though though
-
- // Handle the old mailer configuration
- handleOldLengthConfiguration(rootCfg, "mailer", "mailer", "SEND_BUFFER_LEN", 100)
-
- // Handle the old test pull requests configuration
- // Please note this will be a unique queue
- handleOldLengthConfiguration(rootCfg, "pr_patch_checker", "repository", "PULL_REQUEST_QUEUE_LENGTH", 1000)
-
- // Handle the old mirror queue configuration
- // Please note this will be a unique queue
- handleOldLengthConfiguration(rootCfg, "mirror", "repository", "MIRROR_QUEUE_LENGTH", 1000)
-}
-
-// handleOldLengthConfiguration allows fallback to older configuration. `[queue.name]` `LENGTH` will override this configuration, but
-// if that is left unset then we should fallback to the older configuration. (Except where the new length woul be <=0)
-func handleOldLengthConfiguration(rootCfg ConfigProvider, queueName, oldSection, oldKey string, defaultValue int) {
- if rootCfg.Section(oldSection).HasKey(oldKey) {
- log.Error("Deprecated fallback for %s queue length `[%s]` `%s` present. Use `[queue.%s]` `LENGTH`. This will be removed in v1.18.0", queueName, queueName, oldSection, oldKey)
+ if cfg.Type == "redis" && cfg.ConnStr == "" {
+ cfg.ConnStr = "redis://127.0.0.1:6379/0"
}
- value := rootCfg.Section(oldSection).Key(oldKey).MustInt(defaultValue)
- // Don't override with 0
- if value <= 0 {
- return
+ if cfg.Length <= 0 {
+ cfg.Length = queueSettingsDefault.Length
}
-
- section := rootCfg.Section("queue." + queueName)
- directlySet := toDirectlySetKeysSet(section)
- if !directlySet.Contains("LENGTH") {
- _, _ = section.NewKey("LENGTH", strconv.Itoa(value))
+ if cfg.MaxWorkers <= 0 {
+ cfg.MaxWorkers = queueSettingsDefault.MaxWorkers
}
+ if cfg.BatchLength <= 0 {
+ cfg.BatchLength = queueSettingsDefault.BatchLength
+ }
+
+ return cfg, nil
}
-// toDirectlySetKeysSet returns a set of keys directly set by this section
-// Note: we cannot use section.HasKey(...) as that will immediately set the Key if a parent section has the Key
-// but this section does not.
-func toDirectlySetKeysSet(section ConfigSection) container.Set[string] {
- sections := make(container.Set[string])
- for _, key := range section.Keys() {
- sections.Add(key.Name())
+func LoadQueueSettings() {
+ loadQueueFrom(CfgProvider)
+}
+
+func loadQueueFrom(rootCfg ConfigProvider) {
+ hasOld := false
+ handleOldLengthConfiguration := func(rootCfg ConfigProvider, newQueueName, oldSection, oldKey string) {
+ if rootCfg.Section(oldSection).HasKey(oldKey) {
+ hasOld = true
+ log.Error("Removed queue option: `[%s].%s`. Use new options in `[queue.%s]`", oldSection, oldKey, newQueueName)
+ }
+ }
+ handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_TYPE")
+ handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_BATCH_NUMBER")
+ handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_DIR")
+ handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_CONN_STR")
+ handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "UPDATE_BUFFER_LEN")
+ handleOldLengthConfiguration(rootCfg, "mailer", "mailer", "SEND_BUFFER_LEN")
+ handleOldLengthConfiguration(rootCfg, "pr_patch_checker", "repository", "PULL_REQUEST_QUEUE_LENGTH")
+ handleOldLengthConfiguration(rootCfg, "mirror", "repository", "MIRROR_QUEUE_LENGTH")
+ if hasOld {
+ log.Fatal("Please update your app.ini to remove deprecated config options")
}
- return sections
}
diff --git a/modules/setting/storage_test.go b/modules/setting/storage_test.go
index 9c51bbc081..5e213606e3 100644
--- a/modules/setting/storage_test.go
+++ b/modules/setting/storage_test.go
@@ -19,7 +19,7 @@ MINIO_BUCKET = gitea-attachment
STORAGE_TYPE = minio
MINIO_ENDPOINT = my_minio:9000
`
- cfg, err := newConfigProviderFromData(iniStr)
+ cfg, err := NewConfigProviderFromData(iniStr)
assert.NoError(t, err)
sec := cfg.Section("attachment")
@@ -42,7 +42,7 @@ MINIO_BUCKET = gitea-attachment
[storage.minio]
MINIO_BUCKET = gitea
`
- cfg, err := newConfigProviderFromData(iniStr)
+ cfg, err := NewConfigProviderFromData(iniStr)
assert.NoError(t, err)
sec := cfg.Section("attachment")
@@ -64,7 +64,7 @@ MINIO_BUCKET = gitea-minio
[storage]
MINIO_BUCKET = gitea
`
- cfg, err := newConfigProviderFromData(iniStr)
+ cfg, err := NewConfigProviderFromData(iniStr)
assert.NoError(t, err)
sec := cfg.Section("attachment")
@@ -87,7 +87,7 @@ MINIO_BUCKET = gitea
[storage]
STORAGE_TYPE = local
`
- cfg, err := newConfigProviderFromData(iniStr)
+ cfg, err := NewConfigProviderFromData(iniStr)
assert.NoError(t, err)
sec := cfg.Section("attachment")
@@ -99,7 +99,7 @@ STORAGE_TYPE = local
}
func Test_getStorageGetDefaults(t *testing.T) {
- cfg, err := newConfigProviderFromData("")
+ cfg, err := NewConfigProviderFromData("")
assert.NoError(t, err)
sec := cfg.Section("attachment")
@@ -120,7 +120,7 @@ MINIO_BUCKET = gitea-attachment
[storage]
MINIO_BUCKET = gitea-storage
`
- cfg, err := newConfigProviderFromData(iniStr)
+ cfg, err := NewConfigProviderFromData(iniStr)
assert.NoError(t, err)
{
@@ -154,7 +154,7 @@ STORAGE_TYPE = lfs
[storage.lfs]
MINIO_BUCKET = gitea-storage
`
- cfg, err := newConfigProviderFromData(iniStr)
+ cfg, err := NewConfigProviderFromData(iniStr)
assert.NoError(t, err)
{
@@ -178,7 +178,7 @@ func Test_getStorageInheritStorageType(t *testing.T) {
[storage]
STORAGE_TYPE = minio
`
- cfg, err := newConfigProviderFromData(iniStr)
+ cfg, err := NewConfigProviderFromData(iniStr)
assert.NoError(t, err)
sec := cfg.Section("attachment")
@@ -193,7 +193,7 @@ func Test_getStorageInheritNameSectionType(t *testing.T) {
[storage.attachments]
STORAGE_TYPE = minio
`
- cfg, err := newConfigProviderFromData(iniStr)
+ cfg, err := NewConfigProviderFromData(iniStr)
assert.NoError(t, err)
sec := cfg.Section("attachment")
diff --git a/modules/test/context_tests.go b/modules/test/context_tests.go
index 35dd920bb9..5ba2126126 100644
--- a/modules/test/context_tests.go
+++ b/modules/test/context_tests.go
@@ -26,6 +26,7 @@ import (
)
// MockContext mock context for unit tests
+// TODO: move this function to other packages, because it depends on "models" package
func MockContext(t *testing.T, path string) *context.Context {
resp := &mockResponseWriter{}
ctx := context.Context{
diff --git a/modules/testlogger/testlogger.go b/modules/testlogger/testlogger.go
new file mode 100644
index 0000000000..bf912f41dc
--- /dev/null
+++ b/modules/testlogger/testlogger.go
@@ -0,0 +1,212 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package testlogger
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "runtime"
+ "strings"
+ "sync"
+ "testing"
+ "time"
+
+ "code.gitea.io/gitea/modules/json"
+ "code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/queue"
+)
+
+var (
+ prefix string
+ SlowTest = 10 * time.Second
+ SlowFlush = 5 * time.Second
+)
+
+// TestLogger is a logger which will write to the testing log
+type TestLogger struct {
+ log.WriterLogger
+}
+
+var WriterCloser = &testLoggerWriterCloser{}
+
+type testLoggerWriterCloser struct {
+ sync.RWMutex
+ t []*testing.TB
+}
+
+func (w *testLoggerWriterCloser) pushT(t *testing.TB) {
+ w.Lock()
+ w.t = append(w.t, t)
+ w.Unlock()
+}
+
+func (w *testLoggerWriterCloser) Write(p []byte) (int, error) {
+ // There was a data race problem: the logger system could still try to output logs after the runner is finished.
+ // So we must ensure that the "t" in stack is still valid.
+ w.RLock()
+ defer w.RUnlock()
+
+ var t *testing.TB
+ if len(w.t) > 0 {
+ t = w.t[len(w.t)-1]
+ }
+
+ if len(p) > 0 && p[len(p)-1] == '\n' {
+ p = p[:len(p)-1]
+ }
+
+ if t == nil || *t == nil {
+ return fmt.Fprintf(os.Stdout, "??? [Unknown Test] %s\n", p)
+ }
+
+ defer func() {
+ err := recover()
+ if err == nil {
+ return
+ }
+ var errString string
+ errErr, ok := err.(error)
+ if ok {
+ errString = errErr.Error()
+ } else {
+ errString, ok = err.(string)
+ }
+ if !ok {
+ panic(err)
+ }
+ if !strings.HasPrefix(errString, "Log in goroutine after ") {
+ panic(err)
+ }
+ }()
+
+ (*t).Log(string(p))
+ return len(p), nil
+}
+
+func (w *testLoggerWriterCloser) popT() {
+ w.Lock()
+ if len(w.t) > 0 {
+ w.t = w.t[:len(w.t)-1]
+ }
+ w.Unlock()
+}
+
+func (w *testLoggerWriterCloser) Close() error {
+ return nil
+}
+
+func (w *testLoggerWriterCloser) Reset() {
+ w.Lock()
+ if len(w.t) > 0 {
+ for _, t := range w.t {
+ if t == nil {
+ continue
+ }
+ fmt.Fprintf(os.Stdout, "Unclosed logger writer in test: %s", (*t).Name())
+ (*t).Errorf("Unclosed logger writer in test: %s", (*t).Name())
+ }
+ w.t = nil
+ }
+ w.Unlock()
+}
+
+// PrintCurrentTest prints the current test to os.Stdout
+func PrintCurrentTest(t testing.TB, skip ...int) func() {
+ start := time.Now()
+ actualSkip := 1
+ if len(skip) > 0 {
+ actualSkip = skip[0]
+ }
+ _, filename, line, _ := runtime.Caller(actualSkip)
+
+ if log.CanColorStdout {
+ fmt.Fprintf(os.Stdout, "=== %s (%s:%d)\n", fmt.Formatter(log.NewColoredValue(t.Name())), strings.TrimPrefix(filename, prefix), line)
+ } else {
+ fmt.Fprintf(os.Stdout, "=== %s (%s:%d)\n", t.Name(), strings.TrimPrefix(filename, prefix), line)
+ }
+ WriterCloser.pushT(&t)
+ return func() {
+ took := time.Since(start)
+ if took > SlowTest {
+ if log.CanColorStdout {
+ fmt.Fprintf(os.Stdout, "+++ %s is a slow test (took %v)\n", fmt.Formatter(log.NewColoredValue(t.Name(), log.Bold, log.FgYellow)), fmt.Formatter(log.NewColoredValue(took, log.Bold, log.FgYellow)))
+ } else {
+ fmt.Fprintf(os.Stdout, "+++ %s is a slow test (took %v)\n", t.Name(), took)
+ }
+ }
+ timer := time.AfterFunc(SlowFlush, func() {
+ if log.CanColorStdout {
+ fmt.Fprintf(os.Stdout, "+++ %s ... still flushing after %v ...\n", fmt.Formatter(log.NewColoredValue(t.Name(), log.Bold, log.FgRed)), SlowFlush)
+ } else {
+ fmt.Fprintf(os.Stdout, "+++ %s ... still flushing after %v ...\n", t.Name(), SlowFlush)
+ }
+ })
+ if err := queue.GetManager().FlushAll(context.Background(), time.Minute); err != nil {
+ t.Errorf("Flushing queues failed with error %v", err)
+ }
+ timer.Stop()
+ flushTook := time.Since(start) - took
+ if flushTook > SlowFlush {
+ if log.CanColorStdout {
+ fmt.Fprintf(os.Stdout, "+++ %s had a slow clean-up flush (took %v)\n", fmt.Formatter(log.NewColoredValue(t.Name(), log.Bold, log.FgRed)), fmt.Formatter(log.NewColoredValue(flushTook, log.Bold, log.FgRed)))
+ } else {
+ fmt.Fprintf(os.Stdout, "+++ %s had a slow clean-up flush (took %v)\n", t.Name(), flushTook)
+ }
+ }
+ WriterCloser.popT()
+ }
+}
+
+// Printf takes a format and args and prints the string to os.Stdout
+func Printf(format string, args ...interface{}) {
+ if log.CanColorStdout {
+ for i := 0; i < len(args); i++ {
+ args[i] = log.NewColoredValue(args[i])
+ }
+ }
+ fmt.Fprintf(os.Stdout, "\t"+format, args...)
+}
+
+// NewTestLogger creates a TestLogger as a log.LoggerProvider
+func NewTestLogger() log.LoggerProvider {
+ logger := &TestLogger{}
+ logger.Colorize = log.CanColorStdout
+ logger.Level = log.TRACE
+ return logger
+}
+
+// Init inits connection writer with json config.
+// json config only need key "level".
+func (log *TestLogger) Init(config string) error {
+ err := json.Unmarshal([]byte(config), log)
+ if err != nil {
+ return err
+ }
+ log.NewWriterLogger(WriterCloser)
+ return nil
+}
+
+// Flush when log should be flushed
+func (log *TestLogger) Flush() {
+}
+
+// ReleaseReopen does nothing
+func (log *TestLogger) ReleaseReopen() error {
+ return nil
+}
+
+// GetName returns the default name for this implementation
+func (log *TestLogger) GetName() string {
+ return "test"
+}
+
+func init() {
+ const relFilePath = "modules/testlogger/testlogger.go"
+ _, filename, _, _ := runtime.Caller(0)
+ if !strings.HasSuffix(filename, relFilePath) {
+ panic("source code file path doesn't match expected: " + relFilePath)
+ }
+ prefix = strings.TrimSuffix(filename, relFilePath)
+}
diff --git a/modules/util/timer.go b/modules/util/timer.go
index d598fde73a..f9a7950709 100644
--- a/modules/util/timer.go
+++ b/modules/util/timer.go
@@ -8,18 +8,6 @@ import (
"time"
)
-// StopTimer is a utility function to safely stop a time.Timer and clean its channel
-func StopTimer(t *time.Timer) bool {
- stopped := t.Stop()
- if !stopped {
- select {
- case <-t.C:
- default:
- }
- }
- return stopped
-}
-
func Debounce(d time.Duration) func(f func()) {
type debouncer struct {
mu sync.Mutex