Feat: batch download in streamming paradism

Fix: add cache-controler header in API call responses
pull/1217/head
HFO4 3 years ago
parent 32a655f84e
commit febbd0c5a0

@ -1 +1 @@
Subproject commit 907f86550d759afb9b156515f25e60fff5a1d29a Subproject commit da7a3a38bf924d53072a7c56d144b0e777ea0511

@ -38,3 +38,10 @@ func IsFunctionEnabled(key string) gin.HandlerFunc {
c.Next() c.Next()
} }
} }
// CacheControl 屏蔽客户端缓存
func CacheControl() gin.HandlerFunc {
return func(c *gin.Context) {
c.Header("Cache-Control", "private, no-cache")
}
}

@ -3,6 +3,7 @@ package model
import ( import (
"crypto/md5" "crypto/md5"
"crypto/sha1" "crypto/sha1"
"encoding/gob"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"strings" "strings"
@ -46,6 +47,10 @@ type User struct {
OptionsSerialized UserOption `gorm:"-"` OptionsSerialized UserOption `gorm:"-"`
} }
func init() {
gob.Register(User{})
}
// UserOption 用户个性化配置字段 // UserOption 用户个性化配置字段
type UserOption struct { type UserOption struct {
ProfileOff bool `json:"profile_off,omitempty"` ProfileOff bool `json:"profile_off,omitempty"`

@ -28,17 +28,17 @@ import (
*/ */
// Compress 创建给定目录和文件的压缩文件 // Compress 创建给定目录和文件的压缩文件
func (fs *FileSystem) Compress(ctx context.Context, folderIDs, fileIDs []uint, isArchive bool) (string, error) { func (fs *FileSystem) Compress(ctx context.Context, writer io.Writer, folderIDs, fileIDs []uint, isArchive bool) error {
// 查找待压缩目录 // 查找待压缩目录
folders, err := model.GetFoldersByIDs(folderIDs, fs.User.ID) folders, err := model.GetFoldersByIDs(folderIDs, fs.User.ID)
if err != nil && len(folderIDs) != 0 { if err != nil && len(folderIDs) != 0 {
return "", ErrDBListObjects return ErrDBListObjects
} }
// 查找待压缩文件 // 查找待压缩文件
files, err := model.GetFilesByIDs(fileIDs, fs.User.ID) files, err := model.GetFilesByIDs(fileIDs, fs.User.ID)
if err != nil && len(fileIDs) != 0 { if err != nil && len(fileIDs) != 0 {
return "", ErrDBListObjects return ErrDBListObjects
} }
// 如果上下文限制了父目录,则进行检查 // 如果上下文限制了父目录,则进行检查
@ -46,14 +46,14 @@ func (fs *FileSystem) Compress(ctx context.Context, folderIDs, fileIDs []uint, i
// 检查目录 // 检查目录
for _, folder := range folders { for _, folder := range folders {
if *folder.ParentID != parent.ID { if *folder.ParentID != parent.ID {
return "", ErrObjectNotExist return ErrObjectNotExist
} }
} }
// 检查文件 // 检查文件
for _, file := range files { for _, file := range files {
if file.FolderID != parent.ID { if file.FolderID != parent.ID {
return "", ErrObjectNotExist return ErrObjectNotExist
} }
} }
} }
@ -73,25 +73,8 @@ func (fs *FileSystem) Compress(ctx context.Context, folderIDs, fileIDs []uint, i
files[i].Position = "" files[i].Position = ""
} }
// 创建临时压缩文件
saveFolder := "archive"
if !isArchive {
saveFolder = "compress"
}
zipFilePath := filepath.Join(
util.RelativePath(model.GetSettingByName("temp_path")),
saveFolder,
fmt.Sprintf("archive_%d.zip", time.Now().UnixNano()),
)
zipFile, err := util.CreatNestedFile(zipFilePath)
if err != nil {
util.Log().Warning("%s", err)
return "", err
}
defer zipFile.Close()
// 创建压缩文件Writer // 创建压缩文件Writer
zipWriter := zip.NewWriter(zipFile) zipWriter := zip.NewWriter(writer)
defer zipWriter.Close() defer zipWriter.Close()
ctx = reqContext ctx = reqContext
@ -101,10 +84,9 @@ func (fs *FileSystem) Compress(ctx context.Context, folderIDs, fileIDs []uint, i
select { select {
case <-reqContext.Done(): case <-reqContext.Done():
// 取消压缩请求 // 取消压缩请求
fs.cancelCompress(ctx, zipWriter, zipFile, zipFilePath) return ErrClientCanceled
return "", ErrClientCanceled
default: default:
fs.doCompress(ctx, nil, &folders[i], zipWriter, isArchive) fs.doCompress(reqContext, nil, &folders[i], zipWriter, isArchive)
} }
} }
@ -112,22 +94,13 @@ func (fs *FileSystem) Compress(ctx context.Context, folderIDs, fileIDs []uint, i
select { select {
case <-reqContext.Done(): case <-reqContext.Done():
// 取消压缩请求 // 取消压缩请求
fs.cancelCompress(ctx, zipWriter, zipFile, zipFilePath) return ErrClientCanceled
return "", ErrClientCanceled
default: default:
fs.doCompress(ctx, &files[i], nil, zipWriter, isArchive) fs.doCompress(reqContext, &files[i], nil, zipWriter, isArchive)
} }
} }
return zipFilePath, nil return nil
}
// cancelCompress 取消压缩进程
func (fs *FileSystem) cancelCompress(ctx context.Context, zipWriter *zip.Writer, file *os.File, path string) {
util.Log().Debug("客户端取消压缩请求")
zipWriter.Close()
file.Close()
_ = os.Remove(path)
} }
func (fs *FileSystem) doCompress(ctx context.Context, file *model.File, folder *model.Folder, zipWriter *zip.Writer, isArchive bool) { func (fs *FileSystem) doCompress(ctx context.Context, file *model.File, folder *model.Folder, zipWriter *zip.Writer, isArchive bool) {

@ -3,7 +3,10 @@ package task
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"os" "os"
"path/filepath"
"time"
model "github.com/cloudreve/Cloudreve/v3/models" model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem"
@ -93,20 +96,37 @@ func (job *CompressTask) Do() {
util.Log().Debug("开始压缩文件") util.Log().Debug("开始压缩文件")
job.TaskModel.SetProgress(CompressingProgress) job.TaskModel.SetProgress(CompressingProgress)
// 创建临时压缩文件
saveFolder := "compress"
zipFilePath := filepath.Join(
util.RelativePath(model.GetSettingByName("temp_path")),
saveFolder,
fmt.Sprintf("archive_%d.zip", time.Now().UnixNano()),
)
zipFile, err := util.CreatNestedFile(zipFilePath)
if err != nil {
util.Log().Warning("%s", err)
job.SetErrorMsg(err.Error())
return
}
defer zipFile.Close()
// 开始压缩 // 开始压缩
ctx := context.Background() ctx := context.Background()
zipFile, err := fs.Compress(ctx, job.TaskProps.Dirs, job.TaskProps.Files, false) err = fs.Compress(ctx, zipFile, job.TaskProps.Dirs, job.TaskProps.Files, false)
if err != nil { if err != nil {
job.SetErrorMsg(err.Error()) job.SetErrorMsg(err.Error())
return return
} }
job.zipPath = zipFile
job.zipPath = zipFilePath
zipFile.Close()
util.Log().Debug("压缩文件存放至%s开始上传", zipFile) util.Log().Debug("压缩文件存放至%s开始上传", zipFile)
job.TaskModel.SetProgress(TransferringProgress) job.TaskModel.SetProgress(TransferringProgress)
// 上传文件 // 上传文件
err = fs.UploadFromPath(ctx, zipFile, job.TaskProps.Dst, 0) err = fs.UploadFromPath(ctx, zipFilePath, job.TaskProps.Dst, 0)
if err != nil { if err != nil {
job.SetErrorMsg(err.Error()) job.SetErrorMsg(err.Error())
return return

@ -18,12 +18,9 @@ func DownloadArchive(c *gin.Context) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
var service explorer.DownloadService var service explorer.ArchiveService
if err := c.ShouldBindUri(&service); err == nil { if err := c.ShouldBindUri(&service); err == nil {
res := service.DownloadArchived(ctx, c) service.DownloadArchived(ctx, c)
if res.Code != 0 {
c.JSON(200, res)
}
} else { } else {
c.JSON(200, ErrorResponse(err)) c.JSON(200, ErrorResponse(err))
} }
@ -189,7 +186,7 @@ func Preview(c *gin.Context) {
res := service.PreviewContent(ctx, c, false) res := service.PreviewContent(ctx, c, false)
// 是否需要重定向 // 是否需要重定向
if res.Code == -301 { if res.Code == -301 {
c.Redirect(301, res.Data.(string)) c.Redirect(302, res.Data.(string))
return return
} }
// 是否有错误发生 // 是否有错误发生

@ -100,7 +100,7 @@ func PreviewShare(c *gin.Context) {
res := service.PreviewContent(ctx, c, false) res := service.PreviewContent(ctx, c, false)
// 是否需要重定向 // 是否需要重定向
if res.Code == -301 { if res.Code == -301 {
c.Redirect(301, res.Data.(string)) c.Redirect(302, res.Data.(string))
return return
} }
// 是否有错误发生 // 是否有错误发生

@ -34,6 +34,8 @@ func InitSlaveRouter() *gin.Engine {
v3.Use(middleware.SignRequired(auth.General)) v3.Use(middleware.SignRequired(auth.General))
// 主机信息解析 // 主机信息解析
v3.Use(middleware.MasterMetadata()) v3.Use(middleware.MasterMetadata())
// 禁止缓存
v3.Use(middleware.CacheControl())
/* /*
@ -133,8 +135,12 @@ func InitMasterRouter() *gin.Engine {
if gin.Mode() == gin.TestMode { if gin.Mode() == gin.TestMode {
v3.Use(middleware.MockHelper()) v3.Use(middleware.MockHelper())
} }
// 用户会话
v3.Use(middleware.CurrentUser()) v3.Use(middleware.CurrentUser())
// 禁止缓存
v3.Use(middleware.CacheControl())
/* /*
*/ */
@ -205,10 +211,10 @@ func InitMasterRouter() *gin.Engine {
file.GET("get/:id/:name", controllers.AnonymousGetContent) file.GET("get/:id/:name", controllers.AnonymousGetContent)
// 文件外链(301跳转) // 文件外链(301跳转)
file.GET("source/:id/:name", controllers.AnonymousPermLink) file.GET("source/:id/:name", controllers.AnonymousPermLink)
// 下載已经打包好的文件
file.GET("archive/:id/archive.zip", controllers.DownloadArchive)
// 下载文件 // 下载文件
file.GET("download/:id", controllers.Download) file.GET("download/:id", controllers.Download)
// 打包并下载文件
file.GET("archive/:sessionID/archive.zip", controllers.DownloadArchive)
} }
} }

@ -12,7 +12,6 @@ import (
"path" "path"
"strconv" "strconv"
"strings" "strings"
"time"
model "github.com/cloudreve/Cloudreve/v3/models" model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/cache" "github.com/cloudreve/Cloudreve/v3/pkg/cache"
@ -42,6 +41,11 @@ type DownloadService struct {
ID string `uri:"id" binding:"required"` ID string `uri:"id" binding:"required"`
} }
// ArchiveService 文件流式打包下載服务
type ArchiveService struct {
ID string `uri:"sessionID" binding:"required"`
}
// New 创建新文件 // New 创建新文件
func (service *SingleFileService) Create(c *gin.Context) serializer.Response { func (service *SingleFileService) Create(c *gin.Context) serializer.Response {
// 创建文件系统 // 创建文件系统
@ -93,8 +97,14 @@ func (service *SlaveListService) List(c *gin.Context) serializer.Response {
return serializer.Response{Data: string(res)} return serializer.Response{Data: string(res)}
} }
// DownloadArchived 下載已打包的多文件 // DownloadArchived 通过预签名 URL 打包下载
func (service *DownloadService) DownloadArchived(ctx context.Context, c *gin.Context) serializer.Response { func (service *ArchiveService) DownloadArchived(ctx context.Context, c *gin.Context) serializer.Response {
userRaw, exist := cache.Get("archive_user_" + service.ID)
if !exist {
return serializer.Err(404, "归档会话不存在", nil)
}
user := userRaw.(model.User)
// 创建文件系统 // 创建文件系统
fs, err := filesystem.NewFileSystemFromContext(c) fs, err := filesystem.NewFileSystemFromContext(c)
if err != nil { if err != nil {
@ -103,31 +113,28 @@ func (service *DownloadService) DownloadArchived(ctx context.Context, c *gin.Con
defer fs.Recycle() defer fs.Recycle()
// 查找打包的临时文件 // 查找打包的临时文件
zipPath, exist := cache.Get("archive_" + service.ID) archiveSession, exist := cache.Get("archive_" + service.ID)
if !exist { if !exist {
return serializer.Err(404, "归档文件不存在", nil) return serializer.Err(404, "归档会话不存在", nil)
} }
// 获取文件流 // 清理打包会话
rs, err := fs.GetPhysicalFileContent(ctx, zipPath.(string)) _ = cache.Deletes([]string{service.ID, "user_" + service.ID}, "archive_")
defer rs.Close()
if err != nil {
return serializer.Err(serializer.CodeNotSet, err.Error(), err)
}
if fs.User.Group.OptionsSerialized.OneTimeDownload {
// 清理资源,删除临时文件
_ = cache.Deletes([]string{service.ID}, "archive_")
}
// 开始打包
c.Header("Content-Disposition", "attachment;") c.Header("Content-Disposition", "attachment;")
c.Header("Content-Type", "application/zip") c.Header("Content-Type", "application/zip")
http.ServeContent(c.Writer, c.Request, "", time.Now(), rs) itemService := archiveSession.(ItemIDService)
items := itemService.Raw()
ctx = context.WithValue(ctx, fsctx.GinCtx, c)
err = fs.Compress(ctx, c.Writer, items.Dirs, items.Items, true)
if err != nil {
return serializer.Err(serializer.CodeNotSet, "无法创建压缩文件", err)
}
return serializer.Response{ return serializer.Response{
Code: 0, Code: 0,
} }
} }
// Download 签名的匿名文件下载 // Download 签名的匿名文件下载
@ -261,7 +268,7 @@ func (service *FileIDService) CreateDownloadSession(ctx context.Context, c *gin.
// Download 通过签名URL的文件下载无需登录 // Download 通过签名URL的文件下载无需登录
func (service *DownloadService) Download(ctx context.Context, c *gin.Context) serializer.Response { func (service *DownloadService) Download(ctx context.Context, c *gin.Context) serializer.Response {
// 创建文件系统 // 创建文件系统
fs, err := filesystem.NewFileSystemFromContext(c) fs, err := filesystem.NewFileSystem(&user)
if err != nil { if err != nil {
return serializer.Err(serializer.CodePolicyNotAllowed, err.Error(), err) return serializer.Err(serializer.CodePolicyNotAllowed, err.Error(), err)
} }

@ -2,9 +2,9 @@ package explorer
import ( import (
"context" "context"
"encoding/gob"
"fmt" "fmt"
"math" "math"
"net/url"
"path" "path"
"strings" "strings"
"time" "time"
@ -13,7 +13,6 @@ import (
"github.com/cloudreve/Cloudreve/v3/pkg/auth" "github.com/cloudreve/Cloudreve/v3/pkg/auth"
"github.com/cloudreve/Cloudreve/v3/pkg/cache" "github.com/cloudreve/Cloudreve/v3/pkg/cache"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx"
"github.com/cloudreve/Cloudreve/v3/pkg/hashid" "github.com/cloudreve/Cloudreve/v3/pkg/hashid"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer" "github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"github.com/cloudreve/Cloudreve/v3/pkg/task" "github.com/cloudreve/Cloudreve/v3/pkg/task"
@ -67,6 +66,10 @@ type ItemPropertyService struct {
IsFolder bool `form:"is_folder"` IsFolder bool `form:"is_folder"`
} }
func init() {
gob.Register(ItemIDService{})
}
// Raw 批量解码HashID获取原始ID // Raw 批量解码HashID获取原始ID
func (service *ItemIDService) Raw() *ItemService { func (service *ItemIDService) Raw() *ItemService {
if service.Source != nil { if service.Source != nil {
@ -232,37 +235,20 @@ func (service *ItemIDService) Archive(ctx context.Context, c *gin.Context) seria
return serializer.Err(serializer.CodeGroupNotAllowed, "当前用户组无法进行此操作", nil) return serializer.Err(serializer.CodeGroupNotAllowed, "当前用户组无法进行此操作", nil)
} }
// 开始压缩 // 创建打包下载会话
ctx = context.WithValue(ctx, fsctx.GinCtx, c)
items := service.Raw()
zipFile, err := fs.Compress(ctx, items.Dirs, items.Items, true)
if err != nil {
return serializer.Err(serializer.CodeNotSet, "无法创建压缩文件", err)
}
// 生成一次性压缩文件下载地址
siteURL, err := url.Parse(model.GetSettingByName("siteURL"))
if err != nil {
return serializer.Err(serializer.CodeNotSet, "无法解析站点URL", err)
}
zipID := util.RandStringRunes(16)
ttl := model.GetIntSetting("archive_timeout", 30) ttl := model.GetIntSetting("archive_timeout", 30)
signedURI, err := auth.SignURI( downloadSessionID := util.RandStringRunes(16)
cache.Set("archive_"+downloadSessionID, *service, ttl)
cache.Set("archive_user_"+downloadSessionID, *fs.User, ttl)
signURL, err := auth.SignURI(
auth.General, auth.General,
fmt.Sprintf("/api/v3/file/archive/%s/archive.zip", zipID), fmt.Sprintf("/api/v3/file/archive/%s/archive.zip", downloadSessionID),
time.Now().Unix()+int64(ttl), int64(ttl),
) )
finalURL := siteURL.ResolveReference(signedURI).String()
// 将压缩文件记录存入缓存
err = cache.Set("archive_"+zipID, zipFile, ttl)
if err != nil {
return serializer.Err(serializer.CodeIOFailed, "无法写入缓存", err)
}
return serializer.Response{ return serializer.Response{
Code: 0, Code: 0,
Data: finalURL, Data: signURL.String(),
} }
} }

Loading…
Cancel
Save