]> source.dussan.org Git - gitea.git/commitdiff
Make LFS http_client parallel within a batch. (#32369)
authorRoyce Remer <royceremer@gmail.com>
Mon, 4 Nov 2024 04:49:08 +0000 (20:49 -0800)
committerGitHub <noreply@github.com>
Mon, 4 Nov 2024 04:49:08 +0000 (04:49 +0000)
Signed-off-by: Royce Remer <royceremer@gmail.com>
Co-authored-by: wxiaoguang <wxiaoguang@gmail.com>
custom/conf/app.example.ini
go.mod
modules/lfs/http_client.go
modules/lfs/http_client_test.go
modules/repository/repo.go
modules/setting/lfs.go
modules/setting/lfs_test.go

index 69b57a8c012574c60a137dbc77dc3a068c16eb80..e080b0be7273398ca8b680b3dfa507be56187568 100644 (file)
@@ -2642,9 +2642,15 @@ LEVEL = Info
 ;; override the azure blob base path if storage type is azureblob
 ;AZURE_BLOB_BASE_PATH = lfs/
 
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+;; settings for Gitea's LFS client (eg: mirroring an upstream lfs endpoint)
+;;
 ;[lfs_client]
-;; When mirroring an upstream lfs endpoint, limit the number of pointers in each batch request to this number
+;; Limit the number of pointers in each batch request to this number
 ;BATCH_SIZE = 20
+;; Limit the number of concurrent upload/download operations within a batch
+;BATCH_OPERATION_CONCURRENCY = 3
 
 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
diff --git a/go.mod b/go.mod
index c98ef9a61bf4ad0bbffcef9ba2110664a661b0d1..ff0d612133e36bc05aba6e81310348e0d355a350 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -124,6 +124,7 @@ require (
        golang.org/x/image v0.21.0
        golang.org/x/net v0.30.0
        golang.org/x/oauth2 v0.23.0
+       golang.org/x/sync v0.8.0
        golang.org/x/sys v0.26.0
        golang.org/x/text v0.19.0
        golang.org/x/tools v0.26.0
@@ -316,7 +317,6 @@ require (
        go.uber.org/zap v1.27.0 // indirect
        golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c // indirect
        golang.org/x/mod v0.21.0 // indirect
-       golang.org/x/sync v0.8.0 // indirect
        golang.org/x/time v0.7.0 // indirect
        google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38 // indirect
        gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
index aa9e744d72b8fd023c6b88402ef0160b564c5dbd..411c4248c4aa9c33f853e20aba2116595316bf15 100644 (file)
@@ -17,6 +17,8 @@ import (
        "code.gitea.io/gitea/modules/log"
        "code.gitea.io/gitea/modules/proxy"
        "code.gitea.io/gitea/modules/setting"
+
+       "golang.org/x/sync/errgroup"
 )
 
 // HTTPClient is used to communicate with the LFS server
@@ -113,6 +115,7 @@ func (c *HTTPClient) Upload(ctx context.Context, objects []Pointer, callback Upl
        return c.performOperation(ctx, objects, nil, callback)
 }
 
+// performOperation takes a slice of LFS object pointers, batches them, and performs the upload/download operations concurrently in each batch
 func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc DownloadCallback, uc UploadCallback) error {
        if len(objects) == 0 {
                return nil
@@ -133,71 +136,87 @@ func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc
                return fmt.Errorf("TransferAdapter not found: %s", result.Transfer)
        }
 
+       errGroup, groupCtx := errgroup.WithContext(ctx)
+       errGroup.SetLimit(setting.LFSClient.BatchOperationConcurrency)
        for _, object := range result.Objects {
-               if object.Error != nil {
-                       log.Trace("Error on object %v: %v", object.Pointer, object.Error)
-                       if uc != nil {
-                               if _, err := uc(object.Pointer, object.Error); err != nil {
-                                       return err
-                               }
-                       } else {
-                               if err := dc(object.Pointer, nil, object.Error); err != nil {
-                                       return err
-                               }
-                       }
-                       continue
-               }
+               errGroup.Go(func() error {
+                       return performSingleOperation(groupCtx, object, dc, uc, transferAdapter)
+               })
+       }
 
-               if uc != nil {
-                       if len(object.Actions) == 0 {
-                               log.Trace("%v already present on server", object.Pointer)
-                               continue
-                       }
+       // only the first error is returned, preserving legacy behavior before concurrency
+       return errGroup.Wait()
+}
 
-                       link, ok := object.Actions["upload"]
-                       if !ok {
-                               log.Debug("%+v", object)
-                               return errors.New("missing action 'upload'")
-                       }
+// performSingleOperation performs an LFS upload or download operation on a single object
+func performSingleOperation(ctx context.Context, object *ObjectResponse, dc DownloadCallback, uc UploadCallback, transferAdapter TransferAdapter) error {
+       // the response from a lfs batch api request for this specific object id contained an error
+       if object.Error != nil {
+               log.Trace("Error on object %v: %v", object.Pointer, object.Error)
 
-                       content, err := uc(object.Pointer, nil)
-                       if err != nil {
+               // this was an 'upload' request inside the batch request
+               if uc != nil {
+                       if _, err := uc(object.Pointer, object.Error); err != nil {
                                return err
                        }
-
-                       err = transferAdapter.Upload(ctx, link, object.Pointer, content)
-                       if err != nil {
+               } else {
+                       // this was NOT an 'upload' request inside the batch request, meaning it must be a 'download' request
+                       if err := dc(object.Pointer, nil, object.Error); err != nil {
                                return err
                        }
+               }
+               // if the callback returns no err, then the error could be ignored, and the operations should continue
+               return nil
+       }
 
-                       link, ok = object.Actions["verify"]
-                       if ok {
-                               if err := transferAdapter.Verify(ctx, link, object.Pointer); err != nil {
-                                       return err
-                               }
-                       }
-               } else {
-                       link, ok := object.Actions["download"]
-                       if !ok {
-                               // no actions block in response, try legacy response schema
-                               link, ok = object.Links["download"]
-                       }
-                       if !ok {
-                               log.Debug("%+v", object)
-                               return errors.New("missing action 'download'")
-                       }
+       // the response from an lfs batch api request contained necessary upload/download fields to act upon
+       if uc != nil {
+               if len(object.Actions) == 0 {
+                       log.Trace("%v already present on server", object.Pointer)
+                       return nil
+               }
 
-                       content, err := transferAdapter.Download(ctx, link)
-                       if err != nil {
-                               return err
-                       }
+               link, ok := object.Actions["upload"]
+               if !ok {
+                       return errors.New("missing action 'upload'")
+               }
+
+               content, err := uc(object.Pointer, nil)
+               if err != nil {
+                       return err
+               }
 
-                       if err := dc(object.Pointer, content, nil); err != nil {
+               err = transferAdapter.Upload(ctx, link, object.Pointer, content)
+               if err != nil {
+                       return err
+               }
+
+               link, ok = object.Actions["verify"]
+               if ok {
+                       if err := transferAdapter.Verify(ctx, link, object.Pointer); err != nil {
                                return err
                        }
                }
-       }
+       } else {
+               link, ok := object.Actions["download"]
+               if !ok {
+                       // no actions block in response, try legacy response schema
+                       link, ok = object.Links["download"]
+               }
+               if !ok {
+                       log.Debug("%+v", object)
+                       return errors.New("missing action 'download'")
+               }
 
+               content, err := transferAdapter.Download(ctx, link)
+               if err != nil {
+                       return err
+               }
+
+               if err := dc(object.Pointer, content, nil); err != nil {
+                       return err
+               }
+       }
        return nil
 }
 
index ec90f5375d1b935ce98f6592291d51c14d5da028..d22735147a55615cfba3ebc5f8677f1558d342d1 100644 (file)
@@ -12,6 +12,8 @@ import (
        "testing"
 
        "code.gitea.io/gitea/modules/json"
+       "code.gitea.io/gitea/modules/setting"
+       "code.gitea.io/gitea/modules/test"
 
        "github.com/stretchr/testify/assert"
 )
@@ -183,93 +185,84 @@ func TestHTTPClientDownload(t *testing.T) {
 
        cases := []struct {
                endpoint      string
-               expectederror string
+               expectedError string
        }{
-               // case 0
                {
                        endpoint:      "https://status-not-ok.io",
-                       expectederror: io.ErrUnexpectedEOF.Error(),
+                       expectedError: io.ErrUnexpectedEOF.Error(),
                },
-               // case 1
                {
                        endpoint:      "https://invalid-json-response.io",
-                       expectederror: "invalid json",
+                       expectedError: "invalid json",
                },
-               // case 2
                {
                        endpoint:      "https://valid-batch-request-download.io",
-                       expectederror: "",
+                       expectedError: "",
                },
-               // case 3
                {
                        endpoint:      "https://response-no-objects.io",
-                       expectederror: "",
+                       expectedError: "",
                },
-               // case 4
                {
                        endpoint:      "https://unknown-transfer-adapter.io",
-                       expectederror: "TransferAdapter not found: ",
+                       expectedError: "TransferAdapter not found: ",
                },
-               // case 5
                {
                        endpoint:      "https://error-in-response-objects.io",
-                       expectederror: "Object not found",
+                       expectedError: "Object not found",
                },
-               // case 6
                {
                        endpoint:      "https://empty-actions-map.io",
-                       expectederror: "missing action 'download'",
+                       expectedError: "missing action 'download'",
                },
-               // case 7
                {
                        endpoint:      "https://download-actions-map.io",
-                       expectederror: "",
+                       expectedError: "",
                },
-               // case 8
                {
                        endpoint:      "https://upload-actions-map.io",
-                       expectederror: "missing action 'download'",
+                       expectedError: "missing action 'download'",
                },
-               // case 9
                {
                        endpoint:      "https://verify-actions-map.io",
-                       expectederror: "missing action 'download'",
+                       expectedError: "missing action 'download'",
                },
-               // case 10
                {
                        endpoint:      "https://unknown-actions-map.io",
-                       expectederror: "missing action 'download'",
+                       expectedError: "missing action 'download'",
                },
-               // case 11
                {
                        endpoint:      "https://legacy-batch-request-download.io",
-                       expectederror: "",
+                       expectedError: "",
                },
        }
 
-       for n, c := range cases {
-               client := &HTTPClient{
-                       client:   hc,
-                       endpoint: c.endpoint,
-                       transfers: map[string]TransferAdapter{
-                               "dummy": dummy,
-                       },
-               }
+       defer test.MockVariableValue(&setting.LFSClient.BatchOperationConcurrency, 3)()
+       for _, c := range cases {
+               t.Run(c.endpoint, func(t *testing.T) {
+                       client := &HTTPClient{
+                               client:   hc,
+                               endpoint: c.endpoint,
+                               transfers: map[string]TransferAdapter{
+                                       "dummy": dummy,
+                               },
+                       }
 
-               err := client.Download(context.Background(), []Pointer{p}, func(p Pointer, content io.ReadCloser, objectError error) error {
-                       if objectError != nil {
-                               return objectError
+                       err := client.Download(context.Background(), []Pointer{p}, func(p Pointer, content io.ReadCloser, objectError error) error {
+                               if objectError != nil {
+                                       return objectError
+                               }
+                               b, err := io.ReadAll(content)
+                               assert.NoError(t, err)
+                               assert.Equal(t, []byte("dummy"), b)
+                               return nil
+                       })
+                       if c.expectedError != "" {
+                               assert.ErrorContains(t, err, c.expectedError)
+                       } else {
+                               assert.NoError(t, err)
                        }
-                       b, err := io.ReadAll(content)
-                       assert.NoError(t, err)
-                       assert.Equal(t, []byte("dummy"), b)
-                       return nil
                })
-               if len(c.expectederror) > 0 {
-                       assert.True(t, strings.Contains(err.Error(), c.expectederror), "case %d: '%s' should contain '%s'", n, err.Error(), c.expectederror)
-               } else {
-                       assert.NoError(t, err, "case %d", n)
-               }
        }
 }
 
@@ -296,81 +289,73 @@ func TestHTTPClientUpload(t *testing.T) {
 
        cases := []struct {
                endpoint      string
-               expectederror string
+               expectedError string
        }{
-               // case 0
                {
                        endpoint:      "https://status-not-ok.io",
-                       expectederror: io.ErrUnexpectedEOF.Error(),
+                       expectedError: io.ErrUnexpectedEOF.Error(),
                },
-               // case 1
                {
                        endpoint:      "https://invalid-json-response.io",
-                       expectederror: "invalid json",
+                       expectedError: "invalid json",
                },
-               // case 2
                {
                        endpoint:      "https://valid-batch-request-upload.io",
-                       expectederror: "",
+                       expectedError: "",
                },
-               // case 3
                {
                        endpoint:      "https://response-no-objects.io",
-                       expectederror: "",
+                       expectedError: "",
                },
-               // case 4
                {
                        endpoint:      "https://unknown-transfer-adapter.io",
-                       expectederror: "TransferAdapter not found: ",
+                       expectedError: "TransferAdapter not found: ",
                },
-               // case 5
                {
                        endpoint:      "https://error-in-response-objects.io",
-                       expectederror: "Object not found",
+                       expectedError: "Object not found",
                },
-               // case 6
                {
                        endpoint:      "https://empty-actions-map.io",
-                       expectederror: "",
+                       expectedError: "",
                },
-               // case 7
                {
                        endpoint:      "https://download-actions-map.io",
-                       expectederror: "missing action 'upload'",
+                       expectedError: "missing action 'upload'",
                },
-               // case 8
                {
                        endpoint:      "https://upload-actions-map.io",
-                       expectederror: "",
+                       expectedError: "",
                },
-               // case 9
                {
                        endpoint:      "https://verify-actions-map.io",
-                       expectederror: "missing action 'upload'",
+                       expectedError: "missing action 'upload'",
                },
-               // case 10
                {
                        endpoint:      "https://unknown-actions-map.io",
-                       expectederror: "missing action 'upload'",
+                       expectedError: "missing action 'upload'",
                },
        }
 
-       for n, c := range cases {
-               client := &HTTPClient{
-                       client:   hc,
-                       endpoint: c.endpoint,
-                       transfers: map[string]TransferAdapter{
-                               "dummy": dummy,
-                       },
-               }
+       defer test.MockVariableValue(&setting.LFSClient.BatchOperationConcurrency, 3)()
+       for _, c := range cases {
+               t.Run(c.endpoint, func(t *testing.T) {
+                       client := &HTTPClient{
+                               client:   hc,
+                               endpoint: c.endpoint,
+                               transfers: map[string]TransferAdapter{
+                                       "dummy": dummy,
+                               },
+                       }
 
-               err := client.Upload(context.Background(), []Pointer{p}, func(p Pointer, objectError error) (io.ReadCloser, error) {
-                       return io.NopCloser(new(bytes.Buffer)), objectError
+                       err := client.Upload(context.Background(), []Pointer{p}, func(p Pointer, objectError error) (io.ReadCloser, error) {
+                               return io.NopCloser(new(bytes.Buffer)), objectError
+                       })
+                       if c.expectedError != "" {
+                               assert.ErrorContains(t, err, c.expectedError)
+                       } else {
+                               assert.NoError(t, err)
+                       }
                })
-               if len(c.expectederror) > 0 {
-                       assert.True(t, strings.Contains(err.Error(), c.expectederror), "case %d: '%s' should contain '%s'", n, err.Error(), c.expectederror)
-               } else {
-                       assert.NoError(t, err, "case %d", n)
-               }
        }
 }
index def2220b17d12554ff9e3774aa8769ce739551ed..97b03433813270dced7281302ac186d71bc72484 100644 (file)
@@ -181,11 +181,12 @@ func StoreMissingLfsObjectsInRepository(ctx context.Context, repo *repo_model.Re
 
        downloadObjects := func(pointers []lfs.Pointer) error {
                err := lfsClient.Download(ctx, pointers, func(p lfs.Pointer, content io.ReadCloser, objectError error) error {
+                       if errors.Is(objectError, lfs.ErrObjectNotExist) {
+                               log.Warn("Ignoring missing upstream LFS object %-v: %v", p, objectError)
+                               return nil
+                       }
+
                        if objectError != nil {
-                               if errors.Is(objectError, lfs.ErrObjectNotExist) {
-                                       log.Warn("Repo[%-v]: Ignore missing LFS object %-v: %v", repo, p, objectError)
-                                       return nil
-                               }
                                return objectError
                        }
 
index 24c49cabee93ae317e6be98fee6601c388d5206b..6b54ac0a60d86128fec4821e7e1b3482794d60e5 100644 (file)
@@ -28,7 +28,8 @@ var LFS = struct {
 
 // LFSClient represents configuration for Gitea's LFS clients, for example: mirroring upstream Git LFS
 var LFSClient = struct {
-       BatchSize int `ini:"BATCH_SIZE"`
+       BatchSize                 int `ini:"BATCH_SIZE"`
+       BatchOperationConcurrency int `ini:"BATCH_OPERATION_CONCURRENCY"`
 }{}
 
 func loadLFSFrom(rootCfg ConfigProvider) error {
@@ -66,6 +67,11 @@ func loadLFSFrom(rootCfg ConfigProvider) error {
                LFSClient.BatchSize = 20
        }
 
+       if LFSClient.BatchOperationConcurrency < 1 {
+               // match the default git-lfs's `lfs.concurrenttransfers`
+               LFSClient.BatchOperationConcurrency = 3
+       }
+
        LFS.HTTPAuthExpiry = sec.Key("LFS_HTTP_AUTH_EXPIRY").MustDuration(24 * time.Hour)
 
        if !LFS.StartServer || !InstallLock {
index f7beaaa9c797ef9d24c654018d0bc12618d0c47e..471fa8bff3c68a24b38d0b3938c86a0733646151 100644 (file)
@@ -114,4 +114,17 @@ BATCH_SIZE = 0
        assert.NoError(t, loadLFSFrom(cfg))
        assert.EqualValues(t, 100, LFS.MaxBatchSize)
        assert.EqualValues(t, 20, LFSClient.BatchSize)
+       assert.EqualValues(t, 3, LFSClient.BatchOperationConcurrency)
+
+       iniStr = `
+[lfs_client]
+BATCH_SIZE = 50
+BATCH_OPERATION_CONCURRENCY = 10
+`
+       cfg, err = NewConfigProviderFromData(iniStr)
+       assert.NoError(t, err)
+
+       assert.NoError(t, loadLFSFrom(cfg))
+       assert.EqualValues(t, 50, LFSClient.BatchSize)
+       assert.EqualValues(t, 10, LFSClient.BatchOperationConcurrency)
 }