diff --git a/pkg/aria2/common/common.go b/pkg/aria2/common/common.go index d4a8313d..455c89f0 100644 --- a/pkg/aria2/common/common.go +++ b/pkg/aria2/common/common.go @@ -38,8 +38,6 @@ const ( Downloading // Paused 暂停中 Paused - // Seeding 做种中 - Seeding // Error 出错 Error // Complete 完成 @@ -48,6 +46,8 @@ const ( Canceled // Unknown 未知状态 Unknown + // Seeding 做种中 + Seeding ) var ( diff --git a/pkg/aria2/monitor/monitor.go b/pkg/aria2/monitor/monitor.go index 6f6de7e9..531d6edd 100644 --- a/pkg/aria2/monitor/monitor.go +++ b/pkg/aria2/monitor/monitor.go @@ -251,7 +251,7 @@ func (monitor *Monitor) Complete(pool task.Pool) bool { // 转存完成,回收下载目录 if transferTask.Type == task.TransferTaskType && transferTask.Status >= task.Error { - job, err := task.NewRecycleTask(monitor.Task.UserID, monitor.Task.Parent, monitor.node.ID()) + job, err := task.NewRecycleTask(monitor.Task) if err != nil { monitor.setErrorStatus(err) monitor.RemoveTempFolder() diff --git a/pkg/filesystem/driver/shadow/slaveinmaster/handler.go b/pkg/filesystem/driver/shadow/slaveinmaster/handler.go index 84116394..4dd9da87 100644 --- a/pkg/filesystem/driver/shadow/slaveinmaster/handler.go +++ b/pkg/filesystem/driver/shadow/slaveinmaster/handler.go @@ -122,42 +122,3 @@ func (d *Driver) List(ctx context.Context, path string, recursive bool) ([]respo func (d *Driver) CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error { return nil } - -func (d *Driver) Recycle(ctx context.Context, path string) error { - req := serializer.SlaveRecycleReq{ - Path: path, - } - - body, err := json.Marshal(req) - if err != nil { - return err - } - - // 订阅回收结果 - resChan := mq.GlobalMQ.Subscribe(req.Hash(model.GetSettingByName("siteID")), 0) - defer mq.GlobalMQ.Unsubscribe(req.Hash(model.GetSettingByName("siteID")), resChan) - - res, err := d.client.Request("PUT", "task/recycle", bytes.NewReader(body)). - CheckHTTPResponse(200). - DecodeResponse() - if err != nil { - return err - } - - if res.Code != 0 { - return serializer.NewErrorFromResponse(res) - } - - // 等待回收结果或者超时 - waitTimeout := model.GetIntSetting("slave_transfer_timeout", 172800) - select { - case <-time.After(time.Duration(waitTimeout) * time.Second): - return ErrWaitResultTimeout - case msg := <-resChan: - if msg.Event != serializer.SlaveRecycleSuccess { - return errors.New(msg.Content.(serializer.SlaveRecycleResult).Error) - } - } - - return nil -} diff --git a/pkg/serializer/slave.go b/pkg/serializer/slave.go index 4179d455..04d56d3d 100644 --- a/pkg/serializer/slave.go +++ b/pkg/serializer/slave.go @@ -54,35 +54,15 @@ func (s *SlaveTransferReq) Hash(id string) string { return fmt.Sprintf("%x", bs) } -// SlaveRecycleReq 从机回收任务创建请求 -type SlaveRecycleReq struct { - Path string `json:"path"` -} - -// Hash 返回创建请求的唯一标识,保持创建请求幂等 -func (s *SlaveRecycleReq) Hash(id string) string { - h := sha1.New() - h.Write([]byte(fmt.Sprintf("transfer-%s-%s", id, s.Path))) - bs := h.Sum(nil) - return fmt.Sprintf("%x", bs) -} - const ( SlaveTransferSuccess = "success" SlaveTransferFailed = "failed" - SlaveRecycleSuccess = "success" - SlaveRecycleFailed = "failed" ) type SlaveTransferResult struct { Error string } -type SlaveRecycleResult struct { - Error string -} - func init() { gob.Register(SlaveTransferResult{}) - gob.Register(SlaveRecycleResult{}) } diff --git a/pkg/task/recycle.go b/pkg/task/recycle.go index 23abd96b..17eaf3c2 100644 --- a/pkg/task/recycle.go +++ b/pkg/task/recycle.go @@ -1,14 +1,10 @@ package task import ( - "context" "encoding/json" - "os" model "github.com/cloudreve/Cloudreve/v3/models" "github.com/cloudreve/Cloudreve/v3/pkg/cluster" - "github.com/cloudreve/Cloudreve/v3/pkg/filesystem" - "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver/shadow/slaveinmaster" "github.com/cloudreve/Cloudreve/v3/pkg/util" ) @@ -22,10 +18,8 @@ type RecycleTask struct { // RecycleProps 回收任务属性 type RecycleProps struct { - // 回收目录 - Path string `json:"path"` - // 负责处理回收任务的节点ID - NodeID uint `json:"node_id"` + // 下载任务 GID + DownloadGID string `json:"download_gid"` } // Props 获取任务属性 @@ -59,7 +53,6 @@ func (job *RecycleTask) SetError(err *JobError) { job.Err = err res, _ := json.Marshal(job.Err) job.TaskModel.SetError(string(res)) - } // SetErrorMsg 设定任务失败信息 @@ -78,51 +71,33 @@ func (job *RecycleTask) GetError() *JobError { // Do 开始执行任务 func (job *RecycleTask) Do() { - if job.TaskProps.NodeID == 1 { - err := os.RemoveAll(job.TaskProps.Path) - if err != nil { - util.Log().Warning("无法删除中转临时目录[%s], %s", job.TaskProps.Path, err) - job.SetErrorMsg("文件回收失败", err) - } - } else { - // 指定为从机回收 - - // 创建文件系统 - fs, err := filesystem.NewFileSystem(job.User) - if err != nil { - job.SetErrorMsg(err.Error(), nil) - return - } - - // 获取从机节点 - node := cluster.Default.GetNodeByID(job.TaskProps.NodeID) - if node == nil { - job.SetErrorMsg("从机节点不可用", nil) - } - - // 切换为从机节点处理回收 - fs.SwitchToSlaveHandler(node) - handler := fs.Handler.(*slaveinmaster.Driver) - err = handler.Recycle(context.Background(), job.TaskProps.Path) - if err != nil { - util.Log().Warning("无法删除中转临时目录[%s], %s", job.TaskProps.Path, err) - job.SetErrorMsg("文件回收失败", err) - } + download, err := model.GetDownloadByGid(job.TaskProps.DownloadGID, job.User.ID) + if err != nil { + util.Log().Warning("回收任务 %d 找不到下载记录", job.TaskModel.ID) + job.SetErrorMsg("无法找到下载任务", err) + return } -} - -// NewRecycleTask 新建回收任务 -func NewRecycleTask(user uint, path string, node uint) (Job, error) { - creator, err := model.GetActiveUserByID(user) + nodeID := download.GetNodeID() + node := cluster.Default.GetNodeByID(nodeID) + if node == nil { + util.Log().Warning("回收任务 %d 找不到节点", job.TaskModel.ID) + job.SetErrorMsg("从机节点不可用", nil) + return + } + err = node.GetAria2Instance().DeleteTempFile(download) if err != nil { - return nil, err + util.Log().Warning("无法删除中转临时目录[%s], %s", download.Parent, err) + job.SetErrorMsg("文件回收失败", err) + return } +} +// NewRecycleTask 新建回收任务 +func NewRecycleTask(download *model.Download) (Job, error) { newTask := &RecycleTask{ - User: &creator, + User: download.GetOwner(), TaskProps: RecycleProps{ - Path: path, - NodeID: node, + DownloadGID: download.GID, }, } diff --git a/pkg/task/recycle_test.go b/pkg/task/recycle_test.go index 3fad4778..0092a30c 100644 --- a/pkg/task/recycle_test.go +++ b/pkg/task/recycle_test.go @@ -54,32 +54,6 @@ func TestRecycleTask_SetError(t *testing.T) { asserts.Equal("error", task.GetError().Msg) } -func TestRecycleTask_Do(t *testing.T) { - asserts := assert.New(t) - task := &RecycleTask{ - TaskModel: &model.Task{ - Model: gorm.Model{ID: 1}, - }, - } - - // 目录不存在 - { - task.TaskProps.Path = "test/not_exist" - task.User = &model.User{ - Policy: model.Policy{ - Type: "unknown", - }, - } - mock.ExpectBegin() - mock.ExpectExec("UPDATE(.+)").WillReturnResult(sqlmock.NewResult(1, - 1)) - mock.ExpectCommit() - task.Do() - asserts.NoError(mock.ExpectationsWereMet()) - asserts.NotEmpty(task.GetError().Msg) - } -} - func TestNewRecycleTask(t *testing.T) { asserts := assert.New(t) @@ -89,7 +63,13 @@ func TestNewRecycleTask(t *testing.T) { mock.ExpectBegin() mock.ExpectExec("INSERT(.+)").WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() - job, err := NewRecycleTask(1, "/", 0) + job, err := NewRecycleTask(&model.Download{ + Model: gorm.Model{ID: 1}, + GID: "test_g_id", + Parent: "/", + UserID: 1, + NodeID: 1, + }) asserts.NoError(mock.ExpectationsWereMet()) asserts.NotNil(job) asserts.NoError(err) @@ -101,7 +81,13 @@ func TestNewRecycleTask(t *testing.T) { mock.ExpectBegin() mock.ExpectExec("INSERT(.+)").WillReturnError(errors.New("error")) mock.ExpectRollback() - job, err := NewRecycleTask(1, "test/not_exist", 0) + job, err := NewRecycleTask(&model.Download{ + Model: gorm.Model{ID: 1}, + GID: "test_g_id", + Parent: "test/not_exist", + UserID: 1, + NodeID: 1, + }) asserts.NoError(mock.ExpectationsWereMet()) asserts.Nil(job) asserts.Error(err) diff --git a/pkg/task/slavetask/recycle.go b/pkg/task/slavetask/recycle.go deleted file mode 100644 index d8c7bc8d..00000000 --- a/pkg/task/slavetask/recycle.go +++ /dev/null @@ -1,95 +0,0 @@ -package slavetask - -import ( - "os" - - model "github.com/cloudreve/Cloudreve/v3/models" - "github.com/cloudreve/Cloudreve/v3/pkg/cluster" - "github.com/cloudreve/Cloudreve/v3/pkg/mq" - "github.com/cloudreve/Cloudreve/v3/pkg/serializer" - "github.com/cloudreve/Cloudreve/v3/pkg/task" - "github.com/cloudreve/Cloudreve/v3/pkg/util" -) - -// RecycleTask 文件回收任务 -type RecycleTask struct { - Err *task.JobError - Req *serializer.SlaveRecycleReq - MasterID string -} - -// Props 获取任务属性 -func (job *RecycleTask) Props() string { - return "" -} - -// Type 获取任务类型 -func (job *RecycleTask) Type() int { - return 0 -} - -// Creator 获取创建者ID -func (job *RecycleTask) Creator() uint { - return 0 -} - -// Model 获取任务的数据库模型 -func (job *RecycleTask) Model() *model.Task { - return nil -} - -// SetStatus 设定状态 -func (job *RecycleTask) SetStatus(status int) { -} - -// SetError 设定任务失败信息 -func (job *RecycleTask) SetError(err *task.JobError) { - job.Err = err -} - -// SetErrorMsg 设定任务失败信息 -func (job *RecycleTask) SetErrorMsg(msg string, err error) { - jobErr := &task.JobError{Msg: msg} - if err != nil { - jobErr.Error = err.Error() - } - - job.SetError(jobErr) - - notifyMsg := mq.Message{ - TriggeredBy: job.MasterID, - Event: serializer.SlaveRecycleFailed, - Content: serializer.SlaveRecycleResult{ - Error: err.Error(), - }, - } - - if err = cluster.DefaultController.SendNotification(job.MasterID, job.Req.Hash(job.MasterID), notifyMsg); err != nil { - util.Log().Warning("无法发送回收失败通知到从机, %s", err) - } -} - -// GetError 返回任务失败信息 -func (job *RecycleTask) GetError() *task.JobError { - return job.Err -} - -// Do 开始执行任务 -func (job *RecycleTask) Do() { - err := os.RemoveAll(job.Req.Path) - if err != nil { - util.Log().Warning("无法删除中转临时文件[%s], %s", job.Req.Path, err) - job.SetErrorMsg("文件回收失败", err) - return - } - - msg := mq.Message{ - TriggeredBy: job.MasterID, - Event: serializer.SlaveRecycleSuccess, - Content: serializer.SlaveRecycleResult{}, - } - - if err = cluster.DefaultController.SendNotification(job.MasterID, job.Req.Hash(job.MasterID), msg); err != nil { - util.Log().Warning("无法发送回收成功通知到从机, %s", err) - } -} diff --git a/routers/controllers/slave.go b/routers/controllers/slave.go index 2b5b15ce..e1e7de22 100644 --- a/routers/controllers/slave.go +++ b/routers/controllers/slave.go @@ -212,17 +212,6 @@ func SlaveCreateTransferTask(c *gin.Context) { } } -// SlaveCreateRecycleTask 从机创建回收任务 -func SlaveCreateRecycleTask(c *gin.Context) { - var service serializer.SlaveRecycleReq - if err := c.ShouldBindJSON(&service); err == nil { - res := explorer.CreateRecycleTask(c, &service) - c.JSON(200, res) - } else { - c.JSON(200, ErrorResponse(err)) - } -} - // SlaveNotificationPush 处理从机发送的消息推送 func SlaveNotificationPush(c *gin.Context) { var service node.SlaveNotificationService diff --git a/routers/router.go b/routers/router.go index f7586b3a..0727fe6f 100644 --- a/routers/router.go +++ b/routers/router.go @@ -88,7 +88,6 @@ func InitSlaveRouter() *gin.Engine { task := v3.Group("task") { task.PUT("transfer", controllers.SlaveCreateTransferTask) - task.PUT("recycle", controllers.SlaveCreateRecycleTask) } } return r diff --git a/service/aria2/manage.go b/service/aria2/manage.go index 6f55a8f9..115a4405 100644 --- a/service/aria2/manage.go +++ b/service/aria2/manage.go @@ -57,7 +57,7 @@ func (service *DownloadTaskService) Delete(c *gin.Context) serializer.Response { return serializer.Err(serializer.CodeNotFound, "Download record not found", err) } - if download.Status >= common.Error { + if download.Status >= common.Error && download.Status <= common.Unknown { // 如果任务已完成,则删除任务记录 if err := download.Delete(); err != nil { return serializer.DBErr("Failed to delete task record", err) diff --git a/service/explorer/slave.go b/service/explorer/slave.go index 253fcb87..afb61af6 100644 --- a/service/explorer/slave.go +++ b/service/explorer/slave.go @@ -166,26 +166,6 @@ func CreateTransferTask(c *gin.Context, req *serializer.SlaveTransferReq) serial return serializer.ParamErr("未知的主机节点ID", nil) } -// CreateRecycleTask 创建从机文件回收任务 -func CreateRecycleTask(c *gin.Context, req *serializer.SlaveRecycleReq) serializer.Response { - if id, ok := c.Get("MasterSiteID"); ok { - job := &slavetask.RecycleTask{ - Req: req, - MasterID: id.(string), - } - - if err := cluster.DefaultController.SubmitTask(job.MasterID, job, req.Hash(job.MasterID), func(job interface{}) { - task.TaskPoll.Submit(job.(task.Job)) - }); err != nil { - return serializer.Err(serializer.CodeCreateTaskError, "", err) - } - - return serializer.Response{} - } - - return serializer.ParamErr("未知的主机节点ID", nil) -} - // SlaveListService 从机上传会话服务 type SlaveCreateUploadSessionService struct { Session serializer.UploadSession `json:"session" binding:"required"`