diff options
Diffstat (limited to 'models')
-rw-r--r-- | models/branches.go | 4 | ||||
-rw-r--r-- | models/models.go | 5 | ||||
-rw-r--r-- | models/pull_list.go | 9 | ||||
-rw-r--r-- | models/repo.go | 64 | ||||
-rw-r--r-- | models/repo_indexer.go | 43 | ||||
-rw-r--r-- | models/user.go | 35 |
6 files changed, 142 insertions, 18 deletions
diff --git a/models/branches.go b/models/branches.go index bb99cffa05..045019314a 100644 --- a/models/branches.go +++ b/models/branches.go @@ -5,6 +5,7 @@ package models import ( + "context" "fmt" "time" @@ -525,7 +526,8 @@ func (deletedBranch *DeletedBranch) LoadUser() { } // RemoveOldDeletedBranches removes old deleted branches -func RemoveOldDeletedBranches() { +func RemoveOldDeletedBranches(ctx context.Context) { + // Nothing to do for shutdown or terminate log.Trace("Doing: DeletedBranchesCleanup") deleteBefore := time.Now().Add(-setting.Cron.DeletedBranchesCleanup.OlderThan) diff --git a/models/models.go b/models/models.go index 8c10e7abfc..9eb174e200 100644 --- a/models/models.go +++ b/models/models.go @@ -6,6 +6,7 @@ package models import ( + "context" "database/sql" "errors" "fmt" @@ -164,11 +165,13 @@ func SetEngine() (err error) { } // NewEngine initializes a new xorm.Engine -func NewEngine(migrateFunc func(*xorm.Engine) error) (err error) { +func NewEngine(ctx context.Context, migrateFunc func(*xorm.Engine) error) (err error) { if err = SetEngine(); err != nil { return err } + x.SetDefaultContext(ctx) + if err = x.Ping(); err != nil { return err } diff --git a/models/pull_list.go b/models/pull_list.go index 49d04ba0b8..1376978353 100644 --- a/models/pull_list.go +++ b/models/pull_list.go @@ -68,11 +68,12 @@ func GetUnmergedPullRequestsByBaseInfo(repoID int64, branch string) ([]*PullRequ Find(&prs) } -// GetPullRequestsByCheckStatus returns all pull requests according the special checking status. -func GetPullRequestsByCheckStatus(status PullRequestStatus) ([]*PullRequest, error) { - prs := make([]*PullRequest, 0, 10) - return prs, x. +// GetPullRequestIDsByCheckStatus returns all pull requests according the special checking status. +func GetPullRequestIDsByCheckStatus(status PullRequestStatus) ([]int64, error) { + prs := make([]int64, 0, 10) + return prs, x.Table("pull_request"). Where("status=?", status). + Cols("pull_request.id"). Find(&prs) } diff --git a/models/repo.go b/models/repo.go index f4ac75b8f0..c7eee3c1ec 100644 --- a/models/repo.go +++ b/models/repo.go @@ -7,6 +7,7 @@ package models import ( "bytes" + "context" "crypto/md5" "errors" "fmt" @@ -2098,19 +2099,27 @@ func DeleteRepositoryArchives() error { } // DeleteOldRepositoryArchives deletes old repository archives. -func DeleteOldRepositoryArchives() { +func DeleteOldRepositoryArchives(ctx context.Context) { log.Trace("Doing: ArchiveCleanup") - if err := x.Where("id > 0").Iterate(new(Repository), deleteOldRepositoryArchives); err != nil { + if err := x.Where("id > 0").Iterate(new(Repository), func(idx int, bean interface{}) error { + return deleteOldRepositoryArchives(ctx, idx, bean) + }); err != nil { log.Error("ArchiveClean: %v", err) } } -func deleteOldRepositoryArchives(idx int, bean interface{}) error { +func deleteOldRepositoryArchives(ctx context.Context, idx int, bean interface{}) error { repo := bean.(*Repository) basePath := filepath.Join(repo.RepoPath(), "archives") for _, ty := range []string{"zip", "targz"} { + select { + case <-ctx.Done(): + return fmt.Errorf("Aborted due to shutdown:\nin delete of old repository archives %v\nat delete file %s", repo, ty) + default: + } + path := filepath.Join(basePath, ty) file, err := os.Open(path) if err != nil { @@ -2133,6 +2142,11 @@ func deleteOldRepositoryArchives(idx int, bean interface{}) error { minimumOldestTime := time.Now().Add(-setting.Cron.ArchiveCleanup.OlderThan) for _, info := range files { if info.ModTime().Before(minimumOldestTime) && !info.IsDir() { + select { + case <-ctx.Done(): + return fmt.Errorf("Aborted due to shutdown:\nin delete of old repository archives %v\nat delete file %s - %s", repo, ty, info.Name()) + default: + } toDelete := filepath.Join(path, info.Name()) // This is a best-effort purge, so we do not check error codes to confirm removal. if err = os.Remove(toDelete); err != nil { @@ -2226,13 +2240,17 @@ func SyncRepositoryHooks() error { } // GitFsck calls 'git fsck' to check repository health. -func GitFsck() { +func GitFsck(ctx context.Context) { log.Trace("Doing: GitFsck") - if err := x. Where("id>0 AND is_fsck_enabled=?", true).BufferSize(setting.Database.IterateBufferSize). Iterate(new(Repository), func(idx int, bean interface{}) error { + select { + case <-ctx.Done(): + return fmt.Errorf("Aborted due to shutdown") + default: + } repo := bean.(*Repository) repoPath := repo.RepoPath() log.Trace("Running health check on repository %s", repoPath) @@ -2278,13 +2296,19 @@ type repoChecker struct { desc string } -func repoStatsCheck(checker *repoChecker) { +func repoStatsCheck(ctx context.Context, checker *repoChecker) { results, err := x.Query(checker.querySQL) if err != nil { log.Error("Select %s: %v", checker.desc, err) return } for _, result := range results { + select { + case <-ctx.Done(): + log.Warn("CheckRepoStats: Aborting due to shutdown") + return + default: + } id := com.StrTo(result["id"]).MustInt64() log.Trace("Updating %s: %d", checker.desc, id) _, err = x.Exec(checker.correctSQL, id, id) @@ -2295,7 +2319,7 @@ func repoStatsCheck(checker *repoChecker) { } // CheckRepoStats checks the repository stats -func CheckRepoStats() { +func CheckRepoStats(ctx context.Context) { log.Trace("Doing: CheckRepoStats") checkers := []*repoChecker{ @@ -2331,7 +2355,13 @@ func CheckRepoStats() { }, } for i := range checkers { - repoStatsCheck(checkers[i]) + select { + case <-ctx.Done(): + log.Warn("CheckRepoStats: Aborting due to shutdown") + return + default: + repoStatsCheck(ctx, checkers[i]) + } } // ***** START: Repository.NumClosedIssues ***** @@ -2341,6 +2371,12 @@ func CheckRepoStats() { log.Error("Select %s: %v", desc, err) } else { for _, result := range results { + select { + case <-ctx.Done(): + log.Warn("CheckRepoStats: Aborting due to shutdown") + return + default: + } id := com.StrTo(result["id"]).MustInt64() log.Trace("Updating %s: %d", desc, id) _, err = x.Exec("UPDATE `repository` SET num_closed_issues=(SELECT COUNT(*) FROM `issue` WHERE repo_id=? AND is_closed=? AND is_pull=?) WHERE id=?", id, true, false, id) @@ -2358,6 +2394,12 @@ func CheckRepoStats() { log.Error("Select %s: %v", desc, err) } else { for _, result := range results { + select { + case <-ctx.Done(): + log.Warn("CheckRepoStats: Aborting due to shutdown") + return + default: + } id := com.StrTo(result["id"]).MustInt64() log.Trace("Updating %s: %d", desc, id) _, err = x.Exec("UPDATE `repository` SET num_closed_pulls=(SELECT COUNT(*) FROM `issue` WHERE repo_id=? AND is_closed=? AND is_pull=?) WHERE id=?", id, true, true, id) @@ -2375,6 +2417,12 @@ func CheckRepoStats() { log.Error("Select repository count 'num_forks': %v", err) } else { for _, result := range results { + select { + case <-ctx.Done(): + log.Warn("CheckRepoStats: Aborting due to shutdown") + return + default: + } id := com.StrTo(result["id"]).MustInt64() log.Trace("Updating repository count 'num_forks': %d", id) diff --git a/models/repo_indexer.go b/models/repo_indexer.go index 138ef54d33..aee3c74b35 100644 --- a/models/repo_indexer.go +++ b/models/repo_indexer.go @@ -4,6 +4,12 @@ package models +import ( + "fmt" + + "xorm.io/builder" +) + // RepoIndexerStatus status of a repo's entry in the repo indexer // For now, implicitly refers to default branch type RepoIndexerStatus struct { @@ -12,6 +18,31 @@ type RepoIndexerStatus struct { CommitSha string `xorm:"VARCHAR(40)"` } +// GetUnindexedRepos returns repos which do not have an indexer status +func GetUnindexedRepos(maxRepoID int64, page, pageSize int) ([]int64, error) { + ids := make([]int64, 0, 50) + cond := builder.Cond(builder.IsNull{ + "repo_indexer_status.id", + }) + sess := x.Table("repository").Join("LEFT OUTER", "repo_indexer_status", "repository.id = repo_indexer_status.repo_id") + if maxRepoID > 0 { + cond = builder.And(cond, builder.Lte{ + "repository.id": maxRepoID, + }) + } + if page >= 0 && pageSize > 0 { + start := 0 + if page > 0 { + start = (page - 1) * pageSize + } + sess.Limit(pageSize, start) + } + + sess.Where(cond).Cols("repository.id").Desc("repository.id") + err := sess.Find(&ids) + return ids, err +} + // GetIndexerStatus loads repo codes indxer status func (repo *Repository) GetIndexerStatus() error { if repo.IndexerStatus != nil { @@ -31,15 +62,21 @@ func (repo *Repository) GetIndexerStatus() error { // UpdateIndexerStatus updates indexer status func (repo *Repository) UpdateIndexerStatus(sha string) error { if err := repo.GetIndexerStatus(); err != nil { - return err + return fmt.Errorf("UpdateIndexerStatus: Unable to getIndexerStatus for repo: %s/%s Error: %v", repo.MustOwnerName(), repo.Name, err) } if len(repo.IndexerStatus.CommitSha) == 0 { repo.IndexerStatus.CommitSha = sha _, err := x.Insert(repo.IndexerStatus) - return err + if err != nil { + return fmt.Errorf("UpdateIndexerStatus: Unable to insert repoIndexerStatus for repo: %s/%s Sha: %s Error: %v", repo.MustOwnerName(), repo.Name, sha, err) + } + return nil } repo.IndexerStatus.CommitSha = sha _, err := x.ID(repo.IndexerStatus.ID).Cols("commit_sha"). Update(repo.IndexerStatus) - return err + if err != nil { + return fmt.Errorf("UpdateIndexerStatus: Unable to update repoIndexerStatus for repo: %s/%s Sha: %s Error: %v", repo.MustOwnerName(), repo.Name, sha, err) + } + return nil } diff --git a/models/user.go b/models/user.go index 2cef2e5dec..0454158de6 100644 --- a/models/user.go +++ b/models/user.go @@ -7,6 +7,7 @@ package models import ( "container/list" + "context" "crypto/md5" "crypto/sha256" "crypto/subtle" @@ -1695,7 +1696,7 @@ func synchronizeLdapSSHPublicKeys(usr *User, s *LoginSource, sshPublicKeys []str } // SyncExternalUsers is used to synchronize users with external authorization source -func SyncExternalUsers() { +func SyncExternalUsers(ctx context.Context) { log.Trace("Doing: SyncExternalUsers") ls, err := LoginSources() @@ -1710,6 +1711,12 @@ func SyncExternalUsers() { if !s.IsActived || !s.IsSyncEnabled { continue } + select { + case <-ctx.Done(): + log.Warn("SyncExternalUsers: Aborted due to shutdown before update of %s", s.Name) + return + default: + } if s.IsLDAP() { log.Trace("Doing: SyncExternalUsers[%s]", s.Name) @@ -1727,6 +1734,12 @@ func SyncExternalUsers() { log.Error("SyncExternalUsers: %v", err) return } + select { + case <-ctx.Done(): + log.Warn("SyncExternalUsers: Aborted due to shutdown before update of %s", s.Name) + return + default: + } sr, err := s.LDAP().SearchEntries() if err != nil { @@ -1735,6 +1748,19 @@ func SyncExternalUsers() { } for _, su := range sr { + select { + case <-ctx.Done(): + log.Warn("SyncExternalUsers: Aborted due to shutdown at update of %s before completed update of users", s.Name) + // Rewrite authorized_keys file if LDAP Public SSH Key attribute is set and any key was added or removed + if sshKeysNeedUpdate { + err = RewriteAllPublicKeys() + if err != nil { + log.Error("RewriteAllPublicKeys: %v", err) + } + } + return + default: + } if len(su.Username) == 0 { continue } @@ -1819,6 +1845,13 @@ func SyncExternalUsers() { } } + select { + case <-ctx.Done(): + log.Warn("SyncExternalUsers: Aborted due to shutdown at update of %s before delete users", s.Name) + return + default: + } + // Deactivate users not present in LDAP if updateExisting { for _, usr := range users { |