diff options
author | zeripath <art27@cantab.net> | 2021-11-23 03:09:35 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-11-22 22:09:35 -0500 |
commit | 188fd2dd1a778ad140a569b31d2bf6c95e2a6bae (patch) | |
tree | 9e10531fbeaf7df945b0d10e3ffce620d3d6d696 /services | |
parent | 9450410ff71db5c6076fbe72e4b47fc9798b8d14 (diff) | |
download | gitea-188fd2dd1a778ad140a569b31d2bf6c95e2a6bae.tar.gz gitea-188fd2dd1a778ad140a569b31d2bf6c95e2a6bae.zip |
Add `PULL_LIMIT` and `PUSH_LIMIT` to cron.update_mirror task (#17568)
Diffstat (limited to 'services')
-rw-r--r-- | services/cron/tasks_basic.go | 25 | ||||
-rw-r--r-- | services/mirror/mirror.go | 52 |
2 files changed, 60 insertions, 17 deletions
diff --git a/services/cron/tasks_basic.go b/services/cron/tasks_basic.go index 57fb399d4e..219173ccf0 100644 --- a/services/cron/tasks_basic.go +++ b/services/cron/tasks_basic.go @@ -18,13 +18,24 @@ import ( ) func registerUpdateMirrorTask() { - RegisterTaskFatal("update_mirrors", &BaseConfig{ - Enabled: true, - RunAtStart: false, - Schedule: "@every 10m", - NoSuccessNotice: true, - }, func(ctx context.Context, _ *models.User, _ Config) error { - return mirror_service.Update(ctx) + type UpdateMirrorTaskConfig struct { + BaseConfig + PullLimit int + PushLimit int + } + + RegisterTaskFatal("update_mirrors", &UpdateMirrorTaskConfig{ + BaseConfig: BaseConfig{ + Enabled: true, + RunAtStart: false, + Schedule: "@every 10m", + NoSuccessNotice: true, + }, + PullLimit: 50, + PushLimit: 50, + }, func(ctx context.Context, _ *models.User, cfg Config) error { + umtc := cfg.(*UpdateMirrorTaskConfig) + return mirror_service.Update(ctx, umtc.PullLimit, umtc.PushLimit) }) } diff --git a/services/mirror/mirror.go b/services/mirror/mirror.go index eb37639bef..dae6f2807b 100644 --- a/services/mirror/mirror.go +++ b/services/mirror/mirror.go @@ -45,15 +45,19 @@ func doMirrorSync(ctx context.Context, req *SyncRequest) { } } +var errLimit = fmt.Errorf("reached limit") + // Update checks and updates mirror repositories. -func Update(ctx context.Context) error { +func Update(ctx context.Context, pullLimit, pushLimit int) error { if !setting.Mirror.Enabled { log.Warn("Mirror feature disabled, but cron job enabled: skip update") return nil } log.Trace("Doing: Update") - handler := func(idx int, bean interface{}) error { + requested := 0 + + handler := func(idx int, bean interface{}, limit int) error { var item SyncRequest if m, ok := bean.(*models.Mirror); ok { if m.Repo == nil { @@ -78,21 +82,49 @@ func Update(ctx context.Context) error { return nil } + // Check we've not been cancelled select { case <-ctx.Done(): - return fmt.Errorf("Aborted") + return fmt.Errorf("aborted") default: - return mirrorQueue.Push(&item) } + + // Check if this request is already in the queue + has, err := mirrorQueue.Has(&item) + if err != nil { + return err + } + if has { + return nil + } + + // Push to the Queue + if err := mirrorQueue.Push(&item); err != nil { + return err + } + + requested++ + if limit > 0 && requested > limit { + return errLimit + } + return nil } - if err := models.MirrorsIterate(handler); err != nil { - log.Error("MirrorsIterate: %v", err) - return err + if pullLimit != 0 { + if err := models.MirrorsIterate(func(idx int, bean interface{}) error { + return handler(idx, bean, pullLimit) + }); err != nil && err != errLimit { + log.Error("MirrorsIterate: %v", err) + return err + } } - if err := models.PushMirrorsIterate(handler); err != nil { - log.Error("PushMirrorsIterate: %v", err) - return err + if pushLimit != 0 { + if err := models.PushMirrorsIterate(func(idx int, bean interface{}) error { + return handler(idx, bean, pushLimit) + }); err != nil && err != errLimit { + log.Error("PushMirrorsIterate: %v", err) + return err + } } log.Trace("Finished: Update") return nil |