From 99953825ff2616f4c62df81da33589949040f071 Mon Sep 17 00:00:00 2001 From: HFO4 <912394456@qq.com> Date: Sun, 29 Aug 2021 20:32:03 +0800 Subject: [PATCH] Feat: prototype for slave driven filesystem --- pkg/aria2/monitor/monitor.go | 11 +++++++++-- pkg/filesystem/filesystem.go | 8 ++++++-- pkg/task/job.go | 4 +++- pkg/task/tranfer.go | 31 ++++++++++++++++++++++++++----- 4 files changed, 44 insertions(+), 10 deletions(-) diff --git a/pkg/aria2/monitor/monitor.go b/pkg/aria2/monitor/monitor.go index 92336de6..08d3d9ff 100644 --- a/pkg/aria2/monitor/monitor.go +++ b/pkg/aria2/monitor/monitor.go @@ -239,17 +239,24 @@ func (monitor *Monitor) RemoveTempFolder() { func (monitor *Monitor) Complete(status rpc.StatusInfo) bool { // 创建中转任务 file := make([]string, 0, len(monitor.Task.StatusInfo.Files)) + sizes := make(map[string]uint64, len(monitor.Task.StatusInfo.Files)) for i := 0; i < len(monitor.Task.StatusInfo.Files); i++ { - if monitor.Task.StatusInfo.Files[i].Selected == "true" { - file = append(file, monitor.Task.StatusInfo.Files[i].Path) + fileInfo := monitor.Task.StatusInfo.Files[i] + if fileInfo.Selected == "true" { + file = append(file, fileInfo.Path) + size, _ := strconv.ParseUint(fileInfo.Length, 10, 64) + sizes[fileInfo.Path] = size } } + job, err := task.NewTransferTask( monitor.Task.UserID, file, monitor.Task.Dst, monitor.Task.Parent, true, + monitor.node.ID(), + sizes, ) if err != nil { monitor.setErrorStatus(err) diff --git a/pkg/filesystem/filesystem.go b/pkg/filesystem/filesystem.go index af62983a..31dd2c46 100644 --- a/pkg/filesystem/filesystem.go +++ b/pkg/filesystem/filesystem.go @@ -3,6 +3,7 @@ package filesystem import ( "context" "errors" + "github.com/cloudreve/Cloudreve/v3/pkg/cluster" "io" "net/http" "net/url" @@ -134,7 +135,6 @@ func NewFileSystem(user *model.User) (*FileSystem, error) { // 分配存储策略适配器 err := fs.DispatchHandler() - // TODO 分配默认钩子 return fs, err } @@ -159,7 +159,6 @@ func NewAnonymousFileSystem() (*FileSystem, error) { } // DispatchHandler 根据存储策略分配文件适配器 -// TODO 完善测试 func (fs *FileSystem) DispatchHandler() error { var policyType string var currentPolicy *model.Policy @@ -272,6 +271,11 @@ func NewFileSystemFromCallback(c *gin.Context) (*FileSystem, error) { return fs, err } +// SwitchToSlaveHandler 将负责上传的 Handler 切换为从机节点 +func (fs *FileSystem) SwitchToSlaveHandler(node cluster.Node) { + +} + // SetTargetFile 设置当前处理的目标文件 func (fs *FileSystem) SetTargetFile(files *[]model.File) { if len(fs.FileTarget) == 0 { diff --git a/pkg/task/job.go b/pkg/task/job.go index 22adc791..064b078a 100644 --- a/pkg/task/job.go +++ b/pkg/task/job.go @@ -96,7 +96,9 @@ func Resume() { continue } - TaskPoll.Submit(job) + if job != nil { + TaskPoll.Submit(job) + } } } diff --git a/pkg/task/tranfer.go b/pkg/task/tranfer.go index 8cdc2474..331d86fd 100644 --- a/pkg/task/tranfer.go +++ b/pkg/task/tranfer.go @@ -9,6 +9,7 @@ import ( "strings" model "github.com/cloudreve/Cloudreve/v3/models" + "github.com/cloudreve/Cloudreve/v3/pkg/cluster" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx" "github.com/cloudreve/Cloudreve/v3/pkg/util" @@ -26,11 +27,14 @@ type TransferTask struct { // TransferProps 中转任务属性 type TransferProps struct { - Src []string `json:"src"` // 原始文件 - Parent string `json:"parent"` // 父目录 - Dst string `json:"dst"` // 目的目录ID + Src []string `json:"src"` // 原始文件 + SrcSizes map[string]uint64 `json:"src_size"` // 原始文件的大小信息,从机转存时使用 + Parent string `json:"parent"` // 父目录 + Dst string `json:"dst"` // 目的目录ID // 将会保留原始文件的目录结构,Src 除去 Parent 开头作为最终路径 TrimPath bool `json:"trim_path"` + // 负责处理中专任务的节点ID + NodeID uint `json:"node_id"` } // Props 获取任务属性 @@ -104,7 +108,22 @@ func (job *TransferTask) Do() { } ctx := context.WithValue(context.Background(), fsctx.DisableOverwrite, true) - err = fs.UploadFromPath(ctx, file, dst) + if job.TaskProps.NodeID > 1 { + // 指定为从机中转 + + // 获取从机节点 + node := cluster.Default.GetNodeByID(job.TaskProps.NodeID) + if node == nil { + job.SetErrorMsg("从机节点不可用", nil) + } + + // 切换为从机节点处理上传 + fs.SwitchToSlaveHandler(node) + err = fs.UploadFromStream(ctx, nil, dst, job.TaskProps.SrcSizes[file]) + } else { + err = fs.UploadFromPath(ctx, file, dst) + } + if err != nil { job.SetErrorMsg("文件转存失败", err) } @@ -122,7 +141,7 @@ func (job *TransferTask) Recycle() { } // NewTransferTask 新建中转任务 -func NewTransferTask(user uint, src []string, dst, parent string, trim bool) (Job, error) { +func NewTransferTask(user uint, src []string, dst, parent string, trim bool, node uint, sizes map[string]uint64) (Job, error) { creator, err := model.GetActiveUserByID(user) if err != nil { return nil, err @@ -135,6 +154,8 @@ func NewTransferTask(user uint, src []string, dst, parent string, trim bool) (Jo Parent: parent, Dst: dst, TrimPath: trim, + NodeID: node, + SrcSizes: sizes, }, }