Fix: panics inside of task was not correctly logged into DB

Feat: slave node use new API to upload file to master
pull/1107/head
HFO4 3 years ago
parent 9136f3caec
commit c89327631e

@ -306,9 +306,9 @@ func (fs *FileSystem) Decompress(ctx context.Context, src, dst string) error {
err = fs.UploadFromStream(ctx, &fsctx.FileStream{ err = fs.UploadFromStream(ctx, &fsctx.FileStream{
File: fileStream, File: fileStream,
Size: uint64(size), Size: uint64(size),
Name: path.Base(dst), Name: path.Base(savePath),
VirtualPath: path.Dir(dst), VirtualPath: path.Dir(savePath),
}) }, true)
fileStream.Close() fileStream.Close()
if err != nil { if err != nil {
util.Log().Debug("无法上传压缩包内的文件%s , %s , 跳过", rawPath, err) util.Log().Debug("无法上传压缩包内的文件%s , %s , 跳过", rawPath, err)

@ -57,6 +57,7 @@ func NewClient(policy *model.Policy) (Client, error) {
request.WithEndpoint(serverURL.ResolveReference(base).String()), request.WithEndpoint(serverURL.ResolveReference(base).String()),
request.WithCredential(authInstance, int64(signTTL)), request.WithCredential(authInstance, int64(signTTL)),
request.WithMasterMeta(), request.WithMasterMeta(),
request.WithSlaveMeta(policy.AccessKey),
), ),
}, nil }, nil
} }

@ -52,14 +52,10 @@ func NewDriver(node cluster.Node, handler driver.Handler, policy *model.Policy)
func (d *Driver) Put(ctx context.Context, file fsctx.FileHeader) error { func (d *Driver) Put(ctx context.Context, file fsctx.FileHeader) error {
defer file.Close() defer file.Close()
src, ok := ctx.Value(fsctx.SlaveSrcPath).(string) fileInfo := file.Info()
if !ok {
return ErrSlaveSrcPathNotExist
}
req := serializer.SlaveTransferReq{ req := serializer.SlaveTransferReq{
Src: src, Src: fileInfo.Src,
Dst: file.Info().SavePath, Dst: fileInfo.SavePath,
Policy: d.policy, Policy: d.policy,
} }

@ -26,6 +26,7 @@ type UploadTaskInfo struct {
UploadSessionID *string UploadSessionID *string
AppendStart uint64 AppendStart uint64
Model interface{} Model interface{}
Src string
} }
// FileHeader 上传来的文件数据处理器 // FileHeader 上传来的文件数据处理器
@ -54,14 +55,23 @@ type FileStream struct {
UploadSessionID *string UploadSessionID *string
AppendStart uint64 AppendStart uint64
Model interface{} Model interface{}
Src string
} }
func (file *FileStream) Read(p []byte) (n int, err error) { func (file *FileStream) Read(p []byte) (n int, err error) {
return file.File.Read(p) if file.File != nil {
return file.File.Read(p)
}
return 0, io.EOF
} }
func (file *FileStream) Close() error { func (file *FileStream) Close() error {
return file.File.Close() if file.File != nil {
return file.File.Close()
}
return nil
} }
func (file *FileStream) Seek(offset int64, whence int) (int64, error) { func (file *FileStream) Seek(offset int64, whence int) (int64, error) {
@ -85,6 +95,7 @@ func (file *FileStream) Info() *UploadTaskInfo {
UploadSessionID: file.UploadSessionID, UploadSessionID: file.UploadSessionID,
AppendStart: file.AppendStart, AppendStart: file.AppendStart,
Model: file.Model, Model: file.Model,
Src: file.Src,
} }
} }

@ -224,7 +224,16 @@ func (fs *FileSystem) CreateUploadSession(ctx context.Context, file *fsctx.FileS
} }
// UploadFromStream 从文件流上传文件 // UploadFromStream 从文件流上传文件
func (fs *FileSystem) UploadFromStream(ctx context.Context, file *fsctx.FileStream) error { func (fs *FileSystem) UploadFromStream(ctx context.Context, file *fsctx.FileStream, resetPolicy bool) error {
if resetPolicy {
// 重设存储策略
fs.Policy = &fs.User.Policy
err := fs.DispatchHandler()
if err != nil {
return err
}
}
// 给文件系统分配钩子 // 给文件系统分配钩子
fs.Lock.Lock() fs.Lock.Lock()
if fs.Hooks == nil { if fs.Hooks == nil {
@ -242,16 +251,7 @@ func (fs *FileSystem) UploadFromStream(ctx context.Context, file *fsctx.FileStre
} }
// UploadFromPath 将本机已有文件上传到用户的文件系统 // UploadFromPath 将本机已有文件上传到用户的文件系统
func (fs *FileSystem) UploadFromPath(ctx context.Context, src, dst string, resetPolicy bool, mode fsctx.WriteMode) error { func (fs *FileSystem) UploadFromPath(ctx context.Context, src, dst string, mode fsctx.WriteMode) error {
// 重设存储策略
if resetPolicy {
fs.Policy = &fs.User.Policy
err := fs.DispatchHandler()
if err != nil {
return err
}
}
file, err := os.Open(util.RelativePath(src)) file, err := os.Open(util.RelativePath(src))
if err != nil { if err != nil {
return err return err
@ -273,5 +273,5 @@ func (fs *FileSystem) UploadFromPath(ctx context.Context, src, dst string, reset
Name: path.Base(dst), Name: path.Base(dst),
VirtualPath: path.Dir(dst), VirtualPath: path.Dir(dst),
Mode: mode, Mode: mode,
}) }, true)
} }

@ -106,7 +106,7 @@ func (job *CompressTask) Do() {
job.TaskModel.SetProgress(TransferringProgress) job.TaskModel.SetProgress(TransferringProgress)
// 上传文件 // 上传文件
err = fs.UploadFromPath(ctx, zipFile, job.TaskProps.Dst, true, 0) err = fs.UploadFromPath(ctx, zipFile, job.TaskProps.Dst, 0)
if err != nil { if err != nil {
job.SetErrorMsg(err.Error()) job.SetErrorMsg(err.Error())
return return

@ -117,16 +117,18 @@ func (job *TransferTask) Do() {
} }
// 切换为从机节点处理上传 // 切换为从机节点处理上传
fs.SetPolicyFromPath(path.Dir(dst))
fs.SwitchToSlaveHandler(node) fs.SwitchToSlaveHandler(node)
err = fs.UploadFromStream(context.Background(), &fsctx.FileStream{ err = fs.UploadFromStream(context.Background(), &fsctx.FileStream{
File: nil, File: nil,
Size: job.TaskProps.SrcSizes[file], Size: job.TaskProps.SrcSizes[file],
Name: path.Base(dst), Name: path.Base(dst),
VirtualPath: path.Dir(dst), VirtualPath: path.Dir(dst),
}) Src: file,
}, false)
} else { } else {
// 主机节点中转 // 主机节点中转
err = fs.UploadFromPath(context.Background(), file, dst, true, 0) err = fs.UploadFromPath(context.Background(), file, dst, 0)
} }
if err != nil { if err != nil {

@ -1,6 +1,9 @@
package task package task
import "github.com/cloudreve/Cloudreve/v3/pkg/util" import (
"fmt"
"github.com/cloudreve/Cloudreve/v3/pkg/util"
)
// Worker 处理任务的对象 // Worker 处理任务的对象
type Worker interface { type Worker interface {
@ -20,7 +23,7 @@ func (worker *GeneralWorker) Do(job Job) {
// 致命错误捕获 // 致命错误捕获
if err := recover(); err != nil { if err := recover(); err != nil {
util.Log().Debug("任务执行出错,%s", err) util.Log().Debug("任务执行出错,%s", err)
job.SetError(&JobError{Msg: "致命错误"}) job.SetError(&JobError{Msg: "致命错误", Error: fmt.Sprintf("%s", err)})
job.SetStatus(Error) job.SetStatus(Error)
} }
}() }()

@ -219,7 +219,15 @@ func InitMasterRouter() *gin.Engine {
// 事件通知 // 事件通知
slave.PUT("notification/:subject", controllers.SlaveNotificationPush) slave.PUT("notification/:subject", controllers.SlaveNotificationPush)
// 上传 // 上传
slave.POST("upload", controllers.SlaveUpload) upload := slave.Group("upload")
{
// 上传分片
upload.POST(":sessionId", controllers.SlaveUpload)
// 创建上传会话上传
upload.PUT("", controllers.SlaveGetUploadSession)
// 删除上传会话
upload.DELETE(":sessionId", controllers.SlaveDeleteUploadSession)
}
// OneDrive 存储策略凭证 // OneDrive 存储策略凭证
slave.GET("credential/onedrive/:id", controllers.SlaveGetOneDriveCredential) slave.GET("credential/onedrive/:id", controllers.SlaveGetOneDriveCredential)
} }

@ -7,6 +7,7 @@ import (
"github.com/cloudreve/Cloudreve/v3/pkg/auth" "github.com/cloudreve/Cloudreve/v3/pkg/auth"
"github.com/cloudreve/Cloudreve/v3/pkg/cache" "github.com/cloudreve/Cloudreve/v3/pkg/cache"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver/local"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx"
"github.com/cloudreve/Cloudreve/v3/pkg/hashid" "github.com/cloudreve/Cloudreve/v3/pkg/hashid"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer" "github.com/cloudreve/Cloudreve/v3/pkg/serializer"
@ -137,6 +138,8 @@ func (service *UploadService) SlaveUpload(ctx context.Context, c *gin.Context) s
return serializer.Err(serializer.CodePolicyNotAllowed, err.Error(), err) return serializer.Err(serializer.CodePolicyNotAllowed, err.Error(), err)
} }
fs.Handler = local.Driver{}
// 解析需要的参数 // 解析需要的参数
service.Index, _ = strconv.Atoi(c.Query("chunk")) service.Index, _ = strconv.Atoi(c.Query("chunk"))
mode := fsctx.Append mode := fsctx.Append

Loading…
Cancel
Save