Feat: client method to upload file from master node to slave node

pull/1107/head
HFO4 3 years ago
parent 081e75146c
commit b96019be7c

@ -125,6 +125,7 @@ func addDefaultSettings() {
{Name: "onedrive_callback_check", Value: `20`, Type: "timeout"}, {Name: "onedrive_callback_check", Value: `20`, Type: "timeout"},
{Name: "folder_props_timeout", Value: `300`, Type: "timeout"}, {Name: "folder_props_timeout", Value: `300`, Type: "timeout"},
{Name: "onedrive_chunk_retries", Value: `1`, Type: "retry"}, {Name: "onedrive_chunk_retries", Value: `1`, Type: "retry"},
{Name: "slave_chunk_retries", Value: `1`, Type: "retry"},
{Name: "onedrive_source_timeout", Value: `1800`, Type: "timeout"}, {Name: "onedrive_source_timeout", Value: `1800`, Type: "timeout"},
{Name: "reset_after_upload_failed", Value: `0`, Type: "upload"}, {Name: "reset_after_upload_failed", Value: `0`, Type: "upload"},
{Name: "login_captcha", Value: `0`, Type: "login"}, {Name: "login_captcha", Value: `0`, Type: "login"},

@ -44,7 +44,7 @@ func (node *SlaveNode) Init(nodeModel *model.Node) {
var endpoint *url.URL var endpoint *url.URL
if serverURL, err := url.Parse(node.Model.Server); err == nil { if serverURL, err := url.Parse(node.Model.Server); err == nil {
var controller *url.URL var controller *url.URL
controller, _ = url.Parse("/api/v3/slave") controller, _ = url.Parse("/api/v3/slave/")
endpoint = serverURL.ResolveReference(controller) endpoint = serverURL.ResolveReference(controller)
} }

@ -3,19 +3,26 @@ package remote
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
model "github.com/cloudreve/Cloudreve/v3/models" model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/auth" "github.com/cloudreve/Cloudreve/v3/pkg/auth"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx"
"github.com/cloudreve/Cloudreve/v3/pkg/request" "github.com/cloudreve/Cloudreve/v3/pkg/request"
"github.com/cloudreve/Cloudreve/v3/pkg/retry"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer" "github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"github.com/gofrs/uuid"
"io"
"net/http" "net/http"
"net/url" "net/url"
"path" "path"
"strings" "strings"
"time"
) )
const ( const (
basePath = "/api/v3/slave" basePath = "/api/v3/slave/"
OverwriteHeader = auth.CrHeaderPrefix + "Overwrite" OverwriteHeader = auth.CrHeaderPrefix + "Overwrite"
chunkRetrySleep = time.Duration(5) * time.Second
) )
// Client to operate remote slave server // Client to operate remote slave server
@ -24,6 +31,8 @@ type Client interface {
CreateUploadSession(ctx context.Context, session *serializer.UploadSession, ttl int64) error CreateUploadSession(ctx context.Context, session *serializer.UploadSession, ttl int64) error
// GetUploadURL signs an url for uploading file // GetUploadURL signs an url for uploading file
GetUploadURL(ttl int64, sessionID string) (string, string, error) GetUploadURL(ttl int64, sessionID string) (string, string, error)
// Upload uploads file to remote server
Upload(ctx context.Context, file fsctx.FileHeader) error
} }
// NewClient creates new Client from given policy // NewClient creates new Client from given policy
@ -54,6 +63,62 @@ type remoteClient struct {
httpClient request.Client httpClient request.Client
} }
func (c *remoteClient) Upload(ctx context.Context, file fsctx.FileHeader) error {
ttl := model.GetIntSetting("upload_session_timeout", 86400)
fileInfo := file.Info()
session := &serializer.UploadSession{
Key: uuid.Must(uuid.NewV4()).String(),
VirtualPath: fileInfo.VirtualPath,
Name: fileInfo.FileName,
Size: fileInfo.Size,
SavePath: fileInfo.SavePath,
LastModified: fileInfo.LastModified,
Policy: *c.policy,
}
// Create upload session
if err := c.CreateUploadSession(ctx, session, int64(ttl)); err != nil {
return fmt.Errorf("failed to create upload session: %w", err)
}
overwrite := fileInfo.Mode&fsctx.Overwrite == fsctx.Overwrite
// Upload chunks
offset := uint64(0)
chunkSize := session.Policy.OptionsSerialized.ChunkSize
if chunkSize == 0 {
chunkSize = fileInfo.Size
}
chunkNum := fileInfo.Size / chunkSize
if fileInfo.Size%chunkSize != 0 {
chunkNum++
}
for i := 0; i < int(chunkNum); i++ {
uploadFunc := func(index int, chunk io.Reader) error {
contentLength := chunkSize
if index == int(chunkNum-1) {
contentLength = fileInfo.Size - chunkSize*(chunkNum-1)
}
return c.uploadChunk(ctx, session.Key, index, chunk, overwrite, contentLength)
}
if err := retry.Chunk(i, chunkSize, file, uploadFunc, retry.ConstantBackoff{
Max: model.GetIntSetting("onedrive_chunk_retries", 1),
Sleep: chunkRetrySleep,
}); err != nil {
// TODO 删除上传会话
return fmt.Errorf("failed to upload chunk #%d: %w", i, err)
}
offset += chunkSize
}
return nil
}
func (c *remoteClient) CreateUploadSession(ctx context.Context, session *serializer.UploadSession, ttl int64) error { func (c *remoteClient) CreateUploadSession(ctx context.Context, session *serializer.UploadSession, ttl int64) error {
reqBodyEncoded, err := json.Marshal(map[string]interface{}{ reqBodyEncoded, err := json.Marshal(map[string]interface{}{
"session": session, "session": session,
@ -96,3 +161,24 @@ func (c *remoteClient) GetUploadURL(ttl int64, sessionID string) (string, string
req = auth.SignRequest(c.authInstance, req, ttl) req = auth.SignRequest(c.authInstance, req, ttl)
return req.URL.String(), req.Header["Authorization"][0], nil return req.URL.String(), req.Header["Authorization"][0], nil
} }
func (c *remoteClient) uploadChunk(ctx context.Context, sessionID string, index int, chunk io.Reader, overwrite bool, size uint64) error {
resp, err := c.httpClient.Request(
"POST",
fmt.Sprintf("upload/%s?chunk=%d", sessionID, index),
io.LimitReader(chunk, int64(size)),
request.WithContext(ctx),
request.WithTimeout(time.Duration(0)),
request.WithContentLength(int64(size)),
request.WithHeader(map[string][]string{OverwriteHeader: {fmt.Sprintf("%t", overwrite)}}),
).CheckHTTPResponse(200).DecodeResponse()
if err != nil {
return err
}
if resp.Code != 0 {
return serializer.NewErrorFromResponse(resp)
}
return nil
}

@ -153,55 +153,7 @@ func (handler *Driver) Get(ctx context.Context, path string) (response.RSCloser,
func (handler *Driver) Put(ctx context.Context, file fsctx.FileHeader) error { func (handler *Driver) Put(ctx context.Context, file fsctx.FileHeader) error {
defer file.Close() defer file.Close()
// 凭证有效期 return handler.client.Upload(ctx, file)
credentialTTL := model.GetIntSetting("upload_credential_timeout", 3600)
// 生成上传策略
fileInfo := file.Info()
policy := serializer.UploadPolicy{
SavePath: path.Dir(fileInfo.SavePath),
FileName: path.Base(fileInfo.FileName),
AutoRename: false,
MaxSize: fileInfo.Size,
}
credential, err := handler.getUploadCredential(ctx, policy, int64(credentialTTL))
if err != nil {
return err
}
// 对文件名进行URLEncode
fileName := url.QueryEscape(path.Base(fileInfo.SavePath))
// 决定是否要禁用文件覆盖
overwrite := "false"
if fileInfo.Mode&fsctx.Overwrite == fsctx.Overwrite {
overwrite = "true"
}
// 上传文件
resp, err := handler.Client.Request(
"POST",
handler.Policy.GetUploadURL(),
file,
request.WithHeader(map[string][]string{
"X-Cr-Policy": {credential.Policy},
"X-Cr-FileName": {fileName},
"X-Cr-Overwrite": {overwrite},
}),
request.WithContentLength(int64(fileInfo.Size)),
request.WithTimeout(time.Duration(0)),
request.WithMasterMeta(),
request.WithSlaveMeta(handler.Policy.AccessKey),
request.WithCredential(handler.AuthInstance, int64(credentialTTL)),
).CheckHTTPResponse(200).DecodeResponse()
if err != nil {
return err
}
if resp.Code != 0 {
return errors.New(resp.Msg)
}
return nil
} }
// Delete 删除一个或多个文件, // Delete 删除一个或多个文件,

@ -30,7 +30,7 @@ func NewDriver(node cluster.Node, handler driver.Handler, policy *model.Policy)
var endpoint *url.URL var endpoint *url.URL
if serverURL, err := url.Parse(node.DBModel().Server); err == nil { if serverURL, err := url.Parse(node.DBModel().Server); err == nil {
var controller *url.URL var controller *url.URL
controller, _ = url.Parse("/api/v3/slave") controller, _ = url.Parse("/api/v3/slave/")
endpoint = serverURL.ResolveReference(controller) endpoint = serverURL.ResolveReference(controller)
} }

@ -32,9 +32,11 @@ type UploadTaskInfo struct {
type FileHeader interface { type FileHeader interface {
io.Reader io.Reader
io.Closer io.Closer
io.Seeker
Info() *UploadTaskInfo Info() *UploadTaskInfo
SetSize(uint64) SetSize(uint64)
SetModel(fileModel interface{}) SetModel(fileModel interface{})
Seekable() bool
} }
// FileStream 用户传来的文件 // FileStream 用户传来的文件
@ -43,6 +45,7 @@ type FileStream struct {
LastModified *time.Time LastModified *time.Time
Metadata map[string]string Metadata map[string]string
File io.ReadCloser File io.ReadCloser
Seeker io.Seeker
Size uint64 Size uint64
VirtualPath string VirtualPath string
Name string Name string
@ -61,6 +64,14 @@ func (file *FileStream) Close() error {
return file.File.Close() return file.File.Close()
} }
func (file *FileStream) Seek(offset int64, whence int) (int64, error) {
return file.Seeker.Seek(offset, whence)
}
func (file *FileStream) Seekable() bool {
return file.Seeker != nil
}
func (file *FileStream) Info() *UploadTaskInfo { func (file *FileStream) Info() *UploadTaskInfo {
return &UploadTaskInfo{ return &UploadTaskInfo{
Size: file.Size, Size: file.Size,

@ -261,7 +261,8 @@ func (fs *FileSystem) UploadFromPath(ctx context.Context, src, dst string, reset
// 开始上传 // 开始上传
return fs.UploadFromStream(ctx, &fsctx.FileStream{ return fs.UploadFromStream(ctx, &fsctx.FileStream{
File: nil, File: file,
Seeker: file,
Size: uint64(size), Size: uint64(size),
Name: path.Base(dst), Name: path.Base(dst),
VirtualPath: path.Dir(dst), VirtualPath: path.Dir(dst),

@ -5,6 +5,7 @@ import (
"github.com/cloudreve/Cloudreve/v3/pkg/auth" "github.com/cloudreve/Cloudreve/v3/pkg/auth"
"net/http" "net/http"
"net/url" "net/url"
"strings"
"time" "time"
) )
@ -103,6 +104,10 @@ func WithSlaveMeta(s string) Option {
// Endpoint 使用同一的请求Endpoint // Endpoint 使用同一的请求Endpoint
func WithEndpoint(endpoint string) Option { func WithEndpoint(endpoint string) Option {
if !strings.HasSuffix(endpoint, "/") {
endpoint += "/"
}
endpointURL, _ := url.Parse(endpoint) endpointURL, _ := url.Parse(endpoint)
return optionFunc(func(o *options) { return optionFunc(func(o *options) {
o.endpoint = endpointURL o.endpoint = endpointURL

@ -7,7 +7,7 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"path" "net/url"
"strings" "strings"
"sync" "sync"
@ -70,9 +70,13 @@ func (c *HTTPClient) Request(method, target string, body io.Reader, opts ...Opti
// 确定请求URL // 确定请求URL
if options.endpoint != nil { if options.endpoint != nil {
targetPath, err := url.Parse(target)
if err != nil {
return &Response{Err: err}
}
targetURL := *options.endpoint targetURL := *options.endpoint
targetURL.Path = path.Join(targetURL.Path, target) target = targetURL.ResolveReference(targetPath).String()
target = targetURL.String()
} }
// 创建请求 // 创建请求

@ -0,0 +1,26 @@
package retry
import "time"
// Backoff used for retry sleep backoff
type Backoff interface {
Next() bool
}
// ConstantBackoff implements Backoff interface with constant sleep time
type ConstantBackoff struct {
Sleep time.Duration
Max int
tried int
}
func (c ConstantBackoff) Next() bool {
c.tried++
if c.tried >= c.Max {
return false
}
time.Sleep(c.Sleep)
return true
}

@ -0,0 +1,29 @@
package retry
import (
"context"
"fmt"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx"
"github.com/cloudreve/Cloudreve/v3/pkg/util"
"io"
)
type ChunkProcessFunc func(index int, chunk io.Reader) error
func Chunk(index int, chunkSize uint64, file fsctx.FileHeader, processor ChunkProcessFunc, backoff Backoff) error {
err := processor(index, file)
if err != nil {
if err != context.Canceled && file.Seekable() && backoff.Next() {
if _, seekErr := file.Seek(int64(uint64(index)*chunkSize), io.SeekStart); err != nil {
return fmt.Errorf("failed to seek back to chunk start: %w, last error: %w", seekErr, err)
}
util.Log().Debug("Retrying chunk %d, last error: %s", index, err)
return Chunk(index, chunkSize, file, processor, backoff)
}
return err
}
return nil
}

@ -87,13 +87,13 @@ func (service *UploadService) LocalUpload(ctx context.Context, c *gin.Context) s
} }
if uploadSession.UID != fs.User.ID { if uploadSession.UID != fs.User.ID {
return serializer.Err(serializer.CodeUploadSessionExpired, "LocalUpload session expired or not exist", nil) return serializer.Err(serializer.CodeUploadSessionExpired, "Local upload session expired or not exist", nil)
} }
// 查找上传会话创建的占位文件 // 查找上传会话创建的占位文件
file, err := model.GetFilesByUploadSession(service.ID, fs.User.ID) file, err := model.GetFilesByUploadSession(service.ID, fs.User.ID)
if err != nil { if err != nil {
return serializer.Err(serializer.CodeUploadSessionExpired, "LocalUpload session file placeholder not exist", err) return serializer.Err(serializer.CodeUploadSessionExpired, "Local upload session file placeholder not exist", err)
} }
// 重设 fs 存储策略 // 重设 fs 存储策略
@ -127,7 +127,7 @@ func (service *UploadService) LocalUpload(ctx context.Context, c *gin.Context) s
func (service *UploadService) SlaveUpload(ctx context.Context, c *gin.Context) serializer.Response { func (service *UploadService) SlaveUpload(ctx context.Context, c *gin.Context) serializer.Response {
uploadSessionRaw, ok := cache.Get(filesystem.UploadSessionCachePrefix + service.ID) uploadSessionRaw, ok := cache.Get(filesystem.UploadSessionCachePrefix + service.ID)
if !ok { if !ok {
return serializer.Err(serializer.CodeUploadSessionExpired, "LocalUpload session expired or not exist", nil) return serializer.Err(serializer.CodeUploadSessionExpired, "Slave upload session expired or not exist", nil)
} }
uploadSession := uploadSessionRaw.(serializer.UploadSession) uploadSession := uploadSessionRaw.(serializer.UploadSession)

Loading…
Cancel
Save