]> source.dussan.org Git - gitea.git/commitdiff
Move task from modules to services (#17680)
authorLunny Xiao <xiaolunwen@gmail.com>
Thu, 18 Nov 2021 06:47:57 +0000 (14:47 +0800)
committerGitHub <noreply@github.com>
Thu, 18 Nov 2021 06:47:57 +0000 (14:47 +0800)
modules/task/migrate.go [deleted file]
modules/task/task.go [deleted file]
routers/init.go
routers/web/repo/migrate.go
services/task/migrate.go [new file with mode: 0644]
services/task/task.go [new file with mode: 0644]

diff --git a/modules/task/migrate.go b/modules/task/migrate.go
deleted file mode 100644 (file)
index 100aac1..0000000
+++ /dev/null
@@ -1,143 +0,0 @@
-// Copyright 2019 Gitea. All rights reserved.
-// Use of this source code is governed by a MIT-style
-// license that can be found in the LICENSE file.
-
-package task
-
-import (
-       "context"
-       "errors"
-       "fmt"
-       "strings"
-
-       "code.gitea.io/gitea/models"
-       "code.gitea.io/gitea/modules/graceful"
-       "code.gitea.io/gitea/modules/json"
-       "code.gitea.io/gitea/modules/log"
-       "code.gitea.io/gitea/modules/migration"
-       "code.gitea.io/gitea/modules/notification"
-       "code.gitea.io/gitea/modules/process"
-       "code.gitea.io/gitea/modules/structs"
-       "code.gitea.io/gitea/modules/timeutil"
-       "code.gitea.io/gitea/modules/util"
-       "code.gitea.io/gitea/services/migrations"
-)
-
-func handleCreateError(owner *models.User, err error) error {
-       switch {
-       case models.IsErrReachLimitOfRepo(err):
-               return fmt.Errorf("You have already reached your limit of %d repositories", owner.MaxCreationLimit())
-       case models.IsErrRepoAlreadyExist(err):
-               return errors.New("The repository name is already used")
-       case models.IsErrNameReserved(err):
-               return fmt.Errorf("The repository name '%s' is reserved", err.(models.ErrNameReserved).Name)
-       case models.IsErrNamePatternNotAllowed(err):
-               return fmt.Errorf("The pattern '%s' is not allowed in a repository name", err.(models.ErrNamePatternNotAllowed).Pattern)
-       default:
-               return err
-       }
-}
-
-func runMigrateTask(t *models.Task) (err error) {
-       defer func() {
-               if e := recover(); e != nil {
-                       err = fmt.Errorf("PANIC whilst trying to do migrate task: %v", e)
-                       log.Critical("PANIC during runMigrateTask[%d] by DoerID[%d] to RepoID[%d] for OwnerID[%d]: %v\nStacktrace: %v", t.ID, t.DoerID, t.RepoID, t.OwnerID, e, log.Stack(2))
-               }
-
-               if err == nil {
-                       err = models.FinishMigrateTask(t)
-                       if err == nil {
-                               notification.NotifyMigrateRepository(t.Doer, t.Owner, t.Repo)
-                               return
-                       }
-
-                       log.Error("FinishMigrateTask[%d] by DoerID[%d] to RepoID[%d] for OwnerID[%d] failed: %v", t.ID, t.DoerID, t.RepoID, t.OwnerID, err)
-               }
-
-               t.EndTime = timeutil.TimeStampNow()
-               t.Status = structs.TaskStatusFailed
-               t.Message = err.Error()
-               // Ensure that the repo loaded before we zero out the repo ID from the task - thus ensuring that we can delete it
-               _ = t.LoadRepo()
-
-               t.RepoID = 0
-               if err := t.UpdateCols("status", "errors", "repo_id", "end_time"); err != nil {
-                       log.Error("Task UpdateCols failed: %v", err)
-               }
-
-               if t.Repo != nil {
-                       if errDelete := models.DeleteRepository(t.Doer, t.OwnerID, t.Repo.ID); errDelete != nil {
-                               log.Error("DeleteRepository: %v", errDelete)
-                       }
-               }
-       }()
-
-       if err = t.LoadRepo(); err != nil {
-               return
-       }
-
-       // if repository is ready, then just finish the task
-       if t.Repo.Status == models.RepositoryReady {
-               return nil
-       }
-
-       if err = t.LoadDoer(); err != nil {
-               return
-       }
-       if err = t.LoadOwner(); err != nil {
-               return
-       }
-
-       var opts *migration.MigrateOptions
-       opts, err = t.MigrateConfig()
-       if err != nil {
-               return
-       }
-
-       opts.MigrateToRepoID = t.RepoID
-
-       ctx, cancel := context.WithCancel(graceful.GetManager().ShutdownContext())
-       defer cancel()
-       pm := process.GetManager()
-       pid := pm.Add(fmt.Sprintf("MigrateTask: %s/%s", t.Owner.Name, opts.RepoName), cancel)
-       defer pm.Remove(pid)
-
-       t.StartTime = timeutil.TimeStampNow()
-       t.Status = structs.TaskStatusRunning
-       if err = t.UpdateCols("start_time", "status"); err != nil {
-               return
-       }
-
-       t.Repo, err = migrations.MigrateRepository(ctx, t.Doer, t.Owner.Name, *opts, func(format string, args ...interface{}) {
-               message := models.TranslatableMessage{
-                       Format: format,
-                       Args:   args,
-               }
-               bs, _ := json.Marshal(message)
-               t.Message = string(bs)
-               _ = t.UpdateCols("message")
-       })
-       if err == nil {
-               log.Trace("Repository migrated [%d]: %s/%s", t.Repo.ID, t.Owner.Name, t.Repo.Name)
-               return
-       }
-
-       if models.IsErrRepoAlreadyExist(err) {
-               err = errors.New("The repository name is already used")
-               return
-       }
-
-       // remoteAddr may contain credentials, so we sanitize it
-       err = util.NewStringURLSanitizedError(err, opts.CloneAddr, true)
-       if strings.Contains(err.Error(), "Authentication failed") ||
-               strings.Contains(err.Error(), "could not read Username") {
-               return fmt.Errorf("Authentication failed: %v", err.Error())
-       } else if strings.Contains(err.Error(), "fatal:") {
-               return fmt.Errorf("Migration failed: %v", err.Error())
-       }
-
-       // do not be tempted to coalesce this line with the return
-       err = handleCreateError(t.Owner, err)
-       return
-}
diff --git a/modules/task/task.go b/modules/task/task.go
deleted file mode 100644 (file)
index f538b36..0000000
+++ /dev/null
@@ -1,130 +0,0 @@
-// Copyright 2019 Gitea. All rights reserved.
-// Use of this source code is governed by a MIT-style
-// license that can be found in the LICENSE file.
-
-package task
-
-import (
-       "fmt"
-
-       "code.gitea.io/gitea/models"
-       "code.gitea.io/gitea/modules/graceful"
-       "code.gitea.io/gitea/modules/json"
-       "code.gitea.io/gitea/modules/log"
-       base "code.gitea.io/gitea/modules/migration"
-       "code.gitea.io/gitea/modules/queue"
-       repo_module "code.gitea.io/gitea/modules/repository"
-       "code.gitea.io/gitea/modules/secret"
-       "code.gitea.io/gitea/modules/setting"
-       "code.gitea.io/gitea/modules/structs"
-       "code.gitea.io/gitea/modules/timeutil"
-       "code.gitea.io/gitea/modules/util"
-)
-
-// taskQueue is a global queue of tasks
-var taskQueue queue.Queue
-
-// Run a task
-func Run(t *models.Task) error {
-       switch t.Type {
-       case structs.TaskTypeMigrateRepo:
-               return runMigrateTask(t)
-       default:
-               return fmt.Errorf("Unknown task type: %d", t.Type)
-       }
-}
-
-// Init will start the service to get all unfinished tasks and run them
-func Init() error {
-       taskQueue = queue.CreateQueue("task", handle, &models.Task{})
-
-       if taskQueue == nil {
-               return fmt.Errorf("Unable to create Task Queue")
-       }
-
-       go graceful.GetManager().RunWithShutdownFns(taskQueue.Run)
-
-       return nil
-}
-
-func handle(data ...queue.Data) {
-       for _, datum := range data {
-               task := datum.(*models.Task)
-               if err := Run(task); err != nil {
-                       log.Error("Run task failed: %v", err)
-               }
-       }
-}
-
-// MigrateRepository add migration repository to task
-func MigrateRepository(doer, u *models.User, opts base.MigrateOptions) error {
-       task, err := CreateMigrateTask(doer, u, opts)
-       if err != nil {
-               return err
-       }
-
-       return taskQueue.Push(task)
-}
-
-// CreateMigrateTask creates a migrate task
-func CreateMigrateTask(doer, u *models.User, opts base.MigrateOptions) (*models.Task, error) {
-       // encrypt credentials for persistence
-       var err error
-       opts.CloneAddrEncrypted, err = secret.EncryptSecret(setting.SecretKey, opts.CloneAddr)
-       if err != nil {
-               return nil, err
-       }
-       opts.CloneAddr = util.NewStringURLSanitizer(opts.CloneAddr, true).Replace(opts.CloneAddr)
-       opts.AuthPasswordEncrypted, err = secret.EncryptSecret(setting.SecretKey, opts.AuthPassword)
-       if err != nil {
-               return nil, err
-       }
-       opts.AuthPassword = ""
-       opts.AuthTokenEncrypted, err = secret.EncryptSecret(setting.SecretKey, opts.AuthToken)
-       if err != nil {
-               return nil, err
-       }
-       opts.AuthToken = ""
-       bs, err := json.Marshal(&opts)
-       if err != nil {
-               return nil, err
-       }
-
-       var task = &models.Task{
-               DoerID:         doer.ID,
-               OwnerID:        u.ID,
-               Type:           structs.TaskTypeMigrateRepo,
-               Status:         structs.TaskStatusQueue,
-               PayloadContent: string(bs),
-       }
-
-       if err := models.CreateTask(task); err != nil {
-               return nil, err
-       }
-
-       repo, err := repo_module.CreateRepository(doer, u, models.CreateRepoOptions{
-               Name:           opts.RepoName,
-               Description:    opts.Description,
-               OriginalURL:    opts.OriginalURL,
-               GitServiceType: opts.GitServiceType,
-               IsPrivate:      opts.Private,
-               IsMirror:       opts.Mirror,
-               Status:         models.RepositoryBeingMigrated,
-       })
-       if err != nil {
-               task.EndTime = timeutil.TimeStampNow()
-               task.Status = structs.TaskStatusFailed
-               err2 := task.UpdateCols("end_time", "status")
-               if err2 != nil {
-                       log.Error("UpdateCols Failed: %v", err2.Error())
-               }
-               return nil, err
-       }
-
-       task.RepoID = repo.ID
-       if err = task.UpdateCols("repo_id"); err != nil {
-               return nil, err
-       }
-
-       return task, nil
-}
index 762508cee52b23948ef78ba8fe3c666ffc79ea91..58c7384092215bc1683a0bfaa8f2ac6c5bc4db07 100644 (file)
@@ -29,7 +29,6 @@ import (
        "code.gitea.io/gitea/modules/ssh"
        "code.gitea.io/gitea/modules/storage"
        "code.gitea.io/gitea/modules/svg"
-       "code.gitea.io/gitea/modules/task"
        "code.gitea.io/gitea/modules/translation"
        "code.gitea.io/gitea/modules/web"
        apiv1 "code.gitea.io/gitea/routers/api/v1"
@@ -45,6 +44,7 @@ import (
        mirror_service "code.gitea.io/gitea/services/mirror"
        pull_service "code.gitea.io/gitea/services/pull"
        repo_service "code.gitea.io/gitea/services/repository"
+       "code.gitea.io/gitea/services/task"
        "code.gitea.io/gitea/services/webhook"
 
        "gitea.com/go-chi/session"
index f91c344e94b8c82fa5af63812a051e7904efa15b..5cfc334042d046158d67527205c7d6f73bbf76d3 100644 (file)
@@ -17,11 +17,11 @@ import (
        "code.gitea.io/gitea/modules/log"
        "code.gitea.io/gitea/modules/setting"
        "code.gitea.io/gitea/modules/structs"
-       "code.gitea.io/gitea/modules/task"
        "code.gitea.io/gitea/modules/util"
        "code.gitea.io/gitea/modules/web"
        "code.gitea.io/gitea/services/forms"
        "code.gitea.io/gitea/services/migrations"
+       "code.gitea.io/gitea/services/task"
 )
 
 const (
diff --git a/services/task/migrate.go b/services/task/migrate.go
new file mode 100644 (file)
index 0000000..100aac1
--- /dev/null
@@ -0,0 +1,143 @@
+// Copyright 2019 Gitea. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package task
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "strings"
+
+       "code.gitea.io/gitea/models"
+       "code.gitea.io/gitea/modules/graceful"
+       "code.gitea.io/gitea/modules/json"
+       "code.gitea.io/gitea/modules/log"
+       "code.gitea.io/gitea/modules/migration"
+       "code.gitea.io/gitea/modules/notification"
+       "code.gitea.io/gitea/modules/process"
+       "code.gitea.io/gitea/modules/structs"
+       "code.gitea.io/gitea/modules/timeutil"
+       "code.gitea.io/gitea/modules/util"
+       "code.gitea.io/gitea/services/migrations"
+)
+
+func handleCreateError(owner *models.User, err error) error {
+       switch {
+       case models.IsErrReachLimitOfRepo(err):
+               return fmt.Errorf("You have already reached your limit of %d repositories", owner.MaxCreationLimit())
+       case models.IsErrRepoAlreadyExist(err):
+               return errors.New("The repository name is already used")
+       case models.IsErrNameReserved(err):
+               return fmt.Errorf("The repository name '%s' is reserved", err.(models.ErrNameReserved).Name)
+       case models.IsErrNamePatternNotAllowed(err):
+               return fmt.Errorf("The pattern '%s' is not allowed in a repository name", err.(models.ErrNamePatternNotAllowed).Pattern)
+       default:
+               return err
+       }
+}
+
+func runMigrateTask(t *models.Task) (err error) {
+       defer func() {
+               if e := recover(); e != nil {
+                       err = fmt.Errorf("PANIC whilst trying to do migrate task: %v", e)
+                       log.Critical("PANIC during runMigrateTask[%d] by DoerID[%d] to RepoID[%d] for OwnerID[%d]: %v\nStacktrace: %v", t.ID, t.DoerID, t.RepoID, t.OwnerID, e, log.Stack(2))
+               }
+
+               if err == nil {
+                       err = models.FinishMigrateTask(t)
+                       if err == nil {
+                               notification.NotifyMigrateRepository(t.Doer, t.Owner, t.Repo)
+                               return
+                       }
+
+                       log.Error("FinishMigrateTask[%d] by DoerID[%d] to RepoID[%d] for OwnerID[%d] failed: %v", t.ID, t.DoerID, t.RepoID, t.OwnerID, err)
+               }
+
+               t.EndTime = timeutil.TimeStampNow()
+               t.Status = structs.TaskStatusFailed
+               t.Message = err.Error()
+               // Ensure that the repo loaded before we zero out the repo ID from the task - thus ensuring that we can delete it
+               _ = t.LoadRepo()
+
+               t.RepoID = 0
+               if err := t.UpdateCols("status", "errors", "repo_id", "end_time"); err != nil {
+                       log.Error("Task UpdateCols failed: %v", err)
+               }
+
+               if t.Repo != nil {
+                       if errDelete := models.DeleteRepository(t.Doer, t.OwnerID, t.Repo.ID); errDelete != nil {
+                               log.Error("DeleteRepository: %v", errDelete)
+                       }
+               }
+       }()
+
+       if err = t.LoadRepo(); err != nil {
+               return
+       }
+
+       // if repository is ready, then just finish the task
+       if t.Repo.Status == models.RepositoryReady {
+               return nil
+       }
+
+       if err = t.LoadDoer(); err != nil {
+               return
+       }
+       if err = t.LoadOwner(); err != nil {
+               return
+       }
+
+       var opts *migration.MigrateOptions
+       opts, err = t.MigrateConfig()
+       if err != nil {
+               return
+       }
+
+       opts.MigrateToRepoID = t.RepoID
+
+       ctx, cancel := context.WithCancel(graceful.GetManager().ShutdownContext())
+       defer cancel()
+       pm := process.GetManager()
+       pid := pm.Add(fmt.Sprintf("MigrateTask: %s/%s", t.Owner.Name, opts.RepoName), cancel)
+       defer pm.Remove(pid)
+
+       t.StartTime = timeutil.TimeStampNow()
+       t.Status = structs.TaskStatusRunning
+       if err = t.UpdateCols("start_time", "status"); err != nil {
+               return
+       }
+
+       t.Repo, err = migrations.MigrateRepository(ctx, t.Doer, t.Owner.Name, *opts, func(format string, args ...interface{}) {
+               message := models.TranslatableMessage{
+                       Format: format,
+                       Args:   args,
+               }
+               bs, _ := json.Marshal(message)
+               t.Message = string(bs)
+               _ = t.UpdateCols("message")
+       })
+       if err == nil {
+               log.Trace("Repository migrated [%d]: %s/%s", t.Repo.ID, t.Owner.Name, t.Repo.Name)
+               return
+       }
+
+       if models.IsErrRepoAlreadyExist(err) {
+               err = errors.New("The repository name is already used")
+               return
+       }
+
+       // remoteAddr may contain credentials, so we sanitize it
+       err = util.NewStringURLSanitizedError(err, opts.CloneAddr, true)
+       if strings.Contains(err.Error(), "Authentication failed") ||
+               strings.Contains(err.Error(), "could not read Username") {
+               return fmt.Errorf("Authentication failed: %v", err.Error())
+       } else if strings.Contains(err.Error(), "fatal:") {
+               return fmt.Errorf("Migration failed: %v", err.Error())
+       }
+
+       // do not be tempted to coalesce this line with the return
+       err = handleCreateError(t.Owner, err)
+       return
+}
diff --git a/services/task/task.go b/services/task/task.go
new file mode 100644 (file)
index 0000000..f538b36
--- /dev/null
@@ -0,0 +1,130 @@
+// Copyright 2019 Gitea. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package task
+
+import (
+       "fmt"
+
+       "code.gitea.io/gitea/models"
+       "code.gitea.io/gitea/modules/graceful"
+       "code.gitea.io/gitea/modules/json"
+       "code.gitea.io/gitea/modules/log"
+       base "code.gitea.io/gitea/modules/migration"
+       "code.gitea.io/gitea/modules/queue"
+       repo_module "code.gitea.io/gitea/modules/repository"
+       "code.gitea.io/gitea/modules/secret"
+       "code.gitea.io/gitea/modules/setting"
+       "code.gitea.io/gitea/modules/structs"
+       "code.gitea.io/gitea/modules/timeutil"
+       "code.gitea.io/gitea/modules/util"
+)
+
+// taskQueue is a global queue of tasks
+var taskQueue queue.Queue
+
+// Run a task
+func Run(t *models.Task) error {
+       switch t.Type {
+       case structs.TaskTypeMigrateRepo:
+               return runMigrateTask(t)
+       default:
+               return fmt.Errorf("Unknown task type: %d", t.Type)
+       }
+}
+
+// Init will start the service to get all unfinished tasks and run them
+func Init() error {
+       taskQueue = queue.CreateQueue("task", handle, &models.Task{})
+
+       if taskQueue == nil {
+               return fmt.Errorf("Unable to create Task Queue")
+       }
+
+       go graceful.GetManager().RunWithShutdownFns(taskQueue.Run)
+
+       return nil
+}
+
+func handle(data ...queue.Data) {
+       for _, datum := range data {
+               task := datum.(*models.Task)
+               if err := Run(task); err != nil {
+                       log.Error("Run task failed: %v", err)
+               }
+       }
+}
+
+// MigrateRepository add migration repository to task
+func MigrateRepository(doer, u *models.User, opts base.MigrateOptions) error {
+       task, err := CreateMigrateTask(doer, u, opts)
+       if err != nil {
+               return err
+       }
+
+       return taskQueue.Push(task)
+}
+
+// CreateMigrateTask creates a migrate task
+func CreateMigrateTask(doer, u *models.User, opts base.MigrateOptions) (*models.Task, error) {
+       // encrypt credentials for persistence
+       var err error
+       opts.CloneAddrEncrypted, err = secret.EncryptSecret(setting.SecretKey, opts.CloneAddr)
+       if err != nil {
+               return nil, err
+       }
+       opts.CloneAddr = util.NewStringURLSanitizer(opts.CloneAddr, true).Replace(opts.CloneAddr)
+       opts.AuthPasswordEncrypted, err = secret.EncryptSecret(setting.SecretKey, opts.AuthPassword)
+       if err != nil {
+               return nil, err
+       }
+       opts.AuthPassword = ""
+       opts.AuthTokenEncrypted, err = secret.EncryptSecret(setting.SecretKey, opts.AuthToken)
+       if err != nil {
+               return nil, err
+       }
+       opts.AuthToken = ""
+       bs, err := json.Marshal(&opts)
+       if err != nil {
+               return nil, err
+       }
+
+       var task = &models.Task{
+               DoerID:         doer.ID,
+               OwnerID:        u.ID,
+               Type:           structs.TaskTypeMigrateRepo,
+               Status:         structs.TaskStatusQueue,
+               PayloadContent: string(bs),
+       }
+
+       if err := models.CreateTask(task); err != nil {
+               return nil, err
+       }
+
+       repo, err := repo_module.CreateRepository(doer, u, models.CreateRepoOptions{
+               Name:           opts.RepoName,
+               Description:    opts.Description,
+               OriginalURL:    opts.OriginalURL,
+               GitServiceType: opts.GitServiceType,
+               IsPrivate:      opts.Private,
+               IsMirror:       opts.Mirror,
+               Status:         models.RepositoryBeingMigrated,
+       })
+       if err != nil {
+               task.EndTime = timeutil.TimeStampNow()
+               task.Status = structs.TaskStatusFailed
+               err2 := task.UpdateCols("end_time", "status")
+               if err2 != nil {
+                       log.Error("UpdateCols Failed: %v", err2.Error())
+               }
+               return nil, err
+       }
+
+       task.RepoID = repo.ID
+       if err = task.UpdateCols("repo_id"); err != nil {
+               return nil, err
+       }
+
+       return task, nil
+}