]> source.dussan.org Git - gitea.git/commitdiff
Reduce unnecessary DB queries for Actions tasks (#25199)
authorsillyguodong <33891828+sillyguodong@users.noreply.github.com>
Mon, 24 Jul 2023 06:11:27 +0000 (14:11 +0800)
committerGitHub <noreply@github.com>
Mon, 24 Jul 2023 06:11:27 +0000 (06:11 +0000)
Close #24544

Changes:

- Create `action_tasks_version` table to store the latest version of
each scope (global, org and repo).
- When a job with the status of `waiting` is created, the tasks version
of the scopes it belongs to will increase.
- When the status of a job already in the database is updated to
`waiting`, the tasks version of the scopes it belongs to will increase.
- On Gitea side, in `FeatchTask()`, will try to query the
`action_tasks_version` record of the scope of the runner that call
`FetchTask()`. If the record does not exist, will insert a row. Then,
Gitea will compare the version passed from runner to Gitea with the
version in database, if inconsistent, try pick task. Gitea always
returns the latest version from database to the runner.

Related:

- Protocol: https://gitea.com/gitea/actions-proto-def/pulls/10
- Runner: https://gitea.com/gitea/act_runner/pulls/219

models/actions/run.go
models/actions/run_job.go
models/actions/task.go
models/actions/tasks_version.go [new file with mode: 0644]
models/migrations/migrations.go
models/migrations/v1_21/v267.go [new file with mode: 0644]
routers/api/actions/runner/runner.go

index 7b62ff884f4caa7a3d3ec067fa4d63124ac354bb..5396c612f6e38916c7cdd43ab54fdb7bd9aa09c7 100644 (file)
@@ -195,6 +195,7 @@ func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWork
        }
 
        runJobs := make([]*ActionRunJob, 0, len(jobs))
+       var hasWaiting bool
        for _, v := range jobs {
                id, job := v.Job()
                needs := job.Needs()
@@ -205,6 +206,8 @@ func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWork
                status := StatusWaiting
                if len(needs) > 0 || run.NeedApproval {
                        status = StatusBlocked
+               } else {
+                       hasWaiting = true
                }
                job.Name, _ = util.SplitStringAtByteN(job.Name, 255)
                runJobs = append(runJobs, &ActionRunJob{
@@ -225,6 +228,13 @@ func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWork
                return err
        }
 
+       // if there is a job in the waiting status, increase tasks version.
+       if hasWaiting {
+               if err := IncreaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil {
+                       return err
+               }
+       }
+
        return commiter.Commit()
 }
 
index 0002e507704d536e14b08f3d38b5cc8eceb24580..c7620cd8bca2f849d7207651c1d484090fb8a292 100644 (file)
@@ -111,6 +111,13 @@ func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, col
                return affected, nil
        }
 
+       if affected != 0 && util.SliceContains(cols, "status") && job.Status.IsWaiting() {
+               // if the status of job changes to waiting again, increase tasks version.
+               if err := IncreaseTaskVersion(ctx, job.OwnerID, job.RepoID); err != nil {
+                       return affected, err
+               }
+       }
+
        if job.RunID == 0 {
                var err error
                if job, err = GetRunJobByID(ctx, job.ID); err != nil {
index 55044ec82d105c01c5cb212bb4dfde3255786b6f..9cc0fd0df83dbd9523763470c1318b187f2ed1ff 100644 (file)
@@ -215,12 +215,11 @@ func GetRunningTaskByToken(ctx context.Context, token string) (*ActionTask, erro
 }
 
 func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask, bool, error) {
-       dbCtx, commiter, err := db.TxContext(ctx)
+       ctx, commiter, err := db.TxContext(ctx)
        if err != nil {
                return nil, false, err
        }
        defer commiter.Close()
-       ctx = dbCtx.WithContext(ctx)
 
        e := db.GetEngine(ctx)
 
diff --git a/models/actions/tasks_version.go b/models/actions/tasks_version.go
new file mode 100644 (file)
index 0000000..5c0a865
--- /dev/null
@@ -0,0 +1,105 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package actions
+
+import (
+       "context"
+
+       "code.gitea.io/gitea/models/db"
+       "code.gitea.io/gitea/modules/log"
+       "code.gitea.io/gitea/modules/timeutil"
+)
+
+// ActionTasksVersion
+// If both ownerID and repoID is zero, its scope is global.
+// If ownerID is not zero and repoID is zero, its scope is org (there is no user-level runner currrently).
+// If ownerID is zero and repoID is not zero, its scope is repo.
+type ActionTasksVersion struct {
+       ID          int64 `xorm:"pk autoincr"`
+       OwnerID     int64 `xorm:"UNIQUE(owner_repo)"`
+       RepoID      int64 `xorm:"INDEX UNIQUE(owner_repo)"`
+       Version     int64
+       CreatedUnix timeutil.TimeStamp `xorm:"created"`
+       UpdatedUnix timeutil.TimeStamp `xorm:"updated"`
+}
+
+func init() {
+       db.RegisterModel(new(ActionTasksVersion))
+}
+
+func GetTasksVersionByScope(ctx context.Context, ownerID, repoID int64) (int64, error) {
+       var tasksVersion ActionTasksVersion
+       has, err := db.GetEngine(ctx).Where("owner_id = ? AND repo_id = ?", ownerID, repoID).Get(&tasksVersion)
+       if err != nil {
+               return 0, err
+       } else if !has {
+               return 0, nil
+       }
+       return tasksVersion.Version, err
+}
+
+func insertTasksVersion(ctx context.Context, ownerID, repoID int64) (*ActionTasksVersion, error) {
+       tasksVersion := &ActionTasksVersion{
+               OwnerID: ownerID,
+               RepoID:  repoID,
+               Version: 1,
+       }
+       if _, err := db.GetEngine(ctx).Insert(tasksVersion); err != nil {
+               return nil, err
+       }
+       return tasksVersion, nil
+}
+
+func increaseTasksVersionByScope(ctx context.Context, ownerID, repoID int64) error {
+       result, err := db.GetEngine(ctx).Exec("UPDATE action_tasks_version SET version = version + 1 WHERE owner_id = ? AND repo_id = ?", ownerID, repoID)
+       if err != nil {
+               return err
+       }
+       affected, err := result.RowsAffected()
+       if err != nil {
+               return err
+       }
+
+       if affected == 0 {
+               // if update sql does not affect any rows, the database may be broken,
+               // so re-insert the row of version data here.
+               if _, err := insertTasksVersion(ctx, ownerID, repoID); err != nil {
+                       return err
+               }
+       }
+
+       return nil
+}
+
+func IncreaseTaskVersion(ctx context.Context, ownerID, repoID int64) error {
+       ctx, commiter, err := db.TxContext(ctx)
+       if err != nil {
+               return err
+       }
+       defer commiter.Close()
+
+       // 1. increase global
+       if err := increaseTasksVersionByScope(ctx, 0, 0); err != nil {
+               log.Error("IncreaseTasksVersionByScope(Global): %v", err)
+               return err
+       }
+
+       // 2. increase owner
+       if ownerID > 0 {
+               if err := increaseTasksVersionByScope(ctx, ownerID, 0); err != nil {
+                       log.Error("IncreaseTasksVersionByScope(Owner): %v", err)
+                       return err
+               }
+       }
+
+       // 3. increase repo
+       if repoID > 0 {
+               if err := increaseTasksVersionByScope(ctx, 0, repoID); err != nil {
+                       log.Error("IncreaseTasksVersionByScope(Repo): %v", err)
+                       return err
+               }
+       }
+
+       return commiter.Commit()
+}
index 6599cb9cda3eb282e4294fb135cfcfa51e5e19fd..bfe4b56cd1ce9372429f583159273686a734048b 100644 (file)
@@ -515,6 +515,8 @@ var migrations = []Migration{
        NewMigration("Alter Actions Artifact table", v1_21.AlterActionArtifactTable),
        // v266 -> v267
        NewMigration("Reduce commit status", v1_21.ReduceCommitStatus),
+       // v267 -> v268
+       NewMigration("Add action_tasks_version table", v1_21.CreateActionTasksVersionTable),
 }
 
 // GetCurrentDBVersion returns the current db version
diff --git a/models/migrations/v1_21/v267.go b/models/migrations/v1_21/v267.go
new file mode 100644 (file)
index 0000000..bc0e954
--- /dev/null
@@ -0,0 +1,23 @@
+// Copyright 2023 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package v1_21 //nolint
+
+import (
+       "code.gitea.io/gitea/modules/timeutil"
+
+       "xorm.io/xorm"
+)
+
+func CreateActionTasksVersionTable(x *xorm.Engine) error {
+       type ActionTasksVersion struct {
+               ID          int64 `xorm:"pk autoincr"`
+               OwnerID     int64 `xorm:"UNIQUE(owner_repo)"`
+               RepoID      int64 `xorm:"INDEX UNIQUE(owner_repo)"`
+               Version     int64
+               CreatedUnix timeutil.TimeStamp `xorm:"created"`
+               UpdatedUnix timeutil.TimeStamp `xorm:"updated"`
+       }
+
+       return x.Sync(new(ActionTasksVersion))
+}
index 17801cb32202f852b78d66774219a7600adee06e..6de5964cb77bccc20df65fd9681c920e411db6ff 100644 (file)
@@ -127,20 +127,39 @@ func (s *Service) Declare(
 // FetchTask assigns a task to the runner
 func (s *Service) FetchTask(
        ctx context.Context,
-       _ *connect.Request[runnerv1.FetchTaskRequest],
+       req *connect.Request[runnerv1.FetchTaskRequest],
 ) (*connect.Response[runnerv1.FetchTaskResponse], error) {
        runner := GetRunner(ctx)
 
        var task *runnerv1.Task
-       if t, ok, err := pickTask(ctx, runner); err != nil {
-               log.Error("pick task failed: %v", err)
-               return nil, status.Errorf(codes.Internal, "pick task: %v", err)
-       } else if ok {
-               task = t
+       tasksVersion := req.Msg.TasksVersion // task version from runner
+       latestVersion, err := actions_model.GetTasksVersionByScope(ctx, runner.OwnerID, runner.RepoID)
+       if err != nil {
+               return nil, status.Errorf(codes.Internal, "query tasks version failed: %v", err)
+       } else if latestVersion == 0 {
+               if err := actions_model.IncreaseTaskVersion(ctx, runner.OwnerID, runner.RepoID); err != nil {
+                       return nil, status.Errorf(codes.Internal, "fail to increase task version: %v", err)
+               }
+               // if we don't increase the value of `latestVersion` here,
+               // the response of FetchTask will return tasksVersion as zero.
+               // and the runner will treat it as an old version of Gitea.
+               latestVersion++
        }
 
+       if tasksVersion != latestVersion {
+               // if the task version in request is not equal to the version in db,
+               // it means there may still be some tasks not be assgined.
+               // try to pick a task for the runner that send the request.
+               if t, ok, err := pickTask(ctx, runner); err != nil {
+                       log.Error("pick task failed: %v", err)
+                       return nil, status.Errorf(codes.Internal, "pick task: %v", err)
+               } else if ok {
+                       task = t
+               }
+       }
        res := connect.NewResponse(&runnerv1.FetchTaskResponse{
-               Task: task,
+               Task:         task,
+               TasksVersion: latestVersion,
        })
        return res, nil
 }