From 2811ee3285d0856fc0afe535097259c81948b5af Mon Sep 17 00:00:00 2001 From: HFO4 <912394456@qq.com> Date: Sun, 27 Feb 2022 14:22:09 +0800 Subject: [PATCH] Feat: slave policy creating upload session API --- models/policy.go | 10 +-- pkg/filesystem/driver/cos/handler.go | 14 ++-- pkg/filesystem/driver/handler.go | 2 +- pkg/filesystem/driver/local/handler.go | 4 +- pkg/filesystem/driver/onedrive/handler.go | 8 +-- pkg/filesystem/driver/oss/handler.go | 12 ++-- pkg/filesystem/driver/qiniu/handler.go | 6 +- pkg/filesystem/driver/remote/client.go | 70 +++++++++++++++++++ pkg/filesystem/driver/remote/handler.go | 55 +++++++++++---- pkg/filesystem/driver/s3/handler.go | 8 +-- .../driver/shadow/masterinslave/handler.go | 4 +- .../driver/shadow/slaveinmaster/handler.go | 4 +- pkg/filesystem/driver/upyun/handler.go | 8 +-- pkg/filesystem/file.go | 12 +--- pkg/filesystem/filesystem.go | 14 ++-- pkg/filesystem/upload.go | 42 ++++++----- routers/controllers/file.go | 12 ++-- routers/controllers/slave.go | 15 ++++ routers/router.go | 8 ++- service/explorer/slave.go | 29 ++++++-- service/explorer/upload.go | 5 +- 21 files changed, 236 insertions(+), 106 deletions(-) create mode 100644 pkg/filesystem/driver/remote/client.go diff --git a/models/policy.go b/models/policy.go index ef1ce92..6671692 100644 --- a/models/policy.go +++ b/models/policy.go @@ -219,16 +219,16 @@ func (policy *Policy) IsTransitUpload(size uint64) bool { return policy.Type == "local" } -// IsPathGenerateNeeded 返回此策略是否需要在生成上传凭证时生成存储路径 -func (policy *Policy) IsPathGenerateNeeded() bool { - return policy.Type != "remote" -} - // IsThumbGenerateNeeded 返回此策略是否需要在上传后生成缩略图 func (policy *Policy) IsThumbGenerateNeeded() bool { return policy.Type == "local" } +// IsUploadPlaceholderWithSize 返回此策略创建上传会话时是否需要预留空间 +func (policy *Policy) IsUploadPlaceholderWithSize() bool { + return policy.Type == "remote" +} + // CanStructureBeListed 返回存储策略是否能被前台列物理目录 func (policy *Policy) CanStructureBeListed() bool { return policy.Type != "local" && policy.Type != "remote" diff --git a/pkg/filesystem/driver/cos/handler.go b/pkg/filesystem/driver/cos/handler.go index 7577ec4..cc55ac4 100644 --- a/pkg/filesystem/driver/cos/handler.go +++ b/pkg/filesystem/driver/cos/handler.go @@ -326,7 +326,7 @@ func (handler Driver) signSourceURL(ctx context.Context, path string, ttl int64, } // Token 获取上传策略和认证Token -func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (serializer.UploadCredential, error) { +func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (*serializer.UploadCredential, error) { // 生成回调地址 siteURL := model.GetSiteURL() apiBaseURI, _ := url.Parse("/api/v3/callback/cos/" + uploadSession.Key) @@ -383,11 +383,11 @@ func (handler Driver) Meta(ctx context.Context, path string) (*MetaData, error) }, nil } -func (handler Driver) getUploadCredential(ctx context.Context, policy UploadPolicy, keyTime string, savePath string) (serializer.UploadCredential, error) { +func (handler Driver) getUploadCredential(ctx context.Context, policy UploadPolicy, keyTime string, savePath string) (*serializer.UploadCredential, error) { // 编码上传策略 policyJSON, err := json.Marshal(policy) if err != nil { - return serializer.UploadCredential{}, err + return nil, err } policyEncoded := base64.StdEncoding.EncodeToString(policyJSON) @@ -395,14 +395,14 @@ func (handler Driver) getUploadCredential(ctx context.Context, policy UploadPoli hmacSign := hmac.New(sha1.New, []byte(handler.Policy.SecretKey)) _, err = io.WriteString(hmacSign, keyTime) if err != nil { - return serializer.UploadCredential{}, err + return nil, err } signKey := fmt.Sprintf("%x", hmacSign.Sum(nil)) sha1Sign := sha1.New() _, err = sha1Sign.Write(policyJSON) if err != nil { - return serializer.UploadCredential{}, err + return nil, err } stringToSign := fmt.Sprintf("%x", sha1Sign.Sum(nil)) @@ -410,11 +410,11 @@ func (handler Driver) getUploadCredential(ctx context.Context, policy UploadPoli hmacFinalSign := hmac.New(sha1.New, []byte(signKey)) _, err = hmacFinalSign.Write([]byte(stringToSign)) if err != nil { - return serializer.UploadCredential{}, err + return nil, err } signature := hmacFinalSign.Sum(nil) - return serializer.UploadCredential{ + return &serializer.UploadCredential{ Policy: policyEncoded, Path: savePath, AccessKey: handler.Policy.AccessKey, diff --git a/pkg/filesystem/driver/handler.go b/pkg/filesystem/driver/handler.go index 74c99cd..a1caaf4 100644 --- a/pkg/filesystem/driver/handler.go +++ b/pkg/filesystem/driver/handler.go @@ -30,7 +30,7 @@ type Handler interface { Source(ctx context.Context, path string, url url.URL, ttl int64, isDownload bool, speed int) (string, error) // Token 获取有效期为ttl的上传凭证和签名 - Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (serializer.UploadCredential, error) + Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (*serializer.UploadCredential, error) // CancelToken 取消已经创建的有状态上传凭证 CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error diff --git a/pkg/filesystem/driver/local/handler.go b/pkg/filesystem/driver/local/handler.go index c1742b5..b2b621a 100644 --- a/pkg/filesystem/driver/local/handler.go +++ b/pkg/filesystem/driver/local/handler.go @@ -254,8 +254,8 @@ func (handler Driver) Source( } // Token 获取上传策略和认证Token,本地策略直接返回空值 -func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (serializer.UploadCredential, error) { - return serializer.UploadCredential{ +func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (*serializer.UploadCredential, error) { + return &serializer.UploadCredential{ SessionID: uploadSession.Key, ChunkSize: handler.Policy.OptionsSerialized.ChunkSize, }, nil diff --git a/pkg/filesystem/driver/onedrive/handler.go b/pkg/filesystem/driver/onedrive/handler.go index a5944f9..2fa85b3 100644 --- a/pkg/filesystem/driver/onedrive/handler.go +++ b/pkg/filesystem/driver/onedrive/handler.go @@ -223,12 +223,12 @@ func (handler Driver) replaceSourceHost(origin string) (string, error) { } // Token 获取上传会话URL -func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (serializer.UploadCredential, error) { +func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (*serializer.UploadCredential, error) { fileInfo := file.Info() // 如果小于4MB,则由服务端中转 if fileInfo.Size <= SmallFileSize { - return serializer.UploadCredential{}, nil + return nil, nil } // 生成回调地址 @@ -238,13 +238,13 @@ func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *seria uploadURL, err := handler.Client.CreateUploadSession(ctx, fileInfo.SavePath, WithConflictBehavior("fail")) if err != nil { - return serializer.UploadCredential{}, err + return nil, err } // 监控回调及上传 go handler.Client.MonitorUpload(uploadURL, uploadSession.Key, fileInfo.SavePath, fileInfo.Size, ttl) - return serializer.UploadCredential{ + return &serializer.UploadCredential{ Policy: uploadURL, Token: apiURL.String(), }, nil diff --git a/pkg/filesystem/driver/oss/handler.go b/pkg/filesystem/driver/oss/handler.go index 2a229cd..bd4c736 100644 --- a/pkg/filesystem/driver/oss/handler.go +++ b/pkg/filesystem/driver/oss/handler.go @@ -398,7 +398,7 @@ func (handler Driver) signSourceURL(ctx context.Context, path string, ttl int64, } // Token 获取上传策略和认证Token -func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (serializer.UploadCredential, error) { +func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (*serializer.UploadCredential, error) { // 生成回调地址 siteURL := model.GetSiteURL() apiBaseURI, _ := url.Parse("/api/v3/callback/oss/" + uploadSession.Key) @@ -429,13 +429,13 @@ func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *seria return handler.getUploadCredential(ctx, postPolicy, callbackPolicy, ttl, savePath) } -func (handler Driver) getUploadCredential(ctx context.Context, policy UploadPolicy, callback CallbackPolicy, TTL int64, savePath string) (serializer.UploadCredential, error) { +func (handler Driver) getUploadCredential(ctx context.Context, policy UploadPolicy, callback CallbackPolicy, TTL int64, savePath string) (*serializer.UploadCredential, error) { // 处理回调策略 callbackPolicyEncoded := "" if callback.CallbackURL != "" { callbackPolicyJSON, err := json.Marshal(callback) if err != nil { - return serializer.UploadCredential{}, err + return nil, err } callbackPolicyEncoded = base64.StdEncoding.EncodeToString(callbackPolicyJSON) policy.Conditions = append(policy.Conditions, map[string]string{"callback": callbackPolicyEncoded}) @@ -444,7 +444,7 @@ func (handler Driver) getUploadCredential(ctx context.Context, policy UploadPoli // 编码上传策略 policyJSON, err := json.Marshal(policy) if err != nil { - return serializer.UploadCredential{}, err + return nil, err } policyEncoded := base64.StdEncoding.EncodeToString(policyJSON) @@ -452,11 +452,11 @@ func (handler Driver) getUploadCredential(ctx context.Context, policy UploadPoli hmacSign := hmac.New(sha1.New, []byte(handler.Policy.SecretKey)) _, err = io.WriteString(hmacSign, policyEncoded) if err != nil { - return serializer.UploadCredential{}, err + return nil, err } signature := base64.StdEncoding.EncodeToString(hmacSign.Sum(nil)) - return serializer.UploadCredential{ + return &serializer.UploadCredential{ Policy: fmt.Sprintf("%s:%s", callbackPolicyEncoded, policyEncoded), Path: savePath, AccessKey: handler.Policy.AccessKey, diff --git a/pkg/filesystem/driver/qiniu/handler.go b/pkg/filesystem/driver/qiniu/handler.go index 94a2b8e..f453dd7 100644 --- a/pkg/filesystem/driver/qiniu/handler.go +++ b/pkg/filesystem/driver/qiniu/handler.go @@ -274,7 +274,7 @@ func (handler Driver) signSourceURL(ctx context.Context, path string, ttl int64) } // Token 获取上传策略和认证Token -func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (serializer.UploadCredential, error) { +func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (*serializer.UploadCredential, error) { // 生成回调地址 siteURL := model.GetSiteURL() apiBaseURI, _ := url.Parse("/api/v3/callback/qiniu/" + uploadSession.Key) @@ -299,12 +299,12 @@ func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *seria } // getUploadCredential 签名上传策略 -func (handler Driver) getUploadCredential(ctx context.Context, policy storage.PutPolicy, TTL int64) (serializer.UploadCredential, error) { +func (handler Driver) getUploadCredential(ctx context.Context, policy storage.PutPolicy, TTL int64) (*serializer.UploadCredential, error) { policy.Expires = uint64(TTL) mac := qbox.NewMac(handler.Policy.AccessKey, handler.Policy.SecretKey) upToken := policy.UploadToken(mac) - return serializer.UploadCredential{ + return &serializer.UploadCredential{ Token: upToken, }, nil } diff --git a/pkg/filesystem/driver/remote/client.go b/pkg/filesystem/driver/remote/client.go new file mode 100644 index 0000000..b2f7443 --- /dev/null +++ b/pkg/filesystem/driver/remote/client.go @@ -0,0 +1,70 @@ +package remote + +import ( + "encoding/json" + model "github.com/cloudreve/Cloudreve/v3/models" + "github.com/cloudreve/Cloudreve/v3/pkg/auth" + "github.com/cloudreve/Cloudreve/v3/pkg/request" + "github.com/cloudreve/Cloudreve/v3/pkg/serializer" + "net/url" + "strings" +) + +// Client to operate remote slave server +type Client interface { + CreateUploadSession(session *serializer.UploadSession, ttl int64) error +} + +// NewClient creates new Client from given policy +func NewClient(policy *model.Policy) (Client, error) { + authInstance := auth.HMACAuth{[]byte(policy.SecretKey)} + serverURL, err := url.Parse(policy.Server) + if err != nil { + return nil, err + } + + base, _ := url.Parse("/api/v3/slave") + signTTL := model.GetIntSetting("slave_api_timeout", 60) + + return &remoteClient{ + policy: policy, + authInstance: authInstance, + httpClient: request.NewClient( + request.WithEndpoint(serverURL.ResolveReference(base).String()), + request.WithCredential(authInstance, int64(signTTL)), + request.WithMasterMeta(), + ), + }, nil +} + +type remoteClient struct { + policy *model.Policy + authInstance auth.Auth + httpClient request.Client +} + +func (c *remoteClient) CreateUploadSession(session *serializer.UploadSession, ttl int64) error { + reqBodyEncoded, err := json.Marshal(map[string]interface{}{ + "session": session, + "ttl": ttl, + }) + if err != nil { + return err + } + + bodyReader := strings.NewReader(string(reqBodyEncoded)) + resp, err := c.httpClient.Request( + "PUT", + "upload", + bodyReader, + ).CheckHTTPResponse(200).DecodeResponse() + if err != nil { + return err + } + + if resp.Code != 0 { + return serializer.NewErrorFromResponse(resp) + } + + return nil +} diff --git a/pkg/filesystem/driver/remote/handler.go b/pkg/filesystem/driver/remote/handler.go index 0eb2b74..01cf6fc 100644 --- a/pkg/filesystem/driver/remote/handler.go +++ b/pkg/filesystem/driver/remote/handler.go @@ -25,6 +25,23 @@ type Driver struct { Client request.Client Policy *model.Policy AuthInstance auth.Auth + + client Client +} + +// NewDriver initializes a new Driver from policy +func NewDriver(policy *model.Policy) (*Driver, error) { + client, err := NewClient(policy) + if err != nil { + return nil, err + } + + return &Driver{ + Policy: policy, + Client: request.NewClient(), + AuthInstance: auth.HMACAuth{[]byte(policy.SecretKey)}, + client: client, + }, nil } // List 列取文件 @@ -305,22 +322,30 @@ func (handler Driver) Source( } // Token 获取上传策略和认证Token -func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (serializer.UploadCredential, error) { - // 生成回调地址 - siteURL := model.GetSiteURL() - apiBaseURI, _ := url.Parse("/api/v3/callback/remote/" + uploadSession.Key) - apiURL := siteURL.ResolveReference(apiBaseURI) +func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (*serializer.UploadCredential, error) { + if err := handler.client.CreateUploadSession(uploadSession, ttl); err != nil { + return nil, err + } - // 生成上传策略 - policy := serializer.UploadPolicy{ - SavePath: handler.Policy.DirNameRule, - FileName: handler.Policy.FileNameRule, - AutoRename: handler.Policy.AutoRename, - MaxSize: handler.Policy.MaxSize, - AllowedExtension: handler.Policy.OptionsSerialized.FileType, - CallbackURL: apiURL.String(), - } - return handler.getUploadCredential(ctx, policy, ttl) + return &serializer.UploadCredential{ + SessionID: uploadSession.Key, + ChunkSize: handler.Policy.OptionsSerialized.ChunkSize, + }, nil + //// 生成回调地址 + //siteURL := model.GetSiteURL() + //apiBaseURI, _ := url.Parse("/api/v3/callback/remote/" + uploadSession.Key) + //apiURL := siteURL.ResolveReference(apiBaseURI) + // + //// 生成上传策略 + //policy := serializer.UploadPolicy{ + // SavePath: handler.Policy.DirNameRule, + // FileName: handler.Policy.FileNameRule, + // AutoRename: handler.Policy.AutoRename, + // MaxSize: handler.Policy.MaxSize, + // AllowedExtension: handler.Policy.OptionsSerialized.FileType, + // CallbackURL: apiURL.String(), + //} + //return handler.getUploadCredential(ctx, policy, ttl) } func (handler Driver) getUploadCredential(ctx context.Context, policy serializer.UploadPolicy, TTL int64) (serializer.UploadCredential, error) { diff --git a/pkg/filesystem/driver/s3/handler.go b/pkg/filesystem/driver/s3/handler.go index be72308..120bcf1 100644 --- a/pkg/filesystem/driver/s3/handler.go +++ b/pkg/filesystem/driver/s3/handler.go @@ -325,7 +325,7 @@ func (handler Driver) Source( } // Token 获取上传策略和认证Token -func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (serializer.UploadCredential, error) { +func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (*serializer.UploadCredential, error) { // 生成回调地址 siteURL := model.GetSiteURL() apiBaseURI, _ := url.Parse("/api/v3/callback/s3/" + uploadSession.Key) @@ -378,7 +378,7 @@ func (handler Driver) Meta(ctx context.Context, path string) (*MetaData, error) } -func (handler Driver) getUploadCredential(ctx context.Context, policy UploadPolicy, callback *url.URL, savePath string) (serializer.UploadCredential, error) { +func (handler Driver) getUploadCredential(ctx context.Context, policy UploadPolicy, callback *url.URL, savePath string) (*serializer.UploadCredential, error) { longDate := time.Now().UTC().Format("20060102T150405Z") shortDate := time.Now().UTC().Format("20060102") @@ -390,7 +390,7 @@ func (handler Driver) getUploadCredential(ctx context.Context, policy UploadPoli // 编码上传策略 policyJSON, err := json.Marshal(policy) if err != nil { - return serializer.UploadCredential{}, err + return nil, err } policyEncoded := base64.StdEncoding.EncodeToString(policyJSON) @@ -401,7 +401,7 @@ func (handler Driver) getUploadCredential(ctx context.Context, policy UploadPoli signature = getHMAC(signature, []byte("aws4_request")) signature = getHMAC(signature, []byte(policyEncoded)) - return serializer.UploadCredential{ + return &serializer.UploadCredential{ Policy: policyEncoded, Callback: callback.String(), Token: hex.EncodeToString(signature), diff --git a/pkg/filesystem/driver/shadow/masterinslave/handler.go b/pkg/filesystem/driver/shadow/masterinslave/handler.go index cc3853c..0356ae3 100644 --- a/pkg/filesystem/driver/shadow/masterinslave/handler.go +++ b/pkg/filesystem/driver/shadow/masterinslave/handler.go @@ -47,8 +47,8 @@ func (d *Driver) Source(ctx context.Context, path string, url url.URL, ttl int64 return "", ErrNotImplemented } -func (d *Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (serializer.UploadCredential, error) { - return serializer.UploadCredential{}, ErrNotImplemented +func (d *Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (*serializer.UploadCredential, error) { + return nil, ErrNotImplemented } func (d *Driver) List(ctx context.Context, path string, recursive bool) ([]response.Object, error) { diff --git a/pkg/filesystem/driver/shadow/slaveinmaster/handler.go b/pkg/filesystem/driver/shadow/slaveinmaster/handler.go index 4b495c9..9a79a1a 100644 --- a/pkg/filesystem/driver/shadow/slaveinmaster/handler.go +++ b/pkg/filesystem/driver/shadow/slaveinmaster/handler.go @@ -113,8 +113,8 @@ func (d *Driver) Source(ctx context.Context, path string, url url.URL, ttl int64 return "", ErrNotImplemented } -func (d *Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (serializer.UploadCredential, error) { - return serializer.UploadCredential{}, ErrNotImplemented +func (d *Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (*serializer.UploadCredential, error) { + return nil, ErrNotImplemented } func (d *Driver) List(ctx context.Context, path string, recursive bool) ([]response.Object, error) { diff --git a/pkg/filesystem/driver/upyun/handler.go b/pkg/filesystem/driver/upyun/handler.go index 5c03e5e..ab03396 100644 --- a/pkg/filesystem/driver/upyun/handler.go +++ b/pkg/filesystem/driver/upyun/handler.go @@ -310,7 +310,7 @@ func (handler Driver) signURL(ctx context.Context, path *url.URL, TTL int64) (st } // Token 获取上传策略和认证Token -func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (serializer.UploadCredential, error) { +func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (*serializer.UploadCredential, error) { // 检查文件大小 // 生成回调地址 @@ -340,11 +340,11 @@ func (handler Driver) CancelToken(ctx context.Context, uploadSession *serializer return nil } -func (handler Driver) getUploadCredential(ctx context.Context, policy UploadPolicy) (serializer.UploadCredential, error) { +func (handler Driver) getUploadCredential(ctx context.Context, policy UploadPolicy) (*serializer.UploadCredential, error) { // 生成上传策略 policyJSON, err := json.Marshal(policy) if err != nil { - return serializer.UploadCredential{}, err + return nil, err } policyEncoded := base64.StdEncoding.EncodeToString(policyJSON) @@ -352,7 +352,7 @@ func (handler Driver) getUploadCredential(ctx context.Context, policy UploadPoli elements := []string{"POST", "/" + handler.Policy.BucketName, policyEncoded} signStr := handler.Sign(ctx, elements) - return serializer.UploadCredential{ + return &serializer.UploadCredential{ Policy: policyEncoded, Token: signStr, }, nil diff --git a/pkg/filesystem/file.go b/pkg/filesystem/file.go index 9bc2e0f..491274f 100644 --- a/pkg/filesystem/file.go +++ b/pkg/filesystem/file.go @@ -179,16 +179,9 @@ func (fs *FileSystem) deleteGroupedFile(ctx context.Context, files map[uint][]*m for policyID, toBeDeletedFiles := range files { // 列举出需要物理删除的文件的物理路径 sourceNamesAll := make([]string, 0, len(toBeDeletedFiles)) - sourceNamesDeleted := make([]string, 0, len(toBeDeletedFiles)) - sourceNamesTryDeleted := make([]string, 0, len(toBeDeletedFiles)) for i := 0; i < len(toBeDeletedFiles); i++ { sourceNamesAll = append(sourceNamesAll, toBeDeletedFiles[i].SourceName) - if !(toBeDeletedFiles[i].UploadSessionID != nil && toBeDeletedFiles[i].Size == 0) { - sourceNamesDeleted = append(sourceNamesDeleted, toBeDeletedFiles[i].SourceName) - } else { - sourceNamesTryDeleted = append(sourceNamesTryDeleted, toBeDeletedFiles[i].SourceName) - } if toBeDeletedFiles[i].UploadSessionID != nil { if session, ok := cache.Get(UploadSessionCachePrefix + *toBeDeletedFiles[i].UploadSessionID); ok { @@ -212,11 +205,8 @@ func (fs *FileSystem) deleteGroupedFile(ctx context.Context, files map[uint][]*m } // 执行删除 - failedFile, _ := fs.Handler.Delete(ctx, sourceNamesDeleted) + failedFile, _ := fs.Handler.Delete(ctx, sourceNamesAll) failed[policyID] = failedFile - - // 尝试删除上传会话中大小为0的占位文件。如果失败也忽略 - fs.Handler.Delete(ctx, sourceNamesTryDeleted) } return failed diff --git a/pkg/filesystem/filesystem.go b/pkg/filesystem/filesystem.go index 8b000f0..5c12ad1 100644 --- a/pkg/filesystem/filesystem.go +++ b/pkg/filesystem/filesystem.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" model "github.com/cloudreve/Cloudreve/v3/models" - "github.com/cloudreve/Cloudreve/v3/pkg/auth" "github.com/cloudreve/Cloudreve/v3/pkg/cluster" "github.com/cloudreve/Cloudreve/v3/pkg/conf" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver" @@ -138,12 +137,12 @@ func (fs *FileSystem) DispatchHandler() error { } return nil case "remote": - fs.Handler = remote.Driver{ - Policy: currentPolicy, - Client: request.NewClient(), - AuthInstance: auth.HMACAuth{[]byte(currentPolicy.SecretKey)}, + handler, err := remote.NewDriver(currentPolicy) + if err != nil { + return err } - return nil + + fs.Handler = handler case "qiniu": fs.Handler = qiniu.Driver{ Policy: currentPolicy, @@ -186,6 +185,8 @@ func (fs *FileSystem) DispatchHandler() error { default: return ErrUnknownPolicyType } + + return nil } // NewFileSystemFromContext 从gin.Context创建文件系统 @@ -214,7 +215,6 @@ func NewFileSystemFromCallback(c *gin.Context) (*FileSystem, error) { // 重新指向上传策略 fs.Policy = &callbackSession.Policy - fs.User.Policy = policy err = fs.DispatchHandler() return fs, err diff --git a/pkg/filesystem/upload.go b/pkg/filesystem/upload.go index d00805d..a8b2716 100644 --- a/pkg/filesystem/upload.go +++ b/pkg/filesystem/upload.go @@ -159,26 +159,11 @@ func (fs *FileSystem) CancelUpload(ctx context.Context, path string, file fsctx. // CreateUploadSession 创建上传会话 func (fs *FileSystem) CreateUploadSession(ctx context.Context, file *fsctx.FileStream) (*serializer.UploadCredential, error) { // 获取相关有效期设置 - credentialTTL := model.GetIntSetting("upload_credential_timeout", 3600) callBackSessionTTL := model.GetIntSetting("upload_session_timeout", 86400) callbackKey := uuid.Must(uuid.NewV4()).String() fileSize := file.Size - // 创建占位的文件,同时校验文件信息 - file.Mode = fsctx.Nop - if callbackKey != "" { - file.UploadSessionID = &callbackKey - } - - fs.Use("BeforeUpload", HookValidateFile) - fs.Use("AfterUpload", HookClearFileHeaderSize) - // TODO: 只有本机策略才添加文件 - fs.Use("AfterUpload", GenericAfterUpload) - if err := fs.Upload(ctx, file); err != nil { - return nil, err - } - uploadSession := &serializer.UploadSession{ Key: callbackKey, UID: fs.User.ID, @@ -191,9 +176,30 @@ func (fs *FileSystem) CreateUploadSession(ctx context.Context, file *fsctx.FileS } // 获取上传凭证 - credential, err := fs.Handler.Token(ctx, int64(credentialTTL), uploadSession, file) + credential, err := fs.Handler.Token(ctx, int64(callBackSessionTTL), uploadSession, file) if err != nil { - return nil, serializer.NewError(serializer.CodeEncryptError, "无法获取上传凭证", err) + return nil, err + } + + // 创建占位的文件,同时校验文件信息 + file.Mode = fsctx.Nop + if callbackKey != "" { + file.UploadSessionID = &callbackKey + } + + fs.Use("BeforeUpload", HookValidateFile) + if !fs.Policy.IsUploadPlaceholderWithSize() { + fs.Use("BeforeUpload", HookValidateCapacityWithoutIncrease) + fs.Use("AfterUpload", HookClearFileHeaderSize) + } else { + fs.Use("BeforeUpload", HookValidateCapacity) + fs.Use("AfterValidateFailed", HookGiveBackCapacity) + fs.Use("AfterUploadFailed", HookGiveBackCapacity) + } + + fs.Use("AfterUpload", GenericAfterUpload) + if err := fs.Upload(ctx, file); err != nil { + return nil, err } // 创建回调会话 @@ -209,7 +215,7 @@ func (fs *FileSystem) CreateUploadSession(ctx context.Context, file *fsctx.FileS // 补全上传凭证其他信息 credential.Expires = time.Now().Add(time.Duration(callBackSessionTTL) * time.Second).Unix() - return &credential, nil + return credential, nil } // UploadFromStream 从文件流上传文件 diff --git a/routers/controllers/file.go b/routers/controllers/file.go index 78c60b5..cb3fae8 100644 --- a/routers/controllers/file.go +++ b/routers/controllers/file.go @@ -343,8 +343,8 @@ func FileUpload(c *gin.Context) { //}) } -// DeleteUploadCredential 删除上传会话 -func DeleteUploadCredential(c *gin.Context) { +// DeleteUploadSession 删除上传会话 +func DeleteUploadSession(c *gin.Context) { // 创建上下文 ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -358,8 +358,8 @@ func DeleteUploadCredential(c *gin.Context) { } } -// DeleteAllCredential 删除全部上传会话 -func DeleteAllCredential(c *gin.Context) { +// DeleteAllUploadSession 删除全部上传会话 +func DeleteAllUploadSession(c *gin.Context) { // 创建上下文 ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -368,8 +368,8 @@ func DeleteAllCredential(c *gin.Context) { c.JSON(200, res) } -// GetUploadCredential 创建上传会话 -func GetUploadCredential(c *gin.Context) { +// GetUploadSession 创建上传会话 +func GetUploadSession(c *gin.Context) { // 创建上下文 ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/routers/controllers/slave.go b/routers/controllers/slave.go index 4c62ac6..66d712d 100644 --- a/routers/controllers/slave.go +++ b/routers/controllers/slave.go @@ -90,6 +90,21 @@ func SlaveUpload(c *gin.Context) { }) } +// SlaveGetUploadSession 从机创建上传会话 +func SlaveGetUploadSession(c *gin.Context) { + // 创建上下文 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var service explorer.SlaveCreateUploadSessionService + if err := c.ShouldBindJSON(&service); err == nil { + res := service.Create(ctx, c) + c.JSON(200, res) + } else { + c.JSON(200, ErrorResponse(err)) + } +} + // SlaveDownload 从机文件下载,此请求返回的HTTP状态码不全为200 func SlaveDownload(c *gin.Context) { // 创建上下文 diff --git a/routers/router.go b/routers/router.go index 9baba22..9008746 100644 --- a/routers/router.go +++ b/routers/router.go @@ -47,6 +47,8 @@ func InitSlaveRouter() *gin.Engine { v3.POST("heartbeat", controllers.SlaveHeartbeat) // 上传 v3.POST("upload", controllers.SlaveUpload) + // 创建上传会话上传 + v3.PUT("upload", controllers.SlaveGetUploadSession) // 下载 v3.GET("download/:speed/:path/:name", controllers.SlaveDownload) // 预览 / 外链 @@ -510,11 +512,11 @@ func InitMasterRouter() *gin.Engine { // 文件上传 upload.POST(":sessionId/:index", controllers.FileUpload) // 创建上传会话 - upload.PUT("", controllers.GetUploadCredential) + upload.PUT("", controllers.GetUploadSession) // 删除给定上传会话 - upload.DELETE(":sessionId", controllers.DeleteUploadCredential) + upload.DELETE(":sessionId", controllers.DeleteUploadSession) // 删除全部上传会话 - upload.DELETE("", controllers.DeleteAllCredential) + upload.DELETE("", controllers.DeleteAllUploadSession) } // 更新文件 file.PUT("update/:id", controllers.PutContent) diff --git a/service/explorer/slave.go b/service/explorer/slave.go index 54638ee..0cd45aa 100644 --- a/service/explorer/slave.go +++ b/service/explorer/slave.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" model "github.com/cloudreve/Cloudreve/v3/models" + "github.com/cloudreve/Cloudreve/v3/pkg/cache" "github.com/cloudreve/Cloudreve/v3/pkg/cluster" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx" @@ -87,9 +88,7 @@ func (service *SlaveDownloadService) ServeFile(ctx context.Context, c *gin.Conte // 发送文件 http.ServeContent(c.Writer, c.Request, fs.FileTarget[0].Name, time.Now(), rs) - return serializer.Response{ - Code: 0, - } + return serializer.Response{} } // Delete 通过签名的URL删除从机文件 @@ -114,7 +113,7 @@ func (service *SlaveFilesService) Delete(ctx context.Context, c *gin.Context) se Error: err.Error(), } } - return serializer.Response{Code: 0} + return serializer.Response{} } // Thumb 通过签名URL获取从机文件缩略图 @@ -142,7 +141,7 @@ func (service *SlaveFileService) Thumb(ctx context.Context, c *gin.Context) seri defer resp.Content.Close() http.ServeContent(c.Writer, c.Request, "thumb.png", time.Now(), resp.Content) - return serializer.Response{Code: 0} + return serializer.Response{} } // CreateTransferTask 创建从机文件转存任务 @@ -164,3 +163,23 @@ func CreateTransferTask(c *gin.Context, req *serializer.SlaveTransferReq) serial return serializer.ParamErr("未知的主机节点ID", nil) } + +// SlaveListService 从机上传会话服务 +type SlaveCreateUploadSessionService struct { + Session serializer.UploadSession `json:"session" binding:"required"` + TTL int64 `json:"ttl"` +} + +// Create 从机创建上传会话 +func (service *SlaveCreateUploadSessionService) Create(ctx context.Context, c *gin.Context) serializer.Response { + err := cache.Set( + filesystem.UploadSessionCachePrefix+service.Session.Key, + service.Session, + int(service.TTL), + ) + if err != nil { + return serializer.Err(serializer.CodeCacheOperation, "Failed to create upload session in slave node", err) + } + + return serializer.Response{} +} diff --git a/service/explorer/upload.go b/service/explorer/upload.go index e9f9b89..4cfce8c 100644 --- a/service/explorer/upload.go +++ b/service/explorer/upload.go @@ -12,7 +12,9 @@ import ( "github.com/cloudreve/Cloudreve/v3/pkg/serializer" "github.com/cloudreve/Cloudreve/v3/pkg/util" "github.com/gin-gonic/gin" + "io/ioutil" "strconv" + "strings" "sync" "time" ) @@ -48,6 +50,7 @@ func (service *CreateUploadSessionService) Create(ctx context.Context, c *gin.Co Size: service.Size, Name: service.Name, VirtualPath: service.Path, + File: ioutil.NopCloser(strings.NewReader("")), } if service.LastModified > 0 { lastModified := time.UnixMilli(service.LastModified) @@ -132,7 +135,7 @@ func processChunkUpload(ctx context.Context, c *gin.Context, fs *filesystem.File } fileSize, err := strconv.ParseUint(c.Request.Header.Get("Content-Length"), 10, 64) - if err != nil || fileSize == 0 || (expectedLength != fileSize) { + if err != nil || (expectedLength != fileSize) { return serializer.Err( serializer.CodeInvalidContentLength, fmt.Sprintf("Invalid Content-Length (expected: %d)", expectedLength),