diff options
Diffstat (limited to 'services/archiver/archiver.go')
-rw-r--r-- | services/archiver/archiver.go | 394 |
1 files changed, 162 insertions, 232 deletions
diff --git a/services/archiver/archiver.go b/services/archiver/archiver.go index dfa6334d95..00c0281306 100644 --- a/services/archiver/archiver.go +++ b/services/archiver/archiver.go @@ -6,22 +6,20 @@ package archiver import ( + "errors" + "fmt" "io" - "io/ioutil" "os" - "path" "regexp" "strings" - "sync" - "time" - "code.gitea.io/gitea/modules/base" - "code.gitea.io/gitea/modules/context" + "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" - "code.gitea.io/gitea/modules/util" + "code.gitea.io/gitea/modules/storage" ) // ArchiveRequest defines the parameters of an archive request, which notably @@ -30,223 +28,174 @@ import ( // This is entirely opaque to external entities, though, and mostly used as a // handle elsewhere. type ArchiveRequest struct { - uri string - repo *git.Repository - refName string - ext string - archivePath string - archiveType git.ArchiveType - archiveComplete bool - commit *git.Commit - cchan chan struct{} + RepoID int64 + refName string + Type git.ArchiveType + CommitID string } -var archiveInProgress []*ArchiveRequest -var archiveMutex sync.Mutex - // SHA1 hashes will only go up to 40 characters, but SHA256 hashes will go all // the way to 64. var shaRegex = regexp.MustCompile(`^[0-9a-f]{4,64}$`) -// These facilitate testing, by allowing the unit tests to control (to some extent) -// the goroutine used for processing the queue. -var archiveQueueMutex *sync.Mutex -var archiveQueueStartCond *sync.Cond -var archiveQueueReleaseCond *sync.Cond - -// GetArchivePath returns the path from which we can serve this archive. -func (aReq *ArchiveRequest) GetArchivePath() string { - return aReq.archivePath -} - -// GetArchiveName returns the name of the caller, based on the ref used by the -// caller to create this request. -func (aReq *ArchiveRequest) GetArchiveName() string { - return aReq.refName + aReq.ext -} - -// IsComplete returns the completion status of this request. -func (aReq *ArchiveRequest) IsComplete() bool { - return aReq.archiveComplete -} - -// WaitForCompletion will wait for this request to complete, with no timeout. -// It returns whether the archive was actually completed, as the channel could -// have also been closed due to an error. -func (aReq *ArchiveRequest) WaitForCompletion(ctx *context.Context) bool { - select { - case <-aReq.cchan: - case <-ctx.Done(): - } - - return aReq.IsComplete() -} - -// TimedWaitForCompletion will wait for this request to complete, with timeout -// happening after the specified Duration. It returns whether the archive is -// now complete and whether we hit the timeout or not. The latter may not be -// useful if the request is complete or we started to shutdown. -func (aReq *ArchiveRequest) TimedWaitForCompletion(ctx *context.Context, dur time.Duration) (bool, bool) { - timeout := false - select { - case <-time.After(dur): - timeout = true - case <-aReq.cchan: - case <-ctx.Done(): - } - - return aReq.IsComplete(), timeout -} - -// The caller must hold the archiveMutex across calls to getArchiveRequest. -func getArchiveRequest(repo *git.Repository, commit *git.Commit, archiveType git.ArchiveType) *ArchiveRequest { - for _, r := range archiveInProgress { - // Need to be referring to the same repository. - if r.repo.Path == repo.Path && r.commit.ID == commit.ID && r.archiveType == archiveType { - return r - } - } - return nil -} - -// DeriveRequestFrom creates an archival request, based on the URI. The +// NewRequest creates an archival request, based on the URI. The // resulting ArchiveRequest is suitable for being passed to ArchiveRepository() // if it's determined that the request still needs to be satisfied. -func DeriveRequestFrom(ctx *context.Context, uri string) *ArchiveRequest { - if ctx.Repo == nil || ctx.Repo.GitRepo == nil { - log.Trace("Repo not initialized") - return nil - } +func NewRequest(repoID int64, repo *git.Repository, uri string) (*ArchiveRequest, error) { r := &ArchiveRequest{ - uri: uri, - repo: ctx.Repo.GitRepo, + RepoID: repoID, } + var ext string switch { case strings.HasSuffix(uri, ".zip"): - r.ext = ".zip" - r.archivePath = path.Join(r.repo.Path, "archives/zip") - r.archiveType = git.ZIP + ext = ".zip" + r.Type = git.ZIP case strings.HasSuffix(uri, ".tar.gz"): - r.ext = ".tar.gz" - r.archivePath = path.Join(r.repo.Path, "archives/targz") - r.archiveType = git.TARGZ + ext = ".tar.gz" + r.Type = git.TARGZ default: - log.Trace("Unknown format: %s", uri) - return nil + return nil, fmt.Errorf("Unknown format: %s", uri) } - r.refName = strings.TrimSuffix(r.uri, r.ext) - isDir, err := util.IsDir(r.archivePath) - if err != nil { - ctx.ServerError("Download -> util.IsDir(archivePath)", err) - return nil - } - if !isDir { - if err := os.MkdirAll(r.archivePath, os.ModePerm); err != nil { - ctx.ServerError("Download -> os.MkdirAll(archivePath)", err) - return nil - } - } + r.refName = strings.TrimSuffix(uri, ext) + var err error // Get corresponding commit. - if r.repo.IsBranchExist(r.refName) { - r.commit, err = r.repo.GetBranchCommit(r.refName) + if repo.IsBranchExist(r.refName) { + r.CommitID, err = repo.GetBranchCommitID(r.refName) if err != nil { - ctx.ServerError("GetBranchCommit", err) - return nil + return nil, err } - } else if r.repo.IsTagExist(r.refName) { - r.commit, err = r.repo.GetTagCommit(r.refName) + } else if repo.IsTagExist(r.refName) { + r.CommitID, err = repo.GetTagCommitID(r.refName) if err != nil { - ctx.ServerError("GetTagCommit", err) - return nil + return nil, err } } else if shaRegex.MatchString(r.refName) { - r.commit, err = r.repo.GetCommit(r.refName) - if err != nil { - ctx.NotFound("GetCommit", nil) - return nil + if repo.IsCommitExist(r.refName) { + r.CommitID = r.refName + } else { + return nil, git.ErrNotExist{ + ID: r.refName, + } } } else { - ctx.NotFound("DeriveRequestFrom", nil) - return nil + return nil, fmt.Errorf("Unknow ref %s type", r.refName) } - archiveMutex.Lock() - defer archiveMutex.Unlock() - if rExisting := getArchiveRequest(r.repo, r.commit, r.archiveType); rExisting != nil { - return rExisting - } + return r, nil +} + +// GetArchiveName returns the name of the caller, based on the ref used by the +// caller to create this request. +func (aReq *ArchiveRequest) GetArchiveName() string { + return strings.ReplaceAll(aReq.refName, "/", "-") + "." + aReq.Type.String() +} - r.archivePath = path.Join(r.archivePath, base.ShortSha(r.commit.ID.String())+r.ext) - r.archiveComplete, err = util.IsFile(r.archivePath) +func doArchive(r *ArchiveRequest) (*models.RepoArchiver, error) { + ctx, commiter, err := models.TxDBContext() if err != nil { - ctx.ServerError("util.IsFile", err) - return nil + return nil, err } - return r -} + defer commiter.Close() -func doArchive(r *ArchiveRequest) { - var ( - err error - tmpArchive *os.File - destArchive *os.File - ) - - // Close the channel to indicate to potential waiters that this request - // has finished. - defer close(r.cchan) - - // It could have happened that we enqueued two archival requests, due to - // race conditions and difficulties in locking. Do one last check that - // the archive we're referring to doesn't already exist. If it does exist, - // then just mark the request as complete and move on. - isFile, err := util.IsFile(r.archivePath) + archiver, err := models.GetRepoArchiver(ctx, r.RepoID, r.Type, r.CommitID) if err != nil { - log.Error("Unable to check if %s util.IsFile: %v. Will ignore and recreate.", r.archivePath, err) + return nil, err } - if isFile { - r.archiveComplete = true - return + + if archiver != nil { + // FIXME: If another process are generating it, we think it's not ready and just return + // Or we should wait until the archive generated. + if archiver.Status == models.RepoArchiverGenerating { + return nil, nil + } + } else { + archiver = &models.RepoArchiver{ + RepoID: r.RepoID, + Type: r.Type, + CommitID: r.CommitID, + Status: models.RepoArchiverGenerating, + } + if err := models.AddRepoArchiver(ctx, archiver); err != nil { + return nil, err + } } - // Create a temporary file to use while the archive is being built. We - // will then copy it into place (r.archivePath) once it's fully - // constructed. - tmpArchive, err = ioutil.TempFile("", "archive") + rPath, err := archiver.RelativePath() if err != nil { - log.Error("Unable to create a temporary archive file! Error: %v", err) - return + return nil, err + } + + _, err = storage.RepoArchives.Stat(rPath) + if err == nil { + if archiver.Status == models.RepoArchiverGenerating { + archiver.Status = models.RepoArchiverReady + return archiver, models.UpdateRepoArchiverStatus(ctx, archiver) + } + return archiver, nil + } + + if !errors.Is(err, os.ErrNotExist) { + return nil, fmt.Errorf("unable to stat archive: %v", err) } + + rd, w := io.Pipe() defer func() { - tmpArchive.Close() - os.Remove(tmpArchive.Name()) + w.Close() + rd.Close() }() + var done = make(chan error) + repo, err := archiver.LoadRepo() + if err != nil { + return nil, fmt.Errorf("archiver.LoadRepo failed: %v", err) + } - if err = r.commit.CreateArchive(graceful.GetManager().ShutdownContext(), tmpArchive.Name(), git.CreateArchiveOpts{ - Format: r.archiveType, - Prefix: setting.Repository.PrefixArchiveFiles, - }); err != nil { - log.Error("Download -> CreateArchive "+tmpArchive.Name(), err) - return + gitRepo, err := git.OpenRepository(repo.RepoPath()) + if err != nil { + return nil, err } + defer gitRepo.Close() + + go func(done chan error, w *io.PipeWriter, archiver *models.RepoArchiver, gitRepo *git.Repository) { + defer func() { + if r := recover(); r != nil { + done <- fmt.Errorf("%v", r) + } + }() + + err = gitRepo.CreateArchive( + graceful.GetManager().ShutdownContext(), + archiver.Type, + w, + setting.Repository.PrefixArchiveFiles, + archiver.CommitID, + ) + _ = w.CloseWithError(err) + done <- err + }(done, w, archiver, gitRepo) + + // TODO: add lfs data to zip + // TODO: add submodule data to zip - // Now we copy it into place - if destArchive, err = os.Create(r.archivePath); err != nil { - log.Error("Unable to open archive " + r.archivePath) - return + if _, err := storage.RepoArchives.Save(rPath, rd, -1); err != nil { + return nil, fmt.Errorf("unable to write archive: %v", err) } - _, err = io.Copy(destArchive, tmpArchive) - destArchive.Close() + + err = <-done if err != nil { - log.Error("Unable to write archive " + r.archivePath) - return + return nil, err + } + + if archiver.Status == models.RepoArchiverGenerating { + archiver.Status = models.RepoArchiverReady + if err = models.UpdateRepoArchiverStatus(ctx, archiver); err != nil { + return nil, err + } } - // Block any attempt to finalize creating a new request if we're marking - r.archiveComplete = true + return archiver, commiter.Commit() } // ArchiveRepository satisfies the ArchiveRequest being passed in. Processing @@ -255,65 +204,46 @@ func doArchive(r *ArchiveRequest) { // anything. In all cases, the caller should be examining the *ArchiveRequest // being returned for completion, as it may be different than the one they passed // in. -func ArchiveRepository(request *ArchiveRequest) *ArchiveRequest { - // We'll return the request that's already been enqueued if it has been - // enqueued, or we'll immediately enqueue it if it has not been enqueued - // and it is not marked complete. - archiveMutex.Lock() - defer archiveMutex.Unlock() - if rExisting := getArchiveRequest(request.repo, request.commit, request.archiveType); rExisting != nil { - return rExisting - } - if request.archiveComplete { - return request - } +func ArchiveRepository(request *ArchiveRequest) (*models.RepoArchiver, error) { + return doArchive(request) +} + +var archiverQueue queue.UniqueQueue - request.cchan = make(chan struct{}) - archiveInProgress = append(archiveInProgress, request) - go func() { - // Wait to start, if we have the Cond for it. This is currently only - // useful for testing, so that the start and release of queued entries - // can be controlled to examine the queue. - if archiveQueueStartCond != nil { - archiveQueueMutex.Lock() - archiveQueueStartCond.Wait() - archiveQueueMutex.Unlock() +// Init initlize archive +func Init() error { + handler := func(data ...queue.Data) { + for _, datum := range data { + archiveReq, ok := datum.(*ArchiveRequest) + if !ok { + log.Error("Unable to process provided datum: %v - not possible to cast to IndexerData", datum) + continue + } + log.Trace("ArchiverData Process: %#v", archiveReq) + if _, err := doArchive(archiveReq); err != nil { + log.Error("Archive %v faild: %v", datum, err) + } } + } - // Drop the mutex while we process the request. This may take a long - // time, and it's not necessary now that we've added the reequest to - // archiveInProgress. - doArchive(request) + archiverQueue = queue.CreateUniqueQueue("repo-archive", handler, new(ArchiveRequest)) + if archiverQueue == nil { + return errors.New("unable to create codes indexer queue") + } - if archiveQueueReleaseCond != nil { - archiveQueueMutex.Lock() - archiveQueueReleaseCond.Wait() - archiveQueueMutex.Unlock() - } + go graceful.GetManager().RunWithShutdownFns(archiverQueue.Run) - // Purge this request from the list. To do so, we'll just take the - // index at which we ended up at and swap the final element into that - // position, then chop off the now-redundant final element. The slice - // may have change in between these two segments and we may have moved, - // so we search for it here. We could perhaps avoid this search - // entirely if len(archiveInProgress) == 1, but we should verify - // correctness. - archiveMutex.Lock() - defer archiveMutex.Unlock() - - idx := -1 - for _idx, req := range archiveInProgress { - if req == request { - idx = _idx - break - } - } - if idx == -1 { - log.Error("ArchiveRepository: Failed to find request for removal.") - return - } - archiveInProgress = append(archiveInProgress[:idx], archiveInProgress[idx+1:]...) - }() + return nil +} - return request +// StartArchive push the archive request to the queue +func StartArchive(request *ArchiveRequest) error { + has, err := archiverQueue.Has(request) + if err != nil { + return err + } + if has { + return nil + } + return archiverQueue.Push(request) } |