summaryrefslogtreecommitdiffstats
path: root/services
diff options
context:
space:
mode:
authorzeripath <art27@cantab.net>2022-01-22 21:22:14 +0000
committerGitHub <noreply@github.com>2022-01-22 21:22:14 +0000
commita82fd98d5368a75cbcf6b74c12f58f3f81e66662 (patch)
treecb64c9348ee3d3194c786bb970770c06a8bd4fb1 /services
parent27ee01e1e866f2f13603af65224ddae77d5149d7 (diff)
downloadgitea-a82fd98d5368a75cbcf6b74c12f58f3f81e66662.tar.gz
gitea-a82fd98d5368a75cbcf6b74c12f58f3f81e66662.zip
Pause queues (#15928)
* Start adding mechanism to return unhandled data Signed-off-by: Andrew Thornton <art27@cantab.net> * Create pushback interface Signed-off-by: Andrew Thornton <art27@cantab.net> * Add Pausable interface to WorkerPool and Manager Signed-off-by: Andrew Thornton <art27@cantab.net> * Implement Pausable and PushBack for the bytefifos Signed-off-by: Andrew Thornton <art27@cantab.net> * Implement Pausable and Pushback for ChannelQueues and ChannelUniqueQueues Signed-off-by: Andrew Thornton <art27@cantab.net> * Wire in UI for pausing Signed-off-by: Andrew Thornton <art27@cantab.net> * add testcases and fix a few issues Signed-off-by: Andrew Thornton <art27@cantab.net> * fix build Signed-off-by: Andrew Thornton <art27@cantab.net> * prevent "race" in the test Signed-off-by: Andrew Thornton <art27@cantab.net> * fix jsoniter mismerge Signed-off-by: Andrew Thornton <art27@cantab.net> * fix conflicts Signed-off-by: Andrew Thornton <art27@cantab.net> * fix format Signed-off-by: Andrew Thornton <art27@cantab.net> * Add warnings for no worker configurations and prevent data-loss with redis/levelqueue Signed-off-by: Andrew Thornton <art27@cantab.net> * Use StopTimer Signed-off-by: Andrew Thornton <art27@cantab.net> Co-authored-by: Lauris BH <lauris@nix.lv> Co-authored-by: 6543 <6543@obermui.de> Co-authored-by: techknowlogick <techknowlogick@gitea.io> Co-authored-by: wxiaoguang <wxiaoguang@gmail.com>
Diffstat (limited to 'services')
-rw-r--r--services/mailer/mailer.go3
-rw-r--r--services/mirror/mirror.go3
-rw-r--r--services/pull/check.go3
-rw-r--r--services/pull/check_test.go3
-rw-r--r--services/repository/archiver/archiver.go3
-rw-r--r--services/repository/push.go3
-rw-r--r--services/task/task.go3
7 files changed, 14 insertions, 7 deletions
diff --git a/services/mailer/mailer.go b/services/mailer/mailer.go
index eeb98b5879..3ca9b50fc6 100644
--- a/services/mailer/mailer.go
+++ b/services/mailer/mailer.go
@@ -346,7 +346,7 @@ func NewContext() {
Sender = &dummySender{}
}
- mailQueue = queue.CreateQueue("mail", func(data ...queue.Data) {
+ mailQueue = queue.CreateQueue("mail", func(data ...queue.Data) []queue.Data {
for _, datum := range data {
msg := datum.(*Message)
gomailMsg := msg.ToMessage()
@@ -357,6 +357,7 @@ func NewContext() {
log.Trace("E-mails sent %s: %s", gomailMsg.GetHeader("To"), msg.Info)
}
}
+ return nil
}, &Message{})
go graceful.GetManager().RunWithShutdownFns(mailQueue.Run)
diff --git a/services/mirror/mirror.go b/services/mirror/mirror.go
index 2643200174..6f285ec467 100644
--- a/services/mirror/mirror.go
+++ b/services/mirror/mirror.go
@@ -130,11 +130,12 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error {
return nil
}
-func queueHandle(data ...queue.Data) {
+func queueHandle(data ...queue.Data) []queue.Data {
for _, datum := range data {
req := datum.(*SyncRequest)
doMirrorSync(graceful.GetManager().ShutdownContext(), req)
}
+ return nil
}
// InitSyncMirrors initializes a go routine to sync the mirrors
diff --git a/services/pull/check.go b/services/pull/check.go
index 3615c6c654..2203fb8749 100644
--- a/services/pull/check.go
+++ b/services/pull/check.go
@@ -216,12 +216,13 @@ func InitializePullRequests(ctx context.Context) {
}
// handle passed PR IDs and test the PRs
-func handle(data ...queue.Data) {
+func handle(data ...queue.Data) []queue.Data {
for _, datum := range data {
id, _ := strconv.ParseInt(datum.(string), 10, 64)
testPR(id)
}
+ return nil
}
func testPR(id int64) {
diff --git a/services/pull/check_test.go b/services/pull/check_test.go
index f0ec096ea9..4cdd17cc7b 100644
--- a/services/pull/check_test.go
+++ b/services/pull/check_test.go
@@ -22,11 +22,12 @@ func TestPullRequest_AddToTaskQueue(t *testing.T) {
idChan := make(chan int64, 10)
- q, err := queue.NewChannelUniqueQueue(func(data ...queue.Data) {
+ q, err := queue.NewChannelUniqueQueue(func(data ...queue.Data) []queue.Data {
for _, datum := range data {
id, _ := strconv.ParseInt(datum.(string), 10, 64)
idChan <- id
}
+ return nil
}, queue.ChannelUniqueQueueConfiguration{
WorkerPoolConfiguration: queue.WorkerPoolConfiguration{
QueueLength: 10,
diff --git a/services/repository/archiver/archiver.go b/services/repository/archiver/archiver.go
index f982e2ef7b..ad2141ef33 100644
--- a/services/repository/archiver/archiver.go
+++ b/services/repository/archiver/archiver.go
@@ -246,7 +246,7 @@ var archiverQueue queue.UniqueQueue
// Init initlize archive
func Init() error {
- handler := func(data ...queue.Data) {
+ handler := func(data ...queue.Data) []queue.Data {
for _, datum := range data {
archiveReq, ok := datum.(*ArchiveRequest)
if !ok {
@@ -258,6 +258,7 @@ func Init() error {
log.Error("Archive %v failed: %v", datum, err)
}
}
+ return nil
}
archiverQueue = queue.CreateUniqueQueue("repo-archive", handler, new(ArchiveRequest))
diff --git a/services/repository/push.go b/services/repository/push.go
index 518ad04157..fafe4736ab 100644
--- a/services/repository/push.go
+++ b/services/repository/push.go
@@ -33,13 +33,14 @@ import (
var pushQueue queue.Queue
// handle passed PR IDs and test the PRs
-func handle(data ...queue.Data) {
+func handle(data ...queue.Data) []queue.Data {
for _, datum := range data {
opts := datum.([]*repo_module.PushUpdateOptions)
if err := pushUpdates(opts); err != nil {
log.Error("pushUpdate failed: %v", err)
}
}
+ return nil
}
func initPushQueue() error {
diff --git a/services/task/task.go b/services/task/task.go
index 376fe1dce1..3f823fc224 100644
--- a/services/task/task.go
+++ b/services/task/task.go
@@ -49,13 +49,14 @@ func Init() error {
return nil
}
-func handle(data ...queue.Data) {
+func handle(data ...queue.Data) []queue.Data {
for _, datum := range data {
task := datum.(*models.Task)
if err := Run(task); err != nil {
log.Error("Run task failed: %v", err)
}
}
+ return nil
}
// MigrateRepository add migration repository to task