You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
cloudreve/pkg/task/import.go

222 lines
5.4 KiB

package task
import (
"context"
"encoding/json"
model "github.com/HFO4/cloudreve/models"
"github.com/HFO4/cloudreve/pkg/filesystem"
"github.com/HFO4/cloudreve/pkg/filesystem/driver/local"
"github.com/HFO4/cloudreve/pkg/filesystem/fsctx"
"github.com/HFO4/cloudreve/pkg/util"
"path"
)
// ImportTask 导入务
type ImportTask struct {
User *model.User
TaskModel *model.Task
TaskProps ImportProps
Err *JobError
}
// ImportProps 导入任务属性
type ImportProps struct {
PolicyID uint `json:"policy_id"` // 存储策略ID
Src string `json:"src"` // 原始路径
Recursive bool `json:"is_recursive"` // 是否递归导入
Dst string `json:"dst"` // 目的目录
}
// Props 获取任务属性
func (job *ImportTask) Props() string {
res, _ := json.Marshal(job.TaskProps)
return string(res)
}
// Type 获取任务状态
func (job *ImportTask) Type() int {
return ImportTaskType
}
// Creator 获取创建者ID
func (job *ImportTask) Creator() uint {
return job.User.ID
}
// Model 获取任务的数据库模型
func (job *ImportTask) Model() *model.Task {
return job.TaskModel
}
// SetStatus 设定状态
func (job *ImportTask) SetStatus(status int) {
job.TaskModel.SetStatus(status)
}
// SetError 设定任务失败信息
func (job *ImportTask) SetError(err *JobError) {
job.Err = err
res, _ := json.Marshal(job.Err)
job.TaskModel.SetError(string(res))
}
// SetErrorMsg 设定任务失败信息
func (job *ImportTask) SetErrorMsg(msg string, err error) {
jobErr := &JobError{Msg: msg}
if err != nil {
jobErr.Error = err.Error()
}
job.SetError(jobErr)
}
// GetError 返回任务失败信息
func (job *ImportTask) GetError() *JobError {
return job.Err
}
// Do 开始执行任务
func (job *ImportTask) Do() {
ctx := context.Background()
// 查找存储策略
policy, err := model.GetPolicyByID(job.TaskProps.PolicyID)
if err != nil {
job.SetErrorMsg("找不到存储策略", err)
return
}
// 创建文件系统
job.User.Policy = policy
fs, err := filesystem.NewFileSystem(job.User)
if err != nil {
job.SetErrorMsg(err.Error(), nil)
return
}
defer fs.Recycle()
// 注册钩子
fs.Use("BeforeAddFile", filesystem.HookValidateFile)
fs.Use("BeforeAddFile", filesystem.HookValidateCapacity)
fs.Use("AfterValidateFailed", filesystem.HookGiveBackCapacity)
// 列取目录、对象
job.TaskModel.SetProgress(ListingProgress)
coxIgnoreConflict := context.WithValue(context.Background(), fsctx.IgnoreConflictCtx,
true)
objects, err := fs.Handler.List(ctx, job.TaskProps.Src, job.TaskProps.Recursive)
if err != nil {
job.SetErrorMsg("无法列取文件", err)
return
}
job.TaskModel.SetProgress(InsertingProgress)
// 虚拟目录路径与folder对象ID的对应
pathCache := make(map[string]*model.Folder, len(objects))
// 插入目录记录到用户文件系统
for _, object := range objects {
if object.IsDir {
// 创建目录
virtualPath := path.Join(job.TaskProps.Dst, object.RelativePath)
folder, err := fs.CreateDirectory(coxIgnoreConflict, virtualPath)
if err != nil {
util.Log().Warning("导入任务无法创建用户目录[%s], %s", virtualPath, err)
} else if folder.ID > 0 {
pathCache[virtualPath] = folder
}
}
}
// 插入文件记录到用户文件系统
for _, object := range objects {
if !object.IsDir {
// 创建文件信息
virtualPath := path.Dir(path.Join(job.TaskProps.Dst, object.RelativePath))
fileHeader := local.FileStream{
Size: object.Size,
VirtualPath: virtualPath,
Name: object.Name,
}
addFileCtx := context.WithValue(ctx, fsctx.FileHeaderCtx, fileHeader)
addFileCtx = context.WithValue(addFileCtx, fsctx.SavePathCtx, object.Source)
// 查找父目录
parentFolder := &model.Folder{}
if parent, ok := pathCache[virtualPath]; ok {
parentFolder = parent
} else {
exist, folder := fs.IsPathExist(virtualPath)
if exist {
parentFolder = folder
} else {
folder, err := fs.CreateDirectory(context.Background(), virtualPath)
if err != nil {
util.Log().Warning("导入任务无法创建用户目录[%s], %s",
virtualPath, err)
continue
}
parentFolder = folder
}
}
// 插入文件记录
_, err := fs.AddFile(addFileCtx, parentFolder)
if err != nil {
util.Log().Warning("导入任务无法创插入文件[%s], %s",
object.RelativePath, err)
if err == filesystem.ErrInsufficientCapacity {
job.SetErrorMsg("容量不足", err)
return
}
}
}
}
}
// NewImportTask 新建导入任务
func NewImportTask(user, policy uint, src, dst string, recursive bool) (Job, error) {
creator, err := model.GetActiveUserByID(user)
if err != nil {
return nil, err
}
newTask := &ImportTask{
User: &creator,
TaskProps: ImportProps{
PolicyID: policy,
Recursive: recursive,
Src: src,
Dst: dst,
},
}
record, err := Record(newTask)
if err != nil {
return nil, err
}
newTask.TaskModel = record
return newTask, nil
}
// NewImportTaskFromModel 从数据库记录中恢复导入任务
func NewImportTaskFromModel(task *model.Task) (Job, error) {
user, err := model.GetActiveUserByID(task.UserID)
if err != nil {
return nil, err
}
newTask := &ImportTask{
User: &user,
TaskModel: task,
}
err = json.Unmarshal([]byte(task.Props), &newTask.TaskProps)
if err != nil {
return nil, err
}
return newTask, nil
}