You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
cloudreve/pkg/aria2/monitor.go

269 lines
6.3 KiB

package aria2
import (
"context"
"encoding/json"
"errors"
model "github.com/HFO4/cloudreve/models"
"github.com/HFO4/cloudreve/pkg/aria2/rpc"
"github.com/HFO4/cloudreve/pkg/filesystem"
"github.com/HFO4/cloudreve/pkg/filesystem/driver/local"
"github.com/HFO4/cloudreve/pkg/filesystem/fsctx"
"github.com/HFO4/cloudreve/pkg/task"
"github.com/HFO4/cloudreve/pkg/util"
"os"
"path/filepath"
"strconv"
"time"
)
// Monitor 离线下载状态监控
type Monitor struct {
Task *model.Download
Interval time.Duration
notifier chan StatusEvent
retried int
}
// StatusEvent 状态改变事件
type StatusEvent struct {
GID string
Status int
}
var MAX_RETRY = 10
// NewMonitor 新建上传状态监控
func NewMonitor(task *model.Download) {
monitor := &Monitor{
Task: task,
Interval: time.Duration(model.GetIntSetting("aria2_interval", 10)) * time.Second,
notifier: make(chan StatusEvent),
}
go monitor.Loop()
EventNotifier.Subscribe(monitor.notifier, monitor.Task.GID)
}
// Loop 开启监控循环
func (monitor *Monitor) Loop() {
defer EventNotifier.Unsubscribe(monitor.Task.GID)
// 首次循环立即更新
interval := time.Duration(0)
for {
select {
case <-monitor.notifier:
if monitor.Update() {
return
}
case <-time.After(interval):
interval = monitor.Interval
if monitor.Update() {
return
}
}
}
}
// Update 更新状态,返回值表示是否退出监控
func (monitor *Monitor) Update() bool {
status, err := Instance.Status(monitor.Task)
if err != nil {
monitor.retried++
util.Log().Warning("无法获取下载任务[%s]的状态,%s", monitor.Task.GID, err)
// 十次重试后认定为任务失败
if monitor.retried > MAX_RETRY {
util.Log().Warning("无法获取下载任务[%s]的状态,超过最大重试次数限制,%s", monitor.Task.GID, err)
monitor.setErrorStatus(err)
monitor.RemoveTempFolder()
return true
}
return false
}
monitor.retried = 0
// 磁力链下载需要跟随
if len(status.FollowedBy) > 0 {
util.Log().Debug("离线下载[%s]重定向至[%s]", monitor.Task.GID, status.FollowedBy[0])
monitor.Task.GID = status.FollowedBy[0]
monitor.Task.Save()
return false
}
// 更新任务信息
if err := monitor.UpdateTaskInfo(status); err != nil {
util.Log().Warning("无法更新下载任务[%s]的任务信息[%s]", monitor.Task.GID, err)
monitor.setErrorStatus(err)
return true
}
util.Log().Debug("离线下载[%s]更新状态[%s]", status.Gid, status.Status)
switch status.Status {
case "complete":
return monitor.Complete(status)
case "error":
return monitor.Error(status)
case "active", "waiting", "paused":
return false
case "removed":
monitor.Task.Status = Canceled
monitor.Task.Save()
monitor.RemoveTempFolder()
return true
default:
util.Log().Warning("下载任务[%s]返回未知状态信息[%s]", monitor.Task.GID, status.Status)
return true
}
}
// UpdateTaskInfo 更新数据库中的任务信息
func (monitor *Monitor) UpdateTaskInfo(status rpc.StatusInfo) error {
originSize := monitor.Task.TotalSize
monitor.Task.GID = status.Gid
monitor.Task.Status = getStatus(status.Status)
// 文件大小、已下载大小
total, err := strconv.ParseUint(status.TotalLength, 10, 64)
if err != nil {
total = 0
}
downloaded, err := strconv.ParseUint(status.CompletedLength, 10, 64)
if err != nil {
downloaded = 0
}
monitor.Task.TotalSize = total
monitor.Task.DownloadedSize = downloaded
monitor.Task.GID = status.Gid
monitor.Task.Parent = status.Dir
// 下载速度
speed, err := strconv.Atoi(status.DownloadSpeed)
if err != nil {
speed = 0
}
monitor.Task.Speed = speed
attrs, _ := json.Marshal(status)
monitor.Task.Attrs = string(attrs)
if err := monitor.Task.Save(); err != nil {
return err
}
if originSize != monitor.Task.TotalSize {
// 文件大小更新后,对文件限制等进行校验
if err := monitor.ValidateFile(); err != nil {
// 验证失败时取消任务
Instance.Cancel(monitor.Task)
return err
}
}
return nil
}
// ValidateFile 上传过程中校验文件大小、文件名
func (monitor *Monitor) ValidateFile() error {
// 找到任务创建者
user := monitor.Task.GetOwner()
if user == nil {
return ErrUserNotFound
}
// 创建文件系统
fs, err := filesystem.NewFileSystem(user)
if err != nil {
return err
}
defer fs.Recycle()
// 创建上下文环境
ctx := context.WithValue(context.Background(), fsctx.FileHeaderCtx, local.FileStream{
Size: monitor.Task.TotalSize,
})
// 验证用户容量
if err := filesystem.HookValidateCapacityWithoutIncrease(ctx, fs); err != nil {
return err
}
// 验证每个文件
for _, fileInfo := range monitor.Task.StatusInfo.Files {
if fileInfo.Selected == "true" {
// 创建上下文环境
fileSize, _ := strconv.ParseUint(fileInfo.Length, 10, 64)
ctx := context.WithValue(context.Background(), fsctx.FileHeaderCtx, local.FileStream{
Size: fileSize,
Name: filepath.Base(fileInfo.Path),
})
if err := filesystem.HookValidateFile(ctx, fs); err != nil {
return err
}
}
}
return nil
}
// Error 任务下载出错处理,返回是否中断监控
func (monitor *Monitor) Error(status rpc.StatusInfo) bool {
monitor.setErrorStatus(errors.New(status.ErrorMessage))
// 清理临时文件
monitor.RemoveTempFolder()
return true
}
// RemoveTempFolder 清理下载临时目录
func (monitor *Monitor) RemoveTempFolder() {
err := os.RemoveAll(monitor.Task.Parent)
if err != nil {
util.Log().Warning("无法删除离线下载临时目录[%s], %s", monitor.Task.Parent, err)
}
}
// Complete 完成下载,返回是否中断监控
func (monitor *Monitor) Complete(status rpc.StatusInfo) bool {
// 创建中转任务
file := make([]string, 0, len(monitor.Task.StatusInfo.Files))
for i := 0; i < len(monitor.Task.StatusInfo.Files); i++ {
if monitor.Task.StatusInfo.Files[i].Selected == "true" {
file = append(file, monitor.Task.StatusInfo.Files[i].Path)
}
}
job, err := task.NewTransferTask(
monitor.Task.UserID,
file,
monitor.Task.Dst,
monitor.Task.Parent,
)
if err != nil {
monitor.setErrorStatus(err)
return true
}
// 提交中转任务
task.TaskPoll.Submit(job)
// 更新任务ID
monitor.Task.TaskID = job.Model().ID
monitor.Task.Save()
return true
}
func (monitor *Monitor) setErrorStatus(err error) {
monitor.Task.Status = Error
monitor.Task.Error = err.Error()
monitor.Task.Save()
}