You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

lfs.go 7.3KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. // Copyright 2019 The Gitea Authors.
  2. // All rights reserved.
  3. // Use of this source code is governed by a MIT-style
  4. // license that can be found in the LICENSE file.
  5. package pull
  6. import (
  7. "bufio"
  8. "bytes"
  9. "fmt"
  10. "io"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "code.gitea.io/gitea/models"
  15. "code.gitea.io/gitea/modules/git"
  16. "code.gitea.io/gitea/modules/lfs"
  17. "code.gitea.io/gitea/modules/log"
  18. )
  19. // LFSPush pushes lfs objects referred to in new commits in the head repository from the base repository
  20. func LFSPush(tmpBasePath, mergeHeadSHA, mergeBaseSHA string, pr *models.PullRequest) error {
  21. // Now we have to implement git lfs push
  22. // git rev-list --objects --filter=blob:limit=1k HEAD --not base
  23. // pass blob shas in to git cat-file --batch-check (possibly unnecessary)
  24. // ensure only blobs and <=1k size then pass in to git cat-file --batch
  25. // to read each sha and check each as a pointer
  26. // Then if they are lfs -> add them to the baseRepo
  27. revListReader, revListWriter := io.Pipe()
  28. shasToCheckReader, shasToCheckWriter := io.Pipe()
  29. catFileCheckReader, catFileCheckWriter := io.Pipe()
  30. shasToBatchReader, shasToBatchWriter := io.Pipe()
  31. catFileBatchReader, catFileBatchWriter := io.Pipe()
  32. errChan := make(chan error, 1)
  33. wg := sync.WaitGroup{}
  34. wg.Add(6)
  35. // Create the go-routines in reverse order.
  36. // 6. Take the output of cat-file --batch and check if each file in turn
  37. // to see if they're pointers to files in the LFS store associated with
  38. // the head repo and add them to the base repo if so
  39. go readCatFileBatch(catFileBatchReader, &wg, pr)
  40. // 5. Take the shas of the blobs and batch read them
  41. go doCatFileBatch(shasToBatchReader, catFileBatchWriter, &wg, tmpBasePath)
  42. // 4. From the provided objects restrict to blobs <=1k
  43. go readCatFileBatchCheck(catFileCheckReader, shasToBatchWriter, &wg)
  44. // 3. Run batch-check on the objects retrieved from rev-list
  45. go doCatFileBatchCheck(shasToCheckReader, catFileCheckWriter, &wg, tmpBasePath)
  46. // 2. Check each object retrieved rejecting those without names as they will be commits or trees
  47. go readRevListObjects(revListReader, shasToCheckWriter, &wg)
  48. // 1. Run rev-list objects from mergeHead to mergeBase
  49. go doRevListObjects(revListWriter, &wg, tmpBasePath, mergeHeadSHA, mergeBaseSHA, errChan)
  50. wg.Wait()
  51. select {
  52. case err, has := <-errChan:
  53. if has {
  54. return err
  55. }
  56. default:
  57. }
  58. return nil
  59. }
  60. func doRevListObjects(revListWriter *io.PipeWriter, wg *sync.WaitGroup, tmpBasePath, headSHA, baseSHA string, errChan chan<- error) {
  61. defer wg.Done()
  62. defer revListWriter.Close()
  63. stderr := new(bytes.Buffer)
  64. var errbuf strings.Builder
  65. cmd := git.NewCommand("rev-list", "--objects", headSHA, "--not", baseSHA)
  66. if err := cmd.RunInDirPipeline(tmpBasePath, revListWriter, stderr); err != nil {
  67. log.Error("git rev-list [%s]: %v - %s", tmpBasePath, err, errbuf.String())
  68. errChan <- fmt.Errorf("git rev-list [%s]: %v - %s", tmpBasePath, err, errbuf.String())
  69. }
  70. }
  71. func readRevListObjects(revListReader *io.PipeReader, shasToCheckWriter *io.PipeWriter, wg *sync.WaitGroup) {
  72. defer wg.Done()
  73. defer revListReader.Close()
  74. defer shasToCheckWriter.Close()
  75. scanner := bufio.NewScanner(revListReader)
  76. for scanner.Scan() {
  77. line := scanner.Text()
  78. if len(line) == 0 {
  79. continue
  80. }
  81. fields := strings.Split(line, " ")
  82. if len(fields) < 2 || len(fields[1]) == 0 {
  83. continue
  84. }
  85. toWrite := []byte(fields[0] + "\n")
  86. for len(toWrite) > 0 {
  87. n, err := shasToCheckWriter.Write(toWrite)
  88. if err != nil {
  89. _ = revListReader.CloseWithError(err)
  90. break
  91. }
  92. toWrite = toWrite[n:]
  93. }
  94. }
  95. _ = shasToCheckWriter.CloseWithError(scanner.Err())
  96. }
  97. func doCatFileBatchCheck(shasToCheckReader *io.PipeReader, catFileCheckWriter *io.PipeWriter, wg *sync.WaitGroup, tmpBasePath string) {
  98. defer wg.Done()
  99. defer shasToCheckReader.Close()
  100. defer catFileCheckWriter.Close()
  101. stderr := new(bytes.Buffer)
  102. var errbuf strings.Builder
  103. cmd := git.NewCommand("cat-file", "--batch-check")
  104. if err := cmd.RunInDirFullPipeline(tmpBasePath, catFileCheckWriter, stderr, shasToCheckReader); err != nil {
  105. _ = catFileCheckWriter.CloseWithError(fmt.Errorf("git cat-file --batch-check [%s]: %v - %s", tmpBasePath, err, errbuf.String()))
  106. }
  107. }
  108. func readCatFileBatchCheck(catFileCheckReader *io.PipeReader, shasToBatchWriter *io.PipeWriter, wg *sync.WaitGroup) {
  109. defer wg.Done()
  110. defer catFileCheckReader.Close()
  111. scanner := bufio.NewScanner(catFileCheckReader)
  112. defer func() {
  113. _ = shasToBatchWriter.CloseWithError(scanner.Err())
  114. }()
  115. for scanner.Scan() {
  116. line := scanner.Text()
  117. if len(line) == 0 {
  118. continue
  119. }
  120. fields := strings.Split(line, " ")
  121. if len(fields) < 3 || fields[1] != "blob" {
  122. continue
  123. }
  124. size, _ := strconv.Atoi(fields[2])
  125. if size > 1024 {
  126. continue
  127. }
  128. toWrite := []byte(fields[0] + "\n")
  129. for len(toWrite) > 0 {
  130. n, err := shasToBatchWriter.Write(toWrite)
  131. if err != nil {
  132. _ = catFileCheckReader.CloseWithError(err)
  133. break
  134. }
  135. toWrite = toWrite[n:]
  136. }
  137. }
  138. }
  139. func doCatFileBatch(shasToBatchReader *io.PipeReader, catFileBatchWriter *io.PipeWriter, wg *sync.WaitGroup, tmpBasePath string) {
  140. defer wg.Done()
  141. defer shasToBatchReader.Close()
  142. defer catFileBatchWriter.Close()
  143. stderr := new(bytes.Buffer)
  144. var errbuf strings.Builder
  145. if err := git.NewCommand("cat-file", "--batch").RunInDirFullPipeline(tmpBasePath, catFileBatchWriter, stderr, shasToBatchReader); err != nil {
  146. _ = shasToBatchReader.CloseWithError(fmt.Errorf("git rev-list [%s]: %v - %s", tmpBasePath, err, errbuf.String()))
  147. }
  148. }
  149. func readCatFileBatch(catFileBatchReader *io.PipeReader, wg *sync.WaitGroup, pr *models.PullRequest) {
  150. defer wg.Done()
  151. defer catFileBatchReader.Close()
  152. bufferedReader := bufio.NewReader(catFileBatchReader)
  153. buf := make([]byte, 1025)
  154. for {
  155. // File descriptor line: sha
  156. _, err := bufferedReader.ReadString(' ')
  157. if err != nil {
  158. _ = catFileBatchReader.CloseWithError(err)
  159. break
  160. }
  161. // Throw away the blob
  162. if _, err := bufferedReader.ReadString(' '); err != nil {
  163. _ = catFileBatchReader.CloseWithError(err)
  164. break
  165. }
  166. sizeStr, err := bufferedReader.ReadString('\n')
  167. if err != nil {
  168. _ = catFileBatchReader.CloseWithError(err)
  169. break
  170. }
  171. size, err := strconv.Atoi(sizeStr[:len(sizeStr)-1])
  172. if err != nil {
  173. _ = catFileBatchReader.CloseWithError(err)
  174. break
  175. }
  176. pointerBuf := buf[:size+1]
  177. if _, err := io.ReadFull(bufferedReader, pointerBuf); err != nil {
  178. _ = catFileBatchReader.CloseWithError(err)
  179. break
  180. }
  181. pointerBuf = pointerBuf[:size]
  182. // Now we need to check if the pointerBuf is an LFS pointer
  183. pointer := lfs.IsPointerFile(&pointerBuf)
  184. if pointer == nil {
  185. continue
  186. }
  187. // Then we need to check that this pointer is in the db
  188. if _, err := pr.HeadRepo.GetLFSMetaObjectByOid(pointer.Oid); err != nil {
  189. if err == models.ErrLFSObjectNotExist {
  190. log.Warn("During merge of: %d in %-v, there is a pointer to LFS Oid: %s which although present in the LFS store is not associated with the head repo %-v", pr.Index, pr.BaseRepo, pointer.Oid, pr.HeadRepo)
  191. continue
  192. }
  193. _ = catFileBatchReader.CloseWithError(err)
  194. break
  195. }
  196. // OK we have a pointer that is associated with the head repo
  197. // and is actually a file in the LFS
  198. // Therefore it should be associated with the base repo
  199. pointer.RepositoryID = pr.BaseRepoID
  200. if _, err := models.NewLFSMetaObject(pointer); err != nil {
  201. _ = catFileBatchReader.CloseWithError(err)
  202. break
  203. }
  204. }
  205. }