You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

runner.go 8.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. // Copyright 2022 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package runner
  4. import (
  5. "context"
  6. "errors"
  7. "net/http"
  8. actions_model "code.gitea.io/gitea/models/actions"
  9. "code.gitea.io/gitea/modules/actions"
  10. "code.gitea.io/gitea/modules/log"
  11. "code.gitea.io/gitea/modules/util"
  12. actions_service "code.gitea.io/gitea/services/actions"
  13. runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
  14. "code.gitea.io/actions-proto-go/runner/v1/runnerv1connect"
  15. "connectrpc.com/connect"
  16. gouuid "github.com/google/uuid"
  17. "google.golang.org/grpc/codes"
  18. "google.golang.org/grpc/status"
  19. )
  20. func NewRunnerServiceHandler() (string, http.Handler) {
  21. return runnerv1connect.NewRunnerServiceHandler(
  22. &Service{},
  23. connect.WithCompressMinBytes(1024),
  24. withRunner,
  25. )
  26. }
  27. var _ runnerv1connect.RunnerServiceClient = (*Service)(nil)
  28. type Service struct{}
  29. // Register for new runner.
  30. func (s *Service) Register(
  31. ctx context.Context,
  32. req *connect.Request[runnerv1.RegisterRequest],
  33. ) (*connect.Response[runnerv1.RegisterResponse], error) {
  34. if req.Msg.Token == "" || req.Msg.Name == "" {
  35. return nil, errors.New("missing runner token, name")
  36. }
  37. runnerToken, err := actions_model.GetRunnerToken(ctx, req.Msg.Token)
  38. if err != nil {
  39. return nil, errors.New("runner registration token not found")
  40. }
  41. if !runnerToken.IsActive {
  42. return nil, errors.New("runner registration token has been invalidated, please use the latest one")
  43. }
  44. labels := req.Msg.Labels
  45. // TODO: agent_labels should be removed from pb after Gitea 1.20 released.
  46. // Old version runner's agent_labels slice is not empty and labels slice is empty.
  47. // And due to compatibility with older versions, it is temporarily marked as Deprecated in pb, so use `//nolint` here.
  48. if len(req.Msg.AgentLabels) > 0 && len(req.Msg.Labels) == 0 { //nolint:staticcheck
  49. labels = req.Msg.AgentLabels //nolint:staticcheck
  50. }
  51. // create new runner
  52. name, _ := util.SplitStringAtByteN(req.Msg.Name, 255)
  53. runner := &actions_model.ActionRunner{
  54. UUID: gouuid.New().String(),
  55. Name: name,
  56. OwnerID: runnerToken.OwnerID,
  57. RepoID: runnerToken.RepoID,
  58. Version: req.Msg.Version,
  59. AgentLabels: labels,
  60. }
  61. if err := runner.GenerateToken(); err != nil {
  62. return nil, errors.New("can't generate token")
  63. }
  64. // create new runner
  65. if err := actions_model.CreateRunner(ctx, runner); err != nil {
  66. return nil, errors.New("can't create new runner")
  67. }
  68. // update token status
  69. runnerToken.IsActive = true
  70. if err := actions_model.UpdateRunnerToken(ctx, runnerToken, "is_active"); err != nil {
  71. return nil, errors.New("can't update runner token status")
  72. }
  73. res := connect.NewResponse(&runnerv1.RegisterResponse{
  74. Runner: &runnerv1.Runner{
  75. Id: runner.ID,
  76. Uuid: runner.UUID,
  77. Token: runner.Token,
  78. Name: runner.Name,
  79. Version: runner.Version,
  80. Labels: runner.AgentLabels,
  81. },
  82. })
  83. return res, nil
  84. }
  85. func (s *Service) Declare(
  86. ctx context.Context,
  87. req *connect.Request[runnerv1.DeclareRequest],
  88. ) (*connect.Response[runnerv1.DeclareResponse], error) {
  89. runner := GetRunner(ctx)
  90. runner.AgentLabels = req.Msg.Labels
  91. runner.Version = req.Msg.Version
  92. if err := actions_model.UpdateRunner(ctx, runner, "agent_labels", "version"); err != nil {
  93. return nil, status.Errorf(codes.Internal, "update runner: %v", err)
  94. }
  95. return connect.NewResponse(&runnerv1.DeclareResponse{
  96. Runner: &runnerv1.Runner{
  97. Id: runner.ID,
  98. Uuid: runner.UUID,
  99. Token: runner.Token,
  100. Name: runner.Name,
  101. Version: runner.Version,
  102. Labels: runner.AgentLabels,
  103. },
  104. }), nil
  105. }
  106. // FetchTask assigns a task to the runner
  107. func (s *Service) FetchTask(
  108. ctx context.Context,
  109. req *connect.Request[runnerv1.FetchTaskRequest],
  110. ) (*connect.Response[runnerv1.FetchTaskResponse], error) {
  111. runner := GetRunner(ctx)
  112. var task *runnerv1.Task
  113. tasksVersion := req.Msg.TasksVersion // task version from runner
  114. latestVersion, err := actions_model.GetTasksVersionByScope(ctx, runner.OwnerID, runner.RepoID)
  115. if err != nil {
  116. return nil, status.Errorf(codes.Internal, "query tasks version failed: %v", err)
  117. } else if latestVersion == 0 {
  118. if err := actions_model.IncreaseTaskVersion(ctx, runner.OwnerID, runner.RepoID); err != nil {
  119. return nil, status.Errorf(codes.Internal, "fail to increase task version: %v", err)
  120. }
  121. // if we don't increase the value of `latestVersion` here,
  122. // the response of FetchTask will return tasksVersion as zero.
  123. // and the runner will treat it as an old version of Gitea.
  124. latestVersion++
  125. }
  126. if tasksVersion != latestVersion {
  127. // if the task version in request is not equal to the version in db,
  128. // it means there may still be some tasks not be assgined.
  129. // try to pick a task for the runner that send the request.
  130. if t, ok, err := pickTask(ctx, runner); err != nil {
  131. log.Error("pick task failed: %v", err)
  132. return nil, status.Errorf(codes.Internal, "pick task: %v", err)
  133. } else if ok {
  134. task = t
  135. }
  136. }
  137. res := connect.NewResponse(&runnerv1.FetchTaskResponse{
  138. Task: task,
  139. TasksVersion: latestVersion,
  140. })
  141. return res, nil
  142. }
  143. // UpdateTask updates the task status.
  144. func (s *Service) UpdateTask(
  145. ctx context.Context,
  146. req *connect.Request[runnerv1.UpdateTaskRequest],
  147. ) (*connect.Response[runnerv1.UpdateTaskResponse], error) {
  148. task, err := actions_model.UpdateTaskByState(ctx, req.Msg.State)
  149. if err != nil {
  150. return nil, status.Errorf(codes.Internal, "update task: %v", err)
  151. }
  152. for k, v := range req.Msg.Outputs {
  153. if len(k) > 255 {
  154. log.Warn("Ignore the output of task %d because the key is too long: %q", task.ID, k)
  155. continue
  156. }
  157. // The value can be a maximum of 1 MB
  158. if l := len(v); l > 1024*1024 {
  159. log.Warn("Ignore the output %q of task %d because the value is too long: %v", k, task.ID, l)
  160. continue
  161. }
  162. // There's another limitation on GitHub that the total of all outputs in a workflow run can be a maximum of 50 MB.
  163. // We don't check the total size here because it's not easy to do, and it doesn't really worth it.
  164. // See https://docs.github.com/en/actions/using-jobs/defining-outputs-for-jobs
  165. if err := actions_model.InsertTaskOutputIfNotExist(ctx, task.ID, k, v); err != nil {
  166. log.Warn("Failed to insert the output %q of task %d: %v", k, task.ID, err)
  167. // It's ok not to return errors, the runner will resend the outputs.
  168. }
  169. }
  170. sentOutputs, err := actions_model.FindTaskOutputKeyByTaskID(ctx, task.ID)
  171. if err != nil {
  172. log.Warn("Failed to find the sent outputs of task %d: %v", task.ID, err)
  173. // It's not to return errors, it can be handled when the runner resends sent outputs.
  174. }
  175. if err := task.LoadJob(ctx); err != nil {
  176. return nil, status.Errorf(codes.Internal, "load job: %v", err)
  177. }
  178. if err := task.Job.LoadRun(ctx); err != nil {
  179. return nil, status.Errorf(codes.Internal, "load run: %v", err)
  180. }
  181. // don't create commit status for cron job
  182. if task.Job.Run.ScheduleID == 0 {
  183. actions_service.CreateCommitStatus(ctx, task.Job)
  184. }
  185. if req.Msg.State.Result != runnerv1.Result_RESULT_UNSPECIFIED {
  186. if err := actions_service.EmitJobsIfReady(task.Job.RunID); err != nil {
  187. log.Error("Emit ready jobs of run %d: %v", task.Job.RunID, err)
  188. }
  189. }
  190. return connect.NewResponse(&runnerv1.UpdateTaskResponse{
  191. State: &runnerv1.TaskState{
  192. Id: req.Msg.State.Id,
  193. Result: task.Status.AsResult(),
  194. },
  195. SentOutputs: sentOutputs,
  196. }), nil
  197. }
  198. // UpdateLog uploads log of the task.
  199. func (s *Service) UpdateLog(
  200. ctx context.Context,
  201. req *connect.Request[runnerv1.UpdateLogRequest],
  202. ) (*connect.Response[runnerv1.UpdateLogResponse], error) {
  203. res := connect.NewResponse(&runnerv1.UpdateLogResponse{})
  204. task, err := actions_model.GetTaskByID(ctx, req.Msg.TaskId)
  205. if err != nil {
  206. return nil, status.Errorf(codes.Internal, "get task: %v", err)
  207. }
  208. ack := task.LogLength
  209. if len(req.Msg.Rows) == 0 || req.Msg.Index > ack || int64(len(req.Msg.Rows))+req.Msg.Index <= ack {
  210. res.Msg.AckIndex = ack
  211. return res, nil
  212. }
  213. if task.LogInStorage {
  214. return nil, status.Errorf(codes.AlreadyExists, "log file has been archived")
  215. }
  216. rows := req.Msg.Rows[ack-req.Msg.Index:]
  217. ns, err := actions.WriteLogs(ctx, task.LogFilename, task.LogSize, rows)
  218. if err != nil {
  219. return nil, status.Errorf(codes.Internal, "write logs: %v", err)
  220. }
  221. task.LogLength += int64(len(rows))
  222. for _, n := range ns {
  223. task.LogIndexes = append(task.LogIndexes, task.LogSize)
  224. task.LogSize += int64(n)
  225. }
  226. res.Msg.AckIndex = task.LogLength
  227. var remove func()
  228. if req.Msg.NoMore {
  229. task.LogInStorage = true
  230. remove, err = actions.TransferLogs(ctx, task.LogFilename)
  231. if err != nil {
  232. return nil, status.Errorf(codes.Internal, "transfer logs: %v", err)
  233. }
  234. }
  235. if err := actions_model.UpdateTask(ctx, task, "log_indexes", "log_length", "log_size", "log_in_storage"); err != nil {
  236. return nil, status.Errorf(codes.Internal, "update task: %v", err)
  237. }
  238. if remove != nil {
  239. remove()
  240. }
  241. return res, nil
  242. }