diff --git a/pkg/task/import.go b/pkg/task/import.go new file mode 100644 index 0000000..b4f1927 --- /dev/null +++ b/pkg/task/import.go @@ -0,0 +1,216 @@ +package task + +import ( + "context" + "encoding/json" + model "github.com/HFO4/cloudreve/models" + "github.com/HFO4/cloudreve/pkg/filesystem" + "github.com/HFO4/cloudreve/pkg/filesystem/driver/local" + "github.com/HFO4/cloudreve/pkg/filesystem/fsctx" + "github.com/HFO4/cloudreve/pkg/util" + "path" +) + +// ImportTask 导入务 +type ImportTask struct { + User *model.User + TaskModel *model.Task + TaskProps ImportProps + Err *JobError +} + +// ImportProps 导入任务属性 +type ImportProps struct { + PolicyID uint `json:"policy_id"` // 存储策略ID + Src string `json:"src"` // 原始路径 + Recursive bool `json:"is_recursive"` // 是否递归导入 + Dst string `json:"dst"` // 目的目录 +} + +// Props 获取任务属性 +func (job *ImportTask) Props() string { + res, _ := json.Marshal(job.TaskProps) + return string(res) +} + +// Type 获取任务状态 +func (job *ImportTask) Type() int { + return ImportTaskType +} + +// Creator 获取创建者ID +func (job *ImportTask) Creator() uint { + return job.User.ID +} + +// Model 获取任务的数据库模型 +func (job *ImportTask) Model() *model.Task { + return job.TaskModel +} + +// SetStatus 设定状态 +func (job *ImportTask) SetStatus(status int) { + job.TaskModel.SetStatus(status) +} + +// SetError 设定任务失败信息 +func (job *ImportTask) SetError(err *JobError) { + job.Err = err + res, _ := json.Marshal(job.Err) + job.TaskModel.SetError(string(res)) +} + +// SetErrorMsg 设定任务失败信息 +func (job *ImportTask) SetErrorMsg(msg string, err error) { + jobErr := &JobError{Msg: msg} + if err != nil { + jobErr.Error = err.Error() + } + job.SetError(jobErr) +} + +// GetError 返回任务失败信息 +func (job *ImportTask) GetError() *JobError { + return job.Err +} + +// Do 开始执行任务 +func (job *ImportTask) Do() { + ctx := context.Background() + + // 查找存储策略 + policy, err := model.GetPolicyByID(job.TaskProps.PolicyID) + if err != nil { + job.SetErrorMsg("找不到存储策略", err) + return + } + + // 创建文件系统 + job.User.Policy = policy + fs, err := filesystem.NewFileSystem(job.User) + if err != nil { + job.SetErrorMsg(err.Error(), nil) + return + } + defer fs.Recycle() + + // 注册钩子 + fs.Use("BeforeAddFile", filesystem.HookValidateFile) + fs.Use("BeforeAddFile", filesystem.HookValidateCapacity) + fs.Use("AfterValidateFailed", filesystem.HookGiveBackCapacity) + + // 列取目录、对象 + job.TaskModel.SetProgress(ListingProgress) + coxIgnoreConflict := context.WithValue(context.Background(), fsctx.IgnoreConflictCtx, + true) + objects, err := fs.Handler.List(ctx, job.TaskProps.Src, job.TaskProps.Recursive) + if err != nil { + job.SetErrorMsg("无法列取文件", err) + return + } + + job.TaskModel.SetProgress(InsertingProgress) + + // 虚拟目录路径与folder对象ID的对应 + pathCache := make(map[string]*model.Folder, len(objects)) + + // 插入目录记录到用户文件系统 + for _, object := range objects { + if object.IsDir { + // 创建目录 + virtualPath := path.Join(job.TaskProps.Dst, object.RelativePath) + folder, err := fs.CreateDirectory(coxIgnoreConflict, virtualPath) + if err != nil { + util.Log().Warning("导入任务无法创建用户目录[%s], %s", virtualPath, err) + } else if folder.ID > 0 { + pathCache[virtualPath] = folder + } + } + } + + // 插入文件记录到用户文件系统 + for _, object := range objects { + if !object.IsDir { + // 创建文件信息 + virtualPath := path.Dir(path.Join(job.TaskProps.Dst, object.RelativePath)) + fileHeader := local.FileStream{ + Size: object.Size, + VirtualPath: virtualPath, + Name: object.Name, + } + addFileCtx := context.WithValue(ctx, fsctx.FileHeaderCtx, fileHeader) + addFileCtx = context.WithValue(addFileCtx, fsctx.SavePathCtx, object.Source) + + // 查找父目录 + parentFolder := &model.Folder{} + if parent, ok := pathCache[virtualPath]; ok { + parentFolder = parent + } else { + if exist, folder := fs.IsPathExist(virtualPath); exist { + parentFolder = folder + } else { + util.Log().Warning("导入任务无法创插入文件[%s], 父目录不存在", + object.RelativePath) + continue + } + } + + // 插入文件记录 + _, err := fs.AddFile(addFileCtx, parentFolder) + if err != nil { + util.Log().Warning("导入任务无法创插入文件[%s], %s", + object.RelativePath, err) + if err == filesystem.ErrInsufficientCapacity { + job.SetErrorMsg("容量不足", err) + return + } + } + + } + } +} + +// NewImportTask 新建导入任务 +func NewImportTask(user, policy uint, src, dst string, recursive bool) (Job, error) { + creator, err := model.GetActiveUserByID(user) + if err != nil { + return nil, err + } + + newTask := &ImportTask{ + User: &creator, + TaskProps: ImportProps{ + PolicyID: policy, + Recursive: recursive, + Src: src, + Dst: dst, + }, + } + + record, err := Record(newTask) + if err != nil { + return nil, err + } + newTask.TaskModel = record + + return newTask, nil +} + +// NewImportTaskFromModel 从数据库记录中恢复导入任务 +func NewImportTaskFromModel(task *model.Task) (Job, error) { + user, err := model.GetActiveUserByID(task.UserID) + if err != nil { + return nil, err + } + newTask := &ImportTask{ + User: &user, + TaskModel: task, + } + + err = json.Unmarshal([]byte(task.Props), &newTask.TaskProps) + if err != nil { + return nil, err + } + + return newTask, nil +} diff --git a/pkg/task/job.go b/pkg/task/job.go index caac69f..8f96514 100644 --- a/pkg/task/job.go +++ b/pkg/task/job.go @@ -13,6 +13,8 @@ const ( DecompressTaskType // TransferTaskType 中转任务 TransferTaskType + // ImportTaskType 导入任务 + ImportTaskType ) // 任务状态 @@ -41,6 +43,10 @@ const ( DownloadingProgress // Transferring 转存中 TransferringProgress + // ListingProgress 索引中 + ListingProgress + // InsertingProgress 插入中 + InsertingProgress ) // Job 任务接口 @@ -103,6 +109,8 @@ func GetJobFromModel(task *model.Task) (Job, error) { return NewDecompressTaskFromModel(task) case TransferTaskType: return NewTransferTaskFromModel(task) + case ImportTaskType: + return NewImportTaskFromModel(task) default: return nil, ErrUnknownTaskType } diff --git a/routers/router.go b/routers/router.go index 4adb23a..53232b8 100644 --- a/routers/router.go +++ b/routers/router.go @@ -387,6 +387,8 @@ func InitMasterRouter() *gin.Engine { task.POST("list", controllers.AdminListTask) // 删除 task.POST("delete", controllers.AdminDeleteTask) + // 新建文件导入任务 + task.POST("import", controllers.AdminCreateImportTask) } } diff --git a/service/admin/task.go b/service/admin/task.go index 98a5f35..a6a9b34 100644 --- a/service/admin/task.go +++ b/service/admin/task.go @@ -3,6 +3,7 @@ package admin import ( model "github.com/HFO4/cloudreve/models" "github.com/HFO4/cloudreve/pkg/serializer" + "github.com/HFO4/cloudreve/pkg/task" "github.com/gin-gonic/gin" "strings" ) @@ -12,6 +13,26 @@ type TaskBatchService struct { ID []uint `json:"id" binding:"min=1"` } +// ImportTaskService 导入任务 +type ImportTaskService struct { + UID uint `json:"uid" binding:"required"` + PolicyID uint `json:"policy_id" binding:"required"` + Src string `json:"src" binding:"required,min=1,max=65535"` + Dst string `json:"dst" binding:"required,min=1,max=65535"` + Recursive bool `json:"recursive"` +} + +// Create 新建导入任务 +func (service *ImportTaskService) Create(c *gin.Context, user *model.User) serializer.Response { + // 创建任务 + job, err := task.NewImportTask(service.UID, service.PolicyID, service.Src, service.Dst, service.Recursive) + if err != nil { + return serializer.Err(serializer.CodeNotSet, "任务创建失败", err) + } + task.TaskPoll.Submit(job) + return serializer.Response{} +} + // Delete 删除任务 func (service *TaskBatchService) Delete(c *gin.Context) serializer.Response { if err := model.DB.Where("id in (?)", service.ID).Delete(&model.Download{}).Error; err != nil {