refactor: refactor recycle aria2 temp file

pull/1422/head
XYenon 3 years ago
parent 915fefc5d4
commit b7c6155d3d

@ -38,8 +38,6 @@ const (
Downloading
// Paused 暂停中
Paused
// Seeding 做种中
Seeding
// Error 出错
Error
// Complete 完成
@ -48,6 +46,8 @@ const (
Canceled
// Unknown 未知状态
Unknown
// Seeding 做种中
Seeding
)
var (

@ -251,7 +251,7 @@ func (monitor *Monitor) Complete(pool task.Pool) bool {
// 转存完成,回收下载目录
if transferTask.Type == task.TransferTaskType && transferTask.Status >= task.Error {
job, err := task.NewRecycleTask(monitor.Task.UserID, monitor.Task.Parent, monitor.node.ID())
job, err := task.NewRecycleTask(monitor.Task)
if err != nil {
monitor.setErrorStatus(err)
monitor.RemoveTempFolder()

@ -122,42 +122,3 @@ func (d *Driver) List(ctx context.Context, path string, recursive bool) ([]respo
func (d *Driver) CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error {
return nil
}
func (d *Driver) Recycle(ctx context.Context, path string) error {
req := serializer.SlaveRecycleReq{
Path: path,
}
body, err := json.Marshal(req)
if err != nil {
return err
}
// 订阅回收结果
resChan := mq.GlobalMQ.Subscribe(req.Hash(model.GetSettingByName("siteID")), 0)
defer mq.GlobalMQ.Unsubscribe(req.Hash(model.GetSettingByName("siteID")), resChan)
res, err := d.client.Request("PUT", "task/recycle", bytes.NewReader(body)).
CheckHTTPResponse(200).
DecodeResponse()
if err != nil {
return err
}
if res.Code != 0 {
return serializer.NewErrorFromResponse(res)
}
// 等待回收结果或者超时
waitTimeout := model.GetIntSetting("slave_transfer_timeout", 172800)
select {
case <-time.After(time.Duration(waitTimeout) * time.Second):
return ErrWaitResultTimeout
case msg := <-resChan:
if msg.Event != serializer.SlaveRecycleSuccess {
return errors.New(msg.Content.(serializer.SlaveRecycleResult).Error)
}
}
return nil
}

@ -54,35 +54,15 @@ func (s *SlaveTransferReq) Hash(id string) string {
return fmt.Sprintf("%x", bs)
}
// SlaveRecycleReq 从机回收任务创建请求
type SlaveRecycleReq struct {
Path string `json:"path"`
}
// Hash 返回创建请求的唯一标识,保持创建请求幂等
func (s *SlaveRecycleReq) Hash(id string) string {
h := sha1.New()
h.Write([]byte(fmt.Sprintf("transfer-%s-%s", id, s.Path)))
bs := h.Sum(nil)
return fmt.Sprintf("%x", bs)
}
const (
SlaveTransferSuccess = "success"
SlaveTransferFailed = "failed"
SlaveRecycleSuccess = "success"
SlaveRecycleFailed = "failed"
)
type SlaveTransferResult struct {
Error string
}
type SlaveRecycleResult struct {
Error string
}
func init() {
gob.Register(SlaveTransferResult{})
gob.Register(SlaveRecycleResult{})
}

@ -1,14 +1,10 @@
package task
import (
"context"
"encoding/json"
"os"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/cluster"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver/shadow/slaveinmaster"
"github.com/cloudreve/Cloudreve/v3/pkg/util"
)
@ -22,10 +18,8 @@ type RecycleTask struct {
// RecycleProps 回收任务属性
type RecycleProps struct {
// 回收目录
Path string `json:"path"`
// 负责处理回收任务的节点ID
NodeID uint `json:"node_id"`
// 下载任务 GID
DownloadGID string `json:"download_gid"`
}
// Props 获取任务属性
@ -59,7 +53,6 @@ func (job *RecycleTask) SetError(err *JobError) {
job.Err = err
res, _ := json.Marshal(job.Err)
job.TaskModel.SetError(string(res))
}
// SetErrorMsg 设定任务失败信息
@ -78,51 +71,33 @@ func (job *RecycleTask) GetError() *JobError {
// Do 开始执行任务
func (job *RecycleTask) Do() {
if job.TaskProps.NodeID == 1 {
err := os.RemoveAll(job.TaskProps.Path)
if err != nil {
util.Log().Warning("无法删除中转临时目录[%s], %s", job.TaskProps.Path, err)
job.SetErrorMsg("文件回收失败", err)
}
} else {
// 指定为从机回收
// 创建文件系统
fs, err := filesystem.NewFileSystem(job.User)
if err != nil {
job.SetErrorMsg(err.Error(), nil)
return
}
// 获取从机节点
node := cluster.Default.GetNodeByID(job.TaskProps.NodeID)
if node == nil {
job.SetErrorMsg("从机节点不可用", nil)
}
// 切换为从机节点处理回收
fs.SwitchToSlaveHandler(node)
handler := fs.Handler.(*slaveinmaster.Driver)
err = handler.Recycle(context.Background(), job.TaskProps.Path)
if err != nil {
util.Log().Warning("无法删除中转临时目录[%s], %s", job.TaskProps.Path, err)
job.SetErrorMsg("文件回收失败", err)
}
download, err := model.GetDownloadByGid(job.TaskProps.DownloadGID, job.User.ID)
if err != nil {
util.Log().Warning("回收任务 %d 找不到下载记录", job.TaskModel.ID)
job.SetErrorMsg("无法找到下载任务", err)
return
}
}
// NewRecycleTask 新建回收任务
func NewRecycleTask(user uint, path string, node uint) (Job, error) {
creator, err := model.GetActiveUserByID(user)
nodeID := download.GetNodeID()
node := cluster.Default.GetNodeByID(nodeID)
if node == nil {
util.Log().Warning("回收任务 %d 找不到节点", job.TaskModel.ID)
job.SetErrorMsg("从机节点不可用", nil)
return
}
err = node.GetAria2Instance().DeleteTempFile(download)
if err != nil {
return nil, err
util.Log().Warning("无法删除中转临时目录[%s], %s", download.Parent, err)
job.SetErrorMsg("文件回收失败", err)
return
}
}
// NewRecycleTask 新建回收任务
func NewRecycleTask(download *model.Download) (Job, error) {
newTask := &RecycleTask{
User: &creator,
User: download.GetOwner(),
TaskProps: RecycleProps{
Path: path,
NodeID: node,
DownloadGID: download.GID,
},
}

@ -54,32 +54,6 @@ func TestRecycleTask_SetError(t *testing.T) {
asserts.Equal("error", task.GetError().Msg)
}
func TestRecycleTask_Do(t *testing.T) {
asserts := assert.New(t)
task := &RecycleTask{
TaskModel: &model.Task{
Model: gorm.Model{ID: 1},
},
}
// 目录不存在
{
task.TaskProps.Path = "test/not_exist"
task.User = &model.User{
Policy: model.Policy{
Type: "unknown",
},
}
mock.ExpectBegin()
mock.ExpectExec("UPDATE(.+)").WillReturnResult(sqlmock.NewResult(1,
1))
mock.ExpectCommit()
task.Do()
asserts.NoError(mock.ExpectationsWereMet())
asserts.NotEmpty(task.GetError().Msg)
}
}
func TestNewRecycleTask(t *testing.T) {
asserts := assert.New(t)
@ -89,7 +63,13 @@ func TestNewRecycleTask(t *testing.T) {
mock.ExpectBegin()
mock.ExpectExec("INSERT(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
job, err := NewRecycleTask(1, "/", 0)
job, err := NewRecycleTask(&model.Download{
Model: gorm.Model{ID: 1},
GID: "test_g_id",
Parent: "/",
UserID: 1,
NodeID: 1,
})
asserts.NoError(mock.ExpectationsWereMet())
asserts.NotNil(job)
asserts.NoError(err)
@ -101,7 +81,13 @@ func TestNewRecycleTask(t *testing.T) {
mock.ExpectBegin()
mock.ExpectExec("INSERT(.+)").WillReturnError(errors.New("error"))
mock.ExpectRollback()
job, err := NewRecycleTask(1, "test/not_exist", 0)
job, err := NewRecycleTask(&model.Download{
Model: gorm.Model{ID: 1},
GID: "test_g_id",
Parent: "test/not_exist",
UserID: 1,
NodeID: 1,
})
asserts.NoError(mock.ExpectationsWereMet())
asserts.Nil(job)
asserts.Error(err)

@ -1,95 +0,0 @@
package slavetask
import (
"os"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/cluster"
"github.com/cloudreve/Cloudreve/v3/pkg/mq"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"github.com/cloudreve/Cloudreve/v3/pkg/task"
"github.com/cloudreve/Cloudreve/v3/pkg/util"
)
// RecycleTask 文件回收任务
type RecycleTask struct {
Err *task.JobError
Req *serializer.SlaveRecycleReq
MasterID string
}
// Props 获取任务属性
func (job *RecycleTask) Props() string {
return ""
}
// Type 获取任务类型
func (job *RecycleTask) Type() int {
return 0
}
// Creator 获取创建者ID
func (job *RecycleTask) Creator() uint {
return 0
}
// Model 获取任务的数据库模型
func (job *RecycleTask) Model() *model.Task {
return nil
}
// SetStatus 设定状态
func (job *RecycleTask) SetStatus(status int) {
}
// SetError 设定任务失败信息
func (job *RecycleTask) SetError(err *task.JobError) {
job.Err = err
}
// SetErrorMsg 设定任务失败信息
func (job *RecycleTask) SetErrorMsg(msg string, err error) {
jobErr := &task.JobError{Msg: msg}
if err != nil {
jobErr.Error = err.Error()
}
job.SetError(jobErr)
notifyMsg := mq.Message{
TriggeredBy: job.MasterID,
Event: serializer.SlaveRecycleFailed,
Content: serializer.SlaveRecycleResult{
Error: err.Error(),
},
}
if err = cluster.DefaultController.SendNotification(job.MasterID, job.Req.Hash(job.MasterID), notifyMsg); err != nil {
util.Log().Warning("无法发送回收失败通知到从机, %s", err)
}
}
// GetError 返回任务失败信息
func (job *RecycleTask) GetError() *task.JobError {
return job.Err
}
// Do 开始执行任务
func (job *RecycleTask) Do() {
err := os.RemoveAll(job.Req.Path)
if err != nil {
util.Log().Warning("无法删除中转临时文件[%s], %s", job.Req.Path, err)
job.SetErrorMsg("文件回收失败", err)
return
}
msg := mq.Message{
TriggeredBy: job.MasterID,
Event: serializer.SlaveRecycleSuccess,
Content: serializer.SlaveRecycleResult{},
}
if err = cluster.DefaultController.SendNotification(job.MasterID, job.Req.Hash(job.MasterID), msg); err != nil {
util.Log().Warning("无法发送回收成功通知到从机, %s", err)
}
}

@ -212,17 +212,6 @@ func SlaveCreateTransferTask(c *gin.Context) {
}
}
// SlaveCreateRecycleTask 从机创建回收任务
func SlaveCreateRecycleTask(c *gin.Context) {
var service serializer.SlaveRecycleReq
if err := c.ShouldBindJSON(&service); err == nil {
res := explorer.CreateRecycleTask(c, &service)
c.JSON(200, res)
} else {
c.JSON(200, ErrorResponse(err))
}
}
// SlaveNotificationPush 处理从机发送的消息推送
func SlaveNotificationPush(c *gin.Context) {
var service node.SlaveNotificationService

@ -88,7 +88,6 @@ func InitSlaveRouter() *gin.Engine {
task := v3.Group("task")
{
task.PUT("transfer", controllers.SlaveCreateTransferTask)
task.PUT("recycle", controllers.SlaveCreateRecycleTask)
}
}
return r

@ -57,7 +57,7 @@ func (service *DownloadTaskService) Delete(c *gin.Context) serializer.Response {
return serializer.Err(serializer.CodeNotFound, "Download record not found", err)
}
if download.Status >= common.Error {
if download.Status >= common.Error && download.Status <= common.Unknown {
// 如果任务已完成,则删除任务记录
if err := download.Delete(); err != nil {
return serializer.DBErr("Failed to delete task record", err)

@ -166,26 +166,6 @@ func CreateTransferTask(c *gin.Context, req *serializer.SlaveTransferReq) serial
return serializer.ParamErr("未知的主机节点ID", nil)
}
// CreateRecycleTask 创建从机文件回收任务
func CreateRecycleTask(c *gin.Context, req *serializer.SlaveRecycleReq) serializer.Response {
if id, ok := c.Get("MasterSiteID"); ok {
job := &slavetask.RecycleTask{
Req: req,
MasterID: id.(string),
}
if err := cluster.DefaultController.SubmitTask(job.MasterID, job, req.Hash(job.MasterID), func(job interface{}) {
task.TaskPoll.Submit(job.(task.Job))
}); err != nil {
return serializer.Err(serializer.CodeCreateTaskError, "", err)
}
return serializer.Response{}
}
return serializer.ParamErr("未知的主机节点ID", nil)
}
// SlaveListService 从机上传会话服务
type SlaveCreateUploadSessionService struct {
Session serializer.UploadSession `json:"session" binding:"required"`

Loading…
Cancel
Save