diff --git a/application/dependency/dependency.go b/application/dependency/dependency.go index e8319051..82e2d5a1 100644 --- a/application/dependency/dependency.go +++ b/application/dependency/dependency.go @@ -562,7 +562,7 @@ func (d *dependency) IoIntenseQueue(ctx context.Context) queue.Queue { queue.WithWorkerCount(queueSetting.WorkerNum), queue.WithName("IoIntenseQueue"), queue.WithMaxTaskExecution(queueSetting.MaxExecution), - queue.WithResumeTaskType(queue.CreateArchiveTaskType, queue.ExtractArchiveTaskType, queue.RelocateTaskType), + queue.WithResumeTaskType(queue.CreateArchiveTaskType, queue.ExtractArchiveTaskType, queue.RelocateTaskType, queue.ImportTaskType), queue.WithTaskPullInterval(10*time.Second), ) return d.ioIntenseQueue diff --git a/assets b/assets index c4d4d3aa..815f5857 160000 --- a/assets +++ b/assets @@ -1 +1 @@ -Subproject commit c4d4d3aa6f28e04a5828f3b4b4453d239746bed0 +Subproject commit 815f5857f0c673b81d4d39663b39278badaa626c diff --git a/inventory/file.go b/inventory/file.go index 34b72718..70e48620 100644 --- a/inventory/file.go +++ b/inventory/file.go @@ -120,6 +120,7 @@ type ( Source string Size int64 UploadSessionID uuid.UUID + Importing bool } RelocateEntityParameter struct { @@ -716,6 +717,11 @@ func (f *fileClient) CreateFile(ctx context.Context, root *ent.File, args *Creat SetParent(root). SetIsSymbolic(args.IsSymbolic). SetStoragePoliciesID(args.StoragePolicyID) + + if args.EntityParameters != nil && args.EntityParameters.Importing { + stm.SetSize(args.EntityParameters.Size) + } + newFile, err := stm.Save(ctx) if err != nil { return nil, nil, nil, fmt.Errorf("failed to create file: %v", err) @@ -730,6 +736,12 @@ func (f *fileClient) CreateFile(ctx context.Context, root *ent.File, args *Creat if err != nil { return nil, nil, storageDiff, fmt.Errorf("failed to create default entity: %v", err) } + + if args.EntityParameters.Importing { + if err := f.client.File.UpdateOne(newFile).SetPrimaryEntity(defaultEntity.ID).Exec(ctx); err != nil { + return nil, nil, storageDiff, fmt.Errorf("failed to set primary entity: %v", err) + } + } } // Create metadata if needed @@ -848,7 +860,7 @@ func (f *fileClient) CreateEntity(ctx context.Context, file *ent.File, args *Ent stm.SetUpdatedAt(*args.ModifiedAt) } - if args.UploadSessionID != uuid.Nil { + if args.UploadSessionID != uuid.Nil && !args.Importing { stm.SetUploadSessionID(args.UploadSessionID) } diff --git a/pkg/cluster/routes/routes.go b/pkg/cluster/routes/routes.go index d4bdf0a8..47dc8705 100644 --- a/pkg/cluster/routes/routes.go +++ b/pkg/cluster/routes/routes.go @@ -185,6 +185,15 @@ func SlaveMediaMetaRoute(src, ext string) string { return fmt.Sprintf("file/meta/%s/%s", src, url.PathEscape(ext)) } +func SlaveFileListRoute(srcPath string, recursive bool) string { + base := "file/list" + query := url.Values{} + query.Set("recursive", strconv.FormatBool(recursive)) + query.Set("path", srcPath) + route, _ := url.Parse(constants.APIPrefixSlave + fmt.Sprintf("%s?%s", base, query.Encode())) + return route.String() +} + func SlaveThumbUrl(base *url.URL, srcPath, ext string) *url.URL { srcPath = url.PathEscape(base64.URLEncoding.EncodeToString([]byte(srcPath))) ext = url.PathEscape(ext) diff --git a/pkg/filemanager/driver/cos/cos.go b/pkg/filemanager/driver/cos/cos.go index 186b6f30..89e83880 100644 --- a/pkg/filemanager/driver/cos/cos.go +++ b/pkg/filemanager/driver/cos/cos.go @@ -8,6 +8,9 @@ import ( "net/http" "net/url" "os" + "path" + "path/filepath" + "strings" "time" "github.com/cloudreve/Cloudreve/v4/ent" @@ -126,80 +129,88 @@ func New(ctx context.Context, policy *ent.StoragePolicy, settings setting.Provid return driver, nil } -// -//// List 列出COS文件 -//func (handler Driver) List(ctx context.Context, base string, recursive bool) ([]response.Object, error) { -// // 初始化列目录参数 -// opt := &cossdk.BucketGetOptions{ -// Prefix: strings.TrimPrefix(base, "/"), -// EncodingType: "", -// MaxKeys: 1000, -// } -// // 是否为递归列出 -// if !recursive { -// opt.Delimiter = "/" -// } -// // 手动补齐结尾的slash -// if opt.Prefix != "" { -// opt.Prefix += "/" -// } -// -// var ( -// marker string -// objects []cossdk.Object -// commons []string -// ) -// -// for { -// res, _, err := handler.client.Bucket.Get(ctx, opt) -// if err != nil { -// return nil, err -// } -// objects = append(objects, res.Contents...) -// commons = append(commons, res.CommonPrefixes...) -// // 如果本次未列取完,则继续使用marker获取结果 -// marker = res.NextMarker -// // marker 为空时结果列取完毕,跳出 -// if marker == "" { -// break -// } -// } -// -// // 处理列取结果 -// res := make([]response.Object, 0, len(objects)+len(commons)) -// // 处理目录 -// for _, object := range commons { -// rel, err := filepath.Rel(opt.Prefix, object) -// if err != nil { -// continue -// } -// res = append(res, response.Object{ -// Name: path.Base(object), -// RelativePath: filepath.ToSlash(rel), -// Size: 0, -// IsDir: true, -// LastModify: time.Now(), -// }) -// } -// // 处理文件 -// for _, object := range objects { -// rel, err := filepath.Rel(opt.Prefix, object.Key) -// if err != nil { -// continue -// } -// res = append(res, response.Object{ -// Name: path.Base(object.Key), -// Source: object.Key, -// RelativePath: filepath.ToSlash(rel), -// Size: uint64(object.Size), -// IsDir: false, -// LastModify: time.Now(), -// }) -// } -// -// return res, nil -// -//} +func (handler *Driver) List(ctx context.Context, base string, onProgress driver.ListProgressFunc, recursive bool) ([]fs.PhysicalObject, error) { + // 初始化列目录参数 + opt := &cossdk.BucketGetOptions{ + Prefix: strings.TrimPrefix(base, "/"), + EncodingType: "", + MaxKeys: 1000, + } + + // 是否为递归列出 + + if !recursive { + opt.Delimiter = "/" + } + + // 手动补齐结尾的slash + if opt.Prefix != "" { + opt.Prefix += "/" + } + + var ( + marker string + objects []cossdk.Object + commons []string + ) + + for { + res, _, err := handler.client.Bucket.Get(ctx, opt) + if err != nil { + handler.l.Warning("Failed to list objects: %s", err) + return nil, err + } + objects = append(objects, res.Contents...) + commons = append(commons, res.CommonPrefixes...) + // 如果本次未列取完,则继续使用marker获取结果 + marker = res.NextMarker + // marker 为空时结果列取完毕,跳出 + if marker == "" { + break + } + } + + // 处理列取结果 + res := make([]fs.PhysicalObject, 0, len(objects)+len(commons)) + // 处理目录 + + for _, object := range commons { + rel, err := filepath.Rel(opt.Prefix, object) + if err != nil { + handler.l.Warning("Failed to get relative path: %s", err) + continue + } + res = append(res, fs.PhysicalObject{ + Name: path.Base(object), + RelativePath: filepath.ToSlash(rel), + Size: 0, + IsDir: true, + LastModify: time.Now(), + }) + } + onProgress(len(commons)) + + // 处理文件 + + for _, object := range objects { + rel, err := filepath.Rel(opt.Prefix, object.Key) + if err != nil { + handler.l.Warning("Failed to get relative path: %s", err) + continue + } + res = append(res, fs.PhysicalObject{ + Name: path.Base(object.Key), + Source: object.Key, + RelativePath: filepath.ToSlash(rel), + Size: object.Size, + IsDir: false, + LastModify: time.Now(), + }) + } + onProgress(len(res)) + + return res, nil +} // CORS 创建跨域策略 func (handler Driver) CORS() error { diff --git a/pkg/filemanager/driver/handler.go b/pkg/filemanager/driver/handler.go index 8bfc6c67..21b0ec5f 100644 --- a/pkg/filemanager/driver/handler.go +++ b/pkg/filemanager/driver/handler.go @@ -2,6 +2,7 @@ package driver import ( "context" + "encoding/gob" "os" "time" @@ -76,7 +77,7 @@ type ( // List 递归列取远程端path路径下文件、目录,不包含path本身, // 返回的对象路径以path作为起始根目录. // recursive - 是否递归列出 - // List(ctx context.Context, path string, recursive bool) ([]response.Object, error) + List(ctx context.Context, base string, onProgress ListProgressFunc, recursive bool) ([]fs.PhysicalObject, error) // Capabilities returns the capabilities of this handler Capabilities() *Capabilities @@ -108,6 +109,8 @@ type ( // BrowserRelayedDownload indicates whether to relay download via stream-saver. BrowserRelayedDownload bool } + + ListProgressFunc func(int) ) const ( @@ -122,3 +125,7 @@ type ForceUsePublicEndpointCtx struct{} func WithForcePublicEndpoint(ctx context.Context, value bool) context.Context { return context.WithValue(ctx, ForceUsePublicEndpointCtx{}, value) } + +func init() { + gob.Register(fs.PhysicalObject{}) +} diff --git a/pkg/filemanager/driver/local/local.go b/pkg/filemanager/driver/local/local.go index 88949b63..5fb4a1f2 100644 --- a/pkg/filemanager/driver/local/local.go +++ b/pkg/filemanager/driver/local/local.go @@ -58,51 +58,53 @@ func New(p *ent.StoragePolicy, l logging.Logger, config conf.ConfigProvider) *Dr } } -//// List 递归列取给定物理路径下所有文件 -//func (handler *Driver) List(ctx context.Context, path string, recursive bool) ([]response.Object, error) { -// var res []response.Object -// -// // 取得起始路径 -// root := util.RelativePath(filepath.FromSlash(path)) -// -// // 开始遍历路径下的文件、目录 -// err := filepath.Walk(root, -// func(path string, info os.FileInfo, err error) error { -// // 跳过根目录 -// if path == root { -// return nil -// } -// -// if err != nil { -// util.Log().Warning("Failed to walk folder %q: %s", path, err) -// return filepath.SkipDir -// } -// -// // 将遍历对象的绝对路径转换为相对路径 -// rel, err := filepath.Rel(root, path) -// if err != nil { -// return err -// } -// -// res = append(res, response.Object{ -// Name: info.Name(), -// RelativePath: filepath.ToSlash(rel), -// Source: path, -// Size: uint64(info.Size()), -// IsDir: info.IsDir(), -// LastModify: info.ModTime(), -// }) -// -// // 如果非递归,则不步入目录 -// if !recursive && info.IsDir() { -// return filepath.SkipDir -// } -// -// return nil -// }) -// -// return res, err -//} +func (handler *Driver) List(ctx context.Context, path string, onProgress driver.ListProgressFunc, recursive bool) ([]fs.PhysicalObject, error) { + var res []fs.PhysicalObject + root := handler.LocalPath(ctx, path) + + err := filepath.Walk(root, + func(path string, info os.FileInfo, err error) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + // Skip root directory + if path == root { + return nil + } + + if err != nil { + handler.l.Warning("Failed to walk folder %q: %s", path, err) + return filepath.SkipDir + } + + // Transform absolute path to relative path + rel, err := filepath.Rel(root, path) + if err != nil { + return err + } + + res = append(res, fs.PhysicalObject{ + Name: info.Name(), + RelativePath: filepath.ToSlash(rel), + Source: path, + Size: info.Size(), + IsDir: info.IsDir(), + LastModify: info.ModTime(), + }) + onProgress(1) + // If not recursive, do not enter directory + if !recursive && info.IsDir() { + return filepath.SkipDir + } + + return nil + }) + + return res, err +} // Get 获取文件内容 func (handler *Driver) Open(ctx context.Context, path string) (*os.File, error) { diff --git a/pkg/filemanager/driver/obs/obs.go b/pkg/filemanager/driver/obs/obs.go index 93313f5b..4dadfcf7 100644 --- a/pkg/filemanager/driver/obs/obs.go +++ b/pkg/filemanager/driver/obs/obs.go @@ -9,7 +9,10 @@ import ( "io" "net/url" "os" + "path" + "path/filepath" "strconv" + "strings" "time" "github.com/cloudreve/Cloudreve/v4/ent" @@ -104,6 +107,89 @@ func New(ctx context.Context, policy *ent.StoragePolicy, settings setting.Provid return driver, nil } +func (d *Driver) List(ctx context.Context, base string, onProgress driver.ListProgressFunc, recursive bool) ([]fs.PhysicalObject, error) { + opt := &obs.ListObjectsInput{ + ListObjsInput: obs.ListObjsInput{ + Prefix: strings.TrimPrefix(base, "/"), + EncodingType: "", + MaxKeys: 1000, + }, + Bucket: d.policy.BucketName, + } + + if !recursive { + opt.Delimiter = "/" + } + + if opt.Prefix != "" { + opt.Prefix += "/" + } + + var ( + marker string + objects []obs.Content + commons []string + ) + + for { + res, err := d.obs.ListObjects(opt, obs.WithRequestContext(ctx)) + if err != nil { + d.l.Warning("Failed to list objects: %s", err) + return nil, err + } + objects = append(objects, res.Contents...) + commons = append(commons, res.CommonPrefixes...) + // 如果本次未列取完,则继续使用marker获取结果 + marker = res.NextMarker + // marker 为空时结果列取完毕,跳出 + if marker == "" { + break + } + } + + // 处理列取结果 + res := make([]fs.PhysicalObject, 0, len(objects)+len(commons)) + // 处理目录 + + for _, object := range commons { + rel, err := filepath.Rel(opt.Prefix, object) + if err != nil { + d.l.Warning("Failed to get relative path: %s", err) + continue + } + res = append(res, fs.PhysicalObject{ + Name: path.Base(object), + RelativePath: filepath.ToSlash(rel), + Size: 0, + IsDir: true, + LastModify: time.Now(), + }) + } + onProgress(len(commons)) + + // 处理文件 + + for _, object := range objects { + rel, err := filepath.Rel(opt.Prefix, object.Key) + if err != nil { + d.l.Warning("Failed to get relative path: %s", err) + continue + } + res = append(res, fs.PhysicalObject{ + Name: path.Base(object.Key), + Source: object.Key, + RelativePath: filepath.ToSlash(rel), + Size: object.Size, + IsDir: false, + LastModify: time.Now(), + }) + } + onProgress(len(res)) + + return res, nil + +} + func (d *Driver) Put(ctx context.Context, file *fs.UploadRequest) error { defer file.Close() diff --git a/pkg/filemanager/driver/onedrive/onedrive.go b/pkg/filemanager/driver/onedrive/onedrive.go index 34173651..4eeef076 100644 --- a/pkg/filemanager/driver/onedrive/onedrive.go +++ b/pkg/filemanager/driver/onedrive/onedrive.go @@ -5,6 +5,8 @@ import ( "errors" "fmt" "os" + "path" + "path/filepath" "strings" "time" @@ -23,14 +25,17 @@ import ( ) // Driver OneDrive 适配器 -type Driver struct { - policy *ent.StoragePolicy - client Client - settings setting.Provider - config conf.ConfigProvider - l logging.Logger - chunkSize int64 -} +type ( + Driver struct { + policy *ent.StoragePolicy + client Client + settings setting.Provider + config conf.ConfigProvider + l logging.Logger + chunkSize int64 + } + ListPathRealRootCtx struct{} +) var ( features = &boolset.BooleanSet{} @@ -66,50 +71,52 @@ func New(ctx context.Context, policy *ent.StoragePolicy, settings setting.Provid }, nil } -//// List 列取项目 -//func (handler *Driver) List(ctx context.Context, base string, recursive bool) ([]response.Object, error) { -// base = strings.TrimPrefix(base, "/") -// // 列取子项目 -// objects, _ := handler.client.ListChildren(ctx, base) -// -// // 获取真实的列取起始根目录 -// rootPath := base -// if realBase, ok := ctx.Value(fsctx.PathCtx).(string); ok { -// rootPath = realBase -// } else { -// ctx = context.WithValue(ctx, fsctx.PathCtx, base) -// } -// -// // 整理结果 -// res := make([]response.Object, 0, len(objects)) -// for _, object := range objects { -// source := path.Join(base, object.Name) -// rel, err := filepath.Rel(rootPath, source) -// if err != nil { -// continue -// } -// res = append(res, response.Object{ -// Name: object.Name, -// RelativePath: filepath.ToSlash(rel), -// Source: source, -// Size: uint64(object.Size), -// IsDir: object.Folder != nil, -// LastModify: time.Now(), -// }) -// } -// -// // 递归列取子目录 -// if recursive { -// for _, object := range objects { -// if object.Folder != nil { -// sub, _ := handler.List(ctx, path.Join(base, object.Name), recursive) -// res = append(res, sub...) -// } -// } -// } -// -// return res, nil -//} +// List 列取项目 +func (handler *Driver) List(ctx context.Context, base string, onProgress driver.ListProgressFunc, recursive bool) ([]fs.PhysicalObject, error) { + base = strings.TrimPrefix(base, "/") + // 列取子项目 + objects, _ := handler.client.ListChildren(ctx, base) + + // 获取真实的列取起始根目录 + rootPath := base + if realBase, ok := ctx.Value(ListPathRealRootCtx{}).(string); ok { + rootPath = realBase + } else { + ctx = context.WithValue(ctx, ListPathRealRootCtx{}, base) + } + + // 整理结果 + res := make([]fs.PhysicalObject, 0, len(objects)) + for _, object := range objects { + source := path.Join(base, object.Name) + rel, err := filepath.Rel(rootPath, source) + if err != nil { + continue + } + res = append(res, fs.PhysicalObject{ + Name: object.Name, + RelativePath: filepath.ToSlash(rel), + Source: source, + Size: object.Size, + IsDir: object.Folder != nil, + LastModify: time.Now(), + }) + } + + onProgress(len(objects)) + + // 递归列取子目录 + if recursive { + for _, object := range objects { + if object.Folder != nil { + sub, _ := handler.List(ctx, path.Join(base, object.Name), onProgress, recursive) + res = append(res, sub...) + } + } + } + + return res, nil +} func (handler *Driver) Open(ctx context.Context, path string) (*os.File, error) { return nil, errors.New("not implemented") diff --git a/pkg/filemanager/driver/oss/oss.go b/pkg/filemanager/driver/oss/oss.go index dcad2757..5575e7be 100644 --- a/pkg/filemanager/driver/oss/oss.go +++ b/pkg/filemanager/driver/oss/oss.go @@ -9,6 +9,8 @@ import ( "io" "net/url" "os" + "path" + "path/filepath" "strconv" "strings" "time" @@ -154,72 +156,75 @@ func (handler *Driver) InitOSSClient(forceUsePublicEndpoint bool) error { return nil } -//// List 列出OSS上的文件 -//func (handler *Driver) List(ctx context.Context, base string, recursive bool) ([]response.Object, error) { -// // 列取文件 -// base = strings.TrimPrefix(base, "/") -// if base != "" { -// base += "/" -// } -// -// var ( -// delimiter string -// marker string -// objects []oss.ObjectProperties -// commons []string -// ) -// if !recursive { -// delimiter = "/" -// } -// -// for { -// subRes, err := handler.bucket.ListObjects(oss.Marker(marker), oss.Prefix(base), -// oss.MaxKeys(1000), oss.Delimiter(delimiter)) -// if err != nil { -// return nil, err -// } -// objects = append(objects, subRes.Objects...) -// commons = append(commons, subRes.CommonPrefixes...) -// marker = subRes.NextMarker -// if marker == "" { -// break -// } -// } -// -// // 处理列取结果 -// res := make([]response.Object, 0, len(objects)+len(commons)) -// // 处理目录 -// for _, object := range commons { -// rel, err := filepath.Rel(base, object) -// if err != nil { -// continue -// } -// res = append(res, response.Object{ -// Name: path.Base(object), -// RelativePath: filepath.ToSlash(rel), -// Size: 0, -// IsDir: true, -// LastModify: time.Now(), -// }) -// } -// // 处理文件 -// for _, object := range objects { -// rel, err := filepath.Rel(base, object.Key) -// if err != nil { -// continue -// } -// res = append(res, response.Object{ -// Name: path.Base(object.Key), -// Source: object.Key, -// RelativePath: filepath.ToSlash(rel), -// Size: uint64(object.Size), -// IsDir: false, -// LastModify: object.LastModified, -// }) -// } -// -// return res, nil -//} +// List 列出OSS上的文件 +func (handler *Driver) List(ctx context.Context, base string, onProgress driver.ListProgressFunc, recursive bool) ([]fs.PhysicalObject, error) { + // 列取文件 + base = strings.TrimPrefix(base, "/") + if base != "" { + base += "/" + } + + var ( + delimiter string + marker string + objects []oss.ObjectProperties + commons []string + ) + if !recursive { + delimiter = "/" + } + + for { + subRes, err := handler.bucket.ListObjects(oss.Marker(marker), oss.Prefix(base), + oss.MaxKeys(1000), oss.Delimiter(delimiter)) + if err != nil { + return nil, err + } + objects = append(objects, subRes.Objects...) + commons = append(commons, subRes.CommonPrefixes...) + marker = subRes.NextMarker + if marker == "" { + break + } + } + + // 处理列取结果 + res := make([]fs.PhysicalObject, 0, len(objects)+len(commons)) + // 处理目录 + for _, object := range commons { + rel, err := filepath.Rel(base, object) + if err != nil { + continue + } + res = append(res, fs.PhysicalObject{ + Name: path.Base(object), + RelativePath: filepath.ToSlash(rel), + Size: 0, + IsDir: true, + LastModify: time.Now(), + }) + } + onProgress(len(commons)) + + // 处理文件 + for _, object := range objects { + rel, err := filepath.Rel(base, object.Key) + if err != nil { + continue + } + res = append(res, fs.PhysicalObject{ + Name: path.Base(object.Key), + Source: object.Key, + RelativePath: filepath.ToSlash(rel), + Size: object.Size, + IsDir: false, + LastModify: object.LastModified, + }) + } + onProgress(len(res)) + + return res, nil +} // Get 获取文件 func (handler *Driver) Open(ctx context.Context, path string) (*os.File, error) { diff --git a/pkg/filemanager/driver/qiniu/qiniu.go b/pkg/filemanager/driver/qiniu/qiniu.go index 06cfeeb5..a2c087db 100644 --- a/pkg/filemanager/driver/qiniu/qiniu.go +++ b/pkg/filemanager/driver/qiniu/qiniu.go @@ -5,6 +5,15 @@ import ( "encoding/base64" "errors" "fmt" + "io" + "net/http" + "net/url" + "os" + "path" + "path/filepath" + "strings" + "time" + "github.com/cloudreve/Cloudreve/v4/ent" "github.com/cloudreve/Cloudreve/v4/inventory/types" "github.com/cloudreve/Cloudreve/v4/pkg/boolset" @@ -22,11 +31,6 @@ import ( "github.com/qiniu/go-sdk/v7/auth/qbox" "github.com/qiniu/go-sdk/v7/storage" "github.com/samber/lo" - "io" - "net/http" - "net/url" - "os" - "time" ) const ( @@ -81,73 +85,75 @@ func New(ctx context.Context, policy *ent.StoragePolicy, settings setting.Provid return driver, nil } -// -//// List 列出给定路径下的文件 -//func (handler *Driver) List(ctx context.Context, base string, recursive bool) ([]response.Object, error) { -// base = strings.TrimPrefix(base, "/") -// if base != "" { -// base += "/" -// } -// -// var ( -// delimiter string -// marker string -// objects []storage.ListItem -// commons []string -// ) -// if !recursive { -// delimiter = "/" -// } -// -// for { -// entries, folders, nextMarker, hashNext, err := handler.bucket.ListFiles( -// handler.policy.BucketName, -// base, delimiter, marker, 1000) -// if err != nil { -// return nil, err -// } -// objects = append(objects, entries...) -// commons = append(commons, folders...) -// if !hashNext { -// break -// } -// marker = nextMarker -// } -// -// // 处理列取结果 -// res := make([]response.Object, 0, len(objects)+len(commons)) -// // 处理目录 -// for _, object := range commons { -// rel, err := filepath.Rel(base, object) -// if err != nil { -// continue -// } -// res = append(res, response.Object{ -// Name: path.Base(object), -// RelativePath: filepath.ToSlash(rel), -// Size: 0, -// IsDir: true, -// LastModify: time.Now(), -// }) -// } -// // 处理文件 -// for _, object := range objects { -// rel, err := filepath.Rel(base, object.Key) -// if err != nil { -// continue -// } -// res = append(res, response.Object{ -// Name: path.Base(object.Key), -// Source: object.Key, -// RelativePath: filepath.ToSlash(rel), -// Size: uint64(object.Fsize), -// IsDir: false, -// LastModify: time.Unix(object.PutTime/10000000, 0), -// }) -// } -// -// return res, nil -//} +// List 列出给定路径下的文件 +func (handler *Driver) List(ctx context.Context, base string, onProgress driver.ListProgressFunc, recursive bool) ([]fs.PhysicalObject, error) { + base = strings.TrimPrefix(base, "/") + if base != "" { + base += "/" + } + + var ( + delimiter string + marker string + objects []storage.ListItem + commons []string + ) + if !recursive { + delimiter = "/" + } + + for { + entries, folders, nextMarker, hashNext, err := handler.bucket.ListFiles( + handler.policy.BucketName, + base, delimiter, marker, 1000) + if err != nil { + return nil, err + } + objects = append(objects, entries...) + commons = append(commons, folders...) + if !hashNext { + break + } + marker = nextMarker + } + + // 处理列取结果 + res := make([]fs.PhysicalObject, 0, len(objects)+len(commons)) + // 处理目录 + for _, object := range commons { + rel, err := filepath.Rel(base, object) + if err != nil { + continue + } + res = append(res, fs.PhysicalObject{ + Name: path.Base(object), + RelativePath: filepath.ToSlash(rel), + Size: 0, + IsDir: true, + LastModify: time.Now(), + }) + } + onProgress(len(commons)) + + // 处理文件 + for _, object := range objects { + rel, err := filepath.Rel(base, object.Key) + if err != nil { + continue + } + res = append(res, fs.PhysicalObject{ + Name: path.Base(object.Key), + Source: object.Key, + RelativePath: filepath.ToSlash(rel), + Size: int64(object.Fsize), + IsDir: false, + LastModify: time.Unix(object.PutTime/10000000, 0), + }) + } + onProgress(len(objects)) + + return res, nil +} // Put 将文件流保存到指定目录 func (handler *Driver) Put(ctx context.Context, file *fs.UploadRequest) error { diff --git a/pkg/filemanager/driver/remote/client.go b/pkg/filemanager/driver/remote/client.go index 94016178..7cf94c4e 100644 --- a/pkg/filemanager/driver/remote/client.go +++ b/pkg/filemanager/driver/remote/client.go @@ -5,6 +5,12 @@ import ( "context" "encoding/json" "fmt" + "io" + "net/http" + "net/url" + "strings" + "time" + "github.com/cloudreve/Cloudreve/v4/application/constants" "github.com/cloudreve/Cloudreve/v4/ent" "github.com/cloudreve/Cloudreve/v4/pkg/auth" @@ -19,11 +25,6 @@ import ( "github.com/cloudreve/Cloudreve/v4/pkg/serializer" "github.com/cloudreve/Cloudreve/v4/pkg/setting" "github.com/gofrs/uuid" - "io" - "net/http" - "net/url" - "strings" - "time" ) const ( @@ -45,6 +46,8 @@ type Client interface { MediaMeta(ctx context.Context, src, ext string) ([]driver.MediaMeta, error) // DeleteFiles deletes files from remote server DeleteFiles(ctx context.Context, files ...string) ([]string, error) + // List lists files from remote server + List(ctx context.Context, path string, recursive bool) ([]fs.PhysicalObject, error) } type DeleteFileRequest struct { @@ -229,6 +232,28 @@ func (c *remoteClient) CreateUploadSession(ctx context.Context, session *fs.Uplo return nil } +func (c *remoteClient) List(ctx context.Context, path string, recursive bool) ([]fs.PhysicalObject, error) { + resp, err := c.httpClient.Request( + http.MethodGet, + routes.SlaveFileListRoute(path, recursive), + nil, + request.WithContext(ctx), + request.WithLogger(c.l), + ).CheckHTTPResponse(200).DecodeResponse() + if err != nil { + return nil, err + } + + if resp.Code != 0 { + return nil, fmt.Errorf(resp.Error) + } + + var objects []fs.PhysicalObject + resp.GobDecode(&objects) + return objects, nil + +} + func (c *remoteClient) GetUploadURL(ctx context.Context, expires time.Time, sessionID string) (string, string, error) { base, err := url.Parse(c.policy.Edges.Node.Server) if err != nil { diff --git a/pkg/filemanager/driver/remote/remote.go b/pkg/filemanager/driver/remote/remote.go index 9278655e..eb2b5efb 100644 --- a/pkg/filemanager/driver/remote/remote.go +++ b/pkg/filemanager/driver/remote/remote.go @@ -4,6 +4,10 @@ import ( "context" "errors" "fmt" + "net/url" + "os" + "time" + "github.com/cloudreve/Cloudreve/v4/ent" "github.com/cloudreve/Cloudreve/v4/inventory/types" "github.com/cloudreve/Cloudreve/v4/pkg/auth" @@ -15,10 +19,6 @@ import ( "github.com/cloudreve/Cloudreve/v4/pkg/logging" "github.com/cloudreve/Cloudreve/v4/pkg/request" "github.com/cloudreve/Cloudreve/v4/pkg/setting" - "net/url" - "os" - "path" - "time" ) var ( @@ -54,108 +54,19 @@ func New(ctx context.Context, policy *ent.StoragePolicy, settings setting.Provid }, nil } -//// List 列取文件 -//func (handler *Driver) List(ctx context.Context, path string, recursive bool) ([]response.Object, error) { -// var res []response.Object -// -// reqBody := serializer.ListRequest{ -// Path: path, -// Recursive: recursive, -// } -// reqBodyEncoded, err := json.Marshal(reqBody) -// if err != nil { -// return res, err -// } -// -// // 发送列表请求 -// bodyReader := strings.NewReader(string(reqBodyEncoded)) -// signTTL := model.GetIntSetting("slave_api_timeout", 60) -// resp, err := handler.Client.Request( -// "POST", -// handler.getAPIUrl("list"), -// bodyReader, -// request.WithCredential(handler.AuthInstance, int64(signTTL)), -// request.WithMasterMeta(handler.settings.SiteBasic(ctx).ID, handler.settings.SiteURL(setting.UseFirstSiteUrl(ctx)).String()), -// ).CheckHTTPResponse(200).DecodeResponse() -// if err != nil { -// return res, err -// } -// -// // 处理列取结果 -// if resp.Code != 0 { -// return res, errors.New(resp.Error) -// } -// -// if resStr, ok := resp.Data.(string); ok { -// err = json.Unmarshal([]byte(resStr), &res) -// if err != nil { -// return res, err -// } -// } -// -// return res, nil -//} - -// getAPIUrl 获取接口请求地址 -func (handler *Driver) getAPIUrl(scope string, routes ...string) string { - serverURL, err := url.Parse(handler.Policy.Edges.Node.Server) +// List 列取文件 +func (handler *Driver) List(ctx context.Context, base string, onProgress driver.ListProgressFunc, recursive bool) ([]fs.PhysicalObject, error) { + res, err := handler.uploadClient.List(ctx, base, recursive) if err != nil { - return "" - } - var controller *url.URL - - switch scope { - case "delete": - controller, _ = url.Parse("/api/v3/slave/delete") - case "thumb": - controller, _ = url.Parse("/api/v3/slave/thumb") - case "list": - controller, _ = url.Parse("/api/v3/slave/list") - default: - controller = serverURL - } - - for _, r := range routes { - controller.Path = path.Join(controller.Path, r) + return nil, err } - return serverURL.ResolveReference(controller).String() + onProgress(len(res)) + return res, nil } // Open 获取文件内容 func (handler *Driver) Open(ctx context.Context, path string) (*os.File, error) { - //// 尝试获取速度限制 - //speedLimit := 0 - //if user, ok := ctx.Value(fsctx.UserCtx).(model.User); ok { - // speedLimit = user.Group.SpeedLimit - //} - // - //// 获取文件源地址 - //downloadURL, err := handler.Source(ctx, path, nil, true, int64(speedLimit)) - //if err != nil { - // return nil, err - //} - // - //// 获取文件数据流 - //resp, err := handler.Client.Request( - // "GET", - // downloadURL, - // nil, - // request.WithContext(ctx), - // request.WithTimeout(time.Duration(0)), - // request.WithMasterMeta(handler.settings.SiteBasic(ctx).ID, handler.settings.SiteURL(ctx).String()), - //).CheckHTTPResponse(200).GetRSCloser() - //if err != nil { - // return nil, err - //} - // - //resp.SetFirstFakeChunk() - // - //// 尝试获取文件大小 - //if file, ok := ctx.Value(fsctx.FileModelCtx).(model.File); ok { - // resp.SetContentLength(int64(file.Size)) - //} - return nil, errors.New("not implemented") } diff --git a/pkg/filemanager/driver/s3/s3.go b/pkg/filemanager/driver/s3/s3.go index bbfb5510..c8a6a292 100644 --- a/pkg/filemanager/driver/s3/s3.go +++ b/pkg/filemanager/driver/s3/s3.go @@ -7,6 +7,9 @@ import ( "io" "net/url" "os" + "path" + "path/filepath" + "strings" "time" "github.com/aws/aws-sdk-go/aws/awserr" @@ -100,82 +103,85 @@ func New(ctx context.Context, policy *ent.StoragePolicy, settings setting.Provid return driver, nil } -//// List 列出给定路径下的文件 -//func (handler *Driver) List(ctx context.Context, base string, recursive bool) ([]response.Object, error) { -// // 初始化列目录参数 -// base = strings.TrimPrefix(base, "/") -// if base != "" { -// base += "/" -// } -// -// opt := &s3.ListObjectsInput{ -// Bucket: &handler.policy.BucketName, -// Prefix: &base, -// MaxKeys: aws.Int64(1000), -// } -// -// // 是否为递归列出 -// if !recursive { -// opt.Delimiter = aws.String("/") -// } -// -// var ( -// objects []*s3.Object -// commons []*s3.CommonPrefix -// ) -// -// for { -// res, err := handler.svc.ListObjectsWithContext(ctx, opt) -// if err != nil { -// return nil, err -// } -// objects = append(objects, res.Contents...) -// commons = append(commons, res.CommonPrefixes...) -// -// // 如果本次未列取完,则继续使用marker获取结果 -// if *res.IsTruncated { -// opt.Marker = res.NextMarker -// } else { -// break -// } -// } -// -// // 处理列取结果 -// res := make([]response.Object, 0, len(objects)+len(commons)) -// -// // 处理目录 -// for _, object := range commons { -// rel, err := filepath.Rel(*opt.Prefix, *object.Prefix) -// if err != nil { -// continue -// } -// res = append(res, response.Object{ -// Name: path.Base(*object.Prefix), -// RelativePath: filepath.ToSlash(rel), -// Size: 0, -// IsDir: true, -// LastModify: time.Now(), -// }) -// } -// // 处理文件 -// for _, object := range objects { -// rel, err := filepath.Rel(*opt.Prefix, *object.Key) -// if err != nil { -// continue -// } -// res = append(res, response.Object{ -// Name: path.Base(*object.Key), -// Source: *object.Key, -// RelativePath: filepath.ToSlash(rel), -// Size: uint64(*object.Size), -// IsDir: false, -// LastModify: time.Now(), -// }) -// } -// -// return res, nil -// -//} +// List 列出给定路径下的文件 +func (handler *Driver) List(ctx context.Context, base string, onProgress driver.ListProgressFunc, recursive bool) ([]fs.PhysicalObject, error) { + // 初始化列目录参数 + base = strings.TrimPrefix(base, "/") + if base != "" { + base += "/" + } + + opt := &s3.ListObjectsInput{ + Bucket: &handler.policy.BucketName, + Prefix: &base, + MaxKeys: aws.Int64(1000), + } + + // 是否为递归列出 + if !recursive { + opt.Delimiter = aws.String("/") + } + + var ( + objects []*s3.Object + commons []*s3.CommonPrefix + ) + + for { + res, err := handler.svc.ListObjectsWithContext(ctx, opt) + if err != nil { + return nil, err + } + objects = append(objects, res.Contents...) + commons = append(commons, res.CommonPrefixes...) + + // 如果本次未列取完,则继续使用marker获取结果 + if *res.IsTruncated { + opt.Marker = res.NextMarker + } else { + break + } + } + + // 处理列取结果 + res := make([]fs.PhysicalObject, 0, len(objects)+len(commons)) + + // 处理目录 + for _, object := range commons { + rel, err := filepath.Rel(*opt.Prefix, *object.Prefix) + if err != nil { + continue + } + res = append(res, fs.PhysicalObject{ + Name: path.Base(*object.Prefix), + RelativePath: filepath.ToSlash(rel), + Size: 0, + IsDir: true, + LastModify: time.Now(), + }) + } + onProgress(len(commons)) + + // 处理文件 + for _, object := range objects { + rel, err := filepath.Rel(*opt.Prefix, *object.Key) + if err != nil { + continue + } + res = append(res, fs.PhysicalObject{ + Name: path.Base(*object.Key), + Source: *object.Key, + RelativePath: filepath.ToSlash(rel), + Size: int64(*object.Size), + IsDir: false, + LastModify: time.Now(), + }) + } + onProgress(len(objects)) + + return res, nil + +} // Open 打开文件 func (handler *Driver) Open(ctx context.Context, path string) (*os.File, error) { diff --git a/pkg/filemanager/driver/upyun/upyun.go b/pkg/filemanager/driver/upyun/upyun.go index 2b055cc5..35fe5e7d 100644 --- a/pkg/filemanager/driver/upyun/upyun.go +++ b/pkg/filemanager/driver/upyun/upyun.go @@ -10,6 +10,15 @@ import ( "encoding/json" "errors" "fmt" + "io" + "net/url" + "os" + "path" + "strconv" + "strings" + "sync" + "time" + "github.com/cloudreve/Cloudreve/v4/ent" "github.com/cloudreve/Cloudreve/v4/inventory/types" "github.com/cloudreve/Cloudreve/v4/pkg/boolset" @@ -23,12 +32,6 @@ import ( "github.com/cloudreve/Cloudreve/v4/pkg/setting" "github.com/gin-gonic/gin" "github.com/upyun/go-sdk/upyun" - "io" - "net/url" - "os" - "strconv" - "strings" - "time" ) type ( @@ -78,66 +81,67 @@ func New(ctx context.Context, policy *ent.StoragePolicy, settings setting.Provid return driver, nil } -//func (handler *Driver) List(ctx context.Context, base string, recursive bool) ([]response.Object, error) { -// base = strings.TrimPrefix(base, "/") -// -// // 用于接受SDK返回对象的chan -// objChan := make(chan *upyun.FileInfo) -// objects := []*upyun.FileInfo{} -// -// // 列取配置 -// listConf := &upyun.GetObjectsConfig{ -// Path: "/" + base, -// ObjectsChan: objChan, -// MaxListTries: 1, -// } -// // 递归列取时不限制递归次数 -// if recursive { -// listConf.MaxListLevel = -1 -// } -// -// // 启动一个goroutine收集列取的对象信 -// wg := &sync.WaitGroup{} -// wg.Add(1) -// go func(input chan *upyun.FileInfo, output *[]*upyun.FileInfo, wg *sync.WaitGroup) { -// defer wg.Done() -// for { -// file, ok := <-input -// if !ok { -// return -// } -// *output = append(*output, file) -// } -// }(objChan, &objects, wg) -// -// up := upyun.NewUpYun(&upyun.UpYunConfig{ -// Bucket: handler.policy.BucketName, -// Operator: handler.policy.AccessKey, -// Password: handler.policy.SecretKey, -// }) -// -// err := up.List(listConf) -// if err != nil { -// return nil, err -// } -// -// wg.Wait() -// -// // 汇总处理列取结果 -// res := make([]response.Object, 0, len(objects)) -// for _, object := range objects { -// res = append(res, response.Object{ -// Name: path.Base(object.Name), -// RelativePath: object.Name, -// Source: path.Join(base, object.Name), -// Size: uint64(object.Size), -// IsDir: object.IsDir, -// LastModify: object.Time, -// }) -// } -// -// return res, nil -//} +func (handler *Driver) List(ctx context.Context, base string, onProgress driver.ListProgressFunc, recursive bool) ([]fs.PhysicalObject, error) { + base = strings.TrimPrefix(base, "/") + + // 用于接受SDK返回对象的chan + objChan := make(chan *upyun.FileInfo) + objects := []*upyun.FileInfo{} + + // 列取配置 + listConf := &upyun.GetObjectsConfig{ + Path: "/" + base, + ObjectsChan: objChan, + MaxListTries: 1, + } + // 递归列取时不限制递归次数 + if recursive { + listConf.MaxListLevel = -1 + } + + // 启动一个goroutine收集列取的对象信 + wg := &sync.WaitGroup{} + wg.Add(1) + go func(input chan *upyun.FileInfo, output *[]*upyun.FileInfo, wg *sync.WaitGroup) { + defer wg.Done() + for { + file, ok := <-input + if !ok { + return + } + *output = append(*output, file) + onProgress(1) + } + }(objChan, &objects, wg) + + up := upyun.NewUpYun(&upyun.UpYunConfig{ + Bucket: handler.policy.BucketName, + Operator: handler.policy.AccessKey, + Password: handler.policy.SecretKey, + }) + + err := up.List(listConf) + if err != nil { + return nil, err + } + + wg.Wait() + + // 汇总处理列取结果 + res := make([]fs.PhysicalObject, 0, len(objects)) + for _, object := range objects { + res = append(res, fs.PhysicalObject{ + Name: path.Base(object.Name), + RelativePath: object.Name, + Source: path.Join(base, object.Name), + Size: int64(object.Size), + IsDir: object.IsDir, + LastModify: object.Time, + }) + } + + return res, nil +} func (handler *Driver) Open(ctx context.Context, path string) (*os.File, error) { return nil, errors.New("not implemented") diff --git a/pkg/filemanager/fs/dbfs/dbfs.go b/pkg/filemanager/fs/dbfs/dbfs.go index 5289547c..590d5181 100644 --- a/pkg/filemanager/fs/dbfs/dbfs.go +++ b/pkg/filemanager/fs/dbfs/dbfs.go @@ -653,6 +653,7 @@ func (f *DBFS) createFile(ctx context.Context, parent *File, name string, fileTy Size: o.UploadRequest.Props.Size, ModifiedAt: o.UploadRequest.Props.LastModified, UploadSessionID: uuid.FromStringOrNil(o.UploadRequest.Props.UploadSessionID), + Importing: o.UploadRequest.ImportFrom != nil, } } diff --git a/pkg/filemanager/fs/dbfs/upload.go b/pkg/filemanager/fs/dbfs/upload.go index 5e592869..0c647d3b 100644 --- a/pkg/filemanager/fs/dbfs/upload.go +++ b/pkg/filemanager/fs/dbfs/upload.go @@ -117,7 +117,14 @@ func (f *DBFS) PrepareUpload(ctx context.Context, req *fs.UploadRequest, opts .. } // Get parent folder storage policy and performs validation - policy, err := f.getPreferredPolicy(ctx, ancestor) + var ( + policy *ent.StoragePolicy + ) + if req.ImportFrom == nil { + policy, err = f.getPreferredPolicy(ctx, ancestor) + } else { + policy, err = f.storagePolicyClient.GetPolicyByID(ctx, req.Props.PreferredStoragePolicy) + } if err != nil { return nil, err } @@ -133,7 +140,9 @@ func (f *DBFS) PrepareUpload(ctx context.Context, req *fs.UploadRequest, opts .. } // Generate save path by storage policy - isThumbnailAndPolicyNotAvailable := policy.ID != ancestor.Model.StoragePolicyFiles && (req.Props.EntityType != nil && *req.Props.EntityType == types.EntityTypeThumbnail) + isThumbnailAndPolicyNotAvailable := policy.ID != ancestor.Model.StoragePolicyFiles && + (req.Props.EntityType != nil && *req.Props.EntityType == types.EntityTypeThumbnail) && + req.ImportFrom == nil if req.Props.SavePath == "" || isThumbnailAndPolicyNotAvailable { req.Props.SavePath = generateSavePath(policy, req, f.user) if isThumbnailAndPolicyNotAvailable { @@ -174,7 +183,6 @@ func (f *DBFS) PrepareUpload(ctx context.Context, req *fs.UploadRequest, opts .. fileId = ancestor.ID() entityId = entity.ID() targetFile = ancestor.Model - lockToken = ls.Exclude(lr, f.user, f.hasher) } else { uploadPlaceholder, err := f.Create(ctx, req.Props.Uri, types.FileTypeFile, fs.WithUploadRequest(req), @@ -190,15 +198,19 @@ func (f *DBFS) PrepareUpload(ctx context.Context, req *fs.UploadRequest, opts .. fileId = uploadPlaceholder.ID() entityId = uploadPlaceholder.Entities()[0].ID() targetFile = uploadPlaceholder.(*File).Model - lockToken = ls.Exclude(lr, f.user, f.hasher) } - // create metadata to record uploading entity id - if err := fc.UpsertMetadata(ctx, targetFile, map[string]string{ - MetadataUploadSessionID: req.Props.UploadSessionID, - }, nil); err != nil { - _ = inventory.Rollback(dbTx) - return nil, serializer.NewError(serializer.CodeDBError, "Failed to update upload session metadata", err) + if req.ImportFrom == nil { + // If not importing, we can keep the lock + lockToken = ls.Exclude(lr, f.user, f.hasher) + + // create metadata to record uploading entity id + if err := fc.UpsertMetadata(ctx, targetFile, map[string]string{ + MetadataUploadSessionID: req.Props.UploadSessionID, + }, nil); err != nil { + _ = inventory.Rollback(dbTx) + return nil, serializer.NewError(serializer.CodeDBError, "Failed to update upload session metadata", err) + } } if err := inventory.CommitWithStorageDiff(ctx, dbTx, f.l, f.userClient); err != nil { @@ -217,6 +229,7 @@ func (f *DBFS) PrepareUpload(ctx context.Context, req *fs.UploadRequest, opts .. }, FileID: fileId, NewFileCreated: !fileExisted, + Importing: req.ImportFrom != nil, EntityID: entityId, UID: f.user.ID, Policy: policy, diff --git a/pkg/filemanager/fs/fs.go b/pkg/filemanager/fs/fs.go index 1962e8a2..7d9583b5 100644 --- a/pkg/filemanager/fs/fs.go +++ b/pkg/filemanager/fs/fs.go @@ -258,6 +258,7 @@ type ( ChunkSize int64 SentinelTaskID int NewFileCreated bool // If new file is created for this session + Importing bool // If the upload is importing from another file LockToken string // Token of the locked placeholder file Props *UploadProps @@ -377,6 +378,15 @@ type ( Size int64 OmitName bool // if true, file name will not be validated } + + PhysicalObject struct { + Name string `json:"name"` + Source string `json:"source"` + RelativePath string `json:"relative_path"` + Size int64 `json:"size"` + IsDir bool `json:"is_dir"` + LastModify time.Time `json:"last_modify"` + } ) const ( @@ -599,7 +609,8 @@ type ( Offset int64 ProgressFunc `json:"-"` - read int64 + ImportFrom *PhysicalObject `json:"-"` + read int64 } ) diff --git a/pkg/filemanager/fs/uri.go b/pkg/filemanager/fs/uri.go index 584b0c5a..e94ee5f4 100644 --- a/pkg/filemanager/fs/uri.go +++ b/pkg/filemanager/fs/uri.go @@ -344,6 +344,13 @@ func NewShareUri(id, password string) string { return fmt.Sprintf("%s://%s@%s", constants.CloudreveScheme, id, constants.FileSystemShare) } +func NewMyUri(id string) string { + if id == "" { + return fmt.Sprintf("%s://%s", constants.CloudreveScheme, constants.FileSystemMy) + } + return fmt.Sprintf("%s://%s@%s", constants.CloudreveScheme, id, constants.FileSystemMy) +} + // PathEscape is same as url.PathEscape, with modifications to incoporate with JS encodeURIComponent: // encodeURI() escapes all characters except: // diff --git a/pkg/filemanager/manager/entity.go b/pkg/filemanager/manager/entity.go index 6ffe058f..3dd587dd 100644 --- a/pkg/filemanager/manager/entity.go +++ b/pkg/filemanager/manager/entity.go @@ -11,11 +11,13 @@ import ( "github.com/cloudreve/Cloudreve/v4/ent/user" "github.com/cloudreve/Cloudreve/v4/inventory/types" "github.com/cloudreve/Cloudreve/v4/pkg/cluster/routes" + "github.com/cloudreve/Cloudreve/v4/pkg/filemanager/driver" "github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs" "github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs/dbfs" "github.com/cloudreve/Cloudreve/v4/pkg/filemanager/manager/entitysource" "github.com/cloudreve/Cloudreve/v4/pkg/hashid" "github.com/cloudreve/Cloudreve/v4/pkg/serializer" + "github.com/gofrs/uuid" "github.com/samber/lo" ) @@ -41,6 +43,10 @@ type ( ExtractAndSaveMediaMeta(ctx context.Context, uri *fs.URI, entityID int) error // RecycleEntities recycles a group of entities RecycleEntities(ctx context.Context, force bool, entityIDs ...int) error + // ListPhysical lists physical files in a path + ListPhysical(ctx context.Context, path string, policyID int, recursive bool, progress driver.ListProgressFunc) ([]fs.PhysicalObject, error) + // ImportPhysical imports a physical file to a Cloudreve file + ImportPhysical(ctx context.Context, dst *fs.URI, policyId int, src fs.PhysicalObject, completeHook bool) error } DirectLink struct { File fs.File @@ -369,6 +375,51 @@ func (l *manager) DeleteVersion(ctx context.Context, path *fs.URI, version int) return l.fs.VersionControl(ctx, path, version, true) } +func (l *manager) ListPhysical(ctx context.Context, path string, policyID int, recursive bool, progress driver.ListProgressFunc) ([]fs.PhysicalObject, error) { + policy, err := l.dep.StoragePolicyClient().GetPolicyByID(ctx, policyID) + if err != nil { + return nil, err + } + + driver, err := l.GetStorageDriver(ctx, policy) + if err != nil { + return nil, err + } + + return driver.List(ctx, path, progress, recursive) +} + +func (l *manager) ImportPhysical(ctx context.Context, dst *fs.URI, policyId int, src fs.PhysicalObject, completeHook bool) error { + targetUri := dst.Join(src.RelativePath) + req := &fs.UploadRequest{ + Props: &fs.UploadProps{ + Uri: targetUri, + UploadSessionID: uuid.Must(uuid.NewV4()).String(), + Size: src.Size, + PreferredStoragePolicy: policyId, + SavePath: src.Source, + LastModified: &src.LastModify, + }, + ImportFrom: &src, + } + + // Prepare for upload + uploadSession, err := l.fs.PrepareUpload(ctx, req) + if err != nil { + return fmt.Errorf("faield to prepare uplaod: %w", err) + } + if completeHook { + d, err := l.GetStorageDriver(ctx, l.CastStoragePolicyOnSlave(ctx, uploadSession.Policy)) + if err != nil { + return err + } + + l.onNewEntityUploaded(ctx, uploadSession, d) + } + + return nil +} + func entityUrlCacheKey(id int, speed int64, displayName string, download bool, siteUrl string) string { hash := sha1.New() hash.Write([]byte(fmt.Sprintf("%d_%d_%s_%t_%s", id, diff --git a/pkg/filemanager/manager/recycle.go b/pkg/filemanager/manager/recycle.go index 6213e350..63014864 100644 --- a/pkg/filemanager/manager/recycle.go +++ b/pkg/filemanager/manager/recycle.go @@ -219,9 +219,13 @@ func (m *manager) RecycleEntities(ctx context.Context, force bool, entityIDs ... mapSrcToId[entity.Source()] = entity.ID() } - res, err := d.Delete(ctx, lo.Map(chunk, func(entity fs.Entity, index int) string { + toBeDeletedSrc := lo.Map(lo.Filter(chunk, func(item fs.Entity, index int) bool { + // Only delete entities that are not marked as "unlink only" + return item.Model().RecycleOptions == nil || !item.Model().RecycleOptions.UnlinkOnly + }), func(entity fs.Entity, index int) string { return entity.Source() - })...) + }) + res, err := d.Delete(ctx, toBeDeletedSrc...) if err != nil { for _, src := range res { ae.Add(strconv.Itoa(mapSrcToId[src]), err) diff --git a/pkg/filemanager/manager/thumbnail.go b/pkg/filemanager/manager/thumbnail.go index 09a1efc2..78e3f3e2 100644 --- a/pkg/filemanager/manager/thumbnail.go +++ b/pkg/filemanager/manager/thumbnail.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/cloudreve/Cloudreve/v4/pkg/thumb" "os" "runtime" "time" @@ -270,6 +271,11 @@ func (m *GenerateThumbTask) Do(ctx context.Context) (task.Status, error) { res, err := m.m.generateThumb(ctx, m.uri, m.ext, m.es) if err != nil { + if errors.Is(err, thumb.ErrNotAvailable) { + m.sig <- &generateRes{nil, err} + return task.StatusCompleted, nil + } + return task.StatusError, err } diff --git a/pkg/filemanager/manager/upload.go b/pkg/filemanager/manager/upload.go index 4eae2640..0337761a 100644 --- a/pkg/filemanager/manager/upload.go +++ b/pkg/filemanager/manager/upload.go @@ -339,7 +339,7 @@ func (m *manager) OnUploadFailed(ctx context.Context, session *fs.UploadSession) if err := m.Delete(ctx, []*fs.URI{session.Props.Uri}, fs.WithSysSkipSoftDelete(true)); err != nil { m.l.Warning("OnUploadFailed hook failed to delete file: %s", err) } - } else { + } else if !session.Importing { if err := m.fs.VersionControl(ctx, session.Props.Uri, session.EntityID, true); err != nil { m.l.Warning("OnUploadFailed hook failed to version control: %s", err) } diff --git a/pkg/filemanager/workflows/extract.go b/pkg/filemanager/workflows/extract.go index 9f7d8659..7126e60b 100644 --- a/pkg/filemanager/workflows/extract.go +++ b/pkg/filemanager/workflows/extract.go @@ -61,8 +61,9 @@ const ( ProgressTypeExtractSize = "extract_size" ProgressTypeDownload = "download" - SummaryKeySrc = "src" - SummaryKeyDst = "dst" + SummaryKeySrc = "src" + SummaryKeySrcPhysical = "src_physical" + SummaryKeyDst = "dst" ) func init() { diff --git a/pkg/filemanager/workflows/import.go b/pkg/filemanager/workflows/import.go new file mode 100644 index 00000000..84b0ed3c --- /dev/null +++ b/pkg/filemanager/workflows/import.go @@ -0,0 +1,197 @@ +package workflows + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "sync/atomic" + + "github.com/cloudreve/Cloudreve/v4/application/dependency" + "github.com/cloudreve/Cloudreve/v4/ent" + "github.com/cloudreve/Cloudreve/v4/ent/task" + "github.com/cloudreve/Cloudreve/v4/inventory" + "github.com/cloudreve/Cloudreve/v4/inventory/types" + "github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs" + "github.com/cloudreve/Cloudreve/v4/pkg/filemanager/manager" + "github.com/cloudreve/Cloudreve/v4/pkg/hashid" + "github.com/cloudreve/Cloudreve/v4/pkg/logging" + "github.com/cloudreve/Cloudreve/v4/pkg/queue" + "github.com/cloudreve/Cloudreve/v4/pkg/serializer" +) + +type ( + ImportTask struct { + *queue.DBTask + + l logging.Logger + state *ImportTaskState + progress queue.Progresses + } + ImportTaskState struct { + PolicyID int `json:"policy_id"` + Src string `json:"src"` + Recursive bool `json:"is_recursive"` + Dst string `json:"dst"` + Phase ImportTaskPhase `json:"phase"` + Failed int `json:"failed,omitempty"` + ExtractMediaMeta bool `json:"extract_media_meta"` + } + ImportTaskPhase string +) + +const ( + ProgressTypeImported = "imported" + ProgressTypeIndexed = "indexed" +) + +func init() { + queue.RegisterResumableTaskFactory(queue.ImportTaskType, NewImportTaskFromModel) +} + +func NewImportTask(ctx context.Context, u *ent.User, src string, recursive bool, dst string, policyID int) (queue.Task, error) { + state := &ImportTaskState{ + Src: src, + Recursive: recursive, + Dst: dst, + PolicyID: policyID, + } + stateBytes, err := json.Marshal(state) + if err != nil { + return nil, fmt.Errorf("failed to marshal state: %w", err) + } + + t := &ImportTask{ + DBTask: &queue.DBTask{ + Task: &ent.Task{ + Type: queue.ImportTaskType, + CorrelationID: logging.CorrelationID(ctx), + PrivateState: string(stateBytes), + PublicState: &types.TaskPublicState{}, + }, + DirectOwner: u, + }, + } + + return t, nil +} + +func NewImportTaskFromModel(task *ent.Task) queue.Task { + return &ImportTask{ + DBTask: &queue.DBTask{ + Task: task, + }, + } +} + +func (m *ImportTask) Do(ctx context.Context) (task.Status, error) { + dep := dependency.FromContext(ctx) + m.l = dep.Logger() + + m.Lock() + if m.progress == nil { + m.progress = make(queue.Progresses) + } + m.progress[ProgressTypeIndexed] = &queue.Progress{} + m.Unlock() + + // unmarshal state + state := &ImportTaskState{} + if err := json.Unmarshal([]byte(m.State()), state); err != nil { + return task.StatusError, fmt.Errorf("failed to unmarshal state: %w", err) + } + m.state = state + + next, err := m.processImport(ctx, dep) + + newStateStr, marshalErr := json.Marshal(m.state) + if marshalErr != nil { + return task.StatusError, fmt.Errorf("failed to marshal state: %w", marshalErr) + } + + m.Lock() + m.Task.PrivateState = string(newStateStr) + m.Unlock() + return next, err +} + +func (m *ImportTask) processImport(ctx context.Context, dep dependency.Dep) (task.Status, error) { + user := inventory.UserFromContext(ctx) + fm := manager.NewFileManager(dep, user) + defer fm.Recycle() + + failed := 0 + dst, err := fs.NewUriFromString(m.state.Dst) + if err != nil { + return task.StatusError, fmt.Errorf("failed to parse dst: %s (%w)", err, queue.CriticalErr) + } + + physicalFiles, err := fm.ListPhysical(ctx, m.state.Src, m.state.PolicyID, m.state.Recursive, + func(i int) { + atomic.AddInt64(&m.progress[ProgressTypeIndexed].Current, int64(i)) + }) + if err != nil { + return task.StatusError, fmt.Errorf("failed to list physical files: %w", err) + } + + m.l.Info("Importing %d physical files", len(physicalFiles)) + + m.Lock() + m.progress[ProgressTypeImported] = &queue.Progress{ + Total: int64(len(physicalFiles)), + } + delete(m.progress, ProgressTypeIndexed) + m.Unlock() + + for _, physicalFile := range physicalFiles { + if physicalFile.IsDir { + m.l.Info("Creating folder %s", physicalFile.RelativePath) + _, err := fm.Create(ctx, dst.Join(physicalFile.RelativePath), types.FileTypeFolder) + atomic.AddInt64(&m.progress[ProgressTypeImported].Current, 1) + if err != nil { + m.l.Warning("Failed to create folder %s: %s", physicalFile.RelativePath, err) + failed++ + } + } else { + m.l.Info("Importing file %s", physicalFile.RelativePath) + err := fm.ImportPhysical(ctx, dst, m.state.PolicyID, physicalFile, m.state.ExtractMediaMeta) + atomic.AddInt64(&m.progress[ProgressTypeImported].Current, 1) + if err != nil { + var appErr serializer.AppError + if errors.As(err, &appErr) && appErr.Code == serializer.CodeObjectExist { + m.l.Info("File %s already exists, skipping", physicalFile.RelativePath) + continue + } + m.l.Error("Failed to import file %s: %s, skipping", physicalFile.RelativePath, err) + failed++ + } + } + } + + return task.StatusCompleted, nil +} + +func (m *ImportTask) Progress(ctx context.Context) queue.Progresses { + m.Lock() + defer m.Unlock() + return m.progress +} + +func (m *ImportTask) Summarize(hasher hashid.Encoder) *queue.Summary { + // unmarshal state + if m.state == nil { + if err := json.Unmarshal([]byte(m.State()), &m.state); err != nil { + return nil + } + } + + return &queue.Summary{ + Phase: string(m.state.Phase), + Props: map[string]any{ + SummaryKeyDst: m.state.Dst, + SummaryKeySrcStr: m.state.Src, + SummaryKeyFailed: m.state.Failed, + SummaryKeySrcDstPolicyID: hashid.EncodePolicyID(hasher, m.state.PolicyID), + }, + } +} diff --git a/pkg/queue/task.go b/pkg/queue/task.go index 5318faa6..90bb7429 100644 --- a/pkg/queue/task.go +++ b/pkg/queue/task.go @@ -103,6 +103,7 @@ const ( ExtractArchiveTaskType = "extract_archive" RelocateTaskType = "relocate" RemoteDownloadTaskType = "remote_download" + ImportTaskType = "import" SlaveCreateArchiveTaskType = "slave_create_archive" SlaveUploadTaskType = "slave_upload" diff --git a/routers/controllers/file.go b/routers/controllers/file.go index 8b70c5fa..2f6866ef 100644 --- a/routers/controllers/file.go +++ b/routers/controllers/file.go @@ -34,6 +34,23 @@ func CreateArchive(c *gin.Context) { } } +// ImportFiles imports files +func ImportFiles(c *gin.Context) { + service := ParametersFromContext[*explorer.ImportWorkflowService](c, explorer.CreateImportParamCtx{}) + resp, err := service.CreateImportTask(c) + if err != nil { + c.JSON(200, serializer.Err(c, err)) + c.Abort() + return + } + + if resp != nil { + c.JSON(200, serializer.Response{ + Data: resp, + }) + } +} + // CreateRemoteDownload creates remote download task func CreateRemoteDownload(c *gin.Context) { service := ParametersFromContext[*explorer.DownloadWorkflowService](c, explorer.CreateDownloadParamCtx{}) diff --git a/routers/controllers/slave.go b/routers/controllers/slave.go index 9ed9f59f..82133b3b 100644 --- a/routers/controllers/slave.go +++ b/routers/controllers/slave.go @@ -118,13 +118,15 @@ func SlavePing(c *gin.Context) { // SlaveList 从机列出文件 func SlaveList(c *gin.Context) { - var service explorer.SlaveListService - if err := c.ShouldBindJSON(&service); err == nil { - res := service.List(c) - c.JSON(200, res) - } else { - c.JSON(200, ErrorResponse(err)) + service := ParametersFromContext[*explorer.SlaveListService](c, explorer.SlaveListParamCtx{}) + objects, err := service.List(c) + if err != nil { + c.JSON(200, serializer.Err(c, err)) + c.Abort() + return } + + c.JSON(200, serializer.NewResponseWithGobData(c, objects)) } // SlaveDownloadTaskCreate creates a download task on slave diff --git a/routers/router.go b/routers/router.go index 0389fb4c..f117b8c5 100644 --- a/routers/router.go +++ b/routers/router.go @@ -97,6 +97,11 @@ func initSlaveFileRouter(v4 *gin.RouterGroup) { file.DELETE("", controllers.FromJSON[explorer.SlaveDeleteFileService](explorer.SlaveDeleteFileParamCtx{}), controllers.SlaveDelete) + // 列出文件 + file.GET("list", + controllers.FromQuery[explorer.SlaveListService](explorer.SlaveListParamCtx{}), + controllers.SlaveList, + ) } } @@ -683,6 +688,12 @@ func initMasterRouter(dep dependency.Dep) *gin.Engine { controllers.FromJSON[explorer.CreateViewerSessionService](explorer.CreateViewerSessionParamCtx{}), controllers.CreateViewerSession, ) + // Create task to import files + wf.POST("import", + middleware.IsAdmin(), + controllers.FromJSON[explorer.ImportWorkflowService](explorer.CreateImportParamCtx{}), + controllers.ImportFiles, + ) // 取得文件外链 file.PUT("source", diff --git a/service/explorer/file.go b/service/explorer/file.go index 84fb754b..7fa77f3f 100644 --- a/service/explorer/file.go +++ b/service/explorer/file.go @@ -45,26 +45,6 @@ func init() { gob.Register(ArchiveDownloadSession{}) } -// List 列出从机上的文件 -func (service *SlaveListService) List(c *gin.Context) serializer.Response { - //// 创建文件系统 - //fs, err := filesystem.NewAnonymousFileSystem() - //if err != nil { - // return serializer.ErrDeprecated(serializer.CodeCreateFSError, "", err) - //} - //defer fs.Recycle() - // - //objects, err := fs.Handler.List(context.Background(), service.Path, service.Recursive) - //if err != nil { - // return serializer.ErrDeprecated(serializer.CodeIOFailed, "Cannot list files", err) - //} - // - //res, _ := json.Marshal(objects) - //return serializer.Response{Data: string(res)} - - return serializer.Response{} -} - // ArchiveService 文件流式打包下載服务 type ( ArchiveService struct { diff --git a/service/explorer/slave.go b/service/explorer/slave.go index f63e902a..67fafc76 100644 --- a/service/explorer/slave.go +++ b/service/explorer/slave.go @@ -3,6 +3,8 @@ package explorer import ( "encoding/base64" "fmt" + "strings" + "github.com/cloudreve/Cloudreve/v4/application/dependency" "github.com/cloudreve/Cloudreve/v4/inventory/types" "github.com/cloudreve/Cloudreve/v4/pkg/cluster/routes" @@ -14,7 +16,6 @@ import ( "github.com/cloudreve/Cloudreve/v4/pkg/serializer" "github.com/gin-gonic/gin" "github.com/samber/lo" - "strings" ) // SlaveDownloadService 从机文件下載服务 @@ -35,12 +36,6 @@ type SlaveFilesService struct { Files []string `json:"files" binding:"required,gt=0"` } -// SlaveListService 从机列表服务 -type SlaveListService struct { - Path string `json:"path" binding:"required,min=1,max=65535"` - Recursive bool `json:"recursive"` -} - // SlaveServe serves file content func (s *EntityDownloadService) SlaveServe(c *gin.Context) error { dep := dependency.FromContext(c) @@ -249,3 +244,25 @@ func (service *SlaveDeleteFileService) Delete(c *gin.Context) ([]string, error) return nil, nil } + +type ( + SlaveListParamCtx struct{} + SlaveListService struct { + Path string `uri:"path" binding:"required"` + Recursive bool `uri:"recursive"` + } +) + +func (s *SlaveListService) List(c *gin.Context) ([]fs.PhysicalObject, error) { + dep := dependency.FromContext(c) + m := manager.NewFileManager(dep, nil) + defer m.Recycle() + d := m.LocalDriver(nil) + + objects, err := d.List(c, s.Path, func(i int) {}, s.Recursive) + if err != nil { + return nil, fmt.Errorf("failed to list files: %w", err) + } + + return objects, nil +} diff --git a/service/explorer/workflows.go b/service/explorer/workflows.go index a6f614c7..c108754e 100644 --- a/service/explorer/workflows.go +++ b/service/explorer/workflows.go @@ -2,6 +2,7 @@ package explorer import ( "encoding/gob" + "github.com/cloudreve/Cloudreve/v4/pkg/hashid" "time" "github.com/cloudreve/Cloudreve/v4/application/dependency" @@ -258,6 +259,57 @@ func (service *ArchiveWorkflowService) CreateCompressTask(c *gin.Context) (*Task return BuildTaskResponse(t, nil, hasher), nil } +type ( + ImportWorkflowService struct { + Src string `json:"src" binding:"required"` + Dst string `json:"dst" binding:"required"` + ExtractMediaMeta bool `json:"extract_media_meta"` + UserID string `json:"user_id" binding:"required"` + Recursive bool `json:"recursive"` + PolicyID int `json:"policy_id" binding:"required"` + } + CreateImportParamCtx struct{} +) + +func (service *ImportWorkflowService) CreateImportTask(c *gin.Context) (*TaskResponse, error) { + dep := dependency.FromContext(c) + user := inventory.UserFromContext(c) + hasher := dep.HashIDEncoder() + m := manager.NewFileManager(dep, user) + defer m.Recycle() + + if !user.Edges.Group.Permissions.Enabled(int(types.GroupPermissionIsAdmin)) { + return nil, serializer.NewError(serializer.CodeGroupNotAllowed, "Only admin can import files", nil) + } + + userId, err := hasher.Decode(service.UserID, hashid.UserID) + if err != nil { + return nil, serializer.NewError(serializer.CodeParamErr, "Invalid user id", err) + } + + owner, err := dep.UserClient().GetLoginUserByID(c, userId) + if err != nil || owner.ID == 0 { + return nil, serializer.NewError(serializer.CodeDBError, "Failed to get user", err) + } + + dst, err := fs.NewUriFromString(fs.NewMyUri(service.UserID)) + if err != nil { + return nil, serializer.NewError(serializer.CodeParamErr, "Invalid destination", err) + } + + // Create task + t, err := workflows.NewImportTask(c, owner, service.Src, service.Recursive, dst.Join(service.Dst).String(), service.PolicyID) + if err != nil { + return nil, serializer.NewError(serializer.CodeCreateTaskError, "Failed to create task", err) + } + + if err := dep.IoIntenseQueue(c).QueueTask(c, t); err != nil { + return nil, serializer.NewError(serializer.CodeCreateTaskError, "Failed to queue task", err) + } + + return BuildTaskResponse(t, nil, hasher), nil +} + type ( ListTaskService struct { PageSize int `form:"page_size" binding:"required,min=10,max=100"` @@ -279,7 +331,7 @@ func (service *ListTaskService) ListTasks(c *gin.Context) (*TaskListResponse, er PageToken: service.NextPageToken, PageSize: service.PageSize, }, - Types: []string{queue.CreateArchiveTaskType, queue.ExtractArchiveTaskType, queue.RelocateTaskType}, + Types: []string{queue.CreateArchiveTaskType, queue.ExtractArchiveTaskType, queue.RelocateTaskType, queue.ImportTaskType}, UserID: user.ID, }