From febbd0c5a0726926471553610c2675571707b5fc Mon Sep 17 00:00:00 2001 From: HFO4 <912394456@qq.com> Date: Wed, 13 Apr 2022 17:53:46 +0800 Subject: [PATCH] Feat: batch download in streamming paradism Fix: add cache-controler header in API call responses --- assets | 2 +- middleware/{option.go => common.go} | 7 +++ middleware/{option_test.go => common_test.go} | 0 models/user.go | 5 ++ pkg/filesystem/archive.go | 49 +++++-------------- pkg/task/compress.go | 26 ++++++++-- routers/controllers/file.go | 9 ++-- routers/controllers/share.go | 2 +- routers/router.go | 10 +++- service/explorer/file.go | 45 ++++++++++------- service/explorer/objects.go | 40 +++++---------- 11 files changed, 98 insertions(+), 97 deletions(-) rename middleware/{option.go => common.go} (85%) rename middleware/{option_test.go => common_test.go} (100%) diff --git a/assets b/assets index 907f865..da7a3a3 160000 --- a/assets +++ b/assets @@ -1 +1 @@ -Subproject commit 907f86550d759afb9b156515f25e60fff5a1d29a +Subproject commit da7a3a38bf924d53072a7c56d144b0e777ea0511 diff --git a/middleware/option.go b/middleware/common.go similarity index 85% rename from middleware/option.go rename to middleware/common.go index daa3753..c3a2283 100644 --- a/middleware/option.go +++ b/middleware/common.go @@ -38,3 +38,10 @@ func IsFunctionEnabled(key string) gin.HandlerFunc { c.Next() } } + +// CacheControl 屏蔽客户端缓存 +func CacheControl() gin.HandlerFunc { + return func(c *gin.Context) { + c.Header("Cache-Control", "private, no-cache") + } +} diff --git a/middleware/option_test.go b/middleware/common_test.go similarity index 100% rename from middleware/option_test.go rename to middleware/common_test.go diff --git a/models/user.go b/models/user.go index b311167..cec6ec9 100644 --- a/models/user.go +++ b/models/user.go @@ -3,6 +3,7 @@ package model import ( "crypto/md5" "crypto/sha1" + "encoding/gob" "encoding/hex" "encoding/json" "strings" @@ -46,6 +47,10 @@ type User struct { OptionsSerialized UserOption `gorm:"-"` } +func init() { + gob.Register(User{}) +} + // UserOption 用户个性化配置字段 type UserOption struct { ProfileOff bool `json:"profile_off,omitempty"` diff --git a/pkg/filesystem/archive.go b/pkg/filesystem/archive.go index cca49eb..4468c45 100644 --- a/pkg/filesystem/archive.go +++ b/pkg/filesystem/archive.go @@ -28,17 +28,17 @@ import ( */ // Compress 创建给定目录和文件的压缩文件 -func (fs *FileSystem) Compress(ctx context.Context, folderIDs, fileIDs []uint, isArchive bool) (string, error) { +func (fs *FileSystem) Compress(ctx context.Context, writer io.Writer, folderIDs, fileIDs []uint, isArchive bool) error { // 查找待压缩目录 folders, err := model.GetFoldersByIDs(folderIDs, fs.User.ID) if err != nil && len(folderIDs) != 0 { - return "", ErrDBListObjects + return ErrDBListObjects } // 查找待压缩文件 files, err := model.GetFilesByIDs(fileIDs, fs.User.ID) if err != nil && len(fileIDs) != 0 { - return "", ErrDBListObjects + return ErrDBListObjects } // 如果上下文限制了父目录,则进行检查 @@ -46,14 +46,14 @@ func (fs *FileSystem) Compress(ctx context.Context, folderIDs, fileIDs []uint, i // 检查目录 for _, folder := range folders { if *folder.ParentID != parent.ID { - return "", ErrObjectNotExist + return ErrObjectNotExist } } // 检查文件 for _, file := range files { if file.FolderID != parent.ID { - return "", ErrObjectNotExist + return ErrObjectNotExist } } } @@ -73,25 +73,8 @@ func (fs *FileSystem) Compress(ctx context.Context, folderIDs, fileIDs []uint, i files[i].Position = "" } - // 创建临时压缩文件 - saveFolder := "archive" - if !isArchive { - saveFolder = "compress" - } - zipFilePath := filepath.Join( - util.RelativePath(model.GetSettingByName("temp_path")), - saveFolder, - fmt.Sprintf("archive_%d.zip", time.Now().UnixNano()), - ) - zipFile, err := util.CreatNestedFile(zipFilePath) - if err != nil { - util.Log().Warning("%s", err) - return "", err - } - defer zipFile.Close() - // 创建压缩文件Writer - zipWriter := zip.NewWriter(zipFile) + zipWriter := zip.NewWriter(writer) defer zipWriter.Close() ctx = reqContext @@ -101,10 +84,9 @@ func (fs *FileSystem) Compress(ctx context.Context, folderIDs, fileIDs []uint, i select { case <-reqContext.Done(): // 取消压缩请求 - fs.cancelCompress(ctx, zipWriter, zipFile, zipFilePath) - return "", ErrClientCanceled + return ErrClientCanceled default: - fs.doCompress(ctx, nil, &folders[i], zipWriter, isArchive) + fs.doCompress(reqContext, nil, &folders[i], zipWriter, isArchive) } } @@ -112,22 +94,13 @@ func (fs *FileSystem) Compress(ctx context.Context, folderIDs, fileIDs []uint, i select { case <-reqContext.Done(): // 取消压缩请求 - fs.cancelCompress(ctx, zipWriter, zipFile, zipFilePath) - return "", ErrClientCanceled + return ErrClientCanceled default: - fs.doCompress(ctx, &files[i], nil, zipWriter, isArchive) + fs.doCompress(reqContext, &files[i], nil, zipWriter, isArchive) } } - return zipFilePath, nil -} - -// cancelCompress 取消压缩进程 -func (fs *FileSystem) cancelCompress(ctx context.Context, zipWriter *zip.Writer, file *os.File, path string) { - util.Log().Debug("客户端取消压缩请求") - zipWriter.Close() - file.Close() - _ = os.Remove(path) + return nil } func (fs *FileSystem) doCompress(ctx context.Context, file *model.File, folder *model.Folder, zipWriter *zip.Writer, isArchive bool) { diff --git a/pkg/task/compress.go b/pkg/task/compress.go index f7314ec..50b21e3 100644 --- a/pkg/task/compress.go +++ b/pkg/task/compress.go @@ -3,7 +3,10 @@ package task import ( "context" "encoding/json" + "fmt" "os" + "path/filepath" + "time" model "github.com/cloudreve/Cloudreve/v3/models" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem" @@ -93,20 +96,37 @@ func (job *CompressTask) Do() { util.Log().Debug("开始压缩文件") job.TaskModel.SetProgress(CompressingProgress) + // 创建临时压缩文件 + saveFolder := "compress" + zipFilePath := filepath.Join( + util.RelativePath(model.GetSettingByName("temp_path")), + saveFolder, + fmt.Sprintf("archive_%d.zip", time.Now().UnixNano()), + ) + zipFile, err := util.CreatNestedFile(zipFilePath) + if err != nil { + util.Log().Warning("%s", err) + job.SetErrorMsg(err.Error()) + return + } + + defer zipFile.Close() + // 开始压缩 ctx := context.Background() - zipFile, err := fs.Compress(ctx, job.TaskProps.Dirs, job.TaskProps.Files, false) + err = fs.Compress(ctx, zipFile, job.TaskProps.Dirs, job.TaskProps.Files, false) if err != nil { job.SetErrorMsg(err.Error()) return } - job.zipPath = zipFile + job.zipPath = zipFilePath + zipFile.Close() util.Log().Debug("压缩文件存放至%s,开始上传", zipFile) job.TaskModel.SetProgress(TransferringProgress) // 上传文件 - err = fs.UploadFromPath(ctx, zipFile, job.TaskProps.Dst, 0) + err = fs.UploadFromPath(ctx, zipFilePath, job.TaskProps.Dst, 0) if err != nil { job.SetErrorMsg(err.Error()) return diff --git a/routers/controllers/file.go b/routers/controllers/file.go index 1141d7a..67bbb73 100644 --- a/routers/controllers/file.go +++ b/routers/controllers/file.go @@ -18,12 +18,9 @@ func DownloadArchive(c *gin.Context) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - var service explorer.DownloadService + var service explorer.ArchiveService if err := c.ShouldBindUri(&service); err == nil { - res := service.DownloadArchived(ctx, c) - if res.Code != 0 { - c.JSON(200, res) - } + service.DownloadArchived(ctx, c) } else { c.JSON(200, ErrorResponse(err)) } @@ -189,7 +186,7 @@ func Preview(c *gin.Context) { res := service.PreviewContent(ctx, c, false) // 是否需要重定向 if res.Code == -301 { - c.Redirect(301, res.Data.(string)) + c.Redirect(302, res.Data.(string)) return } // 是否有错误发生 diff --git a/routers/controllers/share.go b/routers/controllers/share.go index 10e8980..88b476d 100644 --- a/routers/controllers/share.go +++ b/routers/controllers/share.go @@ -100,7 +100,7 @@ func PreviewShare(c *gin.Context) { res := service.PreviewContent(ctx, c, false) // 是否需要重定向 if res.Code == -301 { - c.Redirect(301, res.Data.(string)) + c.Redirect(302, res.Data.(string)) return } // 是否有错误发生 diff --git a/routers/router.go b/routers/router.go index 962179f..da2cc4c 100644 --- a/routers/router.go +++ b/routers/router.go @@ -34,6 +34,8 @@ func InitSlaveRouter() *gin.Engine { v3.Use(middleware.SignRequired(auth.General)) // 主机信息解析 v3.Use(middleware.MasterMetadata()) + // 禁止缓存 + v3.Use(middleware.CacheControl()) /* 路由 @@ -133,8 +135,12 @@ func InitMasterRouter() *gin.Engine { if gin.Mode() == gin.TestMode { v3.Use(middleware.MockHelper()) } + // 用户会话 v3.Use(middleware.CurrentUser()) + // 禁止缓存 + v3.Use(middleware.CacheControl()) + /* 路由 */ @@ -205,10 +211,10 @@ func InitMasterRouter() *gin.Engine { file.GET("get/:id/:name", controllers.AnonymousGetContent) // 文件外链(301跳转) file.GET("source/:id/:name", controllers.AnonymousPermLink) - // 下載已经打包好的文件 - file.GET("archive/:id/archive.zip", controllers.DownloadArchive) // 下载文件 file.GET("download/:id", controllers.Download) + // 打包并下载文件 + file.GET("archive/:sessionID/archive.zip", controllers.DownloadArchive) } } diff --git a/service/explorer/file.go b/service/explorer/file.go index cc255bd..5899880 100644 --- a/service/explorer/file.go +++ b/service/explorer/file.go @@ -12,7 +12,6 @@ import ( "path" "strconv" "strings" - "time" model "github.com/cloudreve/Cloudreve/v3/models" "github.com/cloudreve/Cloudreve/v3/pkg/cache" @@ -42,6 +41,11 @@ type DownloadService struct { ID string `uri:"id" binding:"required"` } +// ArchiveService 文件流式打包下載服务 +type ArchiveService struct { + ID string `uri:"sessionID" binding:"required"` +} + // New 创建新文件 func (service *SingleFileService) Create(c *gin.Context) serializer.Response { // 创建文件系统 @@ -93,8 +97,14 @@ func (service *SlaveListService) List(c *gin.Context) serializer.Response { return serializer.Response{Data: string(res)} } -// DownloadArchived 下載已打包的多文件 -func (service *DownloadService) DownloadArchived(ctx context.Context, c *gin.Context) serializer.Response { +// DownloadArchived 通过预签名 URL 打包下载 +func (service *ArchiveService) DownloadArchived(ctx context.Context, c *gin.Context) serializer.Response { + userRaw, exist := cache.Get("archive_user_" + service.ID) + if !exist { + return serializer.Err(404, "归档会话不存在", nil) + } + user := userRaw.(model.User) + // 创建文件系统 fs, err := filesystem.NewFileSystemFromContext(c) if err != nil { @@ -103,31 +113,28 @@ func (service *DownloadService) DownloadArchived(ctx context.Context, c *gin.Con defer fs.Recycle() // 查找打包的临时文件 - zipPath, exist := cache.Get("archive_" + service.ID) + archiveSession, exist := cache.Get("archive_" + service.ID) if !exist { - return serializer.Err(404, "归档文件不存在", nil) + return serializer.Err(404, "归档会话不存在", nil) } - // 获取文件流 - rs, err := fs.GetPhysicalFileContent(ctx, zipPath.(string)) - defer rs.Close() - if err != nil { - return serializer.Err(serializer.CodeNotSet, err.Error(), err) - } - - if fs.User.Group.OptionsSerialized.OneTimeDownload { - // 清理资源,删除临时文件 - _ = cache.Deletes([]string{service.ID}, "archive_") - } + // 清理打包会话 + _ = cache.Deletes([]string{service.ID, "user_" + service.ID}, "archive_") + // 开始打包 c.Header("Content-Disposition", "attachment;") c.Header("Content-Type", "application/zip") - http.ServeContent(c.Writer, c.Request, "", time.Now(), rs) + itemService := archiveSession.(ItemIDService) + items := itemService.Raw() + ctx = context.WithValue(ctx, fsctx.GinCtx, c) + err = fs.Compress(ctx, c.Writer, items.Dirs, items.Items, true) + if err != nil { + return serializer.Err(serializer.CodeNotSet, "无法创建压缩文件", err) + } return serializer.Response{ Code: 0, } - } // Download 签名的匿名文件下载 @@ -261,7 +268,7 @@ func (service *FileIDService) CreateDownloadSession(ctx context.Context, c *gin. // Download 通过签名URL的文件下载,无需登录 func (service *DownloadService) Download(ctx context.Context, c *gin.Context) serializer.Response { // 创建文件系统 - fs, err := filesystem.NewFileSystemFromContext(c) + fs, err := filesystem.NewFileSystem(&user) if err != nil { return serializer.Err(serializer.CodePolicyNotAllowed, err.Error(), err) } diff --git a/service/explorer/objects.go b/service/explorer/objects.go index e15589f..dab5850 100644 --- a/service/explorer/objects.go +++ b/service/explorer/objects.go @@ -2,9 +2,9 @@ package explorer import ( "context" + "encoding/gob" "fmt" "math" - "net/url" "path" "strings" "time" @@ -13,7 +13,6 @@ import ( "github.com/cloudreve/Cloudreve/v3/pkg/auth" "github.com/cloudreve/Cloudreve/v3/pkg/cache" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem" - "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx" "github.com/cloudreve/Cloudreve/v3/pkg/hashid" "github.com/cloudreve/Cloudreve/v3/pkg/serializer" "github.com/cloudreve/Cloudreve/v3/pkg/task" @@ -67,6 +66,10 @@ type ItemPropertyService struct { IsFolder bool `form:"is_folder"` } +func init() { + gob.Register(ItemIDService{}) +} + // Raw 批量解码HashID,获取原始ID func (service *ItemIDService) Raw() *ItemService { if service.Source != nil { @@ -232,37 +235,20 @@ func (service *ItemIDService) Archive(ctx context.Context, c *gin.Context) seria return serializer.Err(serializer.CodeGroupNotAllowed, "当前用户组无法进行此操作", nil) } - // 开始压缩 - ctx = context.WithValue(ctx, fsctx.GinCtx, c) - items := service.Raw() - zipFile, err := fs.Compress(ctx, items.Dirs, items.Items, true) - if err != nil { - return serializer.Err(serializer.CodeNotSet, "无法创建压缩文件", err) - } - - // 生成一次性压缩文件下载地址 - siteURL, err := url.Parse(model.GetSettingByName("siteURL")) - if err != nil { - return serializer.Err(serializer.CodeNotSet, "无法解析站点URL", err) - } - zipID := util.RandStringRunes(16) + // 创建打包下载会话 ttl := model.GetIntSetting("archive_timeout", 30) - signedURI, err := auth.SignURI( + downloadSessionID := util.RandStringRunes(16) + cache.Set("archive_"+downloadSessionID, *service, ttl) + cache.Set("archive_user_"+downloadSessionID, *fs.User, ttl) + signURL, err := auth.SignURI( auth.General, - fmt.Sprintf("/api/v3/file/archive/%s/archive.zip", zipID), - time.Now().Unix()+int64(ttl), + fmt.Sprintf("/api/v3/file/archive/%s/archive.zip", downloadSessionID), + int64(ttl), ) - finalURL := siteURL.ResolveReference(signedURI).String() - - // 将压缩文件记录存入缓存 - err = cache.Set("archive_"+zipID, zipFile, ttl) - if err != nil { - return serializer.Err(serializer.CodeIOFailed, "无法写入缓存", err) - } return serializer.Response{ Code: 0, - Data: finalURL, + Data: signURL.String(), } }