diff --git a/middleware/auth.go b/middleware/auth.go index 83f972b..34237d5 100644 --- a/middleware/auth.go +++ b/middleware/auth.go @@ -2,6 +2,7 @@ package middleware import ( "github.com/cloudreve/Cloudreve/v3/pkg/filesystem" + "github.com/cloudreve/Cloudreve/v3/pkg/mq" "net/http" model "github.com/cloudreve/Cloudreve/v3/models" @@ -284,19 +285,10 @@ func UpyunCallbackAuth() gin.HandlerFunc { } // OneDriveCallbackAuth OneDrive回调签名验证 -// TODO 解耦 func OneDriveCallbackAuth() gin.HandlerFunc { return func(c *gin.Context) { - //// 验证key并查找用户 - //resp, _ := uploadCallbackCheck(c) - //if resp.Code != 0 { - // c.JSON(401, serializer.GeneralUploadCallbackFailed{Error: resp.Msg}) - // c.Abort() - // return - //} - // - //// 发送回调结束信号 - //onedrive.FinishCallback(c.Param("key")) + // 发送回调结束信号 + mq.GlobalMQ.Publish(c.Param("sessionID"), mq.Message{}) c.Next() } diff --git a/models/migration.go b/models/migration.go index 1053e65..6b557fc 100644 --- a/models/migration.go +++ b/models/migration.go @@ -124,8 +124,8 @@ func addDefaultSettings() { {Name: "share_download_session_timeout", Value: `2073600`, Type: "timeout"}, {Name: "onedrive_callback_check", Value: `20`, Type: "timeout"}, {Name: "folder_props_timeout", Value: `300`, Type: "timeout"}, - {Name: "onedrive_chunk_retries", Value: `1`, Type: "retry"}, - {Name: "slave_chunk_retries", Value: `1`, Type: "retry"}, + {Name: "onedrive_chunk_retries", Value: `5`, Type: "retry"}, + {Name: "slave_chunk_retries", Value: `5`, Type: "retry"}, {Name: "onedrive_source_timeout", Value: `1800`, Type: "timeout"}, {Name: "reset_after_upload_failed", Value: `0`, Type: "upload"}, {Name: "login_captcha", Value: `0`, Type: "login"}, diff --git a/pkg/filesystem/driver/onedrive/api.go b/pkg/filesystem/driver/onedrive/api.go index c9e7ff2..1b6d9e1 100644 --- a/pkg/filesystem/driver/onedrive/api.go +++ b/pkg/filesystem/driver/onedrive/api.go @@ -19,6 +19,7 @@ import ( model "github.com/cloudreve/Cloudreve/v3/models" "github.com/cloudreve/Cloudreve/v3/pkg/cache" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx" + "github.com/cloudreve/Cloudreve/v3/pkg/mq" "github.com/cloudreve/Cloudreve/v3/pkg/request" "github.com/cloudreve/Cloudreve/v3/pkg/util" ) @@ -487,9 +488,9 @@ func (client *Client) GetThumbURL(ctx context.Context, dst string, w, h uint) (s // MonitorUpload 监控客户端分片上传进度 func (client *Client) MonitorUpload(uploadURL, callbackKey, path string, size uint64, ttl int64) { // 回调完成通知chan - callbackChan := make(chan bool) - callbackSignal.Store(callbackKey, callbackChan) - defer callbackSignal.Delete(callbackKey) + callbackChan := mq.GlobalMQ.Subscribe(callbackKey, 1) + defer mq.GlobalMQ.Unsubscribe(callbackKey, callbackChan) + timeout := model.GetIntSetting("onedrive_monitor_timeout", 600) interval := model.GetIntSetting("onedrive_callback_check", 20) @@ -514,16 +515,16 @@ func (client *Client) MonitorUpload(uploadURL, callbackKey, path string, size ui if resErr, ok := err.(*RespError); ok { if resErr.APIError.Code == "itemNotFound" { util.Log().Debug("上传会话已完成,稍后检查回调") - time.Sleep(time.Duration(interval) * time.Second) - util.Log().Debug("开始检查回调") - _, ok := cache.Get("callback_" + callbackKey) - if ok { + select { + case <-time.After(time.Duration(interval) * time.Second): util.Log().Warning("未发送回调,删除文件") cache.Deletes([]string{callbackKey}, "callback_") _, err = client.Delete(context.Background(), []string{path}) if err != nil { util.Log().Warning("无法删除未回调的文件,%s", err) } + case <-callbackChan: + util.Log().Debug("客户端完成回调") } return } @@ -560,15 +561,6 @@ func (client *Client) MonitorUpload(uploadURL, callbackKey, path string, size ui } } -// FinishCallback 向Monitor发送回调结束信号 -func FinishCallback(key string) { - if signal, ok := callbackSignal.Load(key); ok { - if signalChan, ok := signal.(chan bool); ok { - close(signalChan) - } - } -} - func sysError(err error) *RespError { return &RespError{APIError: APIError{ Code: "system", diff --git a/pkg/filesystem/driver/onedrive/handler.go b/pkg/filesystem/driver/onedrive/handler.go index 2fa85b3..4b76bfc 100644 --- a/pkg/filesystem/driver/onedrive/handler.go +++ b/pkg/filesystem/driver/onedrive/handler.go @@ -226,16 +226,6 @@ func (handler Driver) replaceSourceHost(origin string) (string, error) { func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (*serializer.UploadCredential, error) { fileInfo := file.Info() - // 如果小于4MB,则由服务端中转 - if fileInfo.Size <= SmallFileSize { - return nil, nil - } - - // 生成回调地址 - siteURL := model.GetSiteURL() - apiBaseURI, _ := url.Parse("/api/v3/callback/onedrive/finish/" + uploadSession.Key) - apiURL := siteURL.ResolveReference(apiBaseURI) - uploadURL, err := handler.Client.CreateUploadSession(ctx, fileInfo.SavePath, WithConflictBehavior("fail")) if err != nil { return nil, err @@ -244,13 +234,15 @@ func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *seria // 监控回调及上传 go handler.Client.MonitorUpload(uploadURL, uploadSession.Key, fileInfo.SavePath, fileInfo.Size, ttl) + uploadSession.OneDriveUploadURL = uploadURL return &serializer.UploadCredential{ - Policy: uploadURL, - Token: apiURL.String(), + SessionID: uploadSession.Key, + ChunkSize: handler.Policy.OptionsSerialized.ChunkSize, + UploadURLs: []string{uploadURL}, }, nil } // 取消上传凭证 func (handler Driver) CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error { - return nil + return handler.Client.DeleteUploadSession(ctx, uploadSession.OneDriveUploadURL) } diff --git a/pkg/filesystem/driver/onedrive/types.go b/pkg/filesystem/driver/onedrive/types.go index 9dc14fa..aefa638 100644 --- a/pkg/filesystem/driver/onedrive/types.go +++ b/pkg/filesystem/driver/onedrive/types.go @@ -3,7 +3,6 @@ package onedrive import ( "encoding/gob" "net/url" - "sync" ) // RespError 接口返回错误 @@ -148,5 +147,3 @@ func init() { func (chunk *Chunk) IsLast() bool { return chunk.Total-chunk.Offset == chunk.ChunkSize } - -var callbackSignal sync.Map diff --git a/pkg/filesystem/driver/remote/client.go b/pkg/filesystem/driver/remote/client.go index ab764f5..17bbb92 100644 --- a/pkg/filesystem/driver/remote/client.go +++ b/pkg/filesystem/driver/remote/client.go @@ -90,7 +90,7 @@ func (c *remoteClient) Upload(ctx context.Context, file fsctx.FileHeader) error // Initial chunk groups chunks := chunk.NewChunkGroup(file, c.policy.OptionsSerialized.ChunkSize, &backoff.ConstantBackoff{ - Max: model.GetIntSetting("onedrive_chunk_retries", 1), + Max: model.GetIntSetting("slave_chunk_retries", 5), Sleep: chunkRetrySleep, }) diff --git a/pkg/filesystem/upload.go b/pkg/filesystem/upload.go index 38d42ef..71876ae 100644 --- a/pkg/filesystem/upload.go +++ b/pkg/filesystem/upload.go @@ -174,9 +174,6 @@ func (fs *FileSystem) CreateUploadSession(ctx context.Context, file *fsctx.FileS fs.Use("BeforeUpload", HookValidateFile) fs.Use("BeforeUpload", HookValidateCapacity) - if !fs.Policy.IsUploadPlaceholderWithSize() { - fs.Use("AfterUpload", HookClearFileHeaderSize) - } // 验证文件规格 if err := fs.Upload(ctx, file); err != nil { @@ -202,6 +199,9 @@ func (fs *FileSystem) CreateUploadSession(ctx context.Context, file *fsctx.FileS } // 创建占位符 + if !fs.Policy.IsUploadPlaceholderWithSize() { + fs.Use("AfterUpload", HookClearFileHeaderSize) + } fs.Use("AfterUpload", GenericAfterUpload) if err := fs.Upload(ctx, file); err != nil { return nil, err diff --git a/pkg/serializer/upload.go b/pkg/serializer/upload.go index 225c493..485b843 100644 --- a/pkg/serializer/upload.go +++ b/pkg/serializer/upload.go @@ -31,22 +31,23 @@ type UploadCredential struct { Path string `json:"path"` // 存储路径 AccessKey string `json:"ak"` KeyTime string `json:"key_time,omitempty"` // COS用有效期 - Callback string `json:"callback,omitempty"` // 回调地址 Key string `json:"key,omitempty"` // 文件标识符,通常为回调key + Callback string `json:"callback,omitempty"` // 回调地址 } // UploadSession 上传会话 type UploadSession struct { - Key string // 上传会话 GUID - UID uint // 发起者 - VirtualPath string // 用户文件路径,不含文件名 - Name string // 文件名 - Size uint64 // 文件大小 - SavePath string // 物理存储路径,包含物理文件名 - LastModified *time.Time // 可选的文件最后修改日期 - Policy model.Policy - Callback string // 回调 URL 地址 - CallbackSecret string // 回调 URL + Key string // 上传会话 GUID + UID uint // 发起者 + VirtualPath string // 用户文件路径,不含文件名 + Name string // 文件名 + Size uint64 // 文件大小 + SavePath string // 物理存储路径,包含物理文件名 + LastModified *time.Time // 可选的文件最后修改日期 + Policy model.Policy + Callback string // 回调 URL 地址 + CallbackSecret string // 回调 URL + OneDriveUploadURL string } // UploadCallback 上传回调正文 diff --git a/routers/router.go b/routers/router.go index 21f42ae..9f61a33 100644 --- a/routers/router.go +++ b/routers/router.go @@ -264,11 +264,12 @@ func InitMasterRouter() *gin.Engine { { // 文件上传完成 onedrive.POST( - "finish/:key", + "finish/:sessionID", + middleware.UseUploadSession("onedrive"), middleware.OneDriveCallbackAuth(), controllers.OneDriveCallback, ) - // 文件上传完成 + // OAuth 完成 onedrive.GET( "auth", controllers.OneDriveOAuth, diff --git a/service/callback/upload.go b/service/callback/upload.go index df5d180..4f0495e 100644 --- a/service/callback/upload.go +++ b/service/callback/upload.go @@ -50,7 +50,6 @@ type UpyunCallbackService struct { // OneDriveCallback OneDrive 客户端回调正文 type OneDriveCallback struct { - ID string `json:"id" binding:"required"` Meta *onedrive.FileInfo } @@ -165,22 +164,21 @@ func (service *OneDriveCallback) PreProcess(c *gin.Context) serializer.Response defer fs.Recycle() // 获取回调会话 - callbackSessionRaw, _ := c.Get("callbackSession") - callbackSession := callbackSessionRaw.(*serializer.UploadSession) + uploadSession := c.MustGet(filesystem.UploadSessionCtx).(*serializer.UploadSession) // 获取文件信息 - info, err := fs.Handler.(onedrive.Driver).Client.Meta(context.Background(), service.ID, "") + info, err := fs.Handler.(onedrive.Driver).Client.Meta(context.Background(), "", uploadSession.SavePath) if err != nil { return serializer.Err(serializer.CodeUploadFailed, "文件元信息查询失败", err) } // 验证与回调会话中是否一致 - actualPath := strings.TrimPrefix(callbackSession.SavePath, "/") - isSizeCheckFailed := callbackSession.Size != info.Size + actualPath := strings.TrimPrefix(uploadSession.SavePath, "/") + isSizeCheckFailed := uploadSession.Size != info.Size - // SharePoint 会对 Office 文档增加 meta data 导致文件大小不一致,这里增加 10 KB 宽容 + // SharePoint 会对 Office 文档增加 meta data 导致文件大小不一致,这里增加 100 KB 宽容 // See: https://github.com/OneDrive/onedrive-api-docs/issues/935 - if strings.Contains(fs.Policy.OptionsSerialized.OdDriver, "sharepoint.com") && isSizeCheckFailed && (info.Size > callbackSession.Size) && (info.Size-callbackSession.Size <= 10240) { + if strings.Contains(fs.Policy.OptionsSerialized.OdDriver, "sharepoint.com") && isSizeCheckFailed && (info.Size > uploadSession.Size) && (info.Size-uploadSession.Size <= 102400) { isSizeCheckFailed = false }