diff options
author | zeripath <art27@cantab.net> | 2021-11-30 20:06:32 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-11-30 20:06:32 +0000 |
commit | 01087e9eef21ff5ea1cebbb1e84933954671fdf2 (patch) | |
tree | ae618785a3bd46e012096683e2fd2309f87c571d /modules/process | |
parent | d894c90b703ce215e2319ae2e2bf95989f77805d (diff) | |
download | gitea-01087e9eef21ff5ea1cebbb1e84933954671fdf2.tar.gz gitea-01087e9eef21ff5ea1cebbb1e84933954671fdf2.zip |
Make Requests Processes and create process hierarchy. Associate OpenRepository with context. (#17125)
This PR registers requests with the process manager and manages hierarchy within the processes.
Git repos are then associated with a context, (usually the request's context) - with sub commands using this context as their base context.
Signed-off-by: Andrew Thornton <art27@cantab.net>
Diffstat (limited to 'modules/process')
-rw-r--r-- | modules/process/context.go | 69 | ||||
-rw-r--r-- | modules/process/manager.go | 183 | ||||
-rw-r--r-- | modules/process/manager_test.go | 64 | ||||
-rw-r--r-- | modules/process/process.go | 66 |
4 files changed, 319 insertions, 63 deletions
diff --git a/modules/process/context.go b/modules/process/context.go new file mode 100644 index 0000000000..6df5bc1513 --- /dev/null +++ b/modules/process/context.go @@ -0,0 +1,69 @@ +// Copyright 2021 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package process + +import ( + "context" +) + +// Context is a wrapper around context.Context and contains the current pid for this context +type Context struct { + context.Context + pid IDType +} + +// GetPID returns the PID for this context +func (c *Context) GetPID() IDType { + return c.pid +} + +// GetParent returns the parent process context (if any) +func (c *Context) GetParent() *Context { + return GetContext(c.Context) +} + +// Value is part of the interface for context.Context. We mostly defer to the internal context - but we return this in response to the ProcessContextKey +func (c *Context) Value(key interface{}) interface{} { + if key == ProcessContextKey { + return c + } + return c.Context.Value(key) +} + +// ProcessContextKey is the key under which process contexts are stored +var ProcessContextKey interface{} = "process-context" + +// GetContext will return a process context if one exists +func GetContext(ctx context.Context) *Context { + if pCtx, ok := ctx.(*Context); ok { + return pCtx + } + pCtxInterface := ctx.Value(ProcessContextKey) + if pCtxInterface == nil { + return nil + } + if pCtx, ok := pCtxInterface.(*Context); ok { + return pCtx + } + return nil +} + +// GetPID returns the PID for this context +func GetPID(ctx context.Context) IDType { + pCtx := GetContext(ctx) + if pCtx == nil { + return "" + } + return pCtx.GetPID() +} + +// GetParentPID returns the ParentPID for this context +func GetParentPID(ctx context.Context) IDType { + var parentPID IDType + if parentProcess := GetContext(ctx); parentProcess != nil { + parentPID = parentProcess.GetPID() + } + return parentPID +} diff --git a/modules/process/manager.go b/modules/process/manager.go index e42e38a0f0..10a89d04dd 100644 --- a/modules/process/manager.go +++ b/modules/process/manager.go @@ -12,6 +12,7 @@ import ( "io" "os/exec" "sort" + "strconv" "sync" "time" ) @@ -28,57 +29,151 @@ var ( DefaultContext = context.Background() ) -// Process represents a working process inheriting from Gitea. -type Process struct { - PID int64 // Process ID, not system one. - Description string - Start time.Time - Cancel context.CancelFunc -} +// IDType is a pid type +type IDType string + +// FinishedFunc is a function that marks that the process is finished and can be removed from the process table +// - it is simply an alias for context.CancelFunc and is only for documentary purposes +type FinishedFunc = context.CancelFunc -// Manager knows about all processes and counts PIDs. +// Manager manages all processes and counts PIDs. type Manager struct { mutex sync.Mutex - counter int64 - processes map[int64]*Process + next int64 + lastTime int64 + + processes map[IDType]*Process } // GetManager returns a Manager and initializes one as singleton if there's none yet func GetManager() *Manager { managerInit.Do(func() { manager = &Manager{ - processes: make(map[int64]*Process), + processes: make(map[IDType]*Process), + next: 1, } }) return manager } -// Add a process to the ProcessManager and returns its PID. -func (pm *Manager) Add(description string, cancel context.CancelFunc) int64 { +// AddContext creates a new context and adds it as a process. Once the process is finished, finished must be called +// to remove the process from the process table. It should not be called until the process is finished but must always be called. +// +// cancel should be used to cancel the returned context, however it will not remove the process from the process table. +// finished will cancel the returned context and remove it from the process table. +// +// Most processes will not need to use the cancel function but there will be cases whereby you want to cancel the process but not immediately remove it from the +// process table. +func (pm *Manager) AddContext(parent context.Context, description string) (ctx context.Context, cancel context.CancelFunc, finished FinishedFunc) { + parentPID := GetParentPID(parent) + + ctx, cancel = context.WithCancel(parent) + + pid, finished := pm.Add(parentPID, description, cancel) + + return &Context{ + Context: ctx, + pid: pid, + }, cancel, finished +} + +// AddContextTimeout creates a new context and add it as a process. Once the process is finished, finished must be called +// to remove the process from the process table. It should not be called until the process is finsihed but must always be called. +// +// cancel should be used to cancel the returned context, however it will not remove the process from the process table. +// finished will cancel the returned context and remove it from the process table. +// +// Most processes will not need to use the cancel function but there will be cases whereby you want to cancel the process but not immediately remove it from the +// process table. +func (pm *Manager) AddContextTimeout(parent context.Context, timeout time.Duration, description string) (ctx context.Context, cancel context.CancelFunc, finshed FinishedFunc) { + parentPID := GetParentPID(parent) + + ctx, cancel = context.WithTimeout(parent, timeout) + + pid, finshed := pm.Add(parentPID, description, cancel) + + return &Context{ + Context: ctx, + pid: pid, + }, cancel, finshed +} + +// Add create a new process +func (pm *Manager) Add(parentPID IDType, description string, cancel context.CancelFunc) (IDType, FinishedFunc) { pm.mutex.Lock() - pid := pm.counter + 1 - pm.processes[pid] = &Process{ + start, pid := pm.nextPID() + + parent := pm.processes[parentPID] + if parent == nil { + parentPID = "" + } + + process := &Process{ PID: pid, + ParentPID: parentPID, Description: description, - Start: time.Now(), + Start: start, Cancel: cancel, } - pm.counter = pid + + finished := func() { + cancel() + pm.remove(process) + } + + if parent != nil { + parent.AddChild(process) + } + pm.processes[pid] = process pm.mutex.Unlock() - return pid + return pid, finished +} + +// nextPID will return the next available PID. pm.mutex should already be locked. +func (pm *Manager) nextPID() (start time.Time, pid IDType) { + start = time.Now() + startUnix := start.Unix() + if pm.lastTime == startUnix { + pm.next++ + } else { + pm.next = 1 + } + pm.lastTime = startUnix + pid = IDType(strconv.FormatInt(start.Unix(), 16)) + + if pm.next == 1 { + return + } + pid = IDType(string(pid) + "-" + strconv.FormatInt(pm.next, 10)) + return } // Remove a process from the ProcessManager. -func (pm *Manager) Remove(pid int64) { +func (pm *Manager) Remove(pid IDType) { pm.mutex.Lock() delete(pm.processes, pid) pm.mutex.Unlock() } +func (pm *Manager) remove(process *Process) { + pm.mutex.Lock() + if p := pm.processes[process.PID]; p == process { + delete(pm.processes, process.PID) + } + parent := pm.processes[process.ParentPID] + pm.mutex.Unlock() + + if parent == nil { + return + } + + parent.RemoveChild(process) +} + // Cancel a process in the ProcessManager. -func (pm *Manager) Cancel(pid int64) { +func (pm *Manager) Cancel(pid IDType) { pm.mutex.Lock() process, ok := pm.processes[pid] pm.mutex.Unlock() @@ -88,14 +183,28 @@ func (pm *Manager) Cancel(pid int64) { } // Processes gets the processes in a thread safe manner -func (pm *Manager) Processes() []*Process { +func (pm *Manager) Processes(onlyRoots bool) []*Process { pm.mutex.Lock() processes := make([]*Process, 0, len(pm.processes)) - for _, process := range pm.processes { - processes = append(processes, process) + if onlyRoots { + for _, process := range pm.processes { + if _, has := pm.processes[process.ParentPID]; !has { + processes = append(processes, process) + } + } + } else { + for _, process := range pm.processes { + processes = append(processes, process) + } } pm.mutex.Unlock() - sort.Sort(processList(processes)) + + sort.Slice(processes, func(i, j int) bool { + left, right := processes[i], processes[j] + + return left.Start.Before(right.Start) + }) + return processes } @@ -134,8 +243,8 @@ func (pm *Manager) ExecDirEnvStdIn(timeout time.Duration, dir, desc string, env stdOut := new(bytes.Buffer) stdErr := new(bytes.Buffer) - ctx, cancel := context.WithTimeout(DefaultContext, timeout) - defer cancel() + ctx, _, finished := pm.AddContextTimeout(DefaultContext, timeout, desc) + defer finished() cmd := exec.CommandContext(ctx, cmdName, args...) cmd.Dir = dir @@ -150,13 +259,11 @@ func (pm *Manager) ExecDirEnvStdIn(timeout time.Duration, dir, desc string, env return "", "", err } - pid := pm.Add(desc, cancel) err := cmd.Wait() - pm.Remove(pid) if err != nil { err = &Error{ - PID: pid, + PID: GetPID(ctx), Description: desc, Err: err, CtxErr: ctx.Err(), @@ -168,23 +275,9 @@ func (pm *Manager) ExecDirEnvStdIn(timeout time.Duration, dir, desc string, env return stdOut.String(), stdErr.String(), err } -type processList []*Process - -func (l processList) Len() int { - return len(l) -} - -func (l processList) Less(i, j int) bool { - return l[i].PID < l[j].PID -} - -func (l processList) Swap(i, j int) { - l[i], l[j] = l[j], l[i] -} - // Error is a wrapped error describing the error results of Process Execution type Error struct { - PID int64 + PID IDType Description string Err error CtxErr error @@ -193,7 +286,7 @@ type Error struct { } func (err *Error) Error() string { - return fmt.Sprintf("exec(%d:%s) failed: %v(%v) stdout: %s stderr: %s", err.PID, err.Description, err.Err, err.CtxErr, err.Stdout, err.Stderr) + return fmt.Sprintf("exec(%s:%s) failed: %v(%v) stdout: %s stderr: %s", err.PID, err.Description, err.Err, err.CtxErr, err.Stdout, err.Stderr) } // Unwrap implements the unwrappable implicit interface for go1.13 Unwrap() diff --git a/modules/process/manager_test.go b/modules/process/manager_test.go index a515fc32cd..eb4228e72c 100644 --- a/modules/process/manager_test.go +++ b/modules/process/manager_test.go @@ -21,44 +21,72 @@ func TestGetManager(t *testing.T) { assert.NotNil(t, pm) } -func TestManager_Add(t *testing.T) { - pm := Manager{processes: make(map[int64]*Process)} +func TestManager_AddContext(t *testing.T) { + pm := Manager{processes: make(map[IDType]*Process), next: 1} - pid := pm.Add("foo", nil) - assert.Equal(t, int64(1), pid, "expected to get pid 1 got %d", pid) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + p1Ctx, _, finished := pm.AddContext(ctx, "foo") + defer finished() + assert.NotEmpty(t, GetContext(p1Ctx).GetPID(), "expected to get non-empty pid") + + p2Ctx, _, finished := pm.AddContext(p1Ctx, "bar") + defer finished() + + assert.NotEmpty(t, GetContext(p2Ctx).GetPID(), "expected to get non-empty pid") - pid = pm.Add("bar", nil) - assert.Equal(t, int64(2), pid, "expected to get pid 2 got %d", pid) + assert.NotEqual(t, GetContext(p1Ctx).GetPID(), GetContext(p2Ctx).GetPID(), "expected to get different pids %s == %s", GetContext(p2Ctx).GetPID(), GetContext(p1Ctx).GetPID()) + assert.Equal(t, GetContext(p1Ctx).GetPID(), GetContext(p2Ctx).GetParent().GetPID(), "expected to get pid %s got %s", GetContext(p1Ctx).GetPID(), GetContext(p2Ctx).GetParent().GetPID()) } func TestManager_Cancel(t *testing.T) { - pm := Manager{processes: make(map[int64]*Process)} + pm := Manager{processes: make(map[IDType]*Process), next: 1} - ctx, cancel := context.WithCancel(context.Background()) - pid := pm.Add("foo", cancel) + ctx, _, finished := pm.AddContext(context.Background(), "foo") + defer finished() + + pm.Cancel(GetPID(ctx)) + + select { + case <-ctx.Done(): + default: + assert.Fail(t, "Cancel should cancel the provided context") + } + finished() - pm.Cancel(pid) + ctx, cancel, finished := pm.AddContext(context.Background(), "foo") + defer finished() + + cancel() select { case <-ctx.Done(): default: assert.Fail(t, "Cancel should cancel the provided context") } + finished() } func TestManager_Remove(t *testing.T) { - pm := Manager{processes: make(map[int64]*Process)} + pm := Manager{processes: make(map[IDType]*Process), next: 1} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + p1Ctx, _, finished := pm.AddContext(ctx, "foo") + defer finished() + assert.NotEmpty(t, GetContext(p1Ctx).GetPID(), "expected to have non-empty PID") - pid1 := pm.Add("foo", nil) - assert.Equal(t, int64(1), pid1, "expected to get pid 1 got %d", pid1) + p2Ctx, _, finished := pm.AddContext(p1Ctx, "bar") + defer finished() - pid2 := pm.Add("bar", nil) - assert.Equal(t, int64(2), pid2, "expected to get pid 2 got %d", pid2) + assert.NotEqual(t, GetContext(p1Ctx).GetPID(), GetContext(p2Ctx).GetPID(), "expected to get different pids got %s == %s", GetContext(p2Ctx).GetPID(), GetContext(p1Ctx).GetPID()) - pm.Remove(pid2) + pm.Remove(GetPID(p2Ctx)) - _, exists := pm.processes[pid2] - assert.False(t, exists, "PID %d is in the list but shouldn't", pid2) + _, exists := pm.processes[GetPID(p2Ctx)] + assert.False(t, exists, "PID %d is in the list but shouldn't", GetPID(p2Ctx)) } func TestExecTimeoutNever(t *testing.T) { diff --git a/modules/process/process.go b/modules/process/process.go new file mode 100644 index 0000000000..662f878d7f --- /dev/null +++ b/modules/process/process.go @@ -0,0 +1,66 @@ +// Copyright 2021 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package process + +import ( + "context" + "sync" + "time" +) + +// Process represents a working process inheriting from Gitea. +type Process struct { + PID IDType // Process ID, not system one. + ParentPID IDType + Description string + Start time.Time + Cancel context.CancelFunc + + lock sync.Mutex + children []*Process +} + +// Children gets the children of the process +// Note: this function will behave nicely even if p is nil +func (p *Process) Children() (children []*Process) { + if p == nil { + return + } + + p.lock.Lock() + defer p.lock.Unlock() + children = make([]*Process, len(p.children)) + copy(children, p.children) + return children +} + +// AddChild adds a child process +// Note: this function will behave nicely even if p is nil +func (p *Process) AddChild(child *Process) { + if p == nil { + return + } + + p.lock.Lock() + defer p.lock.Unlock() + p.children = append(p.children, child) +} + +// RemoveChild removes a child process +// Note: this function will behave nicely even if p is nil +func (p *Process) RemoveChild(process *Process) { + if p == nil { + return + } + + p.lock.Lock() + defer p.lock.Unlock() + for i, child := range p.children { + if child == process { + p.children = append(p.children[:i], p.children[i+1:]...) + return + } + } +} |