|
|
package monitor
|
|
|
|
|
|
import (
|
|
|
"context"
|
|
|
"encoding/json"
|
|
|
"errors"
|
|
|
"path/filepath"
|
|
|
"strconv"
|
|
|
"time"
|
|
|
|
|
|
model "github.com/cloudreve/Cloudreve/v3/models"
|
|
|
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/common"
|
|
|
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/rpc"
|
|
|
"github.com/cloudreve/Cloudreve/v3/pkg/cluster"
|
|
|
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem"
|
|
|
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx"
|
|
|
"github.com/cloudreve/Cloudreve/v3/pkg/mq"
|
|
|
"github.com/cloudreve/Cloudreve/v3/pkg/task"
|
|
|
"github.com/cloudreve/Cloudreve/v3/pkg/util"
|
|
|
)
|
|
|
|
|
|
// Monitor 离线下载状态监控
|
|
|
type Monitor struct {
|
|
|
Task *model.Download
|
|
|
Interval time.Duration
|
|
|
|
|
|
notifier <-chan mq.Message
|
|
|
node cluster.Node
|
|
|
retried int
|
|
|
}
|
|
|
|
|
|
var MAX_RETRY = 10
|
|
|
|
|
|
// NewMonitor 新建离线下载状态监控
|
|
|
func NewMonitor(task *model.Download, pool cluster.Pool, mqClient mq.MQ) {
|
|
|
monitor := &Monitor{
|
|
|
Task: task,
|
|
|
notifier: make(chan mq.Message),
|
|
|
node: pool.GetNodeByID(task.GetNodeID()),
|
|
|
}
|
|
|
|
|
|
if monitor.node != nil {
|
|
|
monitor.Interval = time.Duration(monitor.node.GetAria2Instance().GetConfig().Interval) * time.Second
|
|
|
go monitor.Loop(mqClient)
|
|
|
|
|
|
monitor.notifier = mqClient.Subscribe(monitor.Task.GID, 0)
|
|
|
} else {
|
|
|
monitor.setErrorStatus(errors.New("节点不可用"))
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// Loop 开启监控循环
|
|
|
func (monitor *Monitor) Loop(mqClient mq.MQ) {
|
|
|
defer mqClient.Unsubscribe(monitor.Task.GID, monitor.notifier)
|
|
|
|
|
|
// 首次循环立即更新
|
|
|
interval := 50 * time.Millisecond
|
|
|
|
|
|
for {
|
|
|
select {
|
|
|
case <-monitor.notifier:
|
|
|
if monitor.Update() {
|
|
|
return
|
|
|
}
|
|
|
case <-time.After(interval):
|
|
|
interval = monitor.Interval
|
|
|
if monitor.Update() {
|
|
|
return
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// Update 更新状态,返回值表示是否退出监控
|
|
|
func (monitor *Monitor) Update() bool {
|
|
|
status, err := monitor.node.GetAria2Instance().Status(monitor.Task)
|
|
|
|
|
|
if err != nil {
|
|
|
monitor.retried++
|
|
|
util.Log().Warning("无法获取下载任务[%s]的状态,%s", monitor.Task.GID, err)
|
|
|
|
|
|
// 十次重试后认定为任务失败
|
|
|
if monitor.retried > MAX_RETRY {
|
|
|
util.Log().Warning("无法获取下载任务[%s]的状态,超过最大重试次数限制,%s", monitor.Task.GID, err)
|
|
|
monitor.setErrorStatus(err)
|
|
|
monitor.RemoveTempFolder()
|
|
|
return true
|
|
|
}
|
|
|
|
|
|
return false
|
|
|
}
|
|
|
monitor.retried = 0
|
|
|
|
|
|
// 磁力链下载需要跟随
|
|
|
if len(status.FollowedBy) > 0 {
|
|
|
util.Log().Debug("离线下载[%s]重定向至[%s]", monitor.Task.GID, status.FollowedBy[0])
|
|
|
monitor.Task.GID = status.FollowedBy[0]
|
|
|
monitor.Task.Save()
|
|
|
return false
|
|
|
}
|
|
|
|
|
|
// 更新任务信息
|
|
|
if err := monitor.UpdateTaskInfo(status); err != nil {
|
|
|
util.Log().Warning("无法更新下载任务[%s]的任务信息[%s],", monitor.Task.GID, err)
|
|
|
monitor.setErrorStatus(err)
|
|
|
monitor.RemoveTempFolder()
|
|
|
return true
|
|
|
}
|
|
|
|
|
|
util.Log().Debug("离线下载[%s]更新状态[%s]", status.Gid, status.Status)
|
|
|
|
|
|
switch status.Status {
|
|
|
case "complete":
|
|
|
return monitor.Complete(task.TaskPoll)
|
|
|
case "error":
|
|
|
return monitor.Error(status)
|
|
|
case "active", "waiting", "paused":
|
|
|
return false
|
|
|
case "removed":
|
|
|
monitor.Task.Status = common.Canceled
|
|
|
monitor.Task.Save()
|
|
|
monitor.RemoveTempFolder()
|
|
|
return true
|
|
|
default:
|
|
|
util.Log().Warning("下载任务[%s]返回未知状态信息[%s],", monitor.Task.GID, status.Status)
|
|
|
return true
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// UpdateTaskInfo 更新数据库中的任务信息
|
|
|
func (monitor *Monitor) UpdateTaskInfo(status rpc.StatusInfo) error {
|
|
|
originSize := monitor.Task.TotalSize
|
|
|
|
|
|
monitor.Task.GID = status.Gid
|
|
|
monitor.Task.Status = common.GetStatus(status.Status)
|
|
|
|
|
|
// 文件大小、已下载大小
|
|
|
total, err := strconv.ParseUint(status.TotalLength, 10, 64)
|
|
|
if err != nil {
|
|
|
total = 0
|
|
|
}
|
|
|
downloaded, err := strconv.ParseUint(status.CompletedLength, 10, 64)
|
|
|
if err != nil {
|
|
|
downloaded = 0
|
|
|
}
|
|
|
monitor.Task.TotalSize = total
|
|
|
monitor.Task.DownloadedSize = downloaded
|
|
|
monitor.Task.GID = status.Gid
|
|
|
monitor.Task.Parent = status.Dir
|
|
|
|
|
|
// 下载速度
|
|
|
speed, err := strconv.Atoi(status.DownloadSpeed)
|
|
|
if err != nil {
|
|
|
speed = 0
|
|
|
}
|
|
|
|
|
|
monitor.Task.Speed = speed
|
|
|
attrs, _ := json.Marshal(status)
|
|
|
monitor.Task.Attrs = string(attrs)
|
|
|
|
|
|
if err := monitor.Task.Save(); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
if originSize != monitor.Task.TotalSize {
|
|
|
// 文件大小更新后,对文件限制等进行校验
|
|
|
if err := monitor.ValidateFile(); err != nil {
|
|
|
// 验证失败时取消任务
|
|
|
monitor.node.GetAria2Instance().Cancel(monitor.Task)
|
|
|
return err
|
|
|
}
|
|
|
}
|
|
|
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
// ValidateFile 上传过程中校验文件大小、文件名
|
|
|
func (monitor *Monitor) ValidateFile() error {
|
|
|
// 找到任务创建者
|
|
|
user := monitor.Task.GetOwner()
|
|
|
if user == nil {
|
|
|
return common.ErrUserNotFound
|
|
|
}
|
|
|
|
|
|
// 创建文件系统
|
|
|
fs, err := filesystem.NewFileSystem(user)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
defer fs.Recycle()
|
|
|
|
|
|
// 创建上下文环境
|
|
|
file := &fsctx.FileStream{
|
|
|
Size: monitor.Task.TotalSize,
|
|
|
}
|
|
|
|
|
|
// 验证用户容量
|
|
|
if err := filesystem.HookValidateCapacity(context.Background(), fs, file); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
// 验证每个文件
|
|
|
for _, fileInfo := range monitor.Task.StatusInfo.Files {
|
|
|
if fileInfo.Selected == "true" {
|
|
|
// 创建上下文环境
|
|
|
fileSize, _ := strconv.ParseUint(fileInfo.Length, 10, 64)
|
|
|
file := &fsctx.FileStream{
|
|
|
Size: fileSize,
|
|
|
Name: filepath.Base(fileInfo.Path),
|
|
|
}
|
|
|
if err := filesystem.HookValidateFile(context.Background(), fs, file); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
// Error 任务下载出错处理,返回是否中断监控
|
|
|
func (monitor *Monitor) Error(status rpc.StatusInfo) bool {
|
|
|
monitor.setErrorStatus(errors.New(status.ErrorMessage))
|
|
|
|
|
|
// 清理临时文件
|
|
|
monitor.RemoveTempFolder()
|
|
|
|
|
|
return true
|
|
|
}
|
|
|
|
|
|
// RemoveTempFolder 清理下载临时目录
|
|
|
func (monitor *Monitor) RemoveTempFolder() {
|
|
|
monitor.node.GetAria2Instance().DeleteTempFile(monitor.Task)
|
|
|
}
|
|
|
|
|
|
// Complete 完成下载,返回是否中断监控
|
|
|
func (monitor *Monitor) Complete(pool task.Pool) bool {
|
|
|
// 创建中转任务
|
|
|
file := make([]string, 0, len(monitor.Task.StatusInfo.Files))
|
|
|
sizes := make(map[string]uint64, len(monitor.Task.StatusInfo.Files))
|
|
|
for i := 0; i < len(monitor.Task.StatusInfo.Files); i++ {
|
|
|
fileInfo := monitor.Task.StatusInfo.Files[i]
|
|
|
if fileInfo.Selected == "true" {
|
|
|
file = append(file, fileInfo.Path)
|
|
|
size, _ := strconv.ParseUint(fileInfo.Length, 10, 64)
|
|
|
sizes[fileInfo.Path] = size
|
|
|
}
|
|
|
}
|
|
|
|
|
|
job, err := task.NewTransferTask(
|
|
|
monitor.Task.UserID,
|
|
|
file,
|
|
|
monitor.Task.Dst,
|
|
|
monitor.Task.Parent,
|
|
|
true,
|
|
|
monitor.node.ID(),
|
|
|
sizes,
|
|
|
)
|
|
|
if err != nil {
|
|
|
monitor.setErrorStatus(err)
|
|
|
monitor.RemoveTempFolder()
|
|
|
return true
|
|
|
}
|
|
|
|
|
|
// 提交中转任务
|
|
|
pool.Submit(job)
|
|
|
|
|
|
// 更新任务ID
|
|
|
monitor.Task.TaskID = job.Model().ID
|
|
|
monitor.Task.Save()
|
|
|
|
|
|
return true
|
|
|
}
|
|
|
|
|
|
func (monitor *Monitor) setErrorStatus(err error) {
|
|
|
monitor.Task.Status = common.Error
|
|
|
monitor.Task.Error = err.Error()
|
|
|
monitor.Task.Save()
|
|
|
}
|