diff options
Diffstat (limited to 'services')
-rw-r--r-- | services/mirror/mirror.go | 98 |
1 files changed, 65 insertions, 33 deletions
diff --git a/services/mirror/mirror.go b/services/mirror/mirror.go index 7a3e37d993..eb37639bef 100644 --- a/services/mirror/mirror.go +++ b/services/mirror/mirror.go @@ -7,18 +7,43 @@ package mirror import ( "context" "fmt" - "strconv" - "strings" "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" - "code.gitea.io/gitea/modules/sync" ) -// mirrorQueue holds an UniqueQueue object of the mirror -var mirrorQueue = sync.NewUniqueQueue(setting.Repository.MirrorQueueLength) +var mirrorQueue queue.UniqueQueue + +// SyncType type of sync request +type SyncType int + +const ( + // PullMirrorType for pull mirrors + PullMirrorType SyncType = iota + // PushMirrorType for push mirrors + PushMirrorType +) + +// SyncRequest for the mirror queue +type SyncRequest struct { + Type SyncType + RepoID int64 +} + +// doMirrorSync causes this request to mirror itself +func doMirrorSync(ctx context.Context, req *SyncRequest) { + switch req.Type { + case PushMirrorType: + _ = SyncPushMirror(ctx, req.RepoID) + case PullMirrorType: + _ = SyncPullMirror(ctx, req.RepoID) + default: + log.Error("Unknown Request type in queue: %v for RepoID[%d]", req.Type, req.RepoID) + } +} // Update checks and updates mirror repositories. func Update(ctx context.Context) error { @@ -29,19 +54,25 @@ func Update(ctx context.Context) error { log.Trace("Doing: Update") handler := func(idx int, bean interface{}) error { - var item string + var item SyncRequest if m, ok := bean.(*models.Mirror); ok { if m.Repo == nil { log.Error("Disconnected mirror found: %d", m.ID) return nil } - item = fmt.Sprintf("pull %d", m.RepoID) + item = SyncRequest{ + Type: PullMirrorType, + RepoID: m.RepoID, + } } else if m, ok := bean.(*models.PushMirror); ok { if m.Repo == nil { log.Error("Disconnected push-mirror found: %d", m.ID) return nil } - item = fmt.Sprintf("push %d", m.ID) + item = SyncRequest{ + Type: PushMirrorType, + RepoID: m.RepoID, + } } else { log.Error("Unknown bean: %v", bean) return nil @@ -51,8 +82,7 @@ func Update(ctx context.Context) error { case <-ctx.Done(): return fmt.Errorf("Aborted") default: - mirrorQueue.Add(item) - return nil + return mirrorQueue.Push(&item) } } @@ -68,26 +98,10 @@ func Update(ctx context.Context) error { return nil } -// syncMirrors checks and syncs mirrors. -// FIXME: graceful: this should be a persistable queue -func syncMirrors(ctx context.Context) { - // Start listening on new sync requests. - for { - select { - case <-ctx.Done(): - mirrorQueue.Close() - return - case item := <-mirrorQueue.Queue(): - id, _ := strconv.ParseInt(item[5:], 10, 64) - if strings.HasPrefix(item, "pull") { - _ = SyncPullMirror(ctx, id) - } else if strings.HasPrefix(item, "push") { - _ = SyncPushMirror(ctx, id) - } else { - log.Error("Unknown item in queue: %v", item) - } - mirrorQueue.Remove(item) - } +func queueHandle(data ...queue.Data) { + for _, datum := range data { + req := datum.(*SyncRequest) + doMirrorSync(graceful.GetManager().ShutdownContext(), req) } } @@ -96,7 +110,9 @@ func InitSyncMirrors() { if !setting.Mirror.Enabled { return } - go graceful.GetManager().RunWithShutdownContext(syncMirrors) + mirrorQueue = queue.CreateUniqueQueue("mirror", queueHandle, new(SyncRequest)) + + go graceful.GetManager().RunWithShutdownFns(mirrorQueue.Run) } // StartToMirror adds repoID to mirror queue @@ -104,7 +120,15 @@ func StartToMirror(repoID int64) { if !setting.Mirror.Enabled { return } - go mirrorQueue.Add(fmt.Sprintf("pull %d", repoID)) + go func() { + err := mirrorQueue.Push(&SyncRequest{ + Type: PullMirrorType, + RepoID: repoID, + }) + if err != nil { + log.Error("Unable to push sync request for to the queue for push mirror repo[%d]: Error: %v", repoID, err) + } + }() } // AddPushMirrorToQueue adds the push mirror to the queue @@ -112,5 +136,13 @@ func AddPushMirrorToQueue(mirrorID int64) { if !setting.Mirror.Enabled { return } - go mirrorQueue.Add(fmt.Sprintf("push %d", mirrorID)) + go func() { + err := mirrorQueue.Push(&SyncRequest{ + Type: PushMirrorType, + RepoID: mirrorID, + }) + if err != nil { + log.Error("Unable to push sync request to the queue for pull mirror repo[%d]: Error: %v", mirrorID, err) + } + }() } |