diff options
Diffstat (limited to 'services/archiver')
-rw-r--r-- | services/archiver/archiver.go | 394 | ||||
-rw-r--r-- | services/archiver/archiver_test.go | 159 |
2 files changed, 197 insertions, 356 deletions
diff --git a/services/archiver/archiver.go b/services/archiver/archiver.go index dfa6334d95..00c0281306 100644 --- a/services/archiver/archiver.go +++ b/services/archiver/archiver.go @@ -6,22 +6,20 @@ package archiver import ( + "errors" + "fmt" "io" - "io/ioutil" "os" - "path" "regexp" "strings" - "sync" - "time" - "code.gitea.io/gitea/modules/base" - "code.gitea.io/gitea/modules/context" + "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" - "code.gitea.io/gitea/modules/util" + "code.gitea.io/gitea/modules/storage" ) // ArchiveRequest defines the parameters of an archive request, which notably @@ -30,223 +28,174 @@ import ( // This is entirely opaque to external entities, though, and mostly used as a // handle elsewhere. type ArchiveRequest struct { - uri string - repo *git.Repository - refName string - ext string - archivePath string - archiveType git.ArchiveType - archiveComplete bool - commit *git.Commit - cchan chan struct{} + RepoID int64 + refName string + Type git.ArchiveType + CommitID string } -var archiveInProgress []*ArchiveRequest -var archiveMutex sync.Mutex - // SHA1 hashes will only go up to 40 characters, but SHA256 hashes will go all // the way to 64. var shaRegex = regexp.MustCompile(`^[0-9a-f]{4,64}$`) -// These facilitate testing, by allowing the unit tests to control (to some extent) -// the goroutine used for processing the queue. -var archiveQueueMutex *sync.Mutex -var archiveQueueStartCond *sync.Cond -var archiveQueueReleaseCond *sync.Cond - -// GetArchivePath returns the path from which we can serve this archive. -func (aReq *ArchiveRequest) GetArchivePath() string { - return aReq.archivePath -} - -// GetArchiveName returns the name of the caller, based on the ref used by the -// caller to create this request. -func (aReq *ArchiveRequest) GetArchiveName() string { - return aReq.refName + aReq.ext -} - -// IsComplete returns the completion status of this request. -func (aReq *ArchiveRequest) IsComplete() bool { - return aReq.archiveComplete -} - -// WaitForCompletion will wait for this request to complete, with no timeout. -// It returns whether the archive was actually completed, as the channel could -// have also been closed due to an error. -func (aReq *ArchiveRequest) WaitForCompletion(ctx *context.Context) bool { - select { - case <-aReq.cchan: - case <-ctx.Done(): - } - - return aReq.IsComplete() -} - -// TimedWaitForCompletion will wait for this request to complete, with timeout -// happening after the specified Duration. It returns whether the archive is -// now complete and whether we hit the timeout or not. The latter may not be -// useful if the request is complete or we started to shutdown. -func (aReq *ArchiveRequest) TimedWaitForCompletion(ctx *context.Context, dur time.Duration) (bool, bool) { - timeout := false - select { - case <-time.After(dur): - timeout = true - case <-aReq.cchan: - case <-ctx.Done(): - } - - return aReq.IsComplete(), timeout -} - -// The caller must hold the archiveMutex across calls to getArchiveRequest. -func getArchiveRequest(repo *git.Repository, commit *git.Commit, archiveType git.ArchiveType) *ArchiveRequest { - for _, r := range archiveInProgress { - // Need to be referring to the same repository. - if r.repo.Path == repo.Path && r.commit.ID == commit.ID && r.archiveType == archiveType { - return r - } - } - return nil -} - -// DeriveRequestFrom creates an archival request, based on the URI. The +// NewRequest creates an archival request, based on the URI. The // resulting ArchiveRequest is suitable for being passed to ArchiveRepository() // if it's determined that the request still needs to be satisfied. -func DeriveRequestFrom(ctx *context.Context, uri string) *ArchiveRequest { - if ctx.Repo == nil || ctx.Repo.GitRepo == nil { - log.Trace("Repo not initialized") - return nil - } +func NewRequest(repoID int64, repo *git.Repository, uri string) (*ArchiveRequest, error) { r := &ArchiveRequest{ - uri: uri, - repo: ctx.Repo.GitRepo, + RepoID: repoID, } + var ext string switch { case strings.HasSuffix(uri, ".zip"): - r.ext = ".zip" - r.archivePath = path.Join(r.repo.Path, "archives/zip") - r.archiveType = git.ZIP + ext = ".zip" + r.Type = git.ZIP case strings.HasSuffix(uri, ".tar.gz"): - r.ext = ".tar.gz" - r.archivePath = path.Join(r.repo.Path, "archives/targz") - r.archiveType = git.TARGZ + ext = ".tar.gz" + r.Type = git.TARGZ default: - log.Trace("Unknown format: %s", uri) - return nil + return nil, fmt.Errorf("Unknown format: %s", uri) } - r.refName = strings.TrimSuffix(r.uri, r.ext) - isDir, err := util.IsDir(r.archivePath) - if err != nil { - ctx.ServerError("Download -> util.IsDir(archivePath)", err) - return nil - } - if !isDir { - if err := os.MkdirAll(r.archivePath, os.ModePerm); err != nil { - ctx.ServerError("Download -> os.MkdirAll(archivePath)", err) - return nil - } - } + r.refName = strings.TrimSuffix(uri, ext) + var err error // Get corresponding commit. - if r.repo.IsBranchExist(r.refName) { - r.commit, err = r.repo.GetBranchCommit(r.refName) + if repo.IsBranchExist(r.refName) { + r.CommitID, err = repo.GetBranchCommitID(r.refName) if err != nil { - ctx.ServerError("GetBranchCommit", err) - return nil + return nil, err } - } else if r.repo.IsTagExist(r.refName) { - r.commit, err = r.repo.GetTagCommit(r.refName) + } else if repo.IsTagExist(r.refName) { + r.CommitID, err = repo.GetTagCommitID(r.refName) if err != nil { - ctx.ServerError("GetTagCommit", err) - return nil + return nil, err } } else if shaRegex.MatchString(r.refName) { - r.commit, err = r.repo.GetCommit(r.refName) - if err != nil { - ctx.NotFound("GetCommit", nil) - return nil + if repo.IsCommitExist(r.refName) { + r.CommitID = r.refName + } else { + return nil, git.ErrNotExist{ + ID: r.refName, + } } } else { - ctx.NotFound("DeriveRequestFrom", nil) - return nil + return nil, fmt.Errorf("Unknow ref %s type", r.refName) } - archiveMutex.Lock() - defer archiveMutex.Unlock() - if rExisting := getArchiveRequest(r.repo, r.commit, r.archiveType); rExisting != nil { - return rExisting - } + return r, nil +} + +// GetArchiveName returns the name of the caller, based on the ref used by the +// caller to create this request. +func (aReq *ArchiveRequest) GetArchiveName() string { + return strings.ReplaceAll(aReq.refName, "/", "-") + "." + aReq.Type.String() +} - r.archivePath = path.Join(r.archivePath, base.ShortSha(r.commit.ID.String())+r.ext) - r.archiveComplete, err = util.IsFile(r.archivePath) +func doArchive(r *ArchiveRequest) (*models.RepoArchiver, error) { + ctx, commiter, err := models.TxDBContext() if err != nil { - ctx.ServerError("util.IsFile", err) - return nil + return nil, err } - return r -} + defer commiter.Close() -func doArchive(r *ArchiveRequest) { - var ( - err error - tmpArchive *os.File - destArchive *os.File - ) - - // Close the channel to indicate to potential waiters that this request - // has finished. - defer close(r.cchan) - - // It could have happened that we enqueued two archival requests, due to - // race conditions and difficulties in locking. Do one last check that - // the archive we're referring to doesn't already exist. If it does exist, - // then just mark the request as complete and move on. - isFile, err := util.IsFile(r.archivePath) + archiver, err := models.GetRepoArchiver(ctx, r.RepoID, r.Type, r.CommitID) if err != nil { - log.Error("Unable to check if %s util.IsFile: %v. Will ignore and recreate.", r.archivePath, err) + return nil, err } - if isFile { - r.archiveComplete = true - return + + if archiver != nil { + // FIXME: If another process are generating it, we think it's not ready and just return + // Or we should wait until the archive generated. + if archiver.Status == models.RepoArchiverGenerating { + return nil, nil + } + } else { + archiver = &models.RepoArchiver{ + RepoID: r.RepoID, + Type: r.Type, + CommitID: r.CommitID, + Status: models.RepoArchiverGenerating, + } + if err := models.AddRepoArchiver(ctx, archiver); err != nil { + return nil, err + } } - // Create a temporary file to use while the archive is being built. We - // will then copy it into place (r.archivePath) once it's fully - // constructed. - tmpArchive, err = ioutil.TempFile("", "archive") + rPath, err := archiver.RelativePath() if err != nil { - log.Error("Unable to create a temporary archive file! Error: %v", err) - return + return nil, err + } + + _, err = storage.RepoArchives.Stat(rPath) + if err == nil { + if archiver.Status == models.RepoArchiverGenerating { + archiver.Status = models.RepoArchiverReady + return archiver, models.UpdateRepoArchiverStatus(ctx, archiver) + } + return archiver, nil + } + + if !errors.Is(err, os.ErrNotExist) { + return nil, fmt.Errorf("unable to stat archive: %v", err) } + + rd, w := io.Pipe() defer func() { - tmpArchive.Close() - os.Remove(tmpArchive.Name()) + w.Close() + rd.Close() }() + var done = make(chan error) + repo, err := archiver.LoadRepo() + if err != nil { + return nil, fmt.Errorf("archiver.LoadRepo failed: %v", err) + } - if err = r.commit.CreateArchive(graceful.GetManager().ShutdownContext(), tmpArchive.Name(), git.CreateArchiveOpts{ - Format: r.archiveType, - Prefix: setting.Repository.PrefixArchiveFiles, - }); err != nil { - log.Error("Download -> CreateArchive "+tmpArchive.Name(), err) - return + gitRepo, err := git.OpenRepository(repo.RepoPath()) + if err != nil { + return nil, err } + defer gitRepo.Close() + + go func(done chan error, w *io.PipeWriter, archiver *models.RepoArchiver, gitRepo *git.Repository) { + defer func() { + if r := recover(); r != nil { + done <- fmt.Errorf("%v", r) + } + }() + + err = gitRepo.CreateArchive( + graceful.GetManager().ShutdownContext(), + archiver.Type, + w, + setting.Repository.PrefixArchiveFiles, + archiver.CommitID, + ) + _ = w.CloseWithError(err) + done <- err + }(done, w, archiver, gitRepo) + + // TODO: add lfs data to zip + // TODO: add submodule data to zip - // Now we copy it into place - if destArchive, err = os.Create(r.archivePath); err != nil { - log.Error("Unable to open archive " + r.archivePath) - return + if _, err := storage.RepoArchives.Save(rPath, rd, -1); err != nil { + return nil, fmt.Errorf("unable to write archive: %v", err) } - _, err = io.Copy(destArchive, tmpArchive) - destArchive.Close() + + err = <-done if err != nil { - log.Error("Unable to write archive " + r.archivePath) - return + return nil, err + } + + if archiver.Status == models.RepoArchiverGenerating { + archiver.Status = models.RepoArchiverReady + if err = models.UpdateRepoArchiverStatus(ctx, archiver); err != nil { + return nil, err + } } - // Block any attempt to finalize creating a new request if we're marking - r.archiveComplete = true + return archiver, commiter.Commit() } // ArchiveRepository satisfies the ArchiveRequest being passed in. Processing @@ -255,65 +204,46 @@ func doArchive(r *ArchiveRequest) { // anything. In all cases, the caller should be examining the *ArchiveRequest // being returned for completion, as it may be different than the one they passed // in. -func ArchiveRepository(request *ArchiveRequest) *ArchiveRequest { - // We'll return the request that's already been enqueued if it has been - // enqueued, or we'll immediately enqueue it if it has not been enqueued - // and it is not marked complete. - archiveMutex.Lock() - defer archiveMutex.Unlock() - if rExisting := getArchiveRequest(request.repo, request.commit, request.archiveType); rExisting != nil { - return rExisting - } - if request.archiveComplete { - return request - } +func ArchiveRepository(request *ArchiveRequest) (*models.RepoArchiver, error) { + return doArchive(request) +} + +var archiverQueue queue.UniqueQueue - request.cchan = make(chan struct{}) - archiveInProgress = append(archiveInProgress, request) - go func() { - // Wait to start, if we have the Cond for it. This is currently only - // useful for testing, so that the start and release of queued entries - // can be controlled to examine the queue. - if archiveQueueStartCond != nil { - archiveQueueMutex.Lock() - archiveQueueStartCond.Wait() - archiveQueueMutex.Unlock() +// Init initlize archive +func Init() error { + handler := func(data ...queue.Data) { + for _, datum := range data { + archiveReq, ok := datum.(*ArchiveRequest) + if !ok { + log.Error("Unable to process provided datum: %v - not possible to cast to IndexerData", datum) + continue + } + log.Trace("ArchiverData Process: %#v", archiveReq) + if _, err := doArchive(archiveReq); err != nil { + log.Error("Archive %v faild: %v", datum, err) + } } + } - // Drop the mutex while we process the request. This may take a long - // time, and it's not necessary now that we've added the reequest to - // archiveInProgress. - doArchive(request) + archiverQueue = queue.CreateUniqueQueue("repo-archive", handler, new(ArchiveRequest)) + if archiverQueue == nil { + return errors.New("unable to create codes indexer queue") + } - if archiveQueueReleaseCond != nil { - archiveQueueMutex.Lock() - archiveQueueReleaseCond.Wait() - archiveQueueMutex.Unlock() - } + go graceful.GetManager().RunWithShutdownFns(archiverQueue.Run) - // Purge this request from the list. To do so, we'll just take the - // index at which we ended up at and swap the final element into that - // position, then chop off the now-redundant final element. The slice - // may have change in between these two segments and we may have moved, - // so we search for it here. We could perhaps avoid this search - // entirely if len(archiveInProgress) == 1, but we should verify - // correctness. - archiveMutex.Lock() - defer archiveMutex.Unlock() - - idx := -1 - for _idx, req := range archiveInProgress { - if req == request { - idx = _idx - break - } - } - if idx == -1 { - log.Error("ArchiveRepository: Failed to find request for removal.") - return - } - archiveInProgress = append(archiveInProgress[:idx], archiveInProgress[idx+1:]...) - }() + return nil +} - return request +// StartArchive push the archive request to the queue +func StartArchive(request *ArchiveRequest) error { + has, err := archiverQueue.Has(request) + if err != nil { + return err + } + if has { + return nil + } + return archiverQueue.Push(request) } diff --git a/services/archiver/archiver_test.go b/services/archiver/archiver_test.go index 6dcd942bf5..3f3f369987 100644 --- a/services/archiver/archiver_test.go +++ b/services/archiver/archiver_test.go @@ -6,108 +6,75 @@ package archiver import ( "path/filepath" - "sync" "testing" "time" "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/test" - "code.gitea.io/gitea/modules/util" "github.com/stretchr/testify/assert" ) -var queueMutex sync.Mutex - func TestMain(m *testing.M) { models.MainTest(m, filepath.Join("..", "..")) } func waitForCount(t *testing.T, num int) { - var numQueued int - - // Wait for up to 10 seconds for the queue to be impacted. - timeout := time.Now().Add(10 * time.Second) - for { - numQueued = len(archiveInProgress) - if numQueued == num || time.Now().After(timeout) { - break - } - } - - assert.Len(t, archiveInProgress, num) -} - -func releaseOneEntry(t *testing.T, inFlight []*ArchiveRequest) { - var nowQueued, numQueued int - - numQueued = len(archiveInProgress) - - // Release one, then wait up to 10 seconds for it to complete. - queueMutex.Lock() - archiveQueueReleaseCond.Signal() - queueMutex.Unlock() - timeout := time.Now().Add(10 * time.Second) - for { - nowQueued = len(archiveInProgress) - if nowQueued != numQueued || time.Now().After(timeout) { - break - } - } - - // Make sure we didn't just timeout. - assert.NotEqual(t, numQueued, nowQueued) - // Also make sure that we released only one. - assert.Equal(t, numQueued-1, nowQueued) } func TestArchive_Basic(t *testing.T) { assert.NoError(t, models.PrepareTestDatabase()) - archiveQueueMutex = &queueMutex - archiveQueueStartCond = sync.NewCond(&queueMutex) - archiveQueueReleaseCond = sync.NewCond(&queueMutex) - defer func() { - archiveQueueMutex = nil - archiveQueueStartCond = nil - archiveQueueReleaseCond = nil - }() - ctx := test.MockContext(t, "user27/repo49") firstCommit, secondCommit := "51f84af23134", "aacbdfe9e1c4" - bogusReq := DeriveRequestFrom(ctx, firstCommit+".zip") - assert.Nil(t, bogusReq) - test.LoadRepo(t, ctx, 49) - bogusReq = DeriveRequestFrom(ctx, firstCommit+".zip") - assert.Nil(t, bogusReq) - test.LoadGitRepo(t, ctx) defer ctx.Repo.GitRepo.Close() + bogusReq, err := NewRequest(ctx.Repo.Repository.ID, ctx.Repo.GitRepo, firstCommit+".zip") + assert.NoError(t, err) + assert.NotNil(t, bogusReq) + assert.EqualValues(t, firstCommit+".zip", bogusReq.GetArchiveName()) + // Check a series of bogus requests. // Step 1, valid commit with a bad extension. - bogusReq = DeriveRequestFrom(ctx, firstCommit+".dilbert") + bogusReq, err = NewRequest(ctx.Repo.Repository.ID, ctx.Repo.GitRepo, firstCommit+".dilbert") + assert.Error(t, err) assert.Nil(t, bogusReq) // Step 2, missing commit. - bogusReq = DeriveRequestFrom(ctx, "dbffff.zip") + bogusReq, err = NewRequest(ctx.Repo.Repository.ID, ctx.Repo.GitRepo, "dbffff.zip") + assert.Error(t, err) assert.Nil(t, bogusReq) // Step 3, doesn't look like branch/tag/commit. - bogusReq = DeriveRequestFrom(ctx, "db.zip") + bogusReq, err = NewRequest(ctx.Repo.Repository.ID, ctx.Repo.GitRepo, "db.zip") + assert.Error(t, err) assert.Nil(t, bogusReq) + bogusReq, err = NewRequest(ctx.Repo.Repository.ID, ctx.Repo.GitRepo, "master.zip") + assert.NoError(t, err) + assert.NotNil(t, bogusReq) + assert.EqualValues(t, "master.zip", bogusReq.GetArchiveName()) + + bogusReq, err = NewRequest(ctx.Repo.Repository.ID, ctx.Repo.GitRepo, "test/archive.zip") + assert.NoError(t, err) + assert.NotNil(t, bogusReq) + assert.EqualValues(t, "test-archive.zip", bogusReq.GetArchiveName()) + // Now two valid requests, firstCommit with valid extensions. - zipReq := DeriveRequestFrom(ctx, firstCommit+".zip") + zipReq, err := NewRequest(ctx.Repo.Repository.ID, ctx.Repo.GitRepo, firstCommit+".zip") + assert.NoError(t, err) assert.NotNil(t, zipReq) - tgzReq := DeriveRequestFrom(ctx, firstCommit+".tar.gz") + tgzReq, err := NewRequest(ctx.Repo.Repository.ID, ctx.Repo.GitRepo, firstCommit+".tar.gz") + assert.NoError(t, err) assert.NotNil(t, tgzReq) - secondReq := DeriveRequestFrom(ctx, secondCommit+".zip") + secondReq, err := NewRequest(ctx.Repo.Repository.ID, ctx.Repo.GitRepo, secondCommit+".zip") + assert.NoError(t, err) assert.NotNil(t, secondReq) inFlight := make([]*ArchiveRequest, 3) @@ -128,41 +95,9 @@ func TestArchive_Basic(t *testing.T) { // Sleep two seconds to make sure the queue doesn't change. time.Sleep(2 * time.Second) - assert.Len(t, archiveInProgress, 3) - - // Release them all, they'll then stall at the archiveQueueReleaseCond while - // we examine the queue state. - queueMutex.Lock() - archiveQueueStartCond.Broadcast() - queueMutex.Unlock() - - // Iterate through all of the in-flight requests and wait for their - // completion. - for _, req := range inFlight { - req.WaitForCompletion(ctx) - } - - for _, req := range inFlight { - assert.True(t, req.IsComplete()) - exist, err := util.IsExist(req.GetArchivePath()) - assert.NoError(t, err) - assert.True(t, exist) - } - - arbitraryReq := inFlight[0] - // Reopen the channel so we don't double-close, mark it incomplete. We're - // going to run it back through the archiver, and it should get marked - // complete again. - arbitraryReq.cchan = make(chan struct{}) - arbitraryReq.archiveComplete = false - doArchive(arbitraryReq) - assert.True(t, arbitraryReq.IsComplete()) - - // Queues should not have drained yet, because we haven't released them. - // Do so now. - assert.Len(t, archiveInProgress, 3) - - zipReq2 := DeriveRequestFrom(ctx, firstCommit+".zip") + + zipReq2, err := NewRequest(ctx.Repo.Repository.ID, ctx.Repo.GitRepo, firstCommit+".zip") + assert.NoError(t, err) // This zipReq should match what's sitting in the queue, as we haven't // let it release yet. From the consumer's point of view, this looks like // a long-running archive task. @@ -173,46 +108,22 @@ func TestArchive_Basic(t *testing.T) { // predecessor has cleared out of the queue. ArchiveRepository(zipReq2) - // Make sure the queue hasn't grown any. - assert.Len(t, archiveInProgress, 3) - - // Make sure the queue drains properly - releaseOneEntry(t, inFlight) - assert.Len(t, archiveInProgress, 2) - releaseOneEntry(t, inFlight) - assert.Len(t, archiveInProgress, 1) - releaseOneEntry(t, inFlight) - assert.Empty(t, archiveInProgress) - // Now we'll submit a request and TimedWaitForCompletion twice, before and // after we release it. We should trigger both the timeout and non-timeout // cases. - var completed, timedout bool - timedReq := DeriveRequestFrom(ctx, secondCommit+".tar.gz") + timedReq, err := NewRequest(ctx.Repo.Repository.ID, ctx.Repo.GitRepo, secondCommit+".tar.gz") + assert.NoError(t, err) assert.NotNil(t, timedReq) ArchiveRepository(timedReq) - // Guaranteed to timeout; we haven't signalled the request to start.. - completed, timedout = timedReq.TimedWaitForCompletion(ctx, 2*time.Second) - assert.False(t, completed) - assert.True(t, timedout) - - queueMutex.Lock() - archiveQueueStartCond.Broadcast() - queueMutex.Unlock() - - // Shouldn't timeout, we've now signalled it and it's a small request. - completed, timedout = timedReq.TimedWaitForCompletion(ctx, 15*time.Second) - assert.True(t, completed) - assert.False(t, timedout) - - zipReq2 = DeriveRequestFrom(ctx, firstCommit+".zip") + zipReq2, err = NewRequest(ctx.Repo.Repository.ID, ctx.Repo.GitRepo, firstCommit+".zip") + assert.NoError(t, err) // Now, we're guaranteed to have released the original zipReq from the queue. // Ensure that we don't get handed back the released entry somehow, but they // should remain functionally equivalent in all fields. The exception here // is zipReq.cchan, which will be non-nil because it's a completed request. // It's fine to go ahead and set it to nil now. - zipReq.cchan = nil + assert.Equal(t, zipReq, zipReq2) assert.False(t, zipReq == zipReq2) |