You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

artifacts_chunks.go 6.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. // Copyright 2023 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package actions
  4. import (
  5. "crypto/md5"
  6. "encoding/base64"
  7. "fmt"
  8. "io"
  9. "path/filepath"
  10. "sort"
  11. "time"
  12. "code.gitea.io/gitea/models/actions"
  13. "code.gitea.io/gitea/modules/log"
  14. "code.gitea.io/gitea/modules/storage"
  15. )
  16. func saveUploadChunk(st storage.ObjectStorage, ctx *ArtifactContext,
  17. artifact *actions.ActionArtifact,
  18. contentSize, runID int64,
  19. ) (int64, error) {
  20. // parse content-range header, format: bytes 0-1023/146515
  21. contentRange := ctx.Req.Header.Get("Content-Range")
  22. start, end, length := int64(0), int64(0), int64(0)
  23. if _, err := fmt.Sscanf(contentRange, "bytes %d-%d/%d", &start, &end, &length); err != nil {
  24. log.Warn("parse content range error: %v, content-range: %s", err, contentRange)
  25. return -1, fmt.Errorf("parse content range error: %v", err)
  26. }
  27. // build chunk store path
  28. storagePath := fmt.Sprintf("tmp%d/%d-%d-%d-%d.chunk", runID, runID, artifact.ID, start, end)
  29. // use io.TeeReader to avoid reading all body to md5 sum.
  30. // it writes data to hasher after reading end
  31. // if hash is not matched, delete the read-end result
  32. hasher := md5.New()
  33. r := io.TeeReader(ctx.Req.Body, hasher)
  34. // save chunk to storage
  35. writtenSize, err := st.Save(storagePath, r, -1)
  36. if err != nil {
  37. return -1, fmt.Errorf("save chunk to storage error: %v", err)
  38. }
  39. // check md5
  40. reqMd5String := ctx.Req.Header.Get(artifactXActionsResultsMD5Header)
  41. chunkMd5String := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
  42. log.Info("[artifact] check chunk md5, sum: %s, header: %s", chunkMd5String, reqMd5String)
  43. // if md5 not match, delete the chunk
  44. if reqMd5String != chunkMd5String || writtenSize != contentSize {
  45. if err := st.Delete(storagePath); err != nil {
  46. log.Error("Error deleting chunk: %s, %v", storagePath, err)
  47. }
  48. return -1, fmt.Errorf("md5 not match")
  49. }
  50. log.Info("[artifact] save chunk %s, size: %d, artifact id: %d, start: %d, end: %d",
  51. storagePath, contentSize, artifact.ID, start, end)
  52. // return chunk total size
  53. return length, nil
  54. }
  55. type chunkFileItem struct {
  56. RunID int64
  57. ArtifactID int64
  58. Start int64
  59. End int64
  60. Path string
  61. }
  62. func listChunksByRunID(st storage.ObjectStorage, runID int64) (map[int64][]*chunkFileItem, error) {
  63. storageDir := fmt.Sprintf("tmp%d", runID)
  64. var chunks []*chunkFileItem
  65. if err := st.IterateObjects(storageDir, func(fpath string, obj storage.Object) error {
  66. baseName := filepath.Base(fpath)
  67. // when read chunks from storage, it only contains storage dir and basename,
  68. // no matter the subdirectory setting in storage config
  69. item := chunkFileItem{Path: storageDir + "/" + baseName}
  70. if _, err := fmt.Sscanf(baseName, "%d-%d-%d-%d.chunk", &item.RunID, &item.ArtifactID, &item.Start, &item.End); err != nil {
  71. return fmt.Errorf("parse content range error: %v", err)
  72. }
  73. chunks = append(chunks, &item)
  74. return nil
  75. }); err != nil {
  76. return nil, err
  77. }
  78. // chunks group by artifact id
  79. chunksMap := make(map[int64][]*chunkFileItem)
  80. for _, c := range chunks {
  81. chunksMap[c.ArtifactID] = append(chunksMap[c.ArtifactID], c)
  82. }
  83. return chunksMap, nil
  84. }
  85. func mergeChunksForRun(ctx *ArtifactContext, st storage.ObjectStorage, runID int64, artifactName string) error {
  86. // read all db artifacts by name
  87. artifacts, err := actions.ListArtifactsByRunIDAndName(ctx, runID, artifactName)
  88. if err != nil {
  89. return err
  90. }
  91. // read all uploading chunks from storage
  92. chunksMap, err := listChunksByRunID(st, runID)
  93. if err != nil {
  94. return err
  95. }
  96. // range db artifacts to merge chunks
  97. for _, art := range artifacts {
  98. chunks, ok := chunksMap[art.ID]
  99. if !ok {
  100. log.Debug("artifact %d chunks not found", art.ID)
  101. continue
  102. }
  103. if err := mergeChunksForArtifact(ctx, chunks, st, art); err != nil {
  104. return err
  105. }
  106. }
  107. return nil
  108. }
  109. func mergeChunksForArtifact(ctx *ArtifactContext, chunks []*chunkFileItem, st storage.ObjectStorage, artifact *actions.ActionArtifact) error {
  110. sort.Slice(chunks, func(i, j int) bool {
  111. return chunks[i].Start < chunks[j].Start
  112. })
  113. allChunks := make([]*chunkFileItem, 0)
  114. startAt := int64(-1)
  115. // check if all chunks are uploaded and in order and clean repeated chunks
  116. for _, c := range chunks {
  117. // startAt is -1 means this is the first chunk
  118. // previous c.ChunkEnd + 1 == c.ChunkStart means this chunk is in order
  119. // StartAt is not -1 and c.ChunkStart is not startAt + 1 means there is a chunk missing
  120. if c.Start == (startAt + 1) {
  121. allChunks = append(allChunks, c)
  122. startAt = c.End
  123. }
  124. }
  125. // if the last chunk.End + 1 is not equal to chunk.ChunkLength, means chunks are not uploaded completely
  126. if startAt+1 != artifact.FileCompressedSize {
  127. log.Debug("[artifact] chunks are not uploaded completely, artifact_id: %d", artifact.ID)
  128. return nil
  129. }
  130. // use multiReader
  131. readers := make([]io.Reader, 0, len(allChunks))
  132. closeReaders := func() {
  133. for _, r := range readers {
  134. _ = r.(io.Closer).Close() // it guarantees to be io.Closer by the following loop's Open function
  135. }
  136. readers = nil
  137. }
  138. defer closeReaders()
  139. for _, c := range allChunks {
  140. var readCloser io.ReadCloser
  141. var err error
  142. if readCloser, err = st.Open(c.Path); err != nil {
  143. return fmt.Errorf("open chunk error: %v, %s", err, c.Path)
  144. }
  145. readers = append(readers, readCloser)
  146. }
  147. mergedReader := io.MultiReader(readers...)
  148. // if chunk is gzip, use gz as extension
  149. // download-artifact action will use content-encoding header to decide if it should decompress the file
  150. extension := "chunk"
  151. if artifact.ContentEncoding == "gzip" {
  152. extension = "chunk.gz"
  153. }
  154. // save merged file
  155. storagePath := fmt.Sprintf("%d/%d/%d.%s", artifact.RunID%255, artifact.ID%255, time.Now().UnixNano(), extension)
  156. written, err := st.Save(storagePath, mergedReader, -1)
  157. if err != nil {
  158. return fmt.Errorf("save merged file error: %v", err)
  159. }
  160. if written != artifact.FileCompressedSize {
  161. return fmt.Errorf("merged file size is not equal to chunk length")
  162. }
  163. defer func() {
  164. closeReaders() // close before delete
  165. // drop chunks
  166. for _, c := range chunks {
  167. if err := st.Delete(c.Path); err != nil {
  168. log.Warn("Error deleting chunk: %s, %v", c.Path, err)
  169. }
  170. }
  171. }()
  172. // save storage path to artifact
  173. log.Debug("[artifact] merge chunks to artifact: %d, %s, old:%s", artifact.ID, storagePath, artifact.StoragePath)
  174. // if artifact is already uploaded, delete the old file
  175. if artifact.StoragePath != "" {
  176. if err := st.Delete(artifact.StoragePath); err != nil {
  177. log.Warn("Error deleting old artifact: %s, %v", artifact.StoragePath, err)
  178. }
  179. }
  180. artifact.StoragePath = storagePath
  181. artifact.Status = int64(actions.ArtifactStatusUploadConfirmed)
  182. if err := actions.UpdateArtifactByID(ctx, artifact.ID, artifact); err != nil {
  183. return fmt.Errorf("update artifact error: %v", err)
  184. }
  185. return nil
  186. }