Backport #29298
Fixes the reason why #29101 is hard to replicate.
Related #29297
Create a repo with a file with minimum size 4097 bytes (I use 10000) and
execute the following code:
```go
gitRepo, err := gitrepo.OpenRepository(db.DefaultContext, <repo>)
assert.NoError(t, err)
commit, err := gitRepo.GetCommit(<sha>)
assert.NoError(t, err)
entry, err := commit.GetTreeEntryByPath(<file>)
assert.NoError(t, err)
b := entry.Blob()
// Create a reader
r, err := b.DataAsync()
assert.NoError(t, err)
defer r.Close()
// Create a second reader
r2, err := b.DataAsync()
assert.NoError(t, err) // Should be no error but is ErrNotExist
defer r2.Close()
```
The problem is the check in `CatFileBatch`:
79217ea63c/modules/git/repo_base_nogogit.go (L81-L87)
`Buffered() > 0` is used to check if there is a "operation" in progress
at the moment. This is a problem because we can't control the internal
buffer in the `bufio.Reader`. The code above demonstrates a sequence
which initiates an operation for which the code thinks there is no
active processing. The second call to `DataAsync()` therefore reuses the
existing instances instead of creating a new batch reader.
tags/v1.21.6
gpgSettings *GPGSettings | gpgSettings *GPGSettings | ||||
batchInUse bool | |||||
batchCancel context.CancelFunc | batchCancel context.CancelFunc | ||||
batchReader *bufio.Reader | batchReader *bufio.Reader | ||||
batchWriter WriteCloserError | batchWriter WriteCloserError | ||||
checkInUse bool | |||||
checkCancel context.CancelFunc | checkCancel context.CancelFunc | ||||
checkReader *bufio.Reader | checkReader *bufio.Reader | ||||
checkWriter WriteCloserError | checkWriter WriteCloserError | ||||
// CatFileBatch obtains a CatFileBatch for this repository | // CatFileBatch obtains a CatFileBatch for this repository | ||||
func (repo *Repository) CatFileBatch(ctx context.Context) (WriteCloserError, *bufio.Reader, func()) { | func (repo *Repository) CatFileBatch(ctx context.Context) (WriteCloserError, *bufio.Reader, func()) { | ||||
if repo.batchCancel == nil || repo.batchReader.Buffered() > 0 { | |||||
if repo.batchCancel == nil || repo.batchInUse { | |||||
log.Debug("Opening temporary cat file batch for: %s", repo.Path) | log.Debug("Opening temporary cat file batch for: %s", repo.Path) | ||||
return CatFileBatch(ctx, repo.Path) | return CatFileBatch(ctx, repo.Path) | ||||
} | } | ||||
return repo.batchWriter, repo.batchReader, func() {} | |||||
repo.batchInUse = true | |||||
return repo.batchWriter, repo.batchReader, func() { | |||||
repo.batchInUse = false | |||||
} | |||||
} | } | ||||
// CatFileBatchCheck obtains a CatFileBatchCheck for this repository | // CatFileBatchCheck obtains a CatFileBatchCheck for this repository | ||||
func (repo *Repository) CatFileBatchCheck(ctx context.Context) (WriteCloserError, *bufio.Reader, func()) { | func (repo *Repository) CatFileBatchCheck(ctx context.Context) (WriteCloserError, *bufio.Reader, func()) { | ||||
if repo.checkCancel == nil || repo.checkReader.Buffered() > 0 { | |||||
log.Debug("Opening temporary cat file batch-check: %s", repo.Path) | |||||
if repo.checkCancel == nil || repo.checkInUse { | |||||
log.Debug("Opening temporary cat file batch-check for: %s", repo.Path) | |||||
return CatFileBatchCheck(ctx, repo.Path) | return CatFileBatchCheck(ctx, repo.Path) | ||||
} | } | ||||
return repo.checkWriter, repo.checkReader, func() {} | |||||
repo.checkInUse = true | |||||
return repo.checkWriter, repo.checkReader, func() { | |||||
repo.checkInUse = false | |||||
} | |||||
} | } | ||||
// Close this repository, in particular close the underlying gogitStorage if this is not nil | |||||
func (repo *Repository) Close() (err error) { | func (repo *Repository) Close() (err error) { | ||||
if repo == nil { | if repo == nil { | ||||
return nil | return nil | ||||
repo.batchReader = nil | repo.batchReader = nil | ||||
repo.batchWriter = nil | repo.batchWriter = nil | ||||
repo.batchCancel = nil | repo.batchCancel = nil | ||||
repo.batchInUse = false | |||||
} | } | ||||
if repo.checkCancel != nil { | if repo.checkCancel != nil { | ||||
repo.checkCancel() | repo.checkCancel() | ||||
repo.checkCancel = nil | repo.checkCancel = nil | ||||
repo.checkReader = nil | repo.checkReader = nil | ||||
repo.checkWriter = nil | repo.checkWriter = nil | ||||
repo.checkInUse = false | |||||
} | } | ||||
repo.LastCommitCache = nil | repo.LastCommitCache = nil | ||||
repo.tagCache = nil | repo.tagCache = nil |
package integration | package integration | ||||
import ( | import ( | ||||
"bytes" | |||||
"encoding/hex" | "encoding/hex" | ||||
"fmt" | "fmt" | ||||
"math/rand" | "math/rand" | ||||
"code.gitea.io/gitea/modules/lfs" | "code.gitea.io/gitea/modules/lfs" | ||||
"code.gitea.io/gitea/modules/setting" | "code.gitea.io/gitea/modules/setting" | ||||
api "code.gitea.io/gitea/modules/structs" | api "code.gitea.io/gitea/modules/structs" | ||||
files_service "code.gitea.io/gitea/services/repository/files" | |||||
"code.gitea.io/gitea/tests" | "code.gitea.io/gitea/tests" | ||||
"github.com/stretchr/testify/assert" | "github.com/stretchr/testify/assert" | ||||
t.Run("CheckoutMasterAgain", doGitCheckoutBranch(dstPath, "master")) | t.Run("CheckoutMasterAgain", doGitCheckoutBranch(dstPath, "master")) | ||||
} | } | ||||
} | } | ||||
func TestDataAsync_Issue29101(t *testing.T) { | |||||
onGiteaRun(t, func(t *testing.T, u *url.URL) { | |||||
user := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) | |||||
repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: 1}) | |||||
resp, err := files_service.ChangeRepoFiles(db.DefaultContext, repo, user, &files_service.ChangeRepoFilesOptions{ | |||||
Files: []*files_service.ChangeRepoFile{ | |||||
{ | |||||
Operation: "create", | |||||
TreePath: "test.txt", | |||||
ContentReader: bytes.NewReader(make([]byte, 10000)), | |||||
}, | |||||
}, | |||||
OldBranch: repo.DefaultBranch, | |||||
NewBranch: repo.DefaultBranch, | |||||
}) | |||||
assert.NoError(t, err) | |||||
sha := resp.Commit.SHA | |||||
gitRepo, err := git.OpenRepository(db.DefaultContext, repo.RepoPath()) | |||||
assert.NoError(t, err) | |||||
commit, err := gitRepo.GetCommit(sha) | |||||
assert.NoError(t, err) | |||||
entry, err := commit.GetTreeEntryByPath("test.txt") | |||||
assert.NoError(t, err) | |||||
b := entry.Blob() | |||||
r, err := b.DataAsync() | |||||
assert.NoError(t, err) | |||||
defer r.Close() | |||||
r2, err := b.DataAsync() | |||||
assert.NoError(t, err) | |||||
defer r2.Close() | |||||
}) | |||||
} |