Backport #25560 by @wolfogre Fix #25451. Bugfixes: - When stopping the zombie or endless tasks, set `LogInStorage` to true after transferring the file to storage. It was missing, it could write to a nonexistent file in DBFS because `LogInStorage` was false. - Always update `ActionTask.Updated` when there's a new state reported by the runner, even if there's no change. This is to avoid the task being judged as a zombie task. Enhancement: - Support `Stat()` for DBFS file. - `WriteLogs` refuses to write if it could result in content holes. Co-authored-by: Jason Song <i@wolfogre.com>tags/v1.20.0
return err | return err | ||||
} | } | ||||
// UpdateTaskByState updates the task by the state. | |||||
// It will always update the task if the state is not final, even there is no change. | |||||
// So it will update ActionTask.Updated to avoid the task being judged as a zombie task. | |||||
func UpdateTaskByState(ctx context.Context, state *runnerv1.TaskState) (*ActionTask, error) { | func UpdateTaskByState(ctx context.Context, state *runnerv1.TaskState) (*ActionTask, error) { | ||||
stepStates := map[int64]*runnerv1.StepState{} | stepStates := map[int64]*runnerv1.StepState{} | ||||
for _, v := range state.Steps { | for _, v := range state.Steps { | ||||
}, nil); err != nil { | }, nil); err != nil { | ||||
return nil, err | return nil, err | ||||
} | } | ||||
} else { | |||||
// Force update ActionTask.Updated to avoid the task being judged as a zombie task | |||||
task.Updated = timeutil.TimeStampNow() | |||||
if err := UpdateTask(ctx, task, "updated"); err != nil { | |||||
return nil, err | |||||
} | |||||
} | } | ||||
if err := task.LoadAttributes(ctx); err != nil { | if err := task.LoadAttributes(ctx); err != nil { |
"context" | "context" | ||||
"errors" | "errors" | ||||
"io" | "io" | ||||
"io/fs" | |||||
"os" | "os" | ||||
"path/filepath" | "path/filepath" | ||||
"strconv" | "strconv" | ||||
type File interface { | type File interface { | ||||
io.ReadWriteCloser | io.ReadWriteCloser | ||||
io.Seeker | io.Seeker | ||||
fs.File | |||||
} | } | ||||
type file struct { | type file struct { | ||||
return nil | return nil | ||||
} | } | ||||
func (f *file) Stat() (os.FileInfo, error) { | |||||
if f.metaID == 0 { | |||||
return nil, os.ErrInvalid | |||||
} | |||||
fileMeta, err := findFileMetaByID(f.ctx, f.metaID) | |||||
if err != nil { | |||||
return nil, err | |||||
} | |||||
return fileMeta, nil | |||||
} | |||||
func timeToFileTimestamp(t time.Time) int64 { | func timeToFileTimestamp(t time.Time) int64 { | ||||
return t.UnixMicro() | return t.UnixMicro() | ||||
} | } | ||||
func fileTimestampToTime(timestamp int64) time.Time { | |||||
return time.UnixMicro(timestamp) | |||||
} | |||||
func (f *file) loadMetaByPath() (*dbfsMeta, error) { | func (f *file) loadMetaByPath() (*dbfsMeta, error) { | ||||
var fileMeta dbfsMeta | var fileMeta dbfsMeta | ||||
if ok, err := db.GetEngine(f.ctx).Where("full_path = ?", f.fullPath).Get(&fileMeta); err != nil { | if ok, err := db.GetEngine(f.ctx).Where("full_path = ?", f.fullPath).Get(&fileMeta); err != nil { |
import ( | import ( | ||||
"context" | "context" | ||||
"io/fs" | |||||
"os" | "os" | ||||
"path" | |||||
"time" | |||||
"code.gitea.io/gitea/models/db" | "code.gitea.io/gitea/models/db" | ||||
) | ) | ||||
defer f.Close() | defer f.Close() | ||||
return f.delete() | return f.delete() | ||||
} | } | ||||
var _ fs.FileInfo = (*dbfsMeta)(nil) | |||||
func (m *dbfsMeta) Name() string { | |||||
return path.Base(m.FullPath) | |||||
} | |||||
func (m *dbfsMeta) Size() int64 { | |||||
return m.FileSize | |||||
} | |||||
func (m *dbfsMeta) Mode() fs.FileMode { | |||||
return os.ModePerm | |||||
} | |||||
func (m *dbfsMeta) ModTime() time.Time { | |||||
return fileTimestampToTime(m.ModifyTimestamp) | |||||
} | |||||
func (m *dbfsMeta) IsDir() bool { | |||||
return false | |||||
} | |||||
func (m *dbfsMeta) Sys() any { | |||||
return nil | |||||
} |
_, err = OpenFile(db.DefaultContext, "test2.txt", os.O_RDONLY) | _, err = OpenFile(db.DefaultContext, "test2.txt", os.O_RDONLY) | ||||
assert.Error(t, err) | assert.Error(t, err) | ||||
// test stat | |||||
f, err = OpenFile(db.DefaultContext, "test/test.txt", os.O_RDWR|os.O_CREATE) | |||||
assert.NoError(t, err) | |||||
stat, err := f.Stat() | |||||
assert.NoError(t, err) | |||||
assert.EqualValues(t, "test.txt", stat.Name()) | |||||
assert.EqualValues(t, 0, stat.Size()) | |||||
_, err = f.Write([]byte("0123456789")) | |||||
assert.NoError(t, err) | |||||
stat, err = f.Stat() | |||||
assert.NoError(t, err) | |||||
assert.EqualValues(t, 10, stat.Size()) | |||||
} | } | ||||
func TestDbfsReadWrite(t *testing.T) { | func TestDbfsReadWrite(t *testing.T) { |
) | ) | ||||
func WriteLogs(ctx context.Context, filename string, offset int64, rows []*runnerv1.LogRow) ([]int, error) { | func WriteLogs(ctx context.Context, filename string, offset int64, rows []*runnerv1.LogRow) ([]int, error) { | ||||
flag := os.O_WRONLY | |||||
if offset == 0 { | |||||
// Create file only if offset is 0, or it could result in content holes if the file doesn't exist. | |||||
flag |= os.O_CREATE | |||||
} | |||||
name := DBFSPrefix + filename | name := DBFSPrefix + filename | ||||
f, err := dbfs.OpenFile(ctx, name, os.O_WRONLY|os.O_CREATE) | |||||
f, err := dbfs.OpenFile(ctx, name, flag) | |||||
if err != nil { | if err != nil { | ||||
return nil, fmt.Errorf("dbfs OpenFile %q: %w", name, err) | return nil, fmt.Errorf("dbfs OpenFile %q: %w", name, err) | ||||
} | } | ||||
defer f.Close() | defer f.Close() | ||||
stat, err := f.Stat() | |||||
if err != nil { | |||||
return nil, fmt.Errorf("dbfs Stat %q: %w", name, err) | |||||
} | |||||
if stat.Size() < offset { | |||||
// If the size is less than offset, refuse to write, or it could result in content holes. | |||||
// However, if the size is greater than offset, we can still write to overwrite the content. | |||||
return nil, fmt.Errorf("size of %q is less than offset", name) | |||||
} | |||||
if _, err := f.Seek(offset, io.SeekStart); err != nil { | if _, err := f.Seek(offset, io.SeekStart); err != nil { | ||||
return nil, fmt.Errorf("dbfs Seek %q: %w", name, err) | return nil, fmt.Errorf("dbfs Seek %q: %w", name, err) | ||||
} | } |
return nil | return nil | ||||
}); err != nil { | }); err != nil { | ||||
log.Warn("Cannot stop task %v: %v", task.ID, err) | log.Warn("Cannot stop task %v: %v", task.ID, err) | ||||
// go on | |||||
} else if remove, err := actions.TransferLogs(ctx, task.LogFilename); err != nil { | |||||
continue | |||||
} | |||||
remove, err := actions.TransferLogs(ctx, task.LogFilename) | |||||
if err != nil { | |||||
log.Warn("Cannot transfer logs of task %v: %v", task.ID, err) | log.Warn("Cannot transfer logs of task %v: %v", task.ID, err) | ||||
} else { | |||||
remove() | |||||
continue | |||||
} | |||||
task.LogInStorage = true | |||||
if err := actions_model.UpdateTask(ctx, task, "log_in_storage"); err != nil { | |||||
log.Warn("Cannot update task %v: %v", task.ID, err) | |||||
continue | |||||
} | } | ||||
remove() | |||||
} | } | ||||
CreateCommitStatus(ctx, jobs...) | CreateCommitStatus(ctx, jobs...) |