aboutsummaryrefslogtreecommitdiffstats
path: root/services
diff options
context:
space:
mode:
Diffstat (limited to 'services')
-rw-r--r--services/mirror/mirror.go98
1 files changed, 65 insertions, 33 deletions
diff --git a/services/mirror/mirror.go b/services/mirror/mirror.go
index 7a3e37d993..eb37639bef 100644
--- a/services/mirror/mirror.go
+++ b/services/mirror/mirror.go
@@ -7,18 +7,43 @@ package mirror
import (
"context"
"fmt"
- "strconv"
- "strings"
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting"
- "code.gitea.io/gitea/modules/sync"
)
-// mirrorQueue holds an UniqueQueue object of the mirror
-var mirrorQueue = sync.NewUniqueQueue(setting.Repository.MirrorQueueLength)
+var mirrorQueue queue.UniqueQueue
+
+// SyncType type of sync request
+type SyncType int
+
+const (
+ // PullMirrorType for pull mirrors
+ PullMirrorType SyncType = iota
+ // PushMirrorType for push mirrors
+ PushMirrorType
+)
+
+// SyncRequest for the mirror queue
+type SyncRequest struct {
+ Type SyncType
+ RepoID int64
+}
+
+// doMirrorSync causes this request to mirror itself
+func doMirrorSync(ctx context.Context, req *SyncRequest) {
+ switch req.Type {
+ case PushMirrorType:
+ _ = SyncPushMirror(ctx, req.RepoID)
+ case PullMirrorType:
+ _ = SyncPullMirror(ctx, req.RepoID)
+ default:
+ log.Error("Unknown Request type in queue: %v for RepoID[%d]", req.Type, req.RepoID)
+ }
+}
// Update checks and updates mirror repositories.
func Update(ctx context.Context) error {
@@ -29,19 +54,25 @@ func Update(ctx context.Context) error {
log.Trace("Doing: Update")
handler := func(idx int, bean interface{}) error {
- var item string
+ var item SyncRequest
if m, ok := bean.(*models.Mirror); ok {
if m.Repo == nil {
log.Error("Disconnected mirror found: %d", m.ID)
return nil
}
- item = fmt.Sprintf("pull %d", m.RepoID)
+ item = SyncRequest{
+ Type: PullMirrorType,
+ RepoID: m.RepoID,
+ }
} else if m, ok := bean.(*models.PushMirror); ok {
if m.Repo == nil {
log.Error("Disconnected push-mirror found: %d", m.ID)
return nil
}
- item = fmt.Sprintf("push %d", m.ID)
+ item = SyncRequest{
+ Type: PushMirrorType,
+ RepoID: m.RepoID,
+ }
} else {
log.Error("Unknown bean: %v", bean)
return nil
@@ -51,8 +82,7 @@ func Update(ctx context.Context) error {
case <-ctx.Done():
return fmt.Errorf("Aborted")
default:
- mirrorQueue.Add(item)
- return nil
+ return mirrorQueue.Push(&item)
}
}
@@ -68,26 +98,10 @@ func Update(ctx context.Context) error {
return nil
}
-// syncMirrors checks and syncs mirrors.
-// FIXME: graceful: this should be a persistable queue
-func syncMirrors(ctx context.Context) {
- // Start listening on new sync requests.
- for {
- select {
- case <-ctx.Done():
- mirrorQueue.Close()
- return
- case item := <-mirrorQueue.Queue():
- id, _ := strconv.ParseInt(item[5:], 10, 64)
- if strings.HasPrefix(item, "pull") {
- _ = SyncPullMirror(ctx, id)
- } else if strings.HasPrefix(item, "push") {
- _ = SyncPushMirror(ctx, id)
- } else {
- log.Error("Unknown item in queue: %v", item)
- }
- mirrorQueue.Remove(item)
- }
+func queueHandle(data ...queue.Data) {
+ for _, datum := range data {
+ req := datum.(*SyncRequest)
+ doMirrorSync(graceful.GetManager().ShutdownContext(), req)
}
}
@@ -96,7 +110,9 @@ func InitSyncMirrors() {
if !setting.Mirror.Enabled {
return
}
- go graceful.GetManager().RunWithShutdownContext(syncMirrors)
+ mirrorQueue = queue.CreateUniqueQueue("mirror", queueHandle, new(SyncRequest))
+
+ go graceful.GetManager().RunWithShutdownFns(mirrorQueue.Run)
}
// StartToMirror adds repoID to mirror queue
@@ -104,7 +120,15 @@ func StartToMirror(repoID int64) {
if !setting.Mirror.Enabled {
return
}
- go mirrorQueue.Add(fmt.Sprintf("pull %d", repoID))
+ go func() {
+ err := mirrorQueue.Push(&SyncRequest{
+ Type: PullMirrorType,
+ RepoID: repoID,
+ })
+ if err != nil {
+ log.Error("Unable to push sync request for to the queue for push mirror repo[%d]: Error: %v", repoID, err)
+ }
+ }()
}
// AddPushMirrorToQueue adds the push mirror to the queue
@@ -112,5 +136,13 @@ func AddPushMirrorToQueue(mirrorID int64) {
if !setting.Mirror.Enabled {
return
}
- go mirrorQueue.Add(fmt.Sprintf("push %d", mirrorID))
+ go func() {
+ err := mirrorQueue.Push(&SyncRequest{
+ Type: PushMirrorType,
+ RepoID: mirrorID,
+ })
+ if err != nil {
+ log.Error("Unable to push sync request to the queue for pull mirror repo[%d]: Error: %v", mirrorID, err)
+ }
+ }()
}