diff --git a/pkg/filesystem/hooks.go b/pkg/filesystem/hooks.go index fdde788..a4115f5 100644 --- a/pkg/filesystem/hooks.go +++ b/pkg/filesystem/hooks.go @@ -219,6 +219,10 @@ func SlaveAfterUpload(ctx context.Context, fs *FileSystem) error { } fs.GenerateThumbnail(ctx, &file) + if policy.CallbackURL == "" { + return nil + } + // 发送回调请求 callbackBody := serializer.RemoteUploadCallback{ Name: file.Name, diff --git a/pkg/filesystem/remote/handler.go b/pkg/filesystem/remote/handler.go index 281c295..cb6920f 100644 --- a/pkg/filesystem/remote/handler.go +++ b/pkg/filesystem/remote/handler.go @@ -35,12 +35,12 @@ func (handler Handler) getAPIUrl(scope string, routes ...string) string { var controller *url.URL switch scope { + case "upload": + controller, _ = url.Parse("/api/v3/slave/upload") case "delete": controller, _ = url.Parse("/api/v3/slave/delete") case "thumb": controller, _ = url.Parse("/api/v3/slave/thumb") - case "remote_callback": - controller, _ = url.Parse("/api/v3/callback/remote") default: controller = serverURL } @@ -53,6 +53,7 @@ func (handler Handler) getAPIUrl(scope string, routes ...string) string { } // Get 获取文件内容 +// TODO 测试 func (handler Handler) Get(ctx context.Context, path string) (response.RSCloser, error) { // 尝试获取速度限制 TODO 是否需要在这里限制? speedLimit := 0 @@ -73,17 +74,53 @@ func (handler Handler) Get(ctx context.Context, path string) (response.RSCloser, nil, request.WithContext(ctx), ).CheckHTTPResponse(200).GetRSCloser() - if err != nil { return nil, err } + resp.SetFirstFakeChunk() return resp, nil } // Put 将文件流保存到指定目录 func (handler Handler) Put(ctx context.Context, file io.ReadCloser, dst string, size uint64) error { - return errors.New("远程策略不支持此上传方式") + defer file.Close() + + // 凭证有效期 + credentialTTL := model.GetIntSetting("upload_credential_timeout", 3600) + + // 生成上传策略 + policy := serializer.UploadPolicy{ + SavePath: path.Dir(dst), + FileName: path.Base(dst), + AutoRename: false, + MaxSize: size, + } + credential, err := handler.getUploadCredential(ctx, policy, int64(credentialTTL)) + if err != nil { + return err + } + + // 上传文件 + resp, err := handler.Client.Request( + "POST", + handler.getAPIUrl("upload"), + file, + request.WithHeader(map[string][]string{ + "Authorization": {credential.Token}, + "X-Policy": {credential.Policy}, + "X-FileName": {path.Base(dst)}, + }), + request.WithContentLength(int64(size)), + ).CheckHTTPResponse(200).DecodeResponse() + if err != nil { + return err + } + if resp.Code != 0 { + return errors.New(resp.Msg) + } + + return nil } // Delete 删除一个或多个文件, @@ -195,7 +232,9 @@ func (handler Handler) Source( // Token 获取上传策略和认证Token func (handler Handler) Token(ctx context.Context, TTL int64, key string) (serializer.UploadCredential, error) { // 生成回调地址 - apiURL := handler.getAPIUrl("remote_callback", key) + siteURL := model.GetSiteURL() + apiBaseURI, _ := url.Parse("/api/v3/callback/remote/" + key) + apiURL := siteURL.ResolveReference(apiBaseURI) // 生成上传策略 policy := serializer.UploadPolicy{ @@ -204,7 +243,7 @@ func (handler Handler) Token(ctx context.Context, TTL int64, key string) (serial AutoRename: handler.Policy.AutoRename, MaxSize: handler.Policy.MaxSize, AllowedExtension: handler.Policy.OptionsSerialized.FileType, - CallbackURL: apiURL, + CallbackURL: apiURL.String(), } return handler.getUploadCredential(ctx, policy, TTL) } diff --git a/pkg/request/request.go b/pkg/request/request.go index 7d5e496..f900a96 100644 --- a/pkg/request/request.go +++ b/pkg/request/request.go @@ -2,10 +2,12 @@ package request import ( "context" + "encoding/json" "errors" "fmt" "github.com/HFO4/cloudreve/pkg/auth" - "github.com/HFO4/cloudreve/pkg/filesystem/response" + "github.com/HFO4/cloudreve/pkg/serializer" + "github.com/HFO4/cloudreve/pkg/util" "io" "io/ioutil" "net/http" @@ -36,11 +38,12 @@ type Option interface { } type options struct { - timeout time.Duration - header http.Header - sign auth.Auth - signTTL int64 - ctx context.Context + timeout time.Duration + header http.Header + sign auth.Auth + signTTL int64 + ctx context.Context + contentLength int64 } type optionFunc func(*options) @@ -51,8 +54,9 @@ func (f optionFunc) apply(o *options) { func newDefaultOption() *options { return &options{ - header: http.Header{}, - timeout: time.Duration(30) * time.Second, + header: http.Header{}, + timeout: time.Duration(30) * time.Second, + contentLength: -1, } } @@ -85,6 +89,14 @@ func WithHeader(header http.Header) Option { }) } +// WithContentLength 设置请求大小 +// TODO 测试 +func WithContentLength(s int64) Option { + return optionFunc(func(o *options) { + o.contentLength = s + }) +} + // Request 发送HTTP请求 func (c HTTPClient) Request(method, target string, body io.Reader, opts ...Option) *Response { // 应用额外设置 @@ -96,6 +108,11 @@ func (c HTTPClient) Request(method, target string, body io.Reader, opts ...Optio // 创建请求客户端 client := &http.Client{Timeout: options.timeout} + // size为0时将body设为nil + if options.contentLength == 0 { + body = nil + } + // 创建请求 var ( req *http.Request @@ -110,8 +127,11 @@ func (c HTTPClient) Request(method, target string, body io.Reader, opts ...Optio return &Response{Err: err} } - // 添加请求header + // 添加请求相关设置 req.Header = options.header + if options.contentLength != -1 { + req.ContentLength = options.contentLength + } // 签名请求 if options.sign != nil { @@ -149,35 +169,78 @@ func (resp *Response) CheckHTTPResponse(status int) *Response { return resp } -type nopRSCloser struct { - body io.ReadCloser - size int64 +// DecodeResponse 尝试解析为serializer.Response,并对状态码进行检查 +// TODO 测试 +func (resp *Response) DecodeResponse() (*serializer.Response, error) { + if resp.Err != nil { + return nil, resp.Err + } + + respString, err := resp.GetResponse() + if err != nil { + return nil, err + } + + var res serializer.Response + err = json.Unmarshal([]byte(respString), &res) + if err != nil { + util.Log().Debug("无法解析回调服务端响应:%s", string(respString)) + return nil, err + } + return &res, nil +} + +// NopRSCloser 实现不完整seeker +type NopRSCloser struct { + body io.ReadCloser + size int64 + status *rscStatus } -// GetRSCloser 返回带有空seeker的body reader -func (resp *Response) GetRSCloser() (response.RSCloser, error) { +type rscStatus struct { + // http.ServeContent 会读取一小块以决定内容类型, + // 但是响应body无法实现seek,所以此项为真时第一个read会返回假数据 + IgnoreFirst bool +} + +// GetRSCloser 返回带有空seeker的RSCloser,供http.ServeContent使用 +func (resp *Response) GetRSCloser() (*NopRSCloser, error) { if resp.Err != nil { return nil, resp.Err } - return nopRSCloser{ - body: resp.Response.Body, - size: resp.Response.ContentLength, + return &NopRSCloser{ + body: resp.Response.Body, + size: resp.Response.ContentLength, + status: &rscStatus{}, }, resp.Err } -// Read 实现 nopRSCloser reader -func (instance nopRSCloser) Read(p []byte) (n int, err error) { +// SetFirstFakeChunk 开启第一次read返回空数据 +// TODO 测试 +func (instance NopRSCloser) SetFirstFakeChunk() { + instance.status.IgnoreFirst = true +} + +// Read 实现 NopRSCloser reader +func (instance NopRSCloser) Read(p []byte) (n int, err error) { + if instance.status.IgnoreFirst { + return 0, io.EOF + } return instance.body.Read(p) } -// 实现 nopRSCloser closer -func (instance nopRSCloser) Close() error { +// Close 实现 NopRSCloser closer +func (instance NopRSCloser) Close() error { return instance.body.Close() } -// 实现 nopRSCloser seeker, 只实现seek开头/结尾以便http.ServeContent用于确定正文大小 -func (instance nopRSCloser) Seek(offset int64, whence int) (int64, error) { +// Seek 实现 NopRSCloser seeker, 只实现seek开头/结尾以便http.ServeContent用于确定正文大小 +func (instance NopRSCloser) Seek(offset int64, whence int) (int64, error) { + // 进行第一次Seek操作后,取消忽略选项 + if instance.status.IgnoreFirst { + instance.status.IgnoreFirst = false + } if offset == 0 { switch whence { case io.SeekStart: diff --git a/pkg/request/slave.go b/pkg/request/slave.go index 261639d..79446a8 100644 --- a/pkg/request/slave.go +++ b/pkg/request/slave.go @@ -7,7 +7,6 @@ import ( "github.com/HFO4/cloudreve/pkg/auth" "github.com/HFO4/cloudreve/pkg/conf" "github.com/HFO4/cloudreve/pkg/serializer" - "github.com/HFO4/cloudreve/pkg/util" "time" ) @@ -34,20 +33,15 @@ func RemoteCallback(url string, body serializer.RemoteUploadCallback) error { return serializer.NewError(serializer.CodeCallbackError, "无法发起回调请求", resp.Err) } - // 检查返回HTTP状态码 - rawResp, err := resp.CheckHTTPResponse(200).GetResponse() - if err != nil { - return serializer.NewError(serializer.CodeCallbackError, "服务器返回异常响应", err) - } - // 解析回调服务端响应 - var response serializer.Response - err = json.Unmarshal([]byte(rawResp), &response) + resp = resp.CheckHTTPResponse(200) + if resp.Err != nil { + return serializer.NewError(serializer.CodeCallbackError, "服务器返回异常响应", resp.Err) + } + response, err := resp.DecodeResponse() if err != nil { - util.Log().Debug("无法解析回调服务端响应:%s", string(rawResp)) return serializer.NewError(serializer.CodeCallbackError, "无法解析服务端返回的响应", err) } - if response.Code != 0 { return serializer.NewError(response.Code, response.Msg, errors.New(response.Error)) }