diff --git a/pkg/filesystem/chunk/backoff/backoff.go b/pkg/filesystem/chunk/backoff/backoff.go index 0b395f1..d15b975 100644 --- a/pkg/filesystem/chunk/backoff/backoff.go +++ b/pkg/filesystem/chunk/backoff/backoff.go @@ -18,7 +18,7 @@ type ConstantBackoff struct { func (c *ConstantBackoff) Next() bool { c.tried++ - if c.tried >= c.Max { + if c.tried > c.Max { return false } diff --git a/pkg/filesystem/chunk/chunk.go b/pkg/filesystem/chunk/chunk.go index dc98fb4..e8a63f7 100644 --- a/pkg/filesystem/chunk/chunk.go +++ b/pkg/filesystem/chunk/chunk.go @@ -49,7 +49,7 @@ func (c *ChunkGroup) Process(processor ChunkProcessFunc) error { err := processor(c, io.LimitReader(c.file, int64(c.chunkSize))) if err != nil { if err != context.Canceled && c.file.Seekable() && c.backoff.Next() { - if _, seekErr := c.file.Seek(c.Start(), io.SeekStart); err != nil { + if _, seekErr := c.file.Seek(c.Start(), io.SeekStart); seekErr != nil { return fmt.Errorf("failed to seek back to chunk start: %w, last error: %w", seekErr, err) } diff --git a/pkg/filesystem/driver/remote/client.go b/pkg/filesystem/driver/remote/client.go index f052337..a0a5642 100644 --- a/pkg/filesystem/driver/remote/client.go +++ b/pkg/filesystem/driver/remote/client.go @@ -11,6 +11,7 @@ import ( "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx" "github.com/cloudreve/Cloudreve/v3/pkg/request" "github.com/cloudreve/Cloudreve/v3/pkg/serializer" + "github.com/cloudreve/Cloudreve/v3/pkg/util" "github.com/gofrs/uuid" "io" "net/http" @@ -26,7 +27,7 @@ const ( chunkRetrySleep = time.Duration(5) * time.Second ) -// Client to operate remote slave server +// Client to operate uploading to remote slave server type Client interface { // CreateUploadSession creates remote upload session CreateUploadSession(ctx context.Context, session *serializer.UploadSession, ttl int64) error @@ -34,6 +35,8 @@ type Client interface { GetUploadURL(ttl int64, sessionID string) (string, string, error) // Upload uploads file to remote server Upload(ctx context.Context, file fsctx.FileHeader) error + // DeleteUploadSession deletes remote upload session + DeleteUploadSession(ctx context.Context, sessionID string) error } // NewClient creates new Client from given policy @@ -84,7 +87,7 @@ func (c *remoteClient) Upload(ctx context.Context, file fsctx.FileHeader) error overwrite := fileInfo.Mode&fsctx.Overwrite == fsctx.Overwrite - // Upload chunks + // Initial chunk groups chunks := chunk.NewChunkGroup(file, c.policy.OptionsSerialized.ChunkSize, &backoff.ConstantBackoff{ Max: model.GetIntSetting("onedrive_chunk_retries", 1), Sleep: chunkRetrySleep, @@ -94,9 +97,13 @@ func (c *remoteClient) Upload(ctx context.Context, file fsctx.FileHeader) error return c.uploadChunk(ctx, session.Key, current.Index(), content, overwrite, current.Length()) } + // upload chunks for chunks.Next() { if err := chunks.Process(uploadFunc); err != nil { - // TODO 删除上传会话 + if err := c.DeleteUploadSession(ctx, session.Key); err != nil { + util.Log().Warning("failed to delete upload session: %s", err) + } + return fmt.Errorf("failed to upload chunk #%d: %w", chunks.Index(), err) } } @@ -104,6 +111,24 @@ func (c *remoteClient) Upload(ctx context.Context, file fsctx.FileHeader) error return nil } +func (c *remoteClient) DeleteUploadSession(ctx context.Context, sessionID string) error { + resp, err := c.httpClient.Request( + "DELETE", + "upload/"+sessionID, + nil, + request.WithContext(ctx), + ).CheckHTTPResponse(200).DecodeResponse() + if err != nil { + return err + } + + if resp.Code != 0 { + return serializer.NewErrorFromResponse(resp) + } + + return nil +} + func (c *remoteClient) CreateUploadSession(ctx context.Context, session *serializer.UploadSession, ttl int64) error { reqBodyEncoded, err := json.Marshal(map[string]interface{}{ "session": session, diff --git a/pkg/filesystem/driver/remote/handler.go b/pkg/filesystem/driver/remote/handler.go index 6fe1c46..8837439 100644 --- a/pkg/filesystem/driver/remote/handler.go +++ b/pkg/filesystem/driver/remote/handler.go @@ -6,7 +6,6 @@ import ( "encoding/json" "errors" "fmt" - "net/http" "net/url" "path" "strings" @@ -26,10 +25,11 @@ type Driver struct { Policy *model.Policy AuthInstance auth.Auth - client Client + uploadClient Client } // NewDriver initializes a new Driver from policy +// TODO: refactor all method into upload client func NewDriver(policy *model.Policy) (*Driver, error) { client, err := NewClient(policy) if err != nil { @@ -40,7 +40,7 @@ func NewDriver(policy *model.Policy) (*Driver, error) { Policy: policy, Client: request.NewClient(), AuthInstance: auth.HMACAuth{[]byte(policy.SecretKey)}, - client: client, + uploadClient: client, }, nil } @@ -153,7 +153,7 @@ func (handler *Driver) Get(ctx context.Context, path string) (response.RSCloser, func (handler *Driver) Put(ctx context.Context, file fsctx.FileHeader) error { defer file.Close() - return handler.client.Upload(ctx, file) + return handler.uploadClient.Upload(ctx, file) } // Delete 删除一个或多个文件, @@ -281,12 +281,12 @@ func (handler *Driver) Token(ctx context.Context, ttl int64, uploadSession *seri // 在从机端创建上传会话 uploadSession.Callback = apiURL.String() - if err := handler.client.CreateUploadSession(ctx, uploadSession, ttl); err != nil { + if err := handler.uploadClient.CreateUploadSession(ctx, uploadSession, ttl); err != nil { return nil, err } // 获取上传地址 - uploadURL, sign, err := handler.client.GetUploadURL(ttl, uploadSession.Key) + uploadURL, sign, err := handler.uploadClient.GetUploadURL(ttl, uploadSession.Key) if err != nil { return nil, fmt.Errorf("failed to sign upload url: %w", err) } @@ -299,30 +299,7 @@ func (handler *Driver) Token(ctx context.Context, ttl int64, uploadSession *seri }, nil } -func (handler *Driver) getUploadCredential(ctx context.Context, policy serializer.UploadPolicy, TTL int64) (serializer.UploadCredential, error) { - policyEncoded, err := policy.EncodeUploadPolicy() - if err != nil { - return serializer.UploadCredential{}, err - } - - // 签名上传策略 - uploadRequest, _ := http.NewRequest("POST", "/api/v3/slave/upload", nil) - uploadRequest.Header = map[string][]string{ - "X-Cr-Policy": {policyEncoded}, - "X-Cr-Overwrite": {"false"}, - } - auth.SignRequest(handler.AuthInstance, uploadRequest, TTL) - - if credential, ok := uploadRequest.Header["Authorization"]; ok && len(credential) == 1 { - return serializer.UploadCredential{ - Token: credential[0], - Policy: policyEncoded, - }, nil - } - return serializer.UploadCredential{}, errors.New("无法签名上传策略") -} - // 取消上传凭证 func (handler *Driver) CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error { - return nil + return handler.uploadClient.DeleteUploadSession(ctx, uploadSession.Key) } diff --git a/routers/controllers/slave.go b/routers/controllers/slave.go index 0b18f91..5a0d277 100644 --- a/routers/controllers/slave.go +++ b/routers/controllers/slave.go @@ -28,77 +28,6 @@ func SlaveUpload(c *gin.Context) { } else { c.JSON(200, ErrorResponse(err)) } - - //// 创建上下文 - //ctx, cancel := context.WithCancel(context.Background()) - //ctx = context.WithValue(ctx, fsctx.GinCtx, c) - //defer cancel() - // - //// 创建匿名文件系统 - //fs, err := filesystem.NewAnonymousFileSystem() - //if err != nil { - // c.JSON(200, serializer.Err(serializer.CodePolicyNotAllowed, err.Error(), err)) - // return - //} - //fs.Handler = local.Driver{} - // - //// 从请求中取得上传策略 - //uploadPolicyRaw := c.GetHeader("X-Cr-Policy") - //if uploadPolicyRaw == "" { - // c.JSON(200, serializer.ParamErr("未指定上传策略", nil)) - // return - //} - // - //// 解析上传策略 - //uploadPolicy, err := serializer.DecodeUploadPolicy(uploadPolicyRaw) - //if err != nil { - // c.JSON(200, serializer.ParamErr("上传策略格式有误", err)) - // return - //} - //ctx = context.WithValue(ctx, fsctx.UploadPolicyCtx, *uploadPolicy) - // - //// 取得文件大小 - //fileSize, err := strconv.ParseUint(c.Request.Header.Get("Content-Length"), 10, 64) - //if err != nil { - // c.JSON(200, ErrorResponse(err)) - // return - //} - // - //// 解码文件名和路径 - //fileName, err := url.QueryUnescape(c.Request.Header.Get("X-Cr-FileName")) - //if err != nil { - // c.JSON(200, ErrorResponse(err)) - // return - //} - // - //fileData := fsctx.FileStream{ - // MIMEType: c.Request.Header.Get("Content-Type"), - // File: c.Request.Body, - // Name: fileName, - // Size: fileSize, - //} - // - //// 给文件系统分配钩子 - //fs.Use("BeforeUpload", filesystem.HookSlaveUploadValidate) - //fs.Use("AfterUploadCanceled", filesystem.HookDeleteTempFile) - //fs.Use("AfterUpload", filesystem.SlaveAfterUpload) - //fs.Use("AfterValidateFailed", filesystem.HookDeleteTempFile) - // - ////// 是否允许覆盖 - ////if c.Request.Header.Get("X-Cr-Overwrite") == "false" { - //// fileData.Mode = fsctx.Create - ////} - // - //// 执行上传 - //err = fs.LocalUpload(ctx, &fileData) - //if err != nil { - // c.JSON(200, serializer.Err(serializer.CodeUploadFailed, err.Error(), err)) - // return - //} - // - //c.JSON(200, serializer.Response{ - // Code: 0, - //}) } // SlaveGetUploadSession 从机创建上传会话 @@ -116,6 +45,21 @@ func SlaveGetUploadSession(c *gin.Context) { } } +// SlaveDeleteUploadSession 从机删除上传会话 +func SlaveDeleteUploadSession(c *gin.Context) { + // 创建上下文 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var service explorer.UploadSessionService + if err := c.ShouldBindUri(&service); err == nil { + res := service.SlaveDelete(ctx, c) + c.JSON(200, res) + } else { + c.JSON(200, ErrorResponse(err)) + } +} + // SlaveDownload 从机文件下载,此请求返回的HTTP状态码不全为200 func SlaveDownload(c *gin.Context) { // 创建上下文 diff --git a/routers/router.go b/routers/router.go index bf8eda1..99c6025 100644 --- a/routers/router.go +++ b/routers/router.go @@ -46,9 +46,15 @@ func InitSlaveRouter() *gin.Engine { // 接收主机心跳包 v3.POST("heartbeat", controllers.SlaveHeartbeat) // 上传 - v3.POST("upload/:sessionId", controllers.SlaveUpload) - // 创建上传会话上传 - v3.PUT("upload", controllers.SlaveGetUploadSession) + upload := v3.Group("upload") + { + // 上传分片 + upload.POST(":sessionId", controllers.SlaveUpload) + // 创建上传会话上传 + upload.PUT("", controllers.SlaveGetUploadSession) + // 删除上传会话 + upload.DELETE(":sessionId", controllers.SlaveDeleteUploadSession) + } // 下载 v3.GET("download/:speed/:path/:name", controllers.SlaveDownload) // 预览 / 外链 diff --git a/service/explorer/upload.go b/service/explorer/upload.go index f7e3d07..5753c2b 100644 --- a/service/explorer/upload.go +++ b/service/explorer/upload.go @@ -230,7 +230,7 @@ func (service *UploadSessionService) Delete(ctx context.Context, c *gin.Context) // 查找需要删除的上传会话的占位文件 file, err := model.GetFilesByUploadSession(service.ID, fs.User.ID) if err != nil { - return serializer.Err(serializer.CodeUploadSessionExpired, "LocalUpload session file placeholder not exist", err) + return serializer.Err(serializer.CodeUploadSessionExpired, "Local Upload session file placeholder not exist", err) } // 删除文件 @@ -241,6 +241,28 @@ func (service *UploadSessionService) Delete(ctx context.Context, c *gin.Context) return serializer.Response{} } +// SlaveDelete 从机删除指定上传会话 +func (service *UploadSessionService) SlaveDelete(ctx context.Context, c *gin.Context) serializer.Response { + // 创建文件系统 + fs, err := filesystem.NewAnonymousFileSystem() + if err != nil { + return serializer.Err(serializer.CodePolicyNotAllowed, err.Error(), err) + } + defer fs.Recycle() + + session, ok := cache.Get(filesystem.UploadSessionCachePrefix + service.ID) + if !ok { + return serializer.Err(serializer.CodeUploadSessionExpired, "Slave Upload session file placeholder not exist", nil) + } + + if _, err := fs.Handler.Delete(ctx, []string{session.(serializer.UploadSession).SavePath}); err != nil { + return serializer.Err(serializer.CodeInternalSetting, "Failed to delete temp file", err) + } + + cache.Deletes([]string{service.ID}, filesystem.UploadSessionCachePrefix) + return serializer.Response{} +} + // DeleteAllUploadSession 删除当前用户的全部上传绘会话 func DeleteAllUploadSession(ctx context.Context, c *gin.Context) serializer.Response { // 创建文件系统