diff options
Diffstat (limited to 'routers/api/actions/runner/runner.go')
-rw-r--r-- | routers/api/actions/runner/runner.go | 221 |
1 files changed, 221 insertions, 0 deletions
diff --git a/routers/api/actions/runner/runner.go b/routers/api/actions/runner/runner.go new file mode 100644 index 0000000000..7dbab9da0a --- /dev/null +++ b/routers/api/actions/runner/runner.go @@ -0,0 +1,221 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package runner + +import ( + "context" + "errors" + "net/http" + + actions_model "code.gitea.io/gitea/models/actions" + "code.gitea.io/gitea/modules/actions" + "code.gitea.io/gitea/modules/json" + "code.gitea.io/gitea/modules/log" + actions_service "code.gitea.io/gitea/services/actions" + + runnerv1 "code.gitea.io/actions-proto-go/runner/v1" + "code.gitea.io/actions-proto-go/runner/v1/runnerv1connect" + "github.com/bufbuild/connect-go" + gouuid "github.com/google/uuid" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func NewRunnerServiceHandler() (string, http.Handler) { + return runnerv1connect.NewRunnerServiceHandler( + &Service{}, + connect.WithCompressMinBytes(1024), + withRunner, + ) +} + +var _ runnerv1connect.RunnerServiceClient = (*Service)(nil) + +type Service struct { + runnerv1connect.UnimplementedRunnerServiceHandler +} + +// Register for new runner. +func (s *Service) Register( + ctx context.Context, + req *connect.Request[runnerv1.RegisterRequest], +) (*connect.Response[runnerv1.RegisterResponse], error) { + if req.Msg.Token == "" || req.Msg.Name == "" { + return nil, errors.New("missing runner token, name") + } + + runnerToken, err := actions_model.GetRunnerToken(ctx, req.Msg.Token) + if err != nil { + return nil, errors.New("runner token not found") + } + + if runnerToken.IsActive { + return nil, errors.New("runner token has already activated") + } + + // create new runner + runner := &actions_model.ActionRunner{ + UUID: gouuid.New().String(), + Name: req.Msg.Name, + OwnerID: runnerToken.OwnerID, + RepoID: runnerToken.RepoID, + AgentLabels: req.Msg.AgentLabels, + CustomLabels: req.Msg.CustomLabels, + } + if err := runner.GenerateToken(); err != nil { + return nil, errors.New("can't generate token") + } + + // create new runner + if err := actions_model.CreateRunner(ctx, runner); err != nil { + return nil, errors.New("can't create new runner") + } + + // update token status + runnerToken.IsActive = true + if err := actions_model.UpdateRunnerToken(ctx, runnerToken, "is_active"); err != nil { + return nil, errors.New("can't update runner token status") + } + + res := connect.NewResponse(&runnerv1.RegisterResponse{ + Runner: &runnerv1.Runner{ + Id: runner.ID, + Uuid: runner.UUID, + Token: runner.Token, + Name: runner.Name, + AgentLabels: runner.AgentLabels, + CustomLabels: runner.CustomLabels, + }, + }) + + return res, nil +} + +// FetchTask assigns a task to the runner +func (s *Service) FetchTask( + ctx context.Context, + req *connect.Request[runnerv1.FetchTaskRequest], +) (*connect.Response[runnerv1.FetchTaskResponse], error) { + runner := GetRunner(ctx) + + var task *runnerv1.Task + if t, ok, err := pickTask(ctx, runner); err != nil { + log.Error("pick task failed: %v", err) + return nil, status.Errorf(codes.Internal, "pick task: %v", err) + } else if ok { + task = t + } + + res := connect.NewResponse(&runnerv1.FetchTaskResponse{ + Task: task, + }) + return res, nil +} + +// UpdateTask updates the task status. +func (s *Service) UpdateTask( + ctx context.Context, + req *connect.Request[runnerv1.UpdateTaskRequest], +) (*connect.Response[runnerv1.UpdateTaskResponse], error) { + { + // to debug strange runner behaviors, it could be removed if all problems have been solved. + stateMsg, _ := json.Marshal(req.Msg.State) + log.Trace("update task with state: %s", stateMsg) + } + + // Get Task first + task, err := actions_model.GetTaskByID(ctx, req.Msg.State.Id) + if err != nil { + return nil, status.Errorf(codes.Internal, "can't find the task: %v", err) + } + if task.Status.IsCancelled() { + return connect.NewResponse(&runnerv1.UpdateTaskResponse{ + State: &runnerv1.TaskState{ + Id: req.Msg.State.Id, + Result: task.Status.AsResult(), + }, + }), nil + } + + task, err = actions_model.UpdateTaskByState(ctx, req.Msg.State) + if err != nil { + return nil, status.Errorf(codes.Internal, "update task: %v", err) + } + + if err := task.LoadJob(ctx); err != nil { + return nil, status.Errorf(codes.Internal, "load job: %v", err) + } + + if err := actions_service.CreateCommitStatus(ctx, task.Job); err != nil { + log.Error("Update commit status failed: %v", err) + // go on + } + + if req.Msg.State.Result != runnerv1.Result_RESULT_UNSPECIFIED { + if err := actions_service.EmitJobsIfReady(task.Job.RunID); err != nil { + log.Error("Emit ready jobs of run %d: %v", task.Job.RunID, err) + } + } + + return connect.NewResponse(&runnerv1.UpdateTaskResponse{ + State: &runnerv1.TaskState{ + Id: req.Msg.State.Id, + Result: task.Status.AsResult(), + }, + }), nil +} + +// UpdateLog uploads log of the task. +func (s *Service) UpdateLog( + ctx context.Context, + req *connect.Request[runnerv1.UpdateLogRequest], +) (*connect.Response[runnerv1.UpdateLogResponse], error) { + res := connect.NewResponse(&runnerv1.UpdateLogResponse{}) + + task, err := actions_model.GetTaskByID(ctx, req.Msg.TaskId) + if err != nil { + return nil, status.Errorf(codes.Internal, "get task: %v", err) + } + ack := task.LogLength + + if len(req.Msg.Rows) == 0 || req.Msg.Index > ack || int64(len(req.Msg.Rows))+req.Msg.Index <= ack { + res.Msg.AckIndex = ack + return res, nil + } + + if task.LogInStorage { + return nil, status.Errorf(codes.AlreadyExists, "log file has been archived") + } + + rows := req.Msg.Rows[ack-req.Msg.Index:] + ns, err := actions.WriteLogs(ctx, task.LogFilename, task.LogSize, rows) + if err != nil { + return nil, status.Errorf(codes.Internal, "write logs: %v", err) + } + task.LogLength += int64(len(rows)) + for _, n := range ns { + task.LogIndexes = append(task.LogIndexes, task.LogSize) + task.LogSize += int64(n) + } + + res.Msg.AckIndex = task.LogLength + + var remove func() + if req.Msg.NoMore { + task.LogInStorage = true + remove, err = actions.TransferLogs(ctx, task.LogFilename) + if err != nil { + return nil, status.Errorf(codes.Internal, "transfer logs: %v", err) + } + } + + if err := actions_model.UpdateTask(ctx, task, "log_indexes", "log_length", "log_size", "log_in_storage"); err != nil { + return nil, status.Errorf(codes.Internal, "update task: %v", err) + } + if remove != nil { + remove() + } + + return res, nil +} |