diff --git a/pkg/cluster/master.go b/pkg/cluster/master.go index 3d6fff14..7307a6ce 100644 --- a/pkg/cluster/master.go +++ b/pkg/cluster/master.go @@ -102,16 +102,19 @@ func (node *MasterNode) Kill() { // GetAria2Instance 获取主机Aria2实例 func (node *MasterNode) GetAria2Instance() common.Aria2 { node.lock.RLock() - defer node.lock.RUnlock() if !node.Model.Aria2Enabled { + node.lock.RUnlock() return &common.DummyAria2{} } if !node.aria2RPC.Initialized { + node.lock.RUnlock() + node.aria2RPC.Init() return &common.DummyAria2{} } + defer node.lock.RUnlock() return &node.aria2RPC } diff --git a/pkg/filesystem/driver/handler.go b/pkg/filesystem/driver/handler.go new file mode 100644 index 00000000..758d3861 --- /dev/null +++ b/pkg/filesystem/driver/handler.go @@ -0,0 +1,39 @@ +package driver + +import ( + "context" + "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/response" + "github.com/cloudreve/Cloudreve/v3/pkg/serializer" + "io" + "net/url" +) + +// Handler 存储策略适配器 +type Handler interface { + // 上传文件, dst为文件存储路径,size 为文件大小。上下文关闭 + // 时,应取消上传并清理临时文件 + Put(ctx context.Context, file io.ReadCloser, dst string, size uint64) error + + // 删除一个或多个给定路径的文件,返回删除失败的文件路径列表及错误 + Delete(ctx context.Context, files []string) ([]string, error) + + // 获取文件内容 + Get(ctx context.Context, path string) (response.RSCloser, error) + + // 获取缩略图,可直接在ContentResponse中返回文件数据流,也可指 + // 定为重定向 + Thumb(ctx context.Context, path string) (*response.ContentResponse, error) + + // 获取外链/下载地址, + // url - 站点本身地址, + // isDownload - 是否直接下载 + Source(ctx context.Context, path string, url url.URL, ttl int64, isDownload bool, speed int) (string, error) + + // Token 获取有效期为ttl的上传凭证和签名,同时回调会话有效期为sessionTTL + Token(ctx context.Context, ttl int64, callbackKey string) (serializer.UploadCredential, error) + + // List 递归列取远程端path路径下文件、目录,不包含path本身, + // 返回的对象路径以path作为起始根目录. + // recursive - 是否递归列出 + List(ctx context.Context, path string, recursive bool) ([]response.Object, error) +} diff --git a/pkg/filesystem/driver/shadow/slave/errors.go b/pkg/filesystem/driver/shadow/slave/errors.go new file mode 100644 index 00000000..4a42c0f6 --- /dev/null +++ b/pkg/filesystem/driver/shadow/slave/errors.go @@ -0,0 +1,7 @@ +package slave + +import "errors" + +var ( + ErrNotImplemented = errors.New("This method of shadowed policy is not implemented") +) diff --git a/pkg/filesystem/driver/shadow/slave/handler.go b/pkg/filesystem/driver/shadow/slave/handler.go new file mode 100644 index 00000000..cd3596f7 --- /dev/null +++ b/pkg/filesystem/driver/shadow/slave/handler.go @@ -0,0 +1,57 @@ +package slave + +import ( + "context" + model "github.com/cloudreve/Cloudreve/v3/models" + "github.com/cloudreve/Cloudreve/v3/pkg/cluster" + "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver" + "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/response" + "github.com/cloudreve/Cloudreve/v3/pkg/serializer" + "io" + "net/url" +) + +// Driver 影子存储策略,将上传任务指派给从机节点处理,并等待从机通知上传结果 +type Driver struct { + node cluster.Node + handler driver.Handler + policy *model.Policy +} + +// NewDriver 返回新的从机指派处理器 +func NewDriver(node cluster.Node, handler driver.Handler, policy *model.Policy) driver.Handler { + return &Driver{ + node: node, + handler: handler, + policy: policy, + } +} + +func (d Driver) Put(ctx context.Context, file io.ReadCloser, dst string, size uint64) error { + + panic("implement me") +} + +func (d Driver) Delete(ctx context.Context, files []string) ([]string, error) { + panic("implement me") +} + +func (d Driver) Get(ctx context.Context, path string) (response.RSCloser, error) { + panic("implement me") +} + +func (d Driver) Thumb(ctx context.Context, path string) (*response.ContentResponse, error) { + panic("implement me") +} + +func (d Driver) Source(ctx context.Context, path string, url url.URL, ttl int64, isDownload bool, speed int) (string, error) { + panic("implement me") +} + +func (d Driver) Token(ctx context.Context, ttl int64, callbackKey string) (serializer.UploadCredential, error) { + panic("implement me") +} + +func (d Driver) List(ctx context.Context, path string, recursive bool) ([]response.Object, error) { + panic("implement me") +} diff --git a/pkg/filesystem/filesystem.go b/pkg/filesystem/filesystem.go index 31dd2c46..52871fb9 100644 --- a/pkg/filesystem/filesystem.go +++ b/pkg/filesystem/filesystem.go @@ -1,9 +1,10 @@ package filesystem import ( - "context" "errors" "github.com/cloudreve/Cloudreve/v3/pkg/cluster" + "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver" + "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver/shadow/slave" "io" "net/http" "net/url" @@ -20,7 +21,6 @@ import ( "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver/remote" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver/s3" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver/upyun" - "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/response" "github.com/cloudreve/Cloudreve/v3/pkg/request" "github.com/cloudreve/Cloudreve/v3/pkg/serializer" "github.com/gin-gonic/gin" @@ -44,36 +44,6 @@ type FileHeader interface { GetVirtualPath() string } -// Handler 存储策略适配器 -type Handler interface { - // 上传文件, dst为文件存储路径,size 为文件大小。上下文关闭 - // 时,应取消上传并清理临时文件 - Put(ctx context.Context, file io.ReadCloser, dst string, size uint64) error - - // 删除一个或多个给定路径的文件,返回删除失败的文件路径列表及错误 - Delete(ctx context.Context, files []string) ([]string, error) - - // 获取文件内容 - Get(ctx context.Context, path string) (response.RSCloser, error) - - // 获取缩略图,可直接在ContentResponse中返回文件数据流,也可指 - // 定为重定向 - Thumb(ctx context.Context, path string) (*response.ContentResponse, error) - - // 获取外链/下载地址, - // url - 站点本身地址, - // isDownload - 是否直接下载 - Source(ctx context.Context, path string, url url.URL, ttl int64, isDownload bool, speed int) (string, error) - - // Token 获取有效期为ttl的上传凭证和签名,同时回调会话有效期为sessionTTL - Token(ctx context.Context, ttl int64, callbackKey string) (serializer.UploadCredential, error) - - // List 递归列取远程端path路径下文件、目录,不包含path本身, - // 返回的对象路径以path作为起始根目录. - // recursive - 是否递归列出 - List(ctx context.Context, path string, recursive bool) ([]response.Object, error) -} - // FileSystem 管理文件的文件系统 type FileSystem struct { // 文件系统所有者 @@ -97,7 +67,7 @@ type FileSystem struct { /* 文件系统处理适配器 */ - Handler Handler + Handler driver.Handler // 回收锁 recycleLock sync.Mutex @@ -273,7 +243,7 @@ func NewFileSystemFromCallback(c *gin.Context) (*FileSystem, error) { // SwitchToSlaveHandler 将负责上传的 Handler 切换为从机节点 func (fs *FileSystem) SwitchToSlaveHandler(node cluster.Node) { - + fs.Handler = slave.NewDriver(node, fs.Handler, &fs.User.Policy) } // SetTargetFile 设置当前处理的目标文件 diff --git a/pkg/filesystem/fsctx/context.go b/pkg/filesystem/fsctx/context.go index 28a2653f..d2806381 100644 --- a/pkg/filesystem/fsctx/context.go +++ b/pkg/filesystem/fsctx/context.go @@ -41,4 +41,6 @@ const ( ValidateCapacityOnceCtx // 禁止上传时同名覆盖操作 DisableOverwrite + // 文件在从机节点中的路径 + SlaveSrcPath ) diff --git a/pkg/task/tranfer.go b/pkg/task/tranfer.go index 331d86fd..2c7de05b 100644 --- a/pkg/task/tranfer.go +++ b/pkg/task/tranfer.go @@ -108,6 +108,7 @@ func (job *TransferTask) Do() { } ctx := context.WithValue(context.Background(), fsctx.DisableOverwrite, true) + ctx = context.WithValue(ctx, fsctx.SlaveSrcPath, file) if job.TaskProps.NodeID > 1 { // 指定为从机中转 @@ -121,6 +122,7 @@ func (job *TransferTask) Do() { fs.SwitchToSlaveHandler(node) err = fs.UploadFromStream(ctx, nil, dst, job.TaskProps.SrcSizes[file]) } else { + // 主机节点中转 err = fs.UploadFromPath(ctx, file, dst) }