From 285611baf74c43c8f4b2bc193b2d23e9c4644b7c Mon Sep 17 00:00:00 2001 From: HFO4 <912394456@qq.com> Date: Mon, 28 Feb 2022 17:47:57 +0800 Subject: [PATCH] Feat: truncate file if uploaded chunk is overlapped --- pkg/auth/auth.go | 4 +-- pkg/filesystem/driver/local/handler.go | 14 +++++++++- pkg/filesystem/driver/remote/client.go | 34 ++++++++++++++++++++++--- pkg/filesystem/driver/remote/handler.go | 30 +++++++++------------- pkg/filesystem/hooks.go | 4 +-- pkg/serializer/upload.go | 8 +++--- routers/router.go | 2 +- service/explorer/upload.go | 7 +++-- 8 files changed, 69 insertions(+), 34 deletions(-) diff --git a/pkg/auth/auth.go b/pkg/auth/auth.go index 094a37f..84b5a5f 100644 --- a/pkg/auth/auth.go +++ b/pkg/auth/auth.go @@ -64,12 +64,12 @@ func CheckRequest(instance Auth, r *http.Request) error { return instance.Check(getSignContent(r), sign[0]) } -// getSignContent 签名请求 path、正文、以`X-`开头的 Header. 如果 Header 中包含 `X-Policy`, +// getSignContent 签名请求 path、正文、以`X-`开头的 Header. 如果请求 path 为从机上传 API, // 则不对正文签名。返回待签名/验证的字符串 func getSignContent(r *http.Request) (rawSignString string) { // 读取所有body正文 var body = []byte{} - if _, ok := r.Header["X-Cr-Policy"]; !ok { + if strings.Contains(r.URL.Path, "/api/v3/slave/upload/") { if r.Body != nil { body, _ = ioutil.ReadAll(r.Body) _ = r.Body.Close() diff --git a/pkg/filesystem/driver/local/handler.go b/pkg/filesystem/driver/local/handler.go index b2b621a..e91f838 100644 --- a/pkg/filesystem/driver/local/handler.go +++ b/pkg/filesystem/driver/local/handler.go @@ -136,8 +136,20 @@ func (handler Driver) Put(ctx context.Context, file fsctx.FileHeader) error { return err } - if uint64(stat.Size()) != fileInfo.AppendStart { + if uint64(stat.Size()) < fileInfo.AppendStart { return errors.New("未上传完成的文件分片与预期大小不一致") + } else if uint64(stat.Size()) > fileInfo.AppendStart { + out.Close() + if err := handler.Truncate(ctx, dst, fileInfo.AppendStart); err != nil { + return fmt.Errorf("覆盖分片时发生错误: %w", err) + } + + out, err = os.OpenFile(dst, os.O_APPEND|os.O_CREATE|os.O_WRONLY, Perm) + defer out.Close() + if err != nil { + util.Log().Warning("无法打开或创建文件,%s", err) + return err + } } } diff --git a/pkg/filesystem/driver/remote/client.go b/pkg/filesystem/driver/remote/client.go index b2f7443..feadc00 100644 --- a/pkg/filesystem/driver/remote/client.go +++ b/pkg/filesystem/driver/remote/client.go @@ -1,18 +1,26 @@ package remote import ( + "context" "encoding/json" model "github.com/cloudreve/Cloudreve/v3/models" "github.com/cloudreve/Cloudreve/v3/pkg/auth" "github.com/cloudreve/Cloudreve/v3/pkg/request" "github.com/cloudreve/Cloudreve/v3/pkg/serializer" + "net/http" "net/url" + "path" "strings" ) +const ( + basePath = "/api/v3/slave" +) + // Client to operate remote slave server type Client interface { - CreateUploadSession(session *serializer.UploadSession, ttl int64) error + CreateUploadSession(ctx context.Context, session *serializer.UploadSession, ttl int64) error + GetUploadURL(ttl int64, sessionID string) (string, string, error) } // NewClient creates new Client from given policy @@ -23,7 +31,7 @@ func NewClient(policy *model.Policy) (Client, error) { return nil, err } - base, _ := url.Parse("/api/v3/slave") + base, _ := url.Parse(basePath) signTTL := model.GetIntSetting("slave_api_timeout", 60) return &remoteClient{ @@ -43,7 +51,7 @@ type remoteClient struct { httpClient request.Client } -func (c *remoteClient) CreateUploadSession(session *serializer.UploadSession, ttl int64) error { +func (c *remoteClient) CreateUploadSession(ctx context.Context, session *serializer.UploadSession, ttl int64) error { reqBodyEncoded, err := json.Marshal(map[string]interface{}{ "session": session, "ttl": ttl, @@ -57,6 +65,7 @@ func (c *remoteClient) CreateUploadSession(session *serializer.UploadSession, tt "PUT", "upload", bodyReader, + request.WithContext(ctx), ).CheckHTTPResponse(200).DecodeResponse() if err != nil { return err @@ -68,3 +77,22 @@ func (c *remoteClient) CreateUploadSession(session *serializer.UploadSession, tt return nil } + +func (c *remoteClient) GetUploadURL(ttl int64, sessionID string) (string, string, error) { + base, err := url.Parse(c.policy.BaseURL) + if err != nil { + return "", "", err + } + + base.Path = path.Join(base.Path, "upload", sessionID) + + req, err := http.NewRequest("POST", base.String(), nil) + if err != nil { + return "", "", err + } + + req.Header["X-Cr-Overwrite"] = []string{"true"} + + req = auth.SignRequest(c.authInstance, req, ttl) + return req.URL.String(), req.Header["Authorization"][0], nil +} diff --git a/pkg/filesystem/driver/remote/handler.go b/pkg/filesystem/driver/remote/handler.go index 01cf6fc..c68e8bd 100644 --- a/pkg/filesystem/driver/remote/handler.go +++ b/pkg/filesystem/driver/remote/handler.go @@ -323,29 +323,23 @@ func (handler Driver) Source( // Token 获取上传策略和认证Token func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (*serializer.UploadCredential, error) { - if err := handler.client.CreateUploadSession(uploadSession, ttl); err != nil { + // 在从机端创建上传会话 + if err := handler.client.CreateUploadSession(ctx, uploadSession, ttl); err != nil { + return nil, err + } + + // 获取上传地址 + uploadURL, sign, err := handler.client.GetUploadURL(ttl, uploadSession.Key) + if err != nil { return nil, err } return &serializer.UploadCredential{ - SessionID: uploadSession.Key, - ChunkSize: handler.Policy.OptionsSerialized.ChunkSize, + SessionID: uploadSession.Key, + ChunkSize: handler.Policy.OptionsSerialized.ChunkSize, + UploadURLs: []string{uploadURL}, + Credential: sign, }, nil - //// 生成回调地址 - //siteURL := model.GetSiteURL() - //apiBaseURI, _ := url.Parse("/api/v3/callback/remote/" + uploadSession.Key) - //apiURL := siteURL.ResolveReference(apiBaseURI) - // - //// 生成上传策略 - //policy := serializer.UploadPolicy{ - // SavePath: handler.Policy.DirNameRule, - // FileName: handler.Policy.FileNameRule, - // AutoRename: handler.Policy.AutoRename, - // MaxSize: handler.Policy.MaxSize, - // AllowedExtension: handler.Policy.OptionsSerialized.FileType, - // CallbackURL: apiURL.String(), - //} - //return handler.getUploadCredential(ctx, policy, ttl) } func (handler Driver) getUploadCredential(ctx context.Context, policy serializer.UploadPolicy, TTL int64) (serializer.UploadCredential, error) { diff --git a/pkg/filesystem/hooks.go b/pkg/filesystem/hooks.go index 8a8951a..f07b370 100644 --- a/pkg/filesystem/hooks.go +++ b/pkg/filesystem/hooks.go @@ -301,7 +301,7 @@ func HookChunkUploaded(ctx context.Context, fs *FileSystem, fileHeader fsctx.Fil fileInfo := fileHeader.Info() // 更新文件大小 - return fileInfo.Model.(*model.File).UpdateSize(fileInfo.Model.(*model.File).GetSize() + fileInfo.Size) + return fileInfo.Model.(*model.File).UpdateSize(fileInfo.AppendStart + fileInfo.Size) } // HookChunkUploadFailed 单个分片上传失败后 @@ -309,7 +309,7 @@ func HookChunkUploadFailed(ctx context.Context, fs *FileSystem, fileHeader fsctx fileInfo := fileHeader.Info() // 更新文件大小 - return fileInfo.Model.(*model.File).UpdateSize(fileInfo.Model.(*model.File).GetSize() - fileInfo.Size) + return fileInfo.Model.(*model.File).UpdateSize(fileInfo.AppendStart) } // HookChunkUploadFinished 分片上传结束后处理文件 diff --git a/pkg/serializer/upload.go b/pkg/serializer/upload.go index e13a5fe..b1f78a1 100644 --- a/pkg/serializer/upload.go +++ b/pkg/serializer/upload.go @@ -20,9 +20,11 @@ type UploadPolicy struct { // UploadCredential 返回给客户端的上传凭证 type UploadCredential struct { - SessionID string `json:"sessionID"` - ChunkSize uint64 `json:"chunkSize"` // 分块大小,0 为部分快 - Expires int64 `json:"expires"` // 上传凭证过期时间, Unix 时间戳 + SessionID string `json:"sessionID"` + ChunkSize uint64 `json:"chunkSize"` // 分块大小,0 为部分快 + Expires int64 `json:"expires"` // 上传凭证过期时间, Unix 时间戳 + UploadURLs []string `json:"uploadURLs"` + Credential string `json:"credential"` Token string `json:"token"` Policy string `json:"policy"` diff --git a/routers/router.go b/routers/router.go index 9008746..2b9052d 100644 --- a/routers/router.go +++ b/routers/router.go @@ -46,7 +46,7 @@ func InitSlaveRouter() *gin.Engine { // 接收主机心跳包 v3.POST("heartbeat", controllers.SlaveHeartbeat) // 上传 - v3.POST("upload", controllers.SlaveUpload) + v3.POST("upload/:sessionId", controllers.SlaveUpload) // 创建上传会话上传 v3.PUT("upload", controllers.SlaveGetUploadSession) // 下载 diff --git a/service/explorer/upload.go b/service/explorer/upload.go index 2a044b0..15cf426 100644 --- a/service/explorer/upload.go +++ b/service/explorer/upload.go @@ -66,10 +66,10 @@ func (service *CreateUploadSessionService) Create(ctx context.Context, c *gin.Co } } -// UploadService 本机策略上传服务 +// UploadService 本机及从机策略上传服务 type UploadService struct { ID string `uri:"sessionId" binding:"required"` - Index int `uri:"index" binding:"min=0"` + Index int `uri:"index" form:"index" binding:"min=0"` } // Upload 处理本机文件分片上传 @@ -117,8 +117,7 @@ func (service *UploadService) Upload(ctx context.Context, c *gin.Context) serial } if expectedSizeStart > actualSizeStart { - util.Log().Warning("尝试上传覆盖分片[%d],数据将被忽略", service.Index) - return serializer.Response{} + util.Log().Info("尝试上传覆盖分片[%d] Start=%d", service.Index, actualSizeStart) } return processChunkUpload(ctx, c, fs, &uploadSession, service.Index, file)