;; override the azure blob base path if storage type is azureblob
;AZURE_BLOB_BASE_PATH = lfs/
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+;; settings for Gitea's LFS client (eg: mirroring an upstream lfs endpoint)
+;;
;[lfs_client]
-;; When mirroring an upstream lfs endpoint, limit the number of pointers in each batch request to this number
+;; Limit the number of pointers in each batch request to this number
;BATCH_SIZE = 20
+;; Limit the number of concurrent upload/download operations within a batch
+;BATCH_OPERATION_CONCURRENCY = 3
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
golang.org/x/image v0.21.0
golang.org/x/net v0.30.0
golang.org/x/oauth2 v0.23.0
+ golang.org/x/sync v0.8.0
golang.org/x/sys v0.26.0
golang.org/x/text v0.19.0
golang.org/x/tools v0.26.0
go.uber.org/zap v1.27.0 // indirect
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c // indirect
golang.org/x/mod v0.21.0 // indirect
- golang.org/x/sync v0.8.0 // indirect
golang.org/x/time v0.7.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38 // indirect
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/proxy"
"code.gitea.io/gitea/modules/setting"
+
+ "golang.org/x/sync/errgroup"
)
// HTTPClient is used to communicate with the LFS server
return c.performOperation(ctx, objects, nil, callback)
}
+// performOperation takes a slice of LFS object pointers, batches them, and performs the upload/download operations concurrently in each batch
func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc DownloadCallback, uc UploadCallback) error {
if len(objects) == 0 {
return nil
return fmt.Errorf("TransferAdapter not found: %s", result.Transfer)
}
+ errGroup, groupCtx := errgroup.WithContext(ctx)
+ errGroup.SetLimit(setting.LFSClient.BatchOperationConcurrency)
for _, object := range result.Objects {
- if object.Error != nil {
- log.Trace("Error on object %v: %v", object.Pointer, object.Error)
- if uc != nil {
- if _, err := uc(object.Pointer, object.Error); err != nil {
- return err
- }
- } else {
- if err := dc(object.Pointer, nil, object.Error); err != nil {
- return err
- }
- }
- continue
- }
+ errGroup.Go(func() error {
+ return performSingleOperation(groupCtx, object, dc, uc, transferAdapter)
+ })
+ }
- if uc != nil {
- if len(object.Actions) == 0 {
- log.Trace("%v already present on server", object.Pointer)
- continue
- }
+ // only the first error is returned, preserving legacy behavior before concurrency
+ return errGroup.Wait()
+}
- link, ok := object.Actions["upload"]
- if !ok {
- log.Debug("%+v", object)
- return errors.New("missing action 'upload'")
- }
+// performSingleOperation performs an LFS upload or download operation on a single object
+func performSingleOperation(ctx context.Context, object *ObjectResponse, dc DownloadCallback, uc UploadCallback, transferAdapter TransferAdapter) error {
+ // the response from a lfs batch api request for this specific object id contained an error
+ if object.Error != nil {
+ log.Trace("Error on object %v: %v", object.Pointer, object.Error)
- content, err := uc(object.Pointer, nil)
- if err != nil {
+ // this was an 'upload' request inside the batch request
+ if uc != nil {
+ if _, err := uc(object.Pointer, object.Error); err != nil {
return err
}
-
- err = transferAdapter.Upload(ctx, link, object.Pointer, content)
- if err != nil {
+ } else {
+ // this was NOT an 'upload' request inside the batch request, meaning it must be a 'download' request
+ if err := dc(object.Pointer, nil, object.Error); err != nil {
return err
}
+ }
+ // if the callback returns no err, then the error could be ignored, and the operations should continue
+ return nil
+ }
- link, ok = object.Actions["verify"]
- if ok {
- if err := transferAdapter.Verify(ctx, link, object.Pointer); err != nil {
- return err
- }
- }
- } else {
- link, ok := object.Actions["download"]
- if !ok {
- // no actions block in response, try legacy response schema
- link, ok = object.Links["download"]
- }
- if !ok {
- log.Debug("%+v", object)
- return errors.New("missing action 'download'")
- }
+ // the response from an lfs batch api request contained necessary upload/download fields to act upon
+ if uc != nil {
+ if len(object.Actions) == 0 {
+ log.Trace("%v already present on server", object.Pointer)
+ return nil
+ }
- content, err := transferAdapter.Download(ctx, link)
- if err != nil {
- return err
- }
+ link, ok := object.Actions["upload"]
+ if !ok {
+ return errors.New("missing action 'upload'")
+ }
+
+ content, err := uc(object.Pointer, nil)
+ if err != nil {
+ return err
+ }
- if err := dc(object.Pointer, content, nil); err != nil {
+ err = transferAdapter.Upload(ctx, link, object.Pointer, content)
+ if err != nil {
+ return err
+ }
+
+ link, ok = object.Actions["verify"]
+ if ok {
+ if err := transferAdapter.Verify(ctx, link, object.Pointer); err != nil {
return err
}
}
- }
+ } else {
+ link, ok := object.Actions["download"]
+ if !ok {
+ // no actions block in response, try legacy response schema
+ link, ok = object.Links["download"]
+ }
+ if !ok {
+ log.Debug("%+v", object)
+ return errors.New("missing action 'download'")
+ }
+ content, err := transferAdapter.Download(ctx, link)
+ if err != nil {
+ return err
+ }
+
+ if err := dc(object.Pointer, content, nil); err != nil {
+ return err
+ }
+ }
return nil
}
"testing"
"code.gitea.io/gitea/modules/json"
+ "code.gitea.io/gitea/modules/setting"
+ "code.gitea.io/gitea/modules/test"
"github.com/stretchr/testify/assert"
)
cases := []struct {
endpoint string
- expectederror string
+ expectedError string
}{
- // case 0
{
endpoint: "https://status-not-ok.io",
- expectederror: io.ErrUnexpectedEOF.Error(),
+ expectedError: io.ErrUnexpectedEOF.Error(),
},
- // case 1
{
endpoint: "https://invalid-json-response.io",
- expectederror: "invalid json",
+ expectedError: "invalid json",
},
- // case 2
{
endpoint: "https://valid-batch-request-download.io",
- expectederror: "",
+ expectedError: "",
},
- // case 3
{
endpoint: "https://response-no-objects.io",
- expectederror: "",
+ expectedError: "",
},
- // case 4
{
endpoint: "https://unknown-transfer-adapter.io",
- expectederror: "TransferAdapter not found: ",
+ expectedError: "TransferAdapter not found: ",
},
- // case 5
{
endpoint: "https://error-in-response-objects.io",
- expectederror: "Object not found",
+ expectedError: "Object not found",
},
- // case 6
{
endpoint: "https://empty-actions-map.io",
- expectederror: "missing action 'download'",
+ expectedError: "missing action 'download'",
},
- // case 7
{
endpoint: "https://download-actions-map.io",
- expectederror: "",
+ expectedError: "",
},
- // case 8
{
endpoint: "https://upload-actions-map.io",
- expectederror: "missing action 'download'",
+ expectedError: "missing action 'download'",
},
- // case 9
{
endpoint: "https://verify-actions-map.io",
- expectederror: "missing action 'download'",
+ expectedError: "missing action 'download'",
},
- // case 10
{
endpoint: "https://unknown-actions-map.io",
- expectederror: "missing action 'download'",
+ expectedError: "missing action 'download'",
},
- // case 11
{
endpoint: "https://legacy-batch-request-download.io",
- expectederror: "",
+ expectedError: "",
},
}
- for n, c := range cases {
- client := &HTTPClient{
- client: hc,
- endpoint: c.endpoint,
- transfers: map[string]TransferAdapter{
- "dummy": dummy,
- },
- }
+ defer test.MockVariableValue(&setting.LFSClient.BatchOperationConcurrency, 3)()
+ for _, c := range cases {
+ t.Run(c.endpoint, func(t *testing.T) {
+ client := &HTTPClient{
+ client: hc,
+ endpoint: c.endpoint,
+ transfers: map[string]TransferAdapter{
+ "dummy": dummy,
+ },
+ }
- err := client.Download(context.Background(), []Pointer{p}, func(p Pointer, content io.ReadCloser, objectError error) error {
- if objectError != nil {
- return objectError
+ err := client.Download(context.Background(), []Pointer{p}, func(p Pointer, content io.ReadCloser, objectError error) error {
+ if objectError != nil {
+ return objectError
+ }
+ b, err := io.ReadAll(content)
+ assert.NoError(t, err)
+ assert.Equal(t, []byte("dummy"), b)
+ return nil
+ })
+ if c.expectedError != "" {
+ assert.ErrorContains(t, err, c.expectedError)
+ } else {
+ assert.NoError(t, err)
}
- b, err := io.ReadAll(content)
- assert.NoError(t, err)
- assert.Equal(t, []byte("dummy"), b)
- return nil
})
- if len(c.expectederror) > 0 {
- assert.True(t, strings.Contains(err.Error(), c.expectederror), "case %d: '%s' should contain '%s'", n, err.Error(), c.expectederror)
- } else {
- assert.NoError(t, err, "case %d", n)
- }
}
}
cases := []struct {
endpoint string
- expectederror string
+ expectedError string
}{
- // case 0
{
endpoint: "https://status-not-ok.io",
- expectederror: io.ErrUnexpectedEOF.Error(),
+ expectedError: io.ErrUnexpectedEOF.Error(),
},
- // case 1
{
endpoint: "https://invalid-json-response.io",
- expectederror: "invalid json",
+ expectedError: "invalid json",
},
- // case 2
{
endpoint: "https://valid-batch-request-upload.io",
- expectederror: "",
+ expectedError: "",
},
- // case 3
{
endpoint: "https://response-no-objects.io",
- expectederror: "",
+ expectedError: "",
},
- // case 4
{
endpoint: "https://unknown-transfer-adapter.io",
- expectederror: "TransferAdapter not found: ",
+ expectedError: "TransferAdapter not found: ",
},
- // case 5
{
endpoint: "https://error-in-response-objects.io",
- expectederror: "Object not found",
+ expectedError: "Object not found",
},
- // case 6
{
endpoint: "https://empty-actions-map.io",
- expectederror: "",
+ expectedError: "",
},
- // case 7
{
endpoint: "https://download-actions-map.io",
- expectederror: "missing action 'upload'",
+ expectedError: "missing action 'upload'",
},
- // case 8
{
endpoint: "https://upload-actions-map.io",
- expectederror: "",
+ expectedError: "",
},
- // case 9
{
endpoint: "https://verify-actions-map.io",
- expectederror: "missing action 'upload'",
+ expectedError: "missing action 'upload'",
},
- // case 10
{
endpoint: "https://unknown-actions-map.io",
- expectederror: "missing action 'upload'",
+ expectedError: "missing action 'upload'",
},
}
- for n, c := range cases {
- client := &HTTPClient{
- client: hc,
- endpoint: c.endpoint,
- transfers: map[string]TransferAdapter{
- "dummy": dummy,
- },
- }
+ defer test.MockVariableValue(&setting.LFSClient.BatchOperationConcurrency, 3)()
+ for _, c := range cases {
+ t.Run(c.endpoint, func(t *testing.T) {
+ client := &HTTPClient{
+ client: hc,
+ endpoint: c.endpoint,
+ transfers: map[string]TransferAdapter{
+ "dummy": dummy,
+ },
+ }
- err := client.Upload(context.Background(), []Pointer{p}, func(p Pointer, objectError error) (io.ReadCloser, error) {
- return io.NopCloser(new(bytes.Buffer)), objectError
+ err := client.Upload(context.Background(), []Pointer{p}, func(p Pointer, objectError error) (io.ReadCloser, error) {
+ return io.NopCloser(new(bytes.Buffer)), objectError
+ })
+ if c.expectedError != "" {
+ assert.ErrorContains(t, err, c.expectedError)
+ } else {
+ assert.NoError(t, err)
+ }
})
- if len(c.expectederror) > 0 {
- assert.True(t, strings.Contains(err.Error(), c.expectederror), "case %d: '%s' should contain '%s'", n, err.Error(), c.expectederror)
- } else {
- assert.NoError(t, err, "case %d", n)
- }
}
}
downloadObjects := func(pointers []lfs.Pointer) error {
err := lfsClient.Download(ctx, pointers, func(p lfs.Pointer, content io.ReadCloser, objectError error) error {
+ if errors.Is(objectError, lfs.ErrObjectNotExist) {
+ log.Warn("Ignoring missing upstream LFS object %-v: %v", p, objectError)
+ return nil
+ }
+
if objectError != nil {
- if errors.Is(objectError, lfs.ErrObjectNotExist) {
- log.Warn("Repo[%-v]: Ignore missing LFS object %-v: %v", repo, p, objectError)
- return nil
- }
return objectError
}
// LFSClient represents configuration for Gitea's LFS clients, for example: mirroring upstream Git LFS
var LFSClient = struct {
- BatchSize int `ini:"BATCH_SIZE"`
+ BatchSize int `ini:"BATCH_SIZE"`
+ BatchOperationConcurrency int `ini:"BATCH_OPERATION_CONCURRENCY"`
}{}
func loadLFSFrom(rootCfg ConfigProvider) error {
LFSClient.BatchSize = 20
}
+ if LFSClient.BatchOperationConcurrency < 1 {
+ // match the default git-lfs's `lfs.concurrenttransfers`
+ LFSClient.BatchOperationConcurrency = 3
+ }
+
LFS.HTTPAuthExpiry = sec.Key("LFS_HTTP_AUTH_EXPIRY").MustDuration(24 * time.Hour)
if !LFS.StartServer || !InstallLock {
assert.NoError(t, loadLFSFrom(cfg))
assert.EqualValues(t, 100, LFS.MaxBatchSize)
assert.EqualValues(t, 20, LFSClient.BatchSize)
+ assert.EqualValues(t, 3, LFSClient.BatchOperationConcurrency)
+
+ iniStr = `
+[lfs_client]
+BATCH_SIZE = 50
+BATCH_OPERATION_CONCURRENCY = 10
+`
+ cfg, err = NewConfigProviderFromData(iniStr)
+ assert.NoError(t, err)
+
+ assert.NoError(t, loadLFSFrom(cfg))
+ assert.EqualValues(t, 50, LFSClient.BatchSize)
+ assert.EqualValues(t, 10, LFSClient.BatchOperationConcurrency)
}