diff options
author | Lunny Xiao <xiaolunwen@gmail.com> | 2021-11-18 14:47:57 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-11-18 14:47:57 +0800 |
commit | 1f1ae571393440c0260a00f98b4ebcfd162c7970 (patch) | |
tree | ed7f38619a16885f9df2c7292dfbf35e37745286 /services | |
parent | 3c3855a05c79ccc7a8495cdbf4a361e7bef74248 (diff) | |
download | gitea-1f1ae571393440c0260a00f98b4ebcfd162c7970.tar.gz gitea-1f1ae571393440c0260a00f98b4ebcfd162c7970.zip |
Move task from modules to services (#17680)
Diffstat (limited to 'services')
-rw-r--r-- | services/task/migrate.go | 143 | ||||
-rw-r--r-- | services/task/task.go | 130 |
2 files changed, 273 insertions, 0 deletions
diff --git a/services/task/migrate.go b/services/task/migrate.go new file mode 100644 index 0000000000..100aac1967 --- /dev/null +++ b/services/task/migrate.go @@ -0,0 +1,143 @@ +// Copyright 2019 Gitea. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package task + +import ( + "context" + "errors" + "fmt" + "strings" + + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/json" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/migration" + "code.gitea.io/gitea/modules/notification" + "code.gitea.io/gitea/modules/process" + "code.gitea.io/gitea/modules/structs" + "code.gitea.io/gitea/modules/timeutil" + "code.gitea.io/gitea/modules/util" + "code.gitea.io/gitea/services/migrations" +) + +func handleCreateError(owner *models.User, err error) error { + switch { + case models.IsErrReachLimitOfRepo(err): + return fmt.Errorf("You have already reached your limit of %d repositories", owner.MaxCreationLimit()) + case models.IsErrRepoAlreadyExist(err): + return errors.New("The repository name is already used") + case models.IsErrNameReserved(err): + return fmt.Errorf("The repository name '%s' is reserved", err.(models.ErrNameReserved).Name) + case models.IsErrNamePatternNotAllowed(err): + return fmt.Errorf("The pattern '%s' is not allowed in a repository name", err.(models.ErrNamePatternNotAllowed).Pattern) + default: + return err + } +} + +func runMigrateTask(t *models.Task) (err error) { + defer func() { + if e := recover(); e != nil { + err = fmt.Errorf("PANIC whilst trying to do migrate task: %v", e) + log.Critical("PANIC during runMigrateTask[%d] by DoerID[%d] to RepoID[%d] for OwnerID[%d]: %v\nStacktrace: %v", t.ID, t.DoerID, t.RepoID, t.OwnerID, e, log.Stack(2)) + } + + if err == nil { + err = models.FinishMigrateTask(t) + if err == nil { + notification.NotifyMigrateRepository(t.Doer, t.Owner, t.Repo) + return + } + + log.Error("FinishMigrateTask[%d] by DoerID[%d] to RepoID[%d] for OwnerID[%d] failed: %v", t.ID, t.DoerID, t.RepoID, t.OwnerID, err) + } + + t.EndTime = timeutil.TimeStampNow() + t.Status = structs.TaskStatusFailed + t.Message = err.Error() + // Ensure that the repo loaded before we zero out the repo ID from the task - thus ensuring that we can delete it + _ = t.LoadRepo() + + t.RepoID = 0 + if err := t.UpdateCols("status", "errors", "repo_id", "end_time"); err != nil { + log.Error("Task UpdateCols failed: %v", err) + } + + if t.Repo != nil { + if errDelete := models.DeleteRepository(t.Doer, t.OwnerID, t.Repo.ID); errDelete != nil { + log.Error("DeleteRepository: %v", errDelete) + } + } + }() + + if err = t.LoadRepo(); err != nil { + return + } + + // if repository is ready, then just finish the task + if t.Repo.Status == models.RepositoryReady { + return nil + } + + if err = t.LoadDoer(); err != nil { + return + } + if err = t.LoadOwner(); err != nil { + return + } + + var opts *migration.MigrateOptions + opts, err = t.MigrateConfig() + if err != nil { + return + } + + opts.MigrateToRepoID = t.RepoID + + ctx, cancel := context.WithCancel(graceful.GetManager().ShutdownContext()) + defer cancel() + pm := process.GetManager() + pid := pm.Add(fmt.Sprintf("MigrateTask: %s/%s", t.Owner.Name, opts.RepoName), cancel) + defer pm.Remove(pid) + + t.StartTime = timeutil.TimeStampNow() + t.Status = structs.TaskStatusRunning + if err = t.UpdateCols("start_time", "status"); err != nil { + return + } + + t.Repo, err = migrations.MigrateRepository(ctx, t.Doer, t.Owner.Name, *opts, func(format string, args ...interface{}) { + message := models.TranslatableMessage{ + Format: format, + Args: args, + } + bs, _ := json.Marshal(message) + t.Message = string(bs) + _ = t.UpdateCols("message") + }) + if err == nil { + log.Trace("Repository migrated [%d]: %s/%s", t.Repo.ID, t.Owner.Name, t.Repo.Name) + return + } + + if models.IsErrRepoAlreadyExist(err) { + err = errors.New("The repository name is already used") + return + } + + // remoteAddr may contain credentials, so we sanitize it + err = util.NewStringURLSanitizedError(err, opts.CloneAddr, true) + if strings.Contains(err.Error(), "Authentication failed") || + strings.Contains(err.Error(), "could not read Username") { + return fmt.Errorf("Authentication failed: %v", err.Error()) + } else if strings.Contains(err.Error(), "fatal:") { + return fmt.Errorf("Migration failed: %v", err.Error()) + } + + // do not be tempted to coalesce this line with the return + err = handleCreateError(t.Owner, err) + return +} diff --git a/services/task/task.go b/services/task/task.go new file mode 100644 index 0000000000..f538b36efc --- /dev/null +++ b/services/task/task.go @@ -0,0 +1,130 @@ +// Copyright 2019 Gitea. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package task + +import ( + "fmt" + + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/json" + "code.gitea.io/gitea/modules/log" + base "code.gitea.io/gitea/modules/migration" + "code.gitea.io/gitea/modules/queue" + repo_module "code.gitea.io/gitea/modules/repository" + "code.gitea.io/gitea/modules/secret" + "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/structs" + "code.gitea.io/gitea/modules/timeutil" + "code.gitea.io/gitea/modules/util" +) + +// taskQueue is a global queue of tasks +var taskQueue queue.Queue + +// Run a task +func Run(t *models.Task) error { + switch t.Type { + case structs.TaskTypeMigrateRepo: + return runMigrateTask(t) + default: + return fmt.Errorf("Unknown task type: %d", t.Type) + } +} + +// Init will start the service to get all unfinished tasks and run them +func Init() error { + taskQueue = queue.CreateQueue("task", handle, &models.Task{}) + + if taskQueue == nil { + return fmt.Errorf("Unable to create Task Queue") + } + + go graceful.GetManager().RunWithShutdownFns(taskQueue.Run) + + return nil +} + +func handle(data ...queue.Data) { + for _, datum := range data { + task := datum.(*models.Task) + if err := Run(task); err != nil { + log.Error("Run task failed: %v", err) + } + } +} + +// MigrateRepository add migration repository to task +func MigrateRepository(doer, u *models.User, opts base.MigrateOptions) error { + task, err := CreateMigrateTask(doer, u, opts) + if err != nil { + return err + } + + return taskQueue.Push(task) +} + +// CreateMigrateTask creates a migrate task +func CreateMigrateTask(doer, u *models.User, opts base.MigrateOptions) (*models.Task, error) { + // encrypt credentials for persistence + var err error + opts.CloneAddrEncrypted, err = secret.EncryptSecret(setting.SecretKey, opts.CloneAddr) + if err != nil { + return nil, err + } + opts.CloneAddr = util.NewStringURLSanitizer(opts.CloneAddr, true).Replace(opts.CloneAddr) + opts.AuthPasswordEncrypted, err = secret.EncryptSecret(setting.SecretKey, opts.AuthPassword) + if err != nil { + return nil, err + } + opts.AuthPassword = "" + opts.AuthTokenEncrypted, err = secret.EncryptSecret(setting.SecretKey, opts.AuthToken) + if err != nil { + return nil, err + } + opts.AuthToken = "" + bs, err := json.Marshal(&opts) + if err != nil { + return nil, err + } + + var task = &models.Task{ + DoerID: doer.ID, + OwnerID: u.ID, + Type: structs.TaskTypeMigrateRepo, + Status: structs.TaskStatusQueue, + PayloadContent: string(bs), + } + + if err := models.CreateTask(task); err != nil { + return nil, err + } + + repo, err := repo_module.CreateRepository(doer, u, models.CreateRepoOptions{ + Name: opts.RepoName, + Description: opts.Description, + OriginalURL: opts.OriginalURL, + GitServiceType: opts.GitServiceType, + IsPrivate: opts.Private, + IsMirror: opts.Mirror, + Status: models.RepositoryBeingMigrated, + }) + if err != nil { + task.EndTime = timeutil.TimeStampNow() + task.Status = structs.TaskStatusFailed + err2 := task.UpdateCols("end_time", "status") + if err2 != nil { + log.Error("UpdateCols Failed: %v", err2.Error()) + } + return nil, err + } + + task.RepoID = repo.ID + if err = task.UpdateCols("repo_id"); err != nil { + return nil, err + } + + return task, nil +} |