diff options
author | Chongyi Zheng <git@zcy.dev> | 2023-08-26 22:24:45 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-08-27 10:24:45 +0800 |
commit | 43652746f2929aeb53ec1e575c3691050755a0b5 (patch) | |
tree | 8f2f05ca05fc6ad1b5c557c77a95f17c0f2381bc /services/mirror | |
parent | 37b3ba22a1dcf88cf4bfa4a7861703c8a5701f5c (diff) | |
download | gitea-43652746f2929aeb53ec1e575c3691050755a0b5.tar.gz gitea-43652746f2929aeb53ec1e575c3691050755a0b5.zip |
Move `modules/mirror` to `services` (#26737)
To solve the cyclic imports in a better way
Closes #20261
Diffstat (limited to 'services/mirror')
-rw-r--r-- | services/mirror/mirror.go | 21 | ||||
-rw-r--r-- | services/mirror/mirror_push.go | 12 | ||||
-rw-r--r-- | services/mirror/notifier.go | 32 | ||||
-rw-r--r-- | services/mirror/queue.go | 70 |
4 files changed, 124 insertions, 11 deletions
diff --git a/services/mirror/mirror.go b/services/mirror/mirror.go index abce1d3c2d..0fc871b214 100644 --- a/services/mirror/mirror.go +++ b/services/mirror/mirror.go @@ -10,21 +10,20 @@ import ( repo_model "code.gitea.io/gitea/models/repo" "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" - mirror_module "code.gitea.io/gitea/modules/mirror" "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" ) // doMirrorSync causes this request to mirror itself -func doMirrorSync(ctx context.Context, req *mirror_module.SyncRequest) { +func doMirrorSync(ctx context.Context, req *SyncRequest) { if req.ReferenceID == 0 { log.Warn("Skipping mirror sync request, no mirror ID was specified") return } switch req.Type { - case mirror_module.PushMirrorType: + case PushMirrorType: _ = SyncPushMirror(ctx, req.ReferenceID) - case mirror_module.PullMirrorType: + case PullMirrorType: _ = SyncPullMirror(ctx, req.ReferenceID) default: log.Error("Unknown Request type in queue: %v for MirrorID[%d]", req.Type, req.ReferenceID) @@ -43,7 +42,7 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error { handler := func(idx int, bean any) error { var repo *repo_model.Repository - var mirrorType mirror_module.SyncType + var mirrorType SyncType var referenceID int64 if m, ok := bean.(*repo_model.Mirror); ok { @@ -52,7 +51,7 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error { return nil } repo = m.Repo - mirrorType = mirror_module.PullMirrorType + mirrorType = PullMirrorType referenceID = m.RepoID } else if m, ok := bean.(*repo_model.PushMirror); ok { if m.GetRepository() == nil { @@ -60,7 +59,7 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error { return nil } repo = m.Repo - mirrorType = mirror_module.PushMirrorType + mirrorType = PushMirrorType referenceID = m.ID } else { log.Error("Unknown bean: %v", bean) @@ -75,9 +74,9 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error { } // Push to the Queue - if err := mirror_module.PushToQueue(mirrorType, referenceID); err != nil { + if err := PushToQueue(mirrorType, referenceID); err != nil { if err == queue.ErrAlreadyInQueue { - if mirrorType == mirror_module.PushMirrorType { + if mirrorType == PushMirrorType { log.Trace("PushMirrors for %-v already queued for sync", repo) } else { log.Trace("PullMirrors for %-v already queued for sync", repo) @@ -120,7 +119,7 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error { return nil } -func queueHandler(items ...*mirror_module.SyncRequest) []*mirror_module.SyncRequest { +func queueHandler(items ...*SyncRequest) []*SyncRequest { for _, req := range items { doMirrorSync(graceful.GetManager().ShutdownContext(), req) } @@ -129,5 +128,5 @@ func queueHandler(items ...*mirror_module.SyncRequest) []*mirror_module.SyncRequ // InitSyncMirrors initializes a go routine to sync the mirrors func InitSyncMirrors() { - mirror_module.StartSyncMirrors(queueHandler) + StartSyncMirrors(queueHandler) } diff --git a/services/mirror/mirror_push.go b/services/mirror/mirror_push.go index 2c1b00b60c..594d31df89 100644 --- a/services/mirror/mirror_push.go +++ b/services/mirror/mirror_push.go @@ -253,3 +253,15 @@ func pushAllLFSObjects(ctx context.Context, gitRepo *git.Repository, lfsClient l return nil } + +func syncPushMirrorWithSyncOnCommit(ctx context.Context, repoID int64) { + pushMirrors, err := repo_model.GetPushMirrorsSyncedOnCommit(ctx, repoID) + if err != nil { + log.Error("repo_model.GetPushMirrorsSyncedOnCommit failed: %v", err) + return + } + + for _, mirror := range pushMirrors { + AddPushMirrorToQueue(mirror.ID) + } +} diff --git a/services/mirror/notifier.go b/services/mirror/notifier.go new file mode 100644 index 0000000000..e0e1b443e0 --- /dev/null +++ b/services/mirror/notifier.go @@ -0,0 +1,32 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package mirror + +import ( + "context" + + repo_model "code.gitea.io/gitea/models/repo" + user_model "code.gitea.io/gitea/models/user" + "code.gitea.io/gitea/modules/notification" + "code.gitea.io/gitea/modules/notification/base" + "code.gitea.io/gitea/modules/repository" +) + +func init() { + notification.RegisterNotifier(&mirrorNotifier{}) +} + +type mirrorNotifier struct { + base.NullNotifier +} + +var _ base.Notifier = &mirrorNotifier{} + +func (m *mirrorNotifier) NotifyPushCommits(ctx context.Context, _ *user_model.User, repo *repo_model.Repository, _ *repository.PushUpdateOptions, _ *repository.PushCommits) { + syncPushMirrorWithSyncOnCommit(ctx, repo.ID) +} + +func (m *mirrorNotifier) NotifySyncPushCommits(ctx context.Context, _ *user_model.User, repo *repo_model.Repository, _ *repository.PushUpdateOptions, _ *repository.PushCommits) { + syncPushMirrorWithSyncOnCommit(ctx, repo.ID) +} diff --git a/services/mirror/queue.go b/services/mirror/queue.go new file mode 100644 index 0000000000..0d9a624730 --- /dev/null +++ b/services/mirror/queue.go @@ -0,0 +1,70 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package mirror + +import ( + "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/queue" + "code.gitea.io/gitea/modules/setting" +) + +var mirrorQueue *queue.WorkerPoolQueue[*SyncRequest] + +// SyncType type of sync request +type SyncType int + +const ( + // PullMirrorType for pull mirrors + PullMirrorType SyncType = iota + // PushMirrorType for push mirrors + PushMirrorType +) + +// SyncRequest for the mirror queue +type SyncRequest struct { + Type SyncType + ReferenceID int64 // RepoID for pull mirror, MirrorID for push mirror +} + +// StartSyncMirrors starts a go routine to sync the mirrors +func StartSyncMirrors(queueHandle func(data ...*SyncRequest) []*SyncRequest) { + if !setting.Mirror.Enabled { + return + } + mirrorQueue = queue.CreateUniqueQueue(graceful.GetManager().ShutdownContext(), "mirror", queueHandle) + if mirrorQueue == nil { + log.Fatal("Unable to create mirror queue") + } + go graceful.GetManager().RunWithCancel(mirrorQueue) +} + +// AddPullMirrorToQueue adds repoID to mirror queue +func AddPullMirrorToQueue(repoID int64) { + addMirrorToQueue(PullMirrorType, repoID) +} + +// AddPushMirrorToQueue adds the push mirror to the queue +func AddPushMirrorToQueue(mirrorID int64) { + addMirrorToQueue(PushMirrorType, mirrorID) +} + +func addMirrorToQueue(syncType SyncType, referenceID int64) { + if !setting.Mirror.Enabled { + return + } + go func() { + if err := PushToQueue(syncType, referenceID); err != nil { + log.Error("Unable to push sync request for to the queue for pull mirror repo[%d]. Error: %v", referenceID, err) + } + }() +} + +// PushToQueue adds the sync request to the queue +func PushToQueue(mirrorType SyncType, referenceID int64) error { + return mirrorQueue.Push(&SyncRequest{ + Type: mirrorType, + ReferenceID: referenceID, + }) +} |