From 440039c0cce18622b12da5677bf6585caed6070a Mon Sep 17 00:00:00 2001 From: KN4CK3R Date: Mon, 14 Jun 2021 19:20:43 +0200 Subject: Add push to remote mirror repository (#15157) * Added push mirror model. * Integrated push mirror into queue. * Moved methods into own file. * Added basic implementation. * Mirror wiki too. * Removed duplicated method. * Get url for different remotes. * Added migration. * Unified remote url access. * Add/Remove push mirror remotes. * Prevent hangs with missing credentials. * Moved code between files. * Changed sanitizer interface. * Added push mirror backend methods. * Only update the mirror remote. * Limit refs on push. * Added UI part. * Added missing table. * Delete mirror if repository gets removed. * Changed signature. Handle object errors. * Added upload method. * Added "upload" unit tests. * Added transfer adapter unit tests. * Send correct headers. * Added pushing of LFS objects. * Added more logging. * Simpler body handling. * Process files in batches to reduce HTTP calls. * Added created timestamp. * Fixed invalid column name. * Changed name to prevent xorm auto setting. * Remove table header im empty. * Strip exit code from error message. * Added docs page about mirroring. * Fixed date. * Fixed merge errors. * Moved test to integrations. * Added push mirror test. * Added test. --- modules/repository/repo.go | 115 ++++++++++++++++++++++++++++----------------- 1 file changed, 71 insertions(+), 44 deletions(-) (limited to 'modules/repository') diff --git a/modules/repository/repo.go b/modules/repository/repo.go index 50eb185daa..08531c04ed 100644 --- a/modules/repository/repo.go +++ b/modules/repository/repo.go @@ -7,6 +7,7 @@ package repository import ( "context" "fmt" + "io" "net/url" "path" "strings" @@ -323,64 +324,90 @@ func StoreMissingLfsObjectsInRepository(ctx context.Context, repo *models.Reposi errChan := make(chan error, 1) go lfs.SearchPointerBlobs(ctx, gitRepo, pointerChan, errChan) - err := func() error { - for pointerBlob := range pointerChan { - meta, err := models.NewLFSMetaObject(&models.LFSMetaObject{Pointer: pointerBlob.Pointer, RepositoryID: repo.ID}) - if err != nil { - return fmt.Errorf("StoreMissingLfsObjectsInRepository models.NewLFSMetaObject: %w", err) - } - if meta.Existing { - continue + downloadObjects := func(pointers []lfs.Pointer) error { + err := client.Download(ctx, pointers, func(p lfs.Pointer, content io.ReadCloser, objectError error) error { + if objectError != nil { + return objectError } - log.Trace("StoreMissingLfsObjectsInRepository: LFS OID[%s] not present in repository %s", pointerBlob.Oid, repo.FullName()) + defer content.Close() - err = func() error { - exist, err := contentStore.Exists(pointerBlob.Pointer) - if err != nil { - return fmt.Errorf("StoreMissingLfsObjectsInRepository contentStore.Exists: %w", err) - } - if !exist { - if setting.LFS.MaxFileSize > 0 && pointerBlob.Size > setting.LFS.MaxFileSize { - log.Info("LFS OID[%s] download denied because of LFS_MAX_FILE_SIZE=%d < size %d", pointerBlob.Oid, setting.LFS.MaxFileSize, pointerBlob.Size) - return nil - } - - stream, err := client.Download(ctx, pointerBlob.Oid, pointerBlob.Size) - if err != nil { - return fmt.Errorf("StoreMissingLfsObjectsInRepository: LFS OID[%s] failed to download: %w", pointerBlob.Oid, err) - } - defer stream.Close() - - if err := contentStore.Put(pointerBlob.Pointer, stream); err != nil { - return fmt.Errorf("StoreMissingLfsObjectsInRepository LFS OID[%s] contentStore.Put: %w", pointerBlob.Oid, err) - } - } else { - log.Trace("StoreMissingLfsObjectsInRepository: LFS OID[%s] already present in content store", pointerBlob.Oid) - } - return nil - }() + _, err := models.NewLFSMetaObject(&models.LFSMetaObject{Pointer: p, RepositoryID: repo.ID}) if err != nil { - if _, err2 := repo.RemoveLFSMetaObjectByOid(meta.Oid); err2 != nil { - log.Error("StoreMissingLfsObjectsInRepository RemoveLFSMetaObjectByOid[Oid: %s]: %w", meta.Oid, err2) - } + log.Error("Error creating LFS meta object %v: %v", p, err) + return err + } - select { - case <-ctx.Done(): - return nil - default: + if err := contentStore.Put(p, content); err != nil { + log.Error("Error storing content for LFS meta object %v: %v", p, err) + if _, err2 := repo.RemoveLFSMetaObjectByOid(p.Oid); err2 != nil { + log.Error("Error removing LFS meta object %v: %v", p, err2) } return err } + return nil + }) + if err != nil { + select { + case <-ctx.Done(): + return nil + default: + } } - return nil - }() - if err != nil { return err } + var batch []lfs.Pointer + for pointerBlob := range pointerChan { + meta, err := repo.GetLFSMetaObjectByOid(pointerBlob.Oid) + if err != nil && err != models.ErrLFSObjectNotExist { + log.Error("Error querying LFS meta object %v: %v", pointerBlob.Pointer, err) + return err + } + if meta != nil { + log.Trace("Skipping unknown LFS meta object %v", pointerBlob.Pointer) + continue + } + + log.Trace("LFS object %v not present in repository %s", pointerBlob.Pointer, repo.FullName()) + + exist, err := contentStore.Exists(pointerBlob.Pointer) + if err != nil { + log.Error("Error checking if LFS object %v exists: %v", pointerBlob.Pointer, err) + return err + } + + if exist { + log.Trace("LFS object %v already present; creating meta object", pointerBlob.Pointer) + _, err := models.NewLFSMetaObject(&models.LFSMetaObject{Pointer: pointerBlob.Pointer, RepositoryID: repo.ID}) + if err != nil { + log.Error("Error creating LFS meta object %v: %v", pointerBlob.Pointer, err) + return err + } + } else { + if setting.LFS.MaxFileSize > 0 && pointerBlob.Size > setting.LFS.MaxFileSize { + log.Info("LFS object %v download denied because of LFS_MAX_FILE_SIZE=%d < size %d", pointerBlob.Pointer, setting.LFS.MaxFileSize, pointerBlob.Size) + continue + } + + batch = append(batch, pointerBlob.Pointer) + if len(batch) >= client.BatchSize() { + if err := downloadObjects(batch); err != nil { + return err + } + batch = nil + } + } + } + if len(batch) > 0 { + if err := downloadObjects(batch); err != nil { + return err + } + } + err, has := <-errChan if has { + log.Error("Error enumerating LFS objects for repository: %v", err) return err } -- cgit v1.2.3