summaryrefslogtreecommitdiffstats
path: root/routers/api/actions/runner/runner.go
diff options
context:
space:
mode:
Diffstat (limited to 'routers/api/actions/runner/runner.go')
-rw-r--r--routers/api/actions/runner/runner.go221
1 files changed, 221 insertions, 0 deletions
diff --git a/routers/api/actions/runner/runner.go b/routers/api/actions/runner/runner.go
new file mode 100644
index 0000000000..7dbab9da0a
--- /dev/null
+++ b/routers/api/actions/runner/runner.go
@@ -0,0 +1,221 @@
+// Copyright 2022 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package runner
+
+import (
+ "context"
+ "errors"
+ "net/http"
+
+ actions_model "code.gitea.io/gitea/models/actions"
+ "code.gitea.io/gitea/modules/actions"
+ "code.gitea.io/gitea/modules/json"
+ "code.gitea.io/gitea/modules/log"
+ actions_service "code.gitea.io/gitea/services/actions"
+
+ runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
+ "code.gitea.io/actions-proto-go/runner/v1/runnerv1connect"
+ "github.com/bufbuild/connect-go"
+ gouuid "github.com/google/uuid"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+func NewRunnerServiceHandler() (string, http.Handler) {
+ return runnerv1connect.NewRunnerServiceHandler(
+ &Service{},
+ connect.WithCompressMinBytes(1024),
+ withRunner,
+ )
+}
+
+var _ runnerv1connect.RunnerServiceClient = (*Service)(nil)
+
+type Service struct {
+ runnerv1connect.UnimplementedRunnerServiceHandler
+}
+
+// Register for new runner.
+func (s *Service) Register(
+ ctx context.Context,
+ req *connect.Request[runnerv1.RegisterRequest],
+) (*connect.Response[runnerv1.RegisterResponse], error) {
+ if req.Msg.Token == "" || req.Msg.Name == "" {
+ return nil, errors.New("missing runner token, name")
+ }
+
+ runnerToken, err := actions_model.GetRunnerToken(ctx, req.Msg.Token)
+ if err != nil {
+ return nil, errors.New("runner token not found")
+ }
+
+ if runnerToken.IsActive {
+ return nil, errors.New("runner token has already activated")
+ }
+
+ // create new runner
+ runner := &actions_model.ActionRunner{
+ UUID: gouuid.New().String(),
+ Name: req.Msg.Name,
+ OwnerID: runnerToken.OwnerID,
+ RepoID: runnerToken.RepoID,
+ AgentLabels: req.Msg.AgentLabels,
+ CustomLabels: req.Msg.CustomLabels,
+ }
+ if err := runner.GenerateToken(); err != nil {
+ return nil, errors.New("can't generate token")
+ }
+
+ // create new runner
+ if err := actions_model.CreateRunner(ctx, runner); err != nil {
+ return nil, errors.New("can't create new runner")
+ }
+
+ // update token status
+ runnerToken.IsActive = true
+ if err := actions_model.UpdateRunnerToken(ctx, runnerToken, "is_active"); err != nil {
+ return nil, errors.New("can't update runner token status")
+ }
+
+ res := connect.NewResponse(&runnerv1.RegisterResponse{
+ Runner: &runnerv1.Runner{
+ Id: runner.ID,
+ Uuid: runner.UUID,
+ Token: runner.Token,
+ Name: runner.Name,
+ AgentLabels: runner.AgentLabels,
+ CustomLabels: runner.CustomLabels,
+ },
+ })
+
+ return res, nil
+}
+
+// FetchTask assigns a task to the runner
+func (s *Service) FetchTask(
+ ctx context.Context,
+ req *connect.Request[runnerv1.FetchTaskRequest],
+) (*connect.Response[runnerv1.FetchTaskResponse], error) {
+ runner := GetRunner(ctx)
+
+ var task *runnerv1.Task
+ if t, ok, err := pickTask(ctx, runner); err != nil {
+ log.Error("pick task failed: %v", err)
+ return nil, status.Errorf(codes.Internal, "pick task: %v", err)
+ } else if ok {
+ task = t
+ }
+
+ res := connect.NewResponse(&runnerv1.FetchTaskResponse{
+ Task: task,
+ })
+ return res, nil
+}
+
+// UpdateTask updates the task status.
+func (s *Service) UpdateTask(
+ ctx context.Context,
+ req *connect.Request[runnerv1.UpdateTaskRequest],
+) (*connect.Response[runnerv1.UpdateTaskResponse], error) {
+ {
+ // to debug strange runner behaviors, it could be removed if all problems have been solved.
+ stateMsg, _ := json.Marshal(req.Msg.State)
+ log.Trace("update task with state: %s", stateMsg)
+ }
+
+ // Get Task first
+ task, err := actions_model.GetTaskByID(ctx, req.Msg.State.Id)
+ if err != nil {
+ return nil, status.Errorf(codes.Internal, "can't find the task: %v", err)
+ }
+ if task.Status.IsCancelled() {
+ return connect.NewResponse(&runnerv1.UpdateTaskResponse{
+ State: &runnerv1.TaskState{
+ Id: req.Msg.State.Id,
+ Result: task.Status.AsResult(),
+ },
+ }), nil
+ }
+
+ task, err = actions_model.UpdateTaskByState(ctx, req.Msg.State)
+ if err != nil {
+ return nil, status.Errorf(codes.Internal, "update task: %v", err)
+ }
+
+ if err := task.LoadJob(ctx); err != nil {
+ return nil, status.Errorf(codes.Internal, "load job: %v", err)
+ }
+
+ if err := actions_service.CreateCommitStatus(ctx, task.Job); err != nil {
+ log.Error("Update commit status failed: %v", err)
+ // go on
+ }
+
+ if req.Msg.State.Result != runnerv1.Result_RESULT_UNSPECIFIED {
+ if err := actions_service.EmitJobsIfReady(task.Job.RunID); err != nil {
+ log.Error("Emit ready jobs of run %d: %v", task.Job.RunID, err)
+ }
+ }
+
+ return connect.NewResponse(&runnerv1.UpdateTaskResponse{
+ State: &runnerv1.TaskState{
+ Id: req.Msg.State.Id,
+ Result: task.Status.AsResult(),
+ },
+ }), nil
+}
+
+// UpdateLog uploads log of the task.
+func (s *Service) UpdateLog(
+ ctx context.Context,
+ req *connect.Request[runnerv1.UpdateLogRequest],
+) (*connect.Response[runnerv1.UpdateLogResponse], error) {
+ res := connect.NewResponse(&runnerv1.UpdateLogResponse{})
+
+ task, err := actions_model.GetTaskByID(ctx, req.Msg.TaskId)
+ if err != nil {
+ return nil, status.Errorf(codes.Internal, "get task: %v", err)
+ }
+ ack := task.LogLength
+
+ if len(req.Msg.Rows) == 0 || req.Msg.Index > ack || int64(len(req.Msg.Rows))+req.Msg.Index <= ack {
+ res.Msg.AckIndex = ack
+ return res, nil
+ }
+
+ if task.LogInStorage {
+ return nil, status.Errorf(codes.AlreadyExists, "log file has been archived")
+ }
+
+ rows := req.Msg.Rows[ack-req.Msg.Index:]
+ ns, err := actions.WriteLogs(ctx, task.LogFilename, task.LogSize, rows)
+ if err != nil {
+ return nil, status.Errorf(codes.Internal, "write logs: %v", err)
+ }
+ task.LogLength += int64(len(rows))
+ for _, n := range ns {
+ task.LogIndexes = append(task.LogIndexes, task.LogSize)
+ task.LogSize += int64(n)
+ }
+
+ res.Msg.AckIndex = task.LogLength
+
+ var remove func()
+ if req.Msg.NoMore {
+ task.LogInStorage = true
+ remove, err = actions.TransferLogs(ctx, task.LogFilename)
+ if err != nil {
+ return nil, status.Errorf(codes.Internal, "transfer logs: %v", err)
+ }
+ }
+
+ if err := actions_model.UpdateTask(ctx, task, "log_indexes", "log_length", "log_size", "log_in_storage"); err != nil {
+ return nil, status.Errorf(codes.Internal, "update task: %v", err)
+ }
+ if remove != nil {
+ remove()
+ }
+
+ return res, nil
+}