From 3444b4a75eb76741780c4d4533bb4560207efa57 Mon Sep 17 00:00:00 2001 From: HFO4 <912394456@qq.com> Date: Sun, 27 Feb 2022 14:13:39 +0800 Subject: [PATCH] Feat: chunk upload handling for local policy --- models/file.go | 21 +++- pkg/filesystem/driver/cos/handler.go | 2 + pkg/filesystem/driver/local/handler.go | 47 +++++++- pkg/filesystem/driver/onedrive/handler.go | 1 + pkg/filesystem/driver/s3/handler.go | 1 + .../driver/shadow/slaveinmaster/handler.go | 2 + pkg/filesystem/filesystem.go | 6 +- pkg/filesystem/fsctx/stream.go | 27 +++-- pkg/filesystem/fsctx/taskinfo/taskinfo.go | 1 + pkg/filesystem/hooks.go | 65 ++++++++--- pkg/filesystem/upload.go | 43 +++---- pkg/request/request.go | 5 +- pkg/serializer/error.go | 6 + pkg/serializer/upload.go | 8 +- pkg/webdav/webdav.go | 3 +- routers/controllers/file.go | 2 + service/explorer/upload.go | 107 +++++++++++++++++- 17 files changed, 280 insertions(+), 67 deletions(-) create mode 100644 pkg/filesystem/fsctx/taskinfo/taskinfo.go diff --git a/models/file.go b/models/file.go index 92c8f82..7c472ef 100644 --- a/models/file.go +++ b/models/file.go @@ -200,6 +200,13 @@ func GetFilesByParentIDs(ids []uint, uid uint) ([]File, error) { return files, result.Error } +// GetFilesByUploadSession 查找上传会话对应的文件 +func GetFilesByUploadSession(sessionID string, uid uint) (*File, error) { + file := File{} + result := DB.Where("user_id = ? and upload_session_id = ?", uid, sessionID).Find(&file) + return &file, result.Error +} + // Rename 重命名文件 func (file *File) Rename(new string) error { return DB.Model(&file).Update("name", new).Error @@ -207,7 +214,7 @@ func (file *File) Rename(new string) error { // UpdatePicInfo 更新文件的图像信息 func (file *File) UpdatePicInfo(value string) error { - return DB.Model(&file).Set("gorm:association_autoupdate", false).Update("pic_info", value).Error + return DB.Model(&file).Set("gorm:association_autoupdate", false).UpdateColumns(File{PicInfo: value}).Error } // UpdateSize 更新文件的大小信息 @@ -220,6 +227,18 @@ func (file *File) UpdateSourceName(value string) error { return DB.Model(&file).Set("gorm:association_autoupdate", false).Update("source_name", value).Error } +func (file *File) PopChunkToFile(lastModified *time.Time) error { + file.UploadSessionID = nil + if lastModified != nil { + file.UpdatedAt = *lastModified + } + + return DB.Model(file).UpdateColumns(map[string]interface{}{ + "upload_session_id": file.UploadSessionID, + "updated_at": file.UpdatedAt, + }).Error +} + // CanCopy 返回文件是否可被复制 func (file *File) CanCopy() bool { return file.UploadSessionID == nil diff --git a/pkg/filesystem/driver/cos/handler.go b/pkg/filesystem/driver/cos/handler.go index 505af1b..efad19f 100644 --- a/pkg/filesystem/driver/cos/handler.go +++ b/pkg/filesystem/driver/cos/handler.go @@ -184,6 +184,8 @@ func (handler Driver) Get(ctx context.Context, path string) (response.RSCloser, // Put 将文件流保存到指定目录 func (handler Driver) Put(ctx context.Context, file fsctx.FileHeader) error { + defer file.Close() + opt := &cossdk.ObjectPutOptions{} _, err := handler.Client.Object.Put(ctx, file.Info().SavePath, file, opt) return err diff --git a/pkg/filesystem/driver/local/handler.go b/pkg/filesystem/driver/local/handler.go index 7985038..3c428ca 100644 --- a/pkg/filesystem/driver/local/handler.go +++ b/pkg/filesystem/driver/local/handler.go @@ -19,6 +19,10 @@ import ( "github.com/cloudreve/Cloudreve/v3/pkg/util" ) +const ( + Perm = 0744 +) + // Driver 本地策略适配器 type Driver struct { Policy *model.Policy @@ -99,26 +103,61 @@ func (handler Driver) Put(ctx context.Context, file fsctx.FileHeader) error { // 如果目标目录不存在,创建 basePath := filepath.Dir(dst) if !util.Exists(basePath) { - err := os.MkdirAll(basePath, 0744) + err := os.MkdirAll(basePath, Perm) if err != nil { util.Log().Warning("无法创建目录,%s", err) return err } } - // 创建目标文件 - out, err := os.Create(dst) + var ( + out *os.File + err error + ) + + if fileInfo.Mode == fsctx.Append { + // 如果是追加模式,则直接打开文件 + out, err = os.OpenFile(dst, os.O_APPEND|os.O_CREATE|os.O_WRONLY, Perm) + } else { + // 创建或覆盖目标文件 + out, err = os.Create(dst) + } + if err != nil { - util.Log().Warning("无法创建文件,%s", err) + util.Log().Warning("无法打开或创建文件,%s", err) return err } defer out.Close() + if fileInfo.Mode == fsctx.Append { + stat, err := out.Stat() + if err != nil { + util.Log().Warning("无法读取文件信息,%s", err) + return err + } + + if uint64(stat.Size()) != fileInfo.AppendStart { + return errors.New("未上传完成的文件分片与预期大小不一致") + } + } + // 写入文件内容 _, err = io.Copy(out, file) return err } +func (handler Driver) Truncate(ctx context.Context, src string, size uint64) error { + util.Log().Warning("截断文件 [%s] 至 [%d]", src, size) + out, err := os.OpenFile(src, os.O_WRONLY, Perm) + if err != nil { + util.Log().Warning("无法打开或创建文件,%s", err) + return err + } + + defer out.Close() + return out.Truncate(int64(size)) +} + // Delete 删除一个或多个文件, // 返回未删除的文件,及遇到的最后一个错误 func (handler Driver) Delete(ctx context.Context, files []string) ([]string, error) { diff --git a/pkg/filesystem/driver/onedrive/handler.go b/pkg/filesystem/driver/onedrive/handler.go index df14f04..7d043d9 100644 --- a/pkg/filesystem/driver/onedrive/handler.go +++ b/pkg/filesystem/driver/onedrive/handler.go @@ -122,6 +122,7 @@ func (handler Driver) Get(ctx context.Context, path string) (response.RSCloser, // Put 将文件流保存到指定目录 func (handler Driver) Put(ctx context.Context, file fsctx.FileHeader) error { defer file.Close() + return handler.Client.Upload(ctx, file) } diff --git a/pkg/filesystem/driver/s3/handler.go b/pkg/filesystem/driver/s3/handler.go index da3ae4a..8152ab7 100644 --- a/pkg/filesystem/driver/s3/handler.go +++ b/pkg/filesystem/driver/s3/handler.go @@ -198,6 +198,7 @@ func (handler Driver) Get(ctx context.Context, path string) (response.RSCloser, // Put 将文件流保存到指定目录 func (handler Driver) Put(ctx context.Context, file fsctx.FileHeader) error { + defer file.Close() // 初始化客户端 if err := handler.InitS3Client(); err != nil { diff --git a/pkg/filesystem/driver/shadow/slaveinmaster/handler.go b/pkg/filesystem/driver/shadow/slaveinmaster/handler.go index 988e5b9..92fee1c 100644 --- a/pkg/filesystem/driver/shadow/slaveinmaster/handler.go +++ b/pkg/filesystem/driver/shadow/slaveinmaster/handler.go @@ -50,6 +50,8 @@ func NewDriver(node cluster.Node, handler driver.Handler, policy *model.Policy) // Put 将ctx中指定的从机物理文件由从机上传到目标存储策略 func (d *Driver) Put(ctx context.Context, file fsctx.FileHeader) error { + defer file.Close() + src, ok := ctx.Value(fsctx.SlaveSrcPath).(string) if !ok { return ErrSlaveSrcPathNotExist diff --git a/pkg/filesystem/filesystem.go b/pkg/filesystem/filesystem.go index 8597960..8b000f0 100644 --- a/pkg/filesystem/filesystem.go +++ b/pkg/filesystem/filesystem.go @@ -213,11 +213,7 @@ func NewFileSystemFromCallback(c *gin.Context) (*FileSystem, error) { callbackSession := callbackSessionRaw.(*serializer.UploadSession) // 重新指向上传策略 - policy, err := model.GetPolicyByID(callbackSession.PolicyID) - if err != nil { - return nil, err - } - fs.Policy = &policy + fs.Policy = &callbackSession.Policy fs.User.Policy = policy err = fs.DispatchHandler() diff --git a/pkg/filesystem/fsctx/stream.go b/pkg/filesystem/fsctx/stream.go index d141800..53ff7a4 100644 --- a/pkg/filesystem/fsctx/stream.go +++ b/pkg/filesystem/fsctx/stream.go @@ -15,14 +15,6 @@ const ( Nop ) -// FileHeader 上传来的文件数据处理器 -type FileHeader interface { - io.Reader - io.Closer - Info() *UploadTaskInfo - SetSize(uint64) -} - type UploadTaskInfo struct { Size uint64 MIMEType string @@ -33,6 +25,17 @@ type UploadTaskInfo struct { LastModified *time.Time SavePath string UploadSessionID *string + AppendStart uint64 + Model interface{} +} + +// FileHeader 上传来的文件数据处理器 +type FileHeader interface { + io.Reader + io.Closer + Info() *UploadTaskInfo + SetSize(uint64) + SetModel(fileModel interface{}) } // FileStream 用户传来的文件 @@ -47,6 +50,8 @@ type FileStream struct { MIMEType string SavePath string UploadSessionID *string + AppendStart uint64 + Model interface{} } func (file *FileStream) Read(p []byte) (n int, err error) { @@ -68,9 +73,15 @@ func (file *FileStream) Info() *UploadTaskInfo { LastModified: file.LastModified, SavePath: file.SavePath, UploadSessionID: file.UploadSessionID, + AppendStart: file.AppendStart, + Model: file.Model, } } func (file *FileStream) SetSize(size uint64) { file.Size = size } + +func (file *FileStream) SetModel(fileModel interface{}) { + file.Model = fileModel +} diff --git a/pkg/filesystem/fsctx/taskinfo/taskinfo.go b/pkg/filesystem/fsctx/taskinfo/taskinfo.go new file mode 100644 index 0000000..1899c1b --- /dev/null +++ b/pkg/filesystem/fsctx/taskinfo/taskinfo.go @@ -0,0 +1 @@ +package taskinfo diff --git a/pkg/filesystem/hooks.go b/pkg/filesystem/hooks.go index 42535e7..cf30d2c 100644 --- a/pkg/filesystem/hooks.go +++ b/pkg/filesystem/hooks.go @@ -8,7 +8,9 @@ import ( "sync" model "github.com/cloudreve/Cloudreve/v3/models" + "github.com/cloudreve/Cloudreve/v3/pkg/cache" "github.com/cloudreve/Cloudreve/v3/pkg/conf" + "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver/local" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx" "github.com/cloudreve/Cloudreve/v3/pkg/request" "github.com/cloudreve/Cloudreve/v3/pkg/serializer" @@ -223,25 +225,13 @@ func GenericAfterUpdate(ctx context.Context, fs *FileSystem, newFile fsctx.FileH return ErrObjectNotExist } - fs.SetTargetFile(&[]model.File{originFile}) + newFile.SetModel(&originFile) err := originFile.UpdateSize(newFile.Info().Size) if err != nil { return err } - // 尝试清空原有缩略图并重新生成 - if originFile.GetPolicy().IsThumbGenerateNeeded() { - fs.recycleLock.Lock() - go func() { - defer fs.recycleLock.Unlock() - if originFile.PicInfo != "" { - _, _ = fs.Handler.Delete(ctx, []string{originFile.SourceName + conf.ThumbConfig.FileSuffix}) - fs.GenerateThumbnail(ctx, &originFile) - } - }() - } - return nil } @@ -302,17 +292,23 @@ func GenericAfterUpload(ctx context.Context, fs *FileSystem, fileHeader fsctx.Fi if err != nil { return ErrInsertFileRecord } - fs.SetTargetFile(&[]model.File{*file}) + fileHeader.SetModel(file) + return nil +} + +// HookGenerateThumb 生成缩略图 +func HookGenerateThumb(ctx context.Context, fs *FileSystem, fileHeader fsctx.FileHeader) error { // 异步尝试生成缩略图 + fileMode := fileHeader.Info().Model.(*model.File) if fs.Policy.IsThumbGenerateNeeded() { fs.recycleLock.Lock() go func() { defer fs.recycleLock.Unlock() - fs.GenerateThumbnail(ctx, file) + _, _ = fs.Handler.Delete(ctx, []string{fileMode.SourceName + conf.ThumbConfig.FileSuffix}) + fs.GenerateThumbnail(ctx, fileMode) }() } - return nil } @@ -321,3 +317,40 @@ func HookClearFileHeaderSize(ctx context.Context, fs *FileSystem, fileHeader fsc fileHeader.SetSize(0) return nil } + +// HookTruncateFileTo 将物理文件截断至 size +func HookTruncateFileTo(size uint64) Hook { + return func(ctx context.Context, fs *FileSystem, fileHeader fsctx.FileHeader) error { + if fs.Policy.Type == "local" { + if driver, ok := fs.Handler.(local.Driver); ok { + return driver.Truncate(ctx, fileHeader.Info().SavePath, size) + } + } + + return nil + } +} + +// HookChunkUploadFinished 单个分片上传结束后 +func HookChunkUploaded(ctx context.Context, fs *FileSystem, fileHeader fsctx.FileHeader) error { + fileInfo := fileHeader.Info() + + // 更新文件大小 + return fileInfo.Model.(*model.File).UpdateSize(fileInfo.Model.(*model.File).GetSize() + fileInfo.Size) +} + +// HookChunkUploadFinished 分片上传结束后处理文件 +func HookChunkUploadFinished(ctx context.Context, fs *FileSystem, fileHeader fsctx.FileHeader) error { + fileInfo := fileHeader.Info() + fileModel := fileInfo.Model.(*model.File) + + return fileModel.PopChunkToFile(fileInfo.LastModified) +} + +// HookChunkUploadFinished 分片上传结束后处理文件 +func HookDeleteUploadSession(id string) Hook { + return func(ctx context.Context, fs *FileSystem, fileHeader fsctx.FileHeader) error { + cache.Deletes([]string{id}, UploadSessionCachePrefix) + return nil + } +} diff --git a/pkg/filesystem/upload.go b/pkg/filesystem/upload.go index 98a3d79..d00805d 100644 --- a/pkg/filesystem/upload.go +++ b/pkg/filesystem/upload.go @@ -22,7 +22,8 @@ import ( */ const ( - UploadSessionMetaKey = "upload_session" + UploadSessionMetaKey = "upload_session" + UploadSessionCachePrefix = "callback_" ) // Upload 上传文件 @@ -36,13 +37,15 @@ func (fs *FileSystem) Upload(ctx context.Context, file *fsctx.FileStream) (err e // 生成文件名和路径, var savePath string - // 如果是更新操作就从上下文中获取 - if originFile, ok := ctx.Value(fsctx.FileModelCtx).(model.File); ok { - savePath = originFile.SourceName - } else { - savePath = fs.GenerateSavePath(ctx, file) + if file.SavePath == "" { + // 如果是更新操作就从上下文中获取 + if originFile, ok := ctx.Value(fsctx.FileModelCtx).(model.File); ok { + savePath = originFile.SourceName + } else { + savePath = fs.GenerateSavePath(ctx, file) + } + file.SavePath = savePath } - file.SavePath = savePath // 处理客户端未完成上传时,关闭连接 go fs.CancelUpload(ctx, savePath, file) @@ -70,14 +73,15 @@ func (fs *FileSystem) Upload(ctx context.Context, file *fsctx.FileStream) (err e return err } - fileInfo := file.Info() - util.Log().Info( - "新文件PUT:%s , 大小:%d, 上传者:%s", - fileInfo.FileName, - fileInfo.Size, - fs.User.Nick, - ) - + if file.Mode == fsctx.Create { + fileInfo := file.Info() + util.Log().Info( + "新文件PUT:%s , 大小:%d, 上传者:%s", + fileInfo.FileName, + fileInfo.Size, + fs.User.Nick, + ) + } return nil } @@ -159,6 +163,7 @@ func (fs *FileSystem) CreateUploadSession(ctx context.Context, file *fsctx.FileS callBackSessionTTL := model.GetIntSetting("upload_session_timeout", 86400) callbackKey := uuid.Must(uuid.NewV4()).String() + fileSize := file.Size // 创建占位的文件,同时校验文件信息 file.Mode = fsctx.Nop @@ -177,12 +182,11 @@ func (fs *FileSystem) CreateUploadSession(ctx context.Context, file *fsctx.FileS uploadSession := &serializer.UploadSession{ Key: callbackKey, UID: fs.User.ID, - PolicyID: fs.Policy.ID, + Policy: *fs.Policy, VirtualPath: file.VirtualPath, Name: file.Name, - Size: file.Size, + Size: fileSize, SavePath: file.SavePath, - ChunkSize: fs.Policy.OptionsSerialized.ChunkSize, LastModified: file.LastModified, } @@ -194,7 +198,7 @@ func (fs *FileSystem) CreateUploadSession(ctx context.Context, file *fsctx.FileS // 创建回调会话 err = cache.Set( - "callback_"+callbackKey, + UploadSessionCachePrefix+callbackKey, uploadSession, callBackSessionTTL, ) @@ -218,6 +222,7 @@ func (fs *FileSystem) UploadFromStream(ctx context.Context, file *fsctx.FileStre fs.Use("AfterUploadCanceled", HookDeleteTempFile) fs.Use("AfterUploadCanceled", HookGiveBackCapacity) fs.Use("AfterUpload", GenericAfterUpload) + fs.Use("AfterUpload", HookGenerateThumb) fs.Use("AfterValidateFailed", HookDeleteTempFile) fs.Use("AfterValidateFailed", HookGiveBackCapacity) fs.Use("AfterUploadFailed", HookGiveBackCapacity) diff --git a/pkg/request/request.go b/pkg/request/request.go index 94a088d..7ce579c 100644 --- a/pkg/request/request.go +++ b/pkg/request/request.go @@ -248,9 +248,6 @@ func (instance NopRSCloser) Seek(offset int64, whence int) (int64, error) { // BlackHole 将客户端发来的数据放入黑洞 func BlackHole(r io.Reader) { if !model.IsTrueVal(model.GetSettingByName("reset_after_upload_failed")) { - _, err := io.Copy(ioutil.Discard, r) - if err != nil { - util.Log().Debug("黑洞数据出错,%s", err) - } + io.Copy(ioutil.Discard, r) } } diff --git a/pkg/serializer/error.go b/pkg/serializer/error.go index d5e971c..51e2b7c 100644 --- a/pkg/serializer/error.go +++ b/pkg/serializer/error.go @@ -72,6 +72,12 @@ const ( CodeAdminRequired = 40008 // CodeMasterNotFound 主机节点未注册 CodeMasterNotFound = 40009 + // CodeUploadSessionExpired 上传会话已过期 + CodeUploadSessionExpired = 400011 + // CodeInvalidChunkIndex 无效的分片序号 + CodeInvalidChunkIndex = 400012 + // CodeInvalidContentLength 无效的正文长度 + CodeInvalidContentLength = 400013 // CodeDBError 数据库操作失败 CodeDBError = 50001 // CodeEncryptError 加密失败 diff --git a/pkg/serializer/upload.go b/pkg/serializer/upload.go index 09547bc..e13a5fe 100644 --- a/pkg/serializer/upload.go +++ b/pkg/serializer/upload.go @@ -4,6 +4,7 @@ import ( "encoding/base64" "encoding/gob" "encoding/json" + model "github.com/cloudreve/Cloudreve/v3/models" "time" ) @@ -34,15 +35,14 @@ type UploadCredential struct { // UploadSession 上传会话 type UploadSession struct { - Key string // 上传会话 GUID - UID uint // 发起者 - PolicyID uint + Key string // 上传会话 GUID + UID uint // 发起者 VirtualPath string // 用户文件路径,不含文件名 Name string // 文件名 Size uint64 // 文件大小 SavePath string // 物理存储路径,包含物理文件名 - ChunkSize uint64 // 分块大小,0 为不分快 LastModified *time.Time // 可选的文件最后修改日期 + Policy model.Policy } // UploadCallback 上传回调正文 diff --git a/pkg/webdav/webdav.go b/pkg/webdav/webdav.go index 350f18f..10cd822 100644 --- a/pkg/webdav/webdav.go +++ b/pkg/webdav/webdav.go @@ -367,6 +367,7 @@ func (h *Handler) handlePut(w http.ResponseWriter, r *http.Request, fs *filesyst fs.Use("AfterUploadCanceled", filesystem.HookGiveBackCapacity) fs.Use("AfterUploadCanceled", filesystem.HookCancelContext) fs.Use("AfterUpload", filesystem.GenericAfterUpload) + fs.Use("AfterUpload", filesystem.HookGenerateThumb) fs.Use("AfterValidateFailed", filesystem.HookDeleteTempFile) fs.Use("AfterValidateFailed", filesystem.HookGiveBackCapacity) fs.Use("AfterUploadFailed", filesystem.HookGiveBackCapacity) @@ -381,7 +382,7 @@ func (h *Handler) handlePut(w http.ResponseWriter, r *http.Request, fs *filesyst return http.StatusMethodNotAllowed, err } - etag, err := findETag(ctx, fs, h.LockSystem[fs.User.ID], reqPath, &fs.FileTarget[0]) + etag, err := findETag(ctx, fs, h.LockSystem[fs.User.ID], reqPath, fileData.Model.(*model.File)) if err != nil { return http.StatusInternalServerError, err } diff --git a/routers/controllers/file.go b/routers/controllers/file.go index 83b0c51..129bd45 100644 --- a/routers/controllers/file.go +++ b/routers/controllers/file.go @@ -7,6 +7,7 @@ import ( "github.com/cloudreve/Cloudreve/v3/pkg/conf" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem" + "github.com/cloudreve/Cloudreve/v3/pkg/request" "github.com/cloudreve/Cloudreve/v3/pkg/serializer" "github.com/cloudreve/Cloudreve/v3/service/explorer" "github.com/gin-gonic/gin" @@ -290,6 +291,7 @@ func FileUpload(c *gin.Context) { if err := c.ShouldBindUri(&service); err == nil { res := service.Upload(ctx, c) c.JSON(200, res) + request.BlackHole(c.Request.Body) } else { c.JSON(200, ErrorResponse(err)) } diff --git a/service/explorer/upload.go b/service/explorer/upload.go index 679d126..ff31cce 100644 --- a/service/explorer/upload.go +++ b/service/explorer/upload.go @@ -2,13 +2,18 @@ package explorer import ( "context" - "github.com/cloudreve/Cloudreve/v3/pkg/request" + "fmt" + model "github.com/cloudreve/Cloudreve/v3/models" + "github.com/cloudreve/Cloudreve/v3/pkg/cache" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx" "github.com/cloudreve/Cloudreve/v3/pkg/hashid" "github.com/cloudreve/Cloudreve/v3/pkg/serializer" + "github.com/cloudreve/Cloudreve/v3/pkg/util" "github.com/gin-gonic/gin" + "strconv" + "sync" "time" ) @@ -62,13 +67,105 @@ func (service *UploadSessionService) Create(ctx context.Context, c *gin.Context) // UploadService 本机策略上传服务 type UploadService struct { ID string `uri:"sessionId" binding:"required"` - Index int `uri:"index"` + Index int `uri:"index" binding:"min=0"` } // Upload 处理本机文件分片上传 func (service *UploadService) Upload(ctx context.Context, c *gin.Context) serializer.Response { - request.BlackHole(c.Request.Body) - return serializer.Response{ - Code: 0, + uploadSessionRaw, ok := cache.Get(filesystem.UploadSessionCachePrefix + service.ID) + if !ok { + return serializer.Err(serializer.CodeUploadSessionExpired, "Upload session expired or not exist", nil) + } + + uploadSession := uploadSessionRaw.(serializer.UploadSession) + + fs, err := filesystem.NewFileSystemFromContext(c) + if err != nil { + return serializer.Err(serializer.CodePolicyNotAllowed, err.Error(), err) + } + + // 查找上传会话创建的占位文件 + file, err := model.GetFilesByUploadSession(service.ID, fs.User.ID) + if err != nil { + return serializer.Err(serializer.CodeUploadSessionExpired, "Upload session file placeholder not exist", err) + } + + // 重设 fs 存储策略 + fs.Policy = &uploadSession.Policy + if err := fs.DispatchHandler(); err != nil { + return serializer.Err(serializer.CodePolicyNotAllowed, "Unknown storage policy", err) + } + + expectedSizeStart := file.Size + actualSizeStart := uint64(service.Index) * uploadSession.Policy.OptionsSerialized.ChunkSize + if uploadSession.Policy.OptionsSerialized.ChunkSize == 0 && service.Index > 0 { + return serializer.Err(serializer.CodeInvalidChunkIndex, "Chunk index cannot be greater than 0", nil) + } + + if expectedSizeStart < actualSizeStart { + return serializer.Err(serializer.CodeInvalidChunkIndex, "Chunk must be uploaded in order", nil) } + + if expectedSizeStart > actualSizeStart { + util.Log().Warning("尝试上传覆盖分片[%d],数据将被忽略", service.Index) + return serializer.Response{} + } + + return processChunkUpload(ctx, c, fs, &uploadSession, service.Index, file) +} + +func processChunkUpload(ctx context.Context, c *gin.Context, fs *filesystem.FileSystem, session *serializer.UploadSession, index int, file *model.File) serializer.Response { + // 取得并校验文件大小是否符合分片要求 + chunkSize := session.Policy.OptionsSerialized.ChunkSize + isLastChunk := session.Policy.OptionsSerialized.ChunkSize == 0 || uint64(index+1)*chunkSize >= session.Size + expectedLength := chunkSize + if isLastChunk { + expectedLength = session.Size - uint64(index)*chunkSize + } + + fileSize, err := strconv.ParseUint(c.Request.Header.Get("Content-Length"), 10, 64) + if err != nil || fileSize == 0 || (expectedLength != fileSize) { + return serializer.Err( + serializer.CodeInvalidContentLength, + fmt.Sprintf("Invalid Content-Length (expected: %d)", expectedLength), + err, + ) + } + + fileData := fsctx.FileStream{ + MIMEType: c.Request.Header.Get("Content-Type"), + File: c.Request.Body, + Size: fileSize, + Name: session.Name, + VirtualPath: session.VirtualPath, + SavePath: session.SavePath, + Mode: fsctx.Append, + AppendStart: chunkSize * uint64(index), + Model: file, + LastModified: session.LastModified, + } + + // 给文件系统分配钩子 + fs.Use("BeforeUpload", filesystem.HookValidateCapacity) + fs.Use("AfterUploadCanceled", filesystem.HookTruncateFileTo(fileData.AppendStart)) + fs.Use("AfterUploadCanceled", filesystem.HookGiveBackCapacity) + fs.Use("AfterUpload", filesystem.HookChunkUploaded) + if isLastChunk { + fs.Use("AfterUpload", filesystem.HookChunkUploadFinished) + fs.Use("AfterUpload", filesystem.HookGenerateThumb) + fs.Use("AfterUpload", filesystem.HookDeleteUploadSession(session.Key)) + } + fs.Use("AfterValidateFailed", filesystem.HookTruncateFileTo(fileData.AppendStart)) + fs.Use("AfterValidateFailed", filesystem.HookGiveBackCapacity) + fs.Use("AfterUploadFailed", filesystem.HookGiveBackCapacity) + + // 执行上传 + ctx = context.WithValue(ctx, fsctx.ValidateCapacityOnceCtx, &sync.Once{}) + uploadCtx := context.WithValue(ctx, fsctx.GinCtx, c) + err = fs.Upload(uploadCtx, &fileData) + if err != nil { + return serializer.Err(serializer.CodeUploadFailed, err.Error(), err) + } + + return serializer.Response{} }