aboutsummaryrefslogtreecommitdiffstats
path: root/services
diff options
context:
space:
mode:
authorzeripath <art27@cantab.net>2021-11-23 03:09:35 +0000
committerGitHub <noreply@github.com>2021-11-22 22:09:35 -0500
commit188fd2dd1a778ad140a569b31d2bf6c95e2a6bae (patch)
tree9e10531fbeaf7df945b0d10e3ffce620d3d6d696 /services
parent9450410ff71db5c6076fbe72e4b47fc9798b8d14 (diff)
downloadgitea-188fd2dd1a778ad140a569b31d2bf6c95e2a6bae.tar.gz
gitea-188fd2dd1a778ad140a569b31d2bf6c95e2a6bae.zip
Add `PULL_LIMIT` and `PUSH_LIMIT` to cron.update_mirror task (#17568)
Diffstat (limited to 'services')
-rw-r--r--services/cron/tasks_basic.go25
-rw-r--r--services/mirror/mirror.go52
2 files changed, 60 insertions, 17 deletions
diff --git a/services/cron/tasks_basic.go b/services/cron/tasks_basic.go
index 57fb399d4e..219173ccf0 100644
--- a/services/cron/tasks_basic.go
+++ b/services/cron/tasks_basic.go
@@ -18,13 +18,24 @@ import (
)
func registerUpdateMirrorTask() {
- RegisterTaskFatal("update_mirrors", &BaseConfig{
- Enabled: true,
- RunAtStart: false,
- Schedule: "@every 10m",
- NoSuccessNotice: true,
- }, func(ctx context.Context, _ *models.User, _ Config) error {
- return mirror_service.Update(ctx)
+ type UpdateMirrorTaskConfig struct {
+ BaseConfig
+ PullLimit int
+ PushLimit int
+ }
+
+ RegisterTaskFatal("update_mirrors", &UpdateMirrorTaskConfig{
+ BaseConfig: BaseConfig{
+ Enabled: true,
+ RunAtStart: false,
+ Schedule: "@every 10m",
+ NoSuccessNotice: true,
+ },
+ PullLimit: 50,
+ PushLimit: 50,
+ }, func(ctx context.Context, _ *models.User, cfg Config) error {
+ umtc := cfg.(*UpdateMirrorTaskConfig)
+ return mirror_service.Update(ctx, umtc.PullLimit, umtc.PushLimit)
})
}
diff --git a/services/mirror/mirror.go b/services/mirror/mirror.go
index eb37639bef..dae6f2807b 100644
--- a/services/mirror/mirror.go
+++ b/services/mirror/mirror.go
@@ -45,15 +45,19 @@ func doMirrorSync(ctx context.Context, req *SyncRequest) {
}
}
+var errLimit = fmt.Errorf("reached limit")
+
// Update checks and updates mirror repositories.
-func Update(ctx context.Context) error {
+func Update(ctx context.Context, pullLimit, pushLimit int) error {
if !setting.Mirror.Enabled {
log.Warn("Mirror feature disabled, but cron job enabled: skip update")
return nil
}
log.Trace("Doing: Update")
- handler := func(idx int, bean interface{}) error {
+ requested := 0
+
+ handler := func(idx int, bean interface{}, limit int) error {
var item SyncRequest
if m, ok := bean.(*models.Mirror); ok {
if m.Repo == nil {
@@ -78,21 +82,49 @@ func Update(ctx context.Context) error {
return nil
}
+ // Check we've not been cancelled
select {
case <-ctx.Done():
- return fmt.Errorf("Aborted")
+ return fmt.Errorf("aborted")
default:
- return mirrorQueue.Push(&item)
}
+
+ // Check if this request is already in the queue
+ has, err := mirrorQueue.Has(&item)
+ if err != nil {
+ return err
+ }
+ if has {
+ return nil
+ }
+
+ // Push to the Queue
+ if err := mirrorQueue.Push(&item); err != nil {
+ return err
+ }
+
+ requested++
+ if limit > 0 && requested > limit {
+ return errLimit
+ }
+ return nil
}
- if err := models.MirrorsIterate(handler); err != nil {
- log.Error("MirrorsIterate: %v", err)
- return err
+ if pullLimit != 0 {
+ if err := models.MirrorsIterate(func(idx int, bean interface{}) error {
+ return handler(idx, bean, pullLimit)
+ }); err != nil && err != errLimit {
+ log.Error("MirrorsIterate: %v", err)
+ return err
+ }
}
- if err := models.PushMirrorsIterate(handler); err != nil {
- log.Error("PushMirrorsIterate: %v", err)
- return err
+ if pushLimit != 0 {
+ if err := models.PushMirrorsIterate(func(idx int, bean interface{}) error {
+ return handler(idx, bean, pushLimit)
+ }); err != nil && err != errLimit {
+ log.Error("PushMirrorsIterate: %v", err)
+ return err
+ }
}
log.Trace("Finished: Update")
return nil