aboutsummaryrefslogtreecommitdiffstats
path: root/services
diff options
context:
space:
mode:
authorzeripath <art27@cantab.net>2021-10-17 12:43:25 +0100
committerGitHub <noreply@github.com>2021-10-17 12:43:25 +0100
commit7117c7774ae1c1003c95d3df8dfaa5bd7270165d (patch)
treeeab8f310ad065af318a5c4dedb7efdb6251436cb /services
parentb9a2f263b8a4c19b01f3440b52f0f90f3c1ee072 (diff)
downloadgitea-7117c7774ae1c1003c95d3df8dfaa5bd7270165d.tar.gz
gitea-7117c7774ae1c1003c95d3df8dfaa5bd7270165d.zip
Make the Mirror Queue a queue (#17326)
Convert the old mirror syncing queue to the more modern queue format. Fix a bug in the from the repo-archive queue PR - the assumption was made that uniqueness could be enforced with by checking equality in a map in channel unique queues - however this only works for primitive types - which was the initial intention but is an imperfect. This is fixed by marshalling the data and placing the martialled data in the unique map instead. The documentation is also updated to add information about the deprecated configuration values. Signed-off-by: Andrew Thornton <art27@cantab.net>
Diffstat (limited to 'services')
-rw-r--r--services/mirror/mirror.go98
1 files changed, 65 insertions, 33 deletions
diff --git a/services/mirror/mirror.go b/services/mirror/mirror.go
index 7a3e37d993..eb37639bef 100644
--- a/services/mirror/mirror.go
+++ b/services/mirror/mirror.go
@@ -7,18 +7,43 @@ package mirror
import (
"context"
"fmt"
- "strconv"
- "strings"
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting"
- "code.gitea.io/gitea/modules/sync"
)
-// mirrorQueue holds an UniqueQueue object of the mirror
-var mirrorQueue = sync.NewUniqueQueue(setting.Repository.MirrorQueueLength)
+var mirrorQueue queue.UniqueQueue
+
+// SyncType type of sync request
+type SyncType int
+
+const (
+ // PullMirrorType for pull mirrors
+ PullMirrorType SyncType = iota
+ // PushMirrorType for push mirrors
+ PushMirrorType
+)
+
+// SyncRequest for the mirror queue
+type SyncRequest struct {
+ Type SyncType
+ RepoID int64
+}
+
+// doMirrorSync causes this request to mirror itself
+func doMirrorSync(ctx context.Context, req *SyncRequest) {
+ switch req.Type {
+ case PushMirrorType:
+ _ = SyncPushMirror(ctx, req.RepoID)
+ case PullMirrorType:
+ _ = SyncPullMirror(ctx, req.RepoID)
+ default:
+ log.Error("Unknown Request type in queue: %v for RepoID[%d]", req.Type, req.RepoID)
+ }
+}
// Update checks and updates mirror repositories.
func Update(ctx context.Context) error {
@@ -29,19 +54,25 @@ func Update(ctx context.Context) error {
log.Trace("Doing: Update")
handler := func(idx int, bean interface{}) error {
- var item string
+ var item SyncRequest
if m, ok := bean.(*models.Mirror); ok {
if m.Repo == nil {
log.Error("Disconnected mirror found: %d", m.ID)
return nil
}
- item = fmt.Sprintf("pull %d", m.RepoID)
+ item = SyncRequest{
+ Type: PullMirrorType,
+ RepoID: m.RepoID,
+ }
} else if m, ok := bean.(*models.PushMirror); ok {
if m.Repo == nil {
log.Error("Disconnected push-mirror found: %d", m.ID)
return nil
}
- item = fmt.Sprintf("push %d", m.ID)
+ item = SyncRequest{
+ Type: PushMirrorType,
+ RepoID: m.RepoID,
+ }
} else {
log.Error("Unknown bean: %v", bean)
return nil
@@ -51,8 +82,7 @@ func Update(ctx context.Context) error {
case <-ctx.Done():
return fmt.Errorf("Aborted")
default:
- mirrorQueue.Add(item)
- return nil
+ return mirrorQueue.Push(&item)
}
}
@@ -68,26 +98,10 @@ func Update(ctx context.Context) error {
return nil
}
-// syncMirrors checks and syncs mirrors.
-// FIXME: graceful: this should be a persistable queue
-func syncMirrors(ctx context.Context) {
- // Start listening on new sync requests.
- for {
- select {
- case <-ctx.Done():
- mirrorQueue.Close()
- return
- case item := <-mirrorQueue.Queue():
- id, _ := strconv.ParseInt(item[5:], 10, 64)
- if strings.HasPrefix(item, "pull") {
- _ = SyncPullMirror(ctx, id)
- } else if strings.HasPrefix(item, "push") {
- _ = SyncPushMirror(ctx, id)
- } else {
- log.Error("Unknown item in queue: %v", item)
- }
- mirrorQueue.Remove(item)
- }
+func queueHandle(data ...queue.Data) {
+ for _, datum := range data {
+ req := datum.(*SyncRequest)
+ doMirrorSync(graceful.GetManager().ShutdownContext(), req)
}
}
@@ -96,7 +110,9 @@ func InitSyncMirrors() {
if !setting.Mirror.Enabled {
return
}
- go graceful.GetManager().RunWithShutdownContext(syncMirrors)
+ mirrorQueue = queue.CreateUniqueQueue("mirror", queueHandle, new(SyncRequest))
+
+ go graceful.GetManager().RunWithShutdownFns(mirrorQueue.Run)
}
// StartToMirror adds repoID to mirror queue
@@ -104,7 +120,15 @@ func StartToMirror(repoID int64) {
if !setting.Mirror.Enabled {
return
}
- go mirrorQueue.Add(fmt.Sprintf("pull %d", repoID))
+ go func() {
+ err := mirrorQueue.Push(&SyncRequest{
+ Type: PullMirrorType,
+ RepoID: repoID,
+ })
+ if err != nil {
+ log.Error("Unable to push sync request for to the queue for push mirror repo[%d]: Error: %v", repoID, err)
+ }
+ }()
}
// AddPushMirrorToQueue adds the push mirror to the queue
@@ -112,5 +136,13 @@ func AddPushMirrorToQueue(mirrorID int64) {
if !setting.Mirror.Enabled {
return
}
- go mirrorQueue.Add(fmt.Sprintf("push %d", mirrorID))
+ go func() {
+ err := mirrorQueue.Push(&SyncRequest{
+ Type: PushMirrorType,
+ RepoID: mirrorID,
+ })
+ if err != nil {
+ log.Error("Unable to push sync request to the queue for pull mirror repo[%d]: Error: %v", mirrorID, err)
+ }
+ }()
}