diff options
Diffstat (limited to 'modules/process/manager.go')
-rw-r--r-- | modules/process/manager.go | 183 |
1 files changed, 138 insertions, 45 deletions
diff --git a/modules/process/manager.go b/modules/process/manager.go index e42e38a0f0..10a89d04dd 100644 --- a/modules/process/manager.go +++ b/modules/process/manager.go @@ -12,6 +12,7 @@ import ( "io" "os/exec" "sort" + "strconv" "sync" "time" ) @@ -28,57 +29,151 @@ var ( DefaultContext = context.Background() ) -// Process represents a working process inheriting from Gitea. -type Process struct { - PID int64 // Process ID, not system one. - Description string - Start time.Time - Cancel context.CancelFunc -} +// IDType is a pid type +type IDType string + +// FinishedFunc is a function that marks that the process is finished and can be removed from the process table +// - it is simply an alias for context.CancelFunc and is only for documentary purposes +type FinishedFunc = context.CancelFunc -// Manager knows about all processes and counts PIDs. +// Manager manages all processes and counts PIDs. type Manager struct { mutex sync.Mutex - counter int64 - processes map[int64]*Process + next int64 + lastTime int64 + + processes map[IDType]*Process } // GetManager returns a Manager and initializes one as singleton if there's none yet func GetManager() *Manager { managerInit.Do(func() { manager = &Manager{ - processes: make(map[int64]*Process), + processes: make(map[IDType]*Process), + next: 1, } }) return manager } -// Add a process to the ProcessManager and returns its PID. -func (pm *Manager) Add(description string, cancel context.CancelFunc) int64 { +// AddContext creates a new context and adds it as a process. Once the process is finished, finished must be called +// to remove the process from the process table. It should not be called until the process is finished but must always be called. +// +// cancel should be used to cancel the returned context, however it will not remove the process from the process table. +// finished will cancel the returned context and remove it from the process table. +// +// Most processes will not need to use the cancel function but there will be cases whereby you want to cancel the process but not immediately remove it from the +// process table. +func (pm *Manager) AddContext(parent context.Context, description string) (ctx context.Context, cancel context.CancelFunc, finished FinishedFunc) { + parentPID := GetParentPID(parent) + + ctx, cancel = context.WithCancel(parent) + + pid, finished := pm.Add(parentPID, description, cancel) + + return &Context{ + Context: ctx, + pid: pid, + }, cancel, finished +} + +// AddContextTimeout creates a new context and add it as a process. Once the process is finished, finished must be called +// to remove the process from the process table. It should not be called until the process is finsihed but must always be called. +// +// cancel should be used to cancel the returned context, however it will not remove the process from the process table. +// finished will cancel the returned context and remove it from the process table. +// +// Most processes will not need to use the cancel function but there will be cases whereby you want to cancel the process but not immediately remove it from the +// process table. +func (pm *Manager) AddContextTimeout(parent context.Context, timeout time.Duration, description string) (ctx context.Context, cancel context.CancelFunc, finshed FinishedFunc) { + parentPID := GetParentPID(parent) + + ctx, cancel = context.WithTimeout(parent, timeout) + + pid, finshed := pm.Add(parentPID, description, cancel) + + return &Context{ + Context: ctx, + pid: pid, + }, cancel, finshed +} + +// Add create a new process +func (pm *Manager) Add(parentPID IDType, description string, cancel context.CancelFunc) (IDType, FinishedFunc) { pm.mutex.Lock() - pid := pm.counter + 1 - pm.processes[pid] = &Process{ + start, pid := pm.nextPID() + + parent := pm.processes[parentPID] + if parent == nil { + parentPID = "" + } + + process := &Process{ PID: pid, + ParentPID: parentPID, Description: description, - Start: time.Now(), + Start: start, Cancel: cancel, } - pm.counter = pid + + finished := func() { + cancel() + pm.remove(process) + } + + if parent != nil { + parent.AddChild(process) + } + pm.processes[pid] = process pm.mutex.Unlock() - return pid + return pid, finished +} + +// nextPID will return the next available PID. pm.mutex should already be locked. +func (pm *Manager) nextPID() (start time.Time, pid IDType) { + start = time.Now() + startUnix := start.Unix() + if pm.lastTime == startUnix { + pm.next++ + } else { + pm.next = 1 + } + pm.lastTime = startUnix + pid = IDType(strconv.FormatInt(start.Unix(), 16)) + + if pm.next == 1 { + return + } + pid = IDType(string(pid) + "-" + strconv.FormatInt(pm.next, 10)) + return } // Remove a process from the ProcessManager. -func (pm *Manager) Remove(pid int64) { +func (pm *Manager) Remove(pid IDType) { pm.mutex.Lock() delete(pm.processes, pid) pm.mutex.Unlock() } +func (pm *Manager) remove(process *Process) { + pm.mutex.Lock() + if p := pm.processes[process.PID]; p == process { + delete(pm.processes, process.PID) + } + parent := pm.processes[process.ParentPID] + pm.mutex.Unlock() + + if parent == nil { + return + } + + parent.RemoveChild(process) +} + // Cancel a process in the ProcessManager. -func (pm *Manager) Cancel(pid int64) { +func (pm *Manager) Cancel(pid IDType) { pm.mutex.Lock() process, ok := pm.processes[pid] pm.mutex.Unlock() @@ -88,14 +183,28 @@ func (pm *Manager) Cancel(pid int64) { } // Processes gets the processes in a thread safe manner -func (pm *Manager) Processes() []*Process { +func (pm *Manager) Processes(onlyRoots bool) []*Process { pm.mutex.Lock() processes := make([]*Process, 0, len(pm.processes)) - for _, process := range pm.processes { - processes = append(processes, process) + if onlyRoots { + for _, process := range pm.processes { + if _, has := pm.processes[process.ParentPID]; !has { + processes = append(processes, process) + } + } + } else { + for _, process := range pm.processes { + processes = append(processes, process) + } } pm.mutex.Unlock() - sort.Sort(processList(processes)) + + sort.Slice(processes, func(i, j int) bool { + left, right := processes[i], processes[j] + + return left.Start.Before(right.Start) + }) + return processes } @@ -134,8 +243,8 @@ func (pm *Manager) ExecDirEnvStdIn(timeout time.Duration, dir, desc string, env stdOut := new(bytes.Buffer) stdErr := new(bytes.Buffer) - ctx, cancel := context.WithTimeout(DefaultContext, timeout) - defer cancel() + ctx, _, finished := pm.AddContextTimeout(DefaultContext, timeout, desc) + defer finished() cmd := exec.CommandContext(ctx, cmdName, args...) cmd.Dir = dir @@ -150,13 +259,11 @@ func (pm *Manager) ExecDirEnvStdIn(timeout time.Duration, dir, desc string, env return "", "", err } - pid := pm.Add(desc, cancel) err := cmd.Wait() - pm.Remove(pid) if err != nil { err = &Error{ - PID: pid, + PID: GetPID(ctx), Description: desc, Err: err, CtxErr: ctx.Err(), @@ -168,23 +275,9 @@ func (pm *Manager) ExecDirEnvStdIn(timeout time.Duration, dir, desc string, env return stdOut.String(), stdErr.String(), err } -type processList []*Process - -func (l processList) Len() int { - return len(l) -} - -func (l processList) Less(i, j int) bool { - return l[i].PID < l[j].PID -} - -func (l processList) Swap(i, j int) { - l[i], l[j] = l[j], l[i] -} - // Error is a wrapped error describing the error results of Process Execution type Error struct { - PID int64 + PID IDType Description string Err error CtxErr error @@ -193,7 +286,7 @@ type Error struct { } func (err *Error) Error() string { - return fmt.Sprintf("exec(%d:%s) failed: %v(%v) stdout: %s stderr: %s", err.PID, err.Description, err.Err, err.CtxErr, err.Stdout, err.Stderr) + return fmt.Sprintf("exec(%s:%s) failed: %v(%v) stdout: %s stderr: %s", err.PID, err.Description, err.Err, err.CtxErr, err.Stdout, err.Stderr) } // Unwrap implements the unwrappable implicit interface for go1.13 Unwrap() |