Feat: upload session recycle crontab job / API for cleanup all upload session

pull/1107/head
HFO4 2 years ago
parent 3444b4a75e
commit 7dd636da74

@ -139,6 +139,19 @@ func GetChildFilesOfFolders(folders *[]Folder) ([]File, error) {
return files, result.Error
}
// GetUploadPlaceholderFiles 获取所有上传占位文件
// UID为0表示忽略用户
func GetUploadPlaceholderFiles(uid uint) []*File {
query := DB
if uid != 0 {
query = query.Where("user_id = ?", uid)
}
var files []*File
query.Where("upload_session_id is not NULL").Find(&files)
return files
}
// GetPolicy 获取文件所属策略
func (file *File) GetPolicy() *Policy {
if file.Policy.Model.ID == 0 {

@ -162,6 +162,7 @@ Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; verti
{Name: "home_view_method", Value: "icon", Type: "view"},
{Name: "share_view_method", Value: "list", Type: "view"},
{Name: "cron_garbage_collect", Value: "@hourly", Type: "cron"},
{Name: "cron_recycle_upload_session", Value: "@every 1h30m", Type: "cron"},
{Name: "authn_enabled", Value: "0", Type: "authn"},
{Name: "captcha_type", Value: "normal", Type: "captcha"},
{Name: "captcha_height", Value: "60", Type: "captcha"},

@ -216,13 +216,7 @@ func (policy *Policy) IsThumbExist(name string) bool {
// IsTransitUpload 返回此策略上传给定size文件时是否需要服务端中转
func (policy *Policy) IsTransitUpload(size uint64) bool {
if policy.Type == "local" {
return true
}
if policy.Type == "onedrive" && size < 4*1024*1024 {
return true
}
return false
return policy.Type == "local"
}
// IsPathGenerateNeeded 返回此策略是否需要在生成上传凭证时生成存储路径

@ -1,6 +1,7 @@
package crontab
import (
"context"
"os"
"path/filepath"
"strings"
@ -8,6 +9,7 @@ import (
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/cache"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem"
"github.com/cloudreve/Cloudreve/v3/pkg/util"
)
@ -53,3 +55,45 @@ func collectCache(store *cache.MemoStore) {
util.Log().Debug("清理内存缓存")
store.GarbageCollect()
}
func uploadSessionCollect() {
placeholders := model.GetUploadPlaceholderFiles(0)
// 将过期的上传会话按照用户分组
userToFiles := make(map[uint][]uint)
for _, file := range placeholders {
_, sessionExist := cache.Get(filesystem.UploadSessionCachePrefix + *file.UploadSessionID)
if sessionExist {
continue
}
if _, ok := userToFiles[file.UserID]; !ok {
userToFiles[file.UserID] = make([]uint, 0)
}
userToFiles[file.UserID] = append(userToFiles[file.UserID], file.ID)
}
// 删除过期的会话
for uid, filesIDs := range userToFiles {
user, err := model.GetUserByID(uid)
if err != nil {
util.Log().Warning("上传会话所属用户不存在, %s", err)
continue
}
fs, err := filesystem.NewFileSystem(&user)
if err != nil {
util.Log().Warning("无法初始化文件系统, %s", err)
continue
}
if err = fs.Delete(context.Background(), []uint{}, filesIDs, false); err != nil {
util.Log().Warning("无法删除上传会话, %s", err)
}
fs.Recycle()
}
util.Log().Info("定时任务 [cron_recycle_upload_session] 执行完毕")
}

@ -21,13 +21,18 @@ func Reload() {
func Init() {
util.Log().Info("初始化定时任务...")
// 读取cron日程设置
options := model.GetSettingByNames("cron_garbage_collect")
options := model.GetSettingByNames(
"cron_garbage_collect",
"cron_recycle_upload_session",
)
Cron := cron.New()
for k, v := range options {
var handler func()
switch k {
case "cron_garbage_collect":
handler = garbageCollect
case "cron_recycle_upload_session":
handler = uploadSessionCollect
default:
util.Log().Warning("未知定时任务类型 [%s],跳过", k)
continue

@ -365,6 +365,11 @@ func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *seria
}
// 取消上传凭证
func (handler Driver) CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error {
return nil
}
// Meta 获取文件信息
func (handler Driver) Meta(ctx context.Context, path string) (*MetaData, error) {
res, err := handler.Client.Object.Head(ctx, path, &cossdk.ObjectHeadOptions{})

@ -32,6 +32,9 @@ type Handler interface {
// Token 获取有效期为ttl的上传凭证和签名
Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (serializer.UploadCredential, error)
// CancelToken 取消已经创建的有状态上传凭证
CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error
// List 递归列取远程端path路径下文件、目录不包含path本身
// 返回的对象路径以path作为起始根目录.
// recursive - 是否递归列出

@ -260,3 +260,8 @@ func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *seria
ChunkSize: handler.Policy.OptionsSerialized.ChunkSize,
}, nil
}
// 取消上传凭证
func (handler Driver) CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error {
return nil
}

@ -249,3 +249,8 @@ func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *seria
Token: apiURL.String(),
}, nil
}
// 取消上传凭证
func (handler Driver) CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error {
return nil
}

@ -463,3 +463,8 @@ func (handler Driver) getUploadCredential(ctx context.Context, policy UploadPoli
Token: signature,
}, nil
}
// 取消上传凭证
func (handler Driver) CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error {
return nil
}

@ -308,3 +308,8 @@ func (handler Driver) getUploadCredential(ctx context.Context, policy storage.Pu
Token: upToken,
}, nil
}
// 取消上传凭证
func (handler Driver) CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error {
return nil
}

@ -345,3 +345,8 @@ func (handler Driver) getUploadCredential(ctx context.Context, policy serializer
}
return serializer.UploadCredential{}, errors.New("无法签名上传策略")
}
// 取消上传凭证
func (handler Driver) CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error {
return nil
}

@ -446,3 +446,8 @@ func (handler Driver) CORS() error {
return err
}
// 取消上传凭证
func (handler Driver) CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error {
return nil
}

@ -54,3 +54,8 @@ func (d *Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer
func (d *Driver) List(ctx context.Context, path string, recursive bool) ([]response.Object, error) {
return nil, ErrNotImplemented
}
// 取消上传凭证
func (handler Driver) CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error {
return nil
}

@ -120,3 +120,8 @@ func (d *Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer
func (d *Driver) List(ctx context.Context, path string, recursive bool) ([]response.Object, error) {
return nil, ErrNotImplemented
}
// 取消上传凭证
func (handler Driver) CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error {
return nil
}

@ -335,6 +335,11 @@ func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *seria
return handler.getUploadCredential(ctx, putPolicy)
}
// 取消上传凭证
func (handler Driver) CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error {
return nil
}
func (handler Driver) getUploadCredential(ctx context.Context, policy UploadPolicy) (serializer.UploadCredential, error) {
// 生成上传策略
policyJSON, err := json.Marshal(policy)

@ -5,6 +5,7 @@ import (
"io"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/cache"
"github.com/cloudreve/Cloudreve/v3/pkg/conf"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/response"
@ -177,23 +178,45 @@ func (fs *FileSystem) deleteGroupedFile(ctx context.Context, files map[uint][]*m
for policyID, toBeDeletedFiles := range files {
// 列举出需要物理删除的文件的物理路径
sourceNames := make([]string, 0, len(toBeDeletedFiles))
sourceNamesAll := make([]string, 0, len(toBeDeletedFiles))
sourceNamesDeleted := make([]string, 0, len(toBeDeletedFiles))
sourceNamesTryDeleted := make([]string, 0, len(toBeDeletedFiles))
for i := 0; i < len(toBeDeletedFiles); i++ {
sourceNames = append(sourceNames, toBeDeletedFiles[i].SourceName)
sourceNamesAll = append(sourceNamesAll, toBeDeletedFiles[i].SourceName)
if !(toBeDeletedFiles[i].UploadSessionID != nil && toBeDeletedFiles[i].Size == 0) {
sourceNamesDeleted = append(sourceNamesDeleted, toBeDeletedFiles[i].SourceName)
} else {
sourceNamesTryDeleted = append(sourceNamesTryDeleted, toBeDeletedFiles[i].SourceName)
}
if toBeDeletedFiles[i].UploadSessionID != nil {
if session, ok := cache.Get(UploadSessionCachePrefix + *toBeDeletedFiles[i].UploadSessionID); ok {
uploadSession := session.(serializer.UploadSession)
if err := fs.Handler.CancelToken(ctx, &uploadSession); err != nil {
util.Log().Warning("无法取消 [%s] 的上传会话: %s", err)
}
cache.Deletes([]string{*toBeDeletedFiles[i].UploadSessionID}, UploadSessionCachePrefix)
}
}
}
// 切换上传策略
fs.Policy = toBeDeletedFiles[0].GetPolicy()
err := fs.DispatchHandler()
if err != nil {
failed[policyID] = sourceNames
failed[policyID] = sourceNamesAll
continue
}
// 执行删除
failedFile, _ := fs.Handler.Delete(ctx, sourceNames)
failedFile, _ := fs.Handler.Delete(ctx, sourceNamesDeleted)
failed[policyID] = failedFile
// 尝试删除上传会话中大小为0的占位文件。如果失败也忽略
fs.Handler.Delete(ctx, sourceNamesTryDeleted)
}
return failed
@ -208,7 +231,7 @@ func (fs *FileSystem) GroupFileByPolicy(ctx context.Context, files []model.File)
// 如果已存在分组,直接追加
policyGroup[files[key].PolicyID] = append(file, &files[key])
} else {
// 分不存在,创建
// 分不存在,创建
policyGroup[files[key].PolicyID] = make([]*model.File, 0)
policyGroup[files[key].PolicyID] = append(policyGroup[files[key].PolicyID], &files[key])
}

@ -185,6 +185,7 @@ func (fs *FileSystem) Delete(ctx context.Context, dirs, files []uint, force bool
}
// 删除文件记录对应的分享记录
// TODO 先取消分享再删除文件
model.DeleteShareBySourceIDs(deletedFileIDs, false)
// 归还容量

@ -343,13 +343,38 @@ func FileUpload(c *gin.Context) {
//})
}
// DeleteUploadCredential 删除上传会话
func DeleteUploadCredential(c *gin.Context) {
// 创建上下文
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var service explorer.UploadSessionService
if err := c.ShouldBindUri(&service); err == nil {
res := service.Delete(ctx, c)
c.JSON(200, res)
} else {
c.JSON(200, ErrorResponse(err))
}
}
// DeleteAllCredential 删除全部上传会话
func DeleteAllCredential(c *gin.Context) {
// 创建上下文
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
res := explorer.DeleteAllUploadSession(ctx, c)
c.JSON(200, res)
}
// GetUploadCredential 创建上传会话
func GetUploadCredential(c *gin.Context) {
// 创建上下文
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var service explorer.UploadSessionService
var service explorer.CreateUploadSessionService
if err := c.ShouldBindJSON(&service); err == nil {
res := service.Create(ctx, c)
c.JSON(200, res)

@ -504,10 +504,18 @@ func InitMasterRouter() *gin.Engine {
// 文件
file := auth.Group("file", middleware.HashID(hashid.FileID))
{
// 文件上传
file.POST("upload/:sessionId/:index", controllers.FileUpload)
// 创建上传会话
file.PUT("upload", controllers.GetUploadCredential)
// 上传
upload := file.Group("upload")
{
// 文件上传
upload.POST(":sessionId/:index", controllers.FileUpload)
// 创建上传会话
upload.PUT("", controllers.GetUploadCredential)
// 删除给定上传会话
upload.DELETE(":sessionId", controllers.DeleteUploadCredential)
// 删除全部上传会话
upload.DELETE("", controllers.DeleteAllCredential)
}
// 更新文件
file.PUT("update/:id", controllers.PutContent)
// 创建空白文件

@ -17,8 +17,8 @@ import (
"time"
)
// UploadSessionService 获取上传凭证服务
type UploadSessionService struct {
// CreateUploadSessionService 获取上传凭证服务
type CreateUploadSessionService struct {
Path string `json:"path" binding:"required"`
Size uint64 `json:"size" binding:"min=0"`
Name string `json:"name" binding:"required"`
@ -27,7 +27,7 @@ type UploadSessionService struct {
}
// Create 创建新的上传会话
func (service *UploadSessionService) Create(ctx context.Context, c *gin.Context) serializer.Response {
func (service *CreateUploadSessionService) Create(ctx context.Context, c *gin.Context) serializer.Response {
// 创建文件系统
fs, err := filesystem.NewFileSystemFromContext(c)
if err != nil {
@ -84,6 +84,10 @@ func (service *UploadService) Upload(ctx context.Context, c *gin.Context) serial
return serializer.Err(serializer.CodePolicyNotAllowed, err.Error(), err)
}
if uploadSession.UID != fs.User.ID {
return serializer.Err(serializer.CodeUploadSessionExpired, "Upload session expired or not exist", nil)
}
// 查找上传会话创建的占位文件
file, err := model.GetFilesByUploadSession(service.ID, fs.User.ID)
if err != nil {
@ -91,6 +95,10 @@ func (service *UploadService) Upload(ctx context.Context, c *gin.Context) serial
}
// 重设 fs 存储策略
if !uploadSession.Policy.IsTransitUpload(uploadSession.Size) {
return serializer.Err(serializer.CodePolicyNotAllowed, "Storage policy not supported", err)
}
fs.Policy = &uploadSession.Policy
if err := fs.DispatchHandler(); err != nil {
return serializer.Err(serializer.CodePolicyNotAllowed, "Unknown storage policy", err)
@ -169,3 +177,55 @@ func processChunkUpload(ctx context.Context, c *gin.Context, fs *filesystem.File
return serializer.Response{}
}
// UploadSessionService 上传会话服务
type UploadSessionService struct {
ID string `uri:"sessionId" binding:"required"`
}
// Delete 删除指定上传会话
func (service *UploadSessionService) Delete(ctx context.Context, c *gin.Context) serializer.Response {
// 创建文件系统
fs, err := filesystem.NewFileSystemFromContext(c)
if err != nil {
return serializer.Err(serializer.CodePolicyNotAllowed, err.Error(), err)
}
defer fs.Recycle()
// 查找需要删除的上传会话的占位文件
file, err := model.GetFilesByUploadSession(service.ID, fs.User.ID)
if err != nil {
return serializer.Err(serializer.CodeUploadSessionExpired, "Upload session file placeholder not exist", err)
}
// 删除文件
if err := fs.Delete(ctx, []uint{}, []uint{file.ID}, false); err != nil {
return serializer.Err(serializer.CodeInternalSetting, "Failed to delete upload session", err)
}
return serializer.Response{}
}
// DeleteAllUploadSession 删除当前用户的全部上传绘会话
func DeleteAllUploadSession(ctx context.Context, c *gin.Context) serializer.Response {
// 创建文件系统
fs, err := filesystem.NewFileSystemFromContext(c)
if err != nil {
return serializer.Err(serializer.CodePolicyNotAllowed, err.Error(), err)
}
defer fs.Recycle()
// 查找需要删除的上传会话的占位文件
files := model.GetUploadPlaceholderFiles(fs.User.ID)
fileIDs := make([]uint, len(files))
for i, file := range files {
fileIDs[i] = file.ID
}
// 删除文件
if err := fs.Delete(ctx, []uint{}, fileIDs, false); err != nil {
return serializer.Err(serializer.CodeInternalSetting, "Failed to cleanup upload session", err)
}
return serializer.Response{}
}

Loading…
Cancel
Save