diff --git a/models/migration.go b/models/migration.go index 230f35a..1053e65 100644 --- a/models/migration.go +++ b/models/migration.go @@ -125,6 +125,7 @@ func addDefaultSettings() { {Name: "onedrive_callback_check", Value: `20`, Type: "timeout"}, {Name: "folder_props_timeout", Value: `300`, Type: "timeout"}, {Name: "onedrive_chunk_retries", Value: `1`, Type: "retry"}, + {Name: "slave_chunk_retries", Value: `1`, Type: "retry"}, {Name: "onedrive_source_timeout", Value: `1800`, Type: "timeout"}, {Name: "reset_after_upload_failed", Value: `0`, Type: "upload"}, {Name: "login_captcha", Value: `0`, Type: "login"}, diff --git a/pkg/cluster/slave.go b/pkg/cluster/slave.go index cb059c6..49e2a48 100644 --- a/pkg/cluster/slave.go +++ b/pkg/cluster/slave.go @@ -44,7 +44,7 @@ func (node *SlaveNode) Init(nodeModel *model.Node) { var endpoint *url.URL if serverURL, err := url.Parse(node.Model.Server); err == nil { var controller *url.URL - controller, _ = url.Parse("/api/v3/slave") + controller, _ = url.Parse("/api/v3/slave/") endpoint = serverURL.ResolveReference(controller) } diff --git a/pkg/filesystem/driver/remote/client.go b/pkg/filesystem/driver/remote/client.go index d153ab3..073ae40 100644 --- a/pkg/filesystem/driver/remote/client.go +++ b/pkg/filesystem/driver/remote/client.go @@ -3,19 +3,26 @@ package remote import ( "context" "encoding/json" + "fmt" model "github.com/cloudreve/Cloudreve/v3/models" "github.com/cloudreve/Cloudreve/v3/pkg/auth" + "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx" "github.com/cloudreve/Cloudreve/v3/pkg/request" + "github.com/cloudreve/Cloudreve/v3/pkg/retry" "github.com/cloudreve/Cloudreve/v3/pkg/serializer" + "github.com/gofrs/uuid" + "io" "net/http" "net/url" "path" "strings" + "time" ) const ( - basePath = "/api/v3/slave" + basePath = "/api/v3/slave/" OverwriteHeader = auth.CrHeaderPrefix + "Overwrite" + chunkRetrySleep = time.Duration(5) * time.Second ) // Client to operate remote slave server @@ -24,6 +31,8 @@ type Client interface { CreateUploadSession(ctx context.Context, session *serializer.UploadSession, ttl int64) error // GetUploadURL signs an url for uploading file GetUploadURL(ttl int64, sessionID string) (string, string, error) + // Upload uploads file to remote server + Upload(ctx context.Context, file fsctx.FileHeader) error } // NewClient creates new Client from given policy @@ -54,6 +63,62 @@ type remoteClient struct { httpClient request.Client } +func (c *remoteClient) Upload(ctx context.Context, file fsctx.FileHeader) error { + ttl := model.GetIntSetting("upload_session_timeout", 86400) + fileInfo := file.Info() + session := &serializer.UploadSession{ + Key: uuid.Must(uuid.NewV4()).String(), + VirtualPath: fileInfo.VirtualPath, + Name: fileInfo.FileName, + Size: fileInfo.Size, + SavePath: fileInfo.SavePath, + LastModified: fileInfo.LastModified, + Policy: *c.policy, + } + + // Create upload session + if err := c.CreateUploadSession(ctx, session, int64(ttl)); err != nil { + return fmt.Errorf("failed to create upload session: %w", err) + } + + overwrite := fileInfo.Mode&fsctx.Overwrite == fsctx.Overwrite + + // Upload chunks + offset := uint64(0) + chunkSize := session.Policy.OptionsSerialized.ChunkSize + if chunkSize == 0 { + chunkSize = fileInfo.Size + } + + chunkNum := fileInfo.Size / chunkSize + if fileInfo.Size%chunkSize != 0 { + chunkNum++ + } + + for i := 0; i < int(chunkNum); i++ { + uploadFunc := func(index int, chunk io.Reader) error { + contentLength := chunkSize + if index == int(chunkNum-1) { + contentLength = fileInfo.Size - chunkSize*(chunkNum-1) + } + + return c.uploadChunk(ctx, session.Key, index, chunk, overwrite, contentLength) + } + + if err := retry.Chunk(i, chunkSize, file, uploadFunc, retry.ConstantBackoff{ + Max: model.GetIntSetting("onedrive_chunk_retries", 1), + Sleep: chunkRetrySleep, + }); err != nil { + // TODO 删除上传会话 + return fmt.Errorf("failed to upload chunk #%d: %w", i, err) + } + + offset += chunkSize + } + + return nil +} + func (c *remoteClient) CreateUploadSession(ctx context.Context, session *serializer.UploadSession, ttl int64) error { reqBodyEncoded, err := json.Marshal(map[string]interface{}{ "session": session, @@ -96,3 +161,24 @@ func (c *remoteClient) GetUploadURL(ttl int64, sessionID string) (string, string req = auth.SignRequest(c.authInstance, req, ttl) return req.URL.String(), req.Header["Authorization"][0], nil } + +func (c *remoteClient) uploadChunk(ctx context.Context, sessionID string, index int, chunk io.Reader, overwrite bool, size uint64) error { + resp, err := c.httpClient.Request( + "POST", + fmt.Sprintf("upload/%s?chunk=%d", sessionID, index), + io.LimitReader(chunk, int64(size)), + request.WithContext(ctx), + request.WithTimeout(time.Duration(0)), + request.WithContentLength(int64(size)), + request.WithHeader(map[string][]string{OverwriteHeader: {fmt.Sprintf("%t", overwrite)}}), + ).CheckHTTPResponse(200).DecodeResponse() + if err != nil { + return err + } + + if resp.Code != 0 { + return serializer.NewErrorFromResponse(resp) + } + + return nil +} diff --git a/pkg/filesystem/driver/remote/handler.go b/pkg/filesystem/driver/remote/handler.go index 00909c3..6fe1c46 100644 --- a/pkg/filesystem/driver/remote/handler.go +++ b/pkg/filesystem/driver/remote/handler.go @@ -153,55 +153,7 @@ func (handler *Driver) Get(ctx context.Context, path string) (response.RSCloser, func (handler *Driver) Put(ctx context.Context, file fsctx.FileHeader) error { defer file.Close() - // 凭证有效期 - credentialTTL := model.GetIntSetting("upload_credential_timeout", 3600) - - // 生成上传策略 - fileInfo := file.Info() - policy := serializer.UploadPolicy{ - SavePath: path.Dir(fileInfo.SavePath), - FileName: path.Base(fileInfo.FileName), - AutoRename: false, - MaxSize: fileInfo.Size, - } - credential, err := handler.getUploadCredential(ctx, policy, int64(credentialTTL)) - if err != nil { - return err - } - - // 对文件名进行URLEncode - fileName := url.QueryEscape(path.Base(fileInfo.SavePath)) - - // 决定是否要禁用文件覆盖 - overwrite := "false" - if fileInfo.Mode&fsctx.Overwrite == fsctx.Overwrite { - overwrite = "true" - } - - // 上传文件 - resp, err := handler.Client.Request( - "POST", - handler.Policy.GetUploadURL(), - file, - request.WithHeader(map[string][]string{ - "X-Cr-Policy": {credential.Policy}, - "X-Cr-FileName": {fileName}, - "X-Cr-Overwrite": {overwrite}, - }), - request.WithContentLength(int64(fileInfo.Size)), - request.WithTimeout(time.Duration(0)), - request.WithMasterMeta(), - request.WithSlaveMeta(handler.Policy.AccessKey), - request.WithCredential(handler.AuthInstance, int64(credentialTTL)), - ).CheckHTTPResponse(200).DecodeResponse() - if err != nil { - return err - } - if resp.Code != 0 { - return errors.New(resp.Msg) - } - - return nil + return handler.client.Upload(ctx, file) } // Delete 删除一个或多个文件, diff --git a/pkg/filesystem/driver/shadow/slaveinmaster/handler.go b/pkg/filesystem/driver/shadow/slaveinmaster/handler.go index 9a79a1a..11e5f9a 100644 --- a/pkg/filesystem/driver/shadow/slaveinmaster/handler.go +++ b/pkg/filesystem/driver/shadow/slaveinmaster/handler.go @@ -30,7 +30,7 @@ func NewDriver(node cluster.Node, handler driver.Handler, policy *model.Policy) var endpoint *url.URL if serverURL, err := url.Parse(node.DBModel().Server); err == nil { var controller *url.URL - controller, _ = url.Parse("/api/v3/slave") + controller, _ = url.Parse("/api/v3/slave/") endpoint = serverURL.ResolveReference(controller) } diff --git a/pkg/filesystem/fsctx/stream.go b/pkg/filesystem/fsctx/stream.go index 5d1ea76..122ead5 100644 --- a/pkg/filesystem/fsctx/stream.go +++ b/pkg/filesystem/fsctx/stream.go @@ -32,9 +32,11 @@ type UploadTaskInfo struct { type FileHeader interface { io.Reader io.Closer + io.Seeker Info() *UploadTaskInfo SetSize(uint64) SetModel(fileModel interface{}) + Seekable() bool } // FileStream 用户传来的文件 @@ -43,6 +45,7 @@ type FileStream struct { LastModified *time.Time Metadata map[string]string File io.ReadCloser + Seeker io.Seeker Size uint64 VirtualPath string Name string @@ -61,6 +64,14 @@ func (file *FileStream) Close() error { return file.File.Close() } +func (file *FileStream) Seek(offset int64, whence int) (int64, error) { + return file.Seeker.Seek(offset, whence) +} + +func (file *FileStream) Seekable() bool { + return file.Seeker != nil +} + func (file *FileStream) Info() *UploadTaskInfo { return &UploadTaskInfo{ Size: file.Size, diff --git a/pkg/filesystem/upload.go b/pkg/filesystem/upload.go index f6ec0d2..2ca01bd 100644 --- a/pkg/filesystem/upload.go +++ b/pkg/filesystem/upload.go @@ -261,7 +261,8 @@ func (fs *FileSystem) UploadFromPath(ctx context.Context, src, dst string, reset // 开始上传 return fs.UploadFromStream(ctx, &fsctx.FileStream{ - File: nil, + File: file, + Seeker: file, Size: uint64(size), Name: path.Base(dst), VirtualPath: path.Dir(dst), diff --git a/pkg/request/options.go b/pkg/request/options.go index d495757..bb9b11c 100644 --- a/pkg/request/options.go +++ b/pkg/request/options.go @@ -5,6 +5,7 @@ import ( "github.com/cloudreve/Cloudreve/v3/pkg/auth" "net/http" "net/url" + "strings" "time" ) @@ -103,6 +104,10 @@ func WithSlaveMeta(s string) Option { // Endpoint 使用同一的请求Endpoint func WithEndpoint(endpoint string) Option { + if !strings.HasSuffix(endpoint, "/") { + endpoint += "/" + } + endpointURL, _ := url.Parse(endpoint) return optionFunc(func(o *options) { o.endpoint = endpointURL diff --git a/pkg/request/request.go b/pkg/request/request.go index 7411ab8..eb7996c 100644 --- a/pkg/request/request.go +++ b/pkg/request/request.go @@ -7,7 +7,7 @@ import ( "io" "io/ioutil" "net/http" - "path" + "net/url" "strings" "sync" @@ -70,9 +70,13 @@ func (c *HTTPClient) Request(method, target string, body io.Reader, opts ...Opti // 确定请求URL if options.endpoint != nil { + targetPath, err := url.Parse(target) + if err != nil { + return &Response{Err: err} + } + targetURL := *options.endpoint - targetURL.Path = path.Join(targetURL.Path, target) - target = targetURL.String() + target = targetURL.ResolveReference(targetPath).String() } // 创建请求 diff --git a/pkg/retry/backoff.go b/pkg/retry/backoff.go new file mode 100644 index 0000000..a64c7c7 --- /dev/null +++ b/pkg/retry/backoff.go @@ -0,0 +1,26 @@ +package retry + +import "time" + +// Backoff used for retry sleep backoff +type Backoff interface { + Next() bool +} + +// ConstantBackoff implements Backoff interface with constant sleep time +type ConstantBackoff struct { + Sleep time.Duration + Max int + + tried int +} + +func (c ConstantBackoff) Next() bool { + c.tried++ + if c.tried >= c.Max { + return false + } + + time.Sleep(c.Sleep) + return true +} diff --git a/pkg/retry/chunk.go b/pkg/retry/chunk.go new file mode 100644 index 0000000..f8ce75a --- /dev/null +++ b/pkg/retry/chunk.go @@ -0,0 +1,29 @@ +package retry + +import ( + "context" + "fmt" + "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx" + "github.com/cloudreve/Cloudreve/v3/pkg/util" + "io" +) + +type ChunkProcessFunc func(index int, chunk io.Reader) error + +func Chunk(index int, chunkSize uint64, file fsctx.FileHeader, processor ChunkProcessFunc, backoff Backoff) error { + err := processor(index, file) + if err != nil { + if err != context.Canceled && file.Seekable() && backoff.Next() { + if _, seekErr := file.Seek(int64(uint64(index)*chunkSize), io.SeekStart); err != nil { + return fmt.Errorf("failed to seek back to chunk start: %w, last error: %w", seekErr, err) + } + + util.Log().Debug("Retrying chunk %d, last error: %s", index, err) + return Chunk(index, chunkSize, file, processor, backoff) + } + + return err + } + + return nil +} diff --git a/service/explorer/upload.go b/service/explorer/upload.go index 4db5f1c..f7e3d07 100644 --- a/service/explorer/upload.go +++ b/service/explorer/upload.go @@ -87,13 +87,13 @@ func (service *UploadService) LocalUpload(ctx context.Context, c *gin.Context) s } if uploadSession.UID != fs.User.ID { - return serializer.Err(serializer.CodeUploadSessionExpired, "LocalUpload session expired or not exist", nil) + return serializer.Err(serializer.CodeUploadSessionExpired, "Local upload session expired or not exist", nil) } // 查找上传会话创建的占位文件 file, err := model.GetFilesByUploadSession(service.ID, fs.User.ID) if err != nil { - return serializer.Err(serializer.CodeUploadSessionExpired, "LocalUpload session file placeholder not exist", err) + return serializer.Err(serializer.CodeUploadSessionExpired, "Local upload session file placeholder not exist", err) } // 重设 fs 存储策略 @@ -127,7 +127,7 @@ func (service *UploadService) LocalUpload(ctx context.Context, c *gin.Context) s func (service *UploadService) SlaveUpload(ctx context.Context, c *gin.Context) serializer.Response { uploadSessionRaw, ok := cache.Get(filesystem.UploadSessionCachePrefix + service.ID) if !ok { - return serializer.Err(serializer.CodeUploadSessionExpired, "LocalUpload session expired or not exist", nil) + return serializer.Err(serializer.CodeUploadSessionExpired, "Slave upload session expired or not exist", nil) } uploadSession := uploadSessionRaw.(serializer.UploadSession)