Feat: remoter handler get content

pull/247/head
HFO4 5 years ago
parent c23d129dbb
commit 4b6fbc0de2

@ -219,6 +219,10 @@ func SlaveAfterUpload(ctx context.Context, fs *FileSystem) error {
} }
fs.GenerateThumbnail(ctx, &file) fs.GenerateThumbnail(ctx, &file)
if policy.CallbackURL == "" {
return nil
}
// 发送回调请求 // 发送回调请求
callbackBody := serializer.RemoteUploadCallback{ callbackBody := serializer.RemoteUploadCallback{
Name: file.Name, Name: file.Name,

@ -35,12 +35,12 @@ func (handler Handler) getAPIUrl(scope string, routes ...string) string {
var controller *url.URL var controller *url.URL
switch scope { switch scope {
case "upload":
controller, _ = url.Parse("/api/v3/slave/upload")
case "delete": case "delete":
controller, _ = url.Parse("/api/v3/slave/delete") controller, _ = url.Parse("/api/v3/slave/delete")
case "thumb": case "thumb":
controller, _ = url.Parse("/api/v3/slave/thumb") controller, _ = url.Parse("/api/v3/slave/thumb")
case "remote_callback":
controller, _ = url.Parse("/api/v3/callback/remote")
default: default:
controller = serverURL controller = serverURL
} }
@ -53,6 +53,7 @@ func (handler Handler) getAPIUrl(scope string, routes ...string) string {
} }
// Get 获取文件内容 // Get 获取文件内容
// TODO 测试
func (handler Handler) Get(ctx context.Context, path string) (response.RSCloser, error) { func (handler Handler) Get(ctx context.Context, path string) (response.RSCloser, error) {
// 尝试获取速度限制 TODO 是否需要在这里限制? // 尝试获取速度限制 TODO 是否需要在这里限制?
speedLimit := 0 speedLimit := 0
@ -73,17 +74,53 @@ func (handler Handler) Get(ctx context.Context, path string) (response.RSCloser,
nil, nil,
request.WithContext(ctx), request.WithContext(ctx),
).CheckHTTPResponse(200).GetRSCloser() ).CheckHTTPResponse(200).GetRSCloser()
if err != nil { if err != nil {
return nil, err return nil, err
} }
resp.SetFirstFakeChunk()
return resp, nil return resp, nil
} }
// Put 将文件流保存到指定目录 // Put 将文件流保存到指定目录
func (handler Handler) Put(ctx context.Context, file io.ReadCloser, dst string, size uint64) error { func (handler Handler) Put(ctx context.Context, file io.ReadCloser, dst string, size uint64) error {
return errors.New("远程策略不支持此上传方式") defer file.Close()
// 凭证有效期
credentialTTL := model.GetIntSetting("upload_credential_timeout", 3600)
// 生成上传策略
policy := serializer.UploadPolicy{
SavePath: path.Dir(dst),
FileName: path.Base(dst),
AutoRename: false,
MaxSize: size,
}
credential, err := handler.getUploadCredential(ctx, policy, int64(credentialTTL))
if err != nil {
return err
}
// 上传文件
resp, err := handler.Client.Request(
"POST",
handler.getAPIUrl("upload"),
file,
request.WithHeader(map[string][]string{
"Authorization": {credential.Token},
"X-Policy": {credential.Policy},
"X-FileName": {path.Base(dst)},
}),
request.WithContentLength(int64(size)),
).CheckHTTPResponse(200).DecodeResponse()
if err != nil {
return err
}
if resp.Code != 0 {
return errors.New(resp.Msg)
}
return nil
} }
// Delete 删除一个或多个文件, // Delete 删除一个或多个文件,
@ -195,7 +232,9 @@ func (handler Handler) Source(
// Token 获取上传策略和认证Token // Token 获取上传策略和认证Token
func (handler Handler) Token(ctx context.Context, TTL int64, key string) (serializer.UploadCredential, error) { func (handler Handler) Token(ctx context.Context, TTL int64, key string) (serializer.UploadCredential, error) {
// 生成回调地址 // 生成回调地址
apiURL := handler.getAPIUrl("remote_callback", key) siteURL := model.GetSiteURL()
apiBaseURI, _ := url.Parse("/api/v3/callback/remote/" + key)
apiURL := siteURL.ResolveReference(apiBaseURI)
// 生成上传策略 // 生成上传策略
policy := serializer.UploadPolicy{ policy := serializer.UploadPolicy{
@ -204,7 +243,7 @@ func (handler Handler) Token(ctx context.Context, TTL int64, key string) (serial
AutoRename: handler.Policy.AutoRename, AutoRename: handler.Policy.AutoRename,
MaxSize: handler.Policy.MaxSize, MaxSize: handler.Policy.MaxSize,
AllowedExtension: handler.Policy.OptionsSerialized.FileType, AllowedExtension: handler.Policy.OptionsSerialized.FileType,
CallbackURL: apiURL, CallbackURL: apiURL.String(),
} }
return handler.getUploadCredential(ctx, policy, TTL) return handler.getUploadCredential(ctx, policy, TTL)
} }

@ -2,10 +2,12 @@ package request
import ( import (
"context" "context"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"github.com/HFO4/cloudreve/pkg/auth" "github.com/HFO4/cloudreve/pkg/auth"
"github.com/HFO4/cloudreve/pkg/filesystem/response" "github.com/HFO4/cloudreve/pkg/serializer"
"github.com/HFO4/cloudreve/pkg/util"
"io" "io"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
@ -36,11 +38,12 @@ type Option interface {
} }
type options struct { type options struct {
timeout time.Duration timeout time.Duration
header http.Header header http.Header
sign auth.Auth sign auth.Auth
signTTL int64 signTTL int64
ctx context.Context ctx context.Context
contentLength int64
} }
type optionFunc func(*options) type optionFunc func(*options)
@ -51,8 +54,9 @@ func (f optionFunc) apply(o *options) {
func newDefaultOption() *options { func newDefaultOption() *options {
return &options{ return &options{
header: http.Header{}, header: http.Header{},
timeout: time.Duration(30) * time.Second, timeout: time.Duration(30) * time.Second,
contentLength: -1,
} }
} }
@ -85,6 +89,14 @@ func WithHeader(header http.Header) Option {
}) })
} }
// WithContentLength 设置请求大小
// TODO 测试
func WithContentLength(s int64) Option {
return optionFunc(func(o *options) {
o.contentLength = s
})
}
// Request 发送HTTP请求 // Request 发送HTTP请求
func (c HTTPClient) Request(method, target string, body io.Reader, opts ...Option) *Response { func (c HTTPClient) Request(method, target string, body io.Reader, opts ...Option) *Response {
// 应用额外设置 // 应用额外设置
@ -96,6 +108,11 @@ func (c HTTPClient) Request(method, target string, body io.Reader, opts ...Optio
// 创建请求客户端 // 创建请求客户端
client := &http.Client{Timeout: options.timeout} client := &http.Client{Timeout: options.timeout}
// size为0时将body设为nil
if options.contentLength == 0 {
body = nil
}
// 创建请求 // 创建请求
var ( var (
req *http.Request req *http.Request
@ -110,8 +127,11 @@ func (c HTTPClient) Request(method, target string, body io.Reader, opts ...Optio
return &Response{Err: err} return &Response{Err: err}
} }
// 添加请求header // 添加请求相关设置
req.Header = options.header req.Header = options.header
if options.contentLength != -1 {
req.ContentLength = options.contentLength
}
// 签名请求 // 签名请求
if options.sign != nil { if options.sign != nil {
@ -149,35 +169,78 @@ func (resp *Response) CheckHTTPResponse(status int) *Response {
return resp return resp
} }
type nopRSCloser struct { // DecodeResponse 尝试解析为serializer.Response并对状态码进行检查
body io.ReadCloser // TODO 测试
size int64 func (resp *Response) DecodeResponse() (*serializer.Response, error) {
if resp.Err != nil {
return nil, resp.Err
}
respString, err := resp.GetResponse()
if err != nil {
return nil, err
}
var res serializer.Response
err = json.Unmarshal([]byte(respString), &res)
if err != nil {
util.Log().Debug("无法解析回调服务端响应:%s", string(respString))
return nil, err
}
return &res, nil
}
// NopRSCloser 实现不完整seeker
type NopRSCloser struct {
body io.ReadCloser
size int64
status *rscStatus
} }
// GetRSCloser 返回带有空seeker的body reader type rscStatus struct {
func (resp *Response) GetRSCloser() (response.RSCloser, error) { // http.ServeContent 会读取一小块以决定内容类型,
// 但是响应body无法实现seek所以此项为真时第一个read会返回假数据
IgnoreFirst bool
}
// GetRSCloser 返回带有空seeker的RSCloser供http.ServeContent使用
func (resp *Response) GetRSCloser() (*NopRSCloser, error) {
if resp.Err != nil { if resp.Err != nil {
return nil, resp.Err return nil, resp.Err
} }
return nopRSCloser{ return &NopRSCloser{
body: resp.Response.Body, body: resp.Response.Body,
size: resp.Response.ContentLength, size: resp.Response.ContentLength,
status: &rscStatus{},
}, resp.Err }, resp.Err
} }
// Read 实现 nopRSCloser reader // SetFirstFakeChunk 开启第一次read返回空数据
func (instance nopRSCloser) Read(p []byte) (n int, err error) { // TODO 测试
func (instance NopRSCloser) SetFirstFakeChunk() {
instance.status.IgnoreFirst = true
}
// Read 实现 NopRSCloser reader
func (instance NopRSCloser) Read(p []byte) (n int, err error) {
if instance.status.IgnoreFirst {
return 0, io.EOF
}
return instance.body.Read(p) return instance.body.Read(p)
} }
// 实现 nopRSCloser closer // Close 实现 NopRSCloser closer
func (instance nopRSCloser) Close() error { func (instance NopRSCloser) Close() error {
return instance.body.Close() return instance.body.Close()
} }
// 实现 nopRSCloser seeker, 只实现seek开头/结尾以便http.ServeContent用于确定正文大小 // Seek 实现 NopRSCloser seeker, 只实现seek开头/结尾以便http.ServeContent用于确定正文大小
func (instance nopRSCloser) Seek(offset int64, whence int) (int64, error) { func (instance NopRSCloser) Seek(offset int64, whence int) (int64, error) {
// 进行第一次Seek操作后取消忽略选项
if instance.status.IgnoreFirst {
instance.status.IgnoreFirst = false
}
if offset == 0 { if offset == 0 {
switch whence { switch whence {
case io.SeekStart: case io.SeekStart:

@ -7,7 +7,6 @@ import (
"github.com/HFO4/cloudreve/pkg/auth" "github.com/HFO4/cloudreve/pkg/auth"
"github.com/HFO4/cloudreve/pkg/conf" "github.com/HFO4/cloudreve/pkg/conf"
"github.com/HFO4/cloudreve/pkg/serializer" "github.com/HFO4/cloudreve/pkg/serializer"
"github.com/HFO4/cloudreve/pkg/util"
"time" "time"
) )
@ -34,20 +33,15 @@ func RemoteCallback(url string, body serializer.RemoteUploadCallback) error {
return serializer.NewError(serializer.CodeCallbackError, "无法发起回调请求", resp.Err) return serializer.NewError(serializer.CodeCallbackError, "无法发起回调请求", resp.Err)
} }
// 检查返回HTTP状态码
rawResp, err := resp.CheckHTTPResponse(200).GetResponse()
if err != nil {
return serializer.NewError(serializer.CodeCallbackError, "服务器返回异常响应", err)
}
// 解析回调服务端响应 // 解析回调服务端响应
var response serializer.Response resp = resp.CheckHTTPResponse(200)
err = json.Unmarshal([]byte(rawResp), &response) if resp.Err != nil {
return serializer.NewError(serializer.CodeCallbackError, "服务器返回异常响应", resp.Err)
}
response, err := resp.DecodeResponse()
if err != nil { if err != nil {
util.Log().Debug("无法解析回调服务端响应:%s", string(rawResp))
return serializer.NewError(serializer.CodeCallbackError, "无法解析服务端返回的响应", err) return serializer.NewError(serializer.CodeCallbackError, "无法解析服务端返回的响应", err)
} }
if response.Code != 0 { if response.Code != 0 {
return serializer.NewError(response.Code, response.Msg, errors.New(response.Error)) return serializer.NewError(response.Code, response.Msg, errors.New(response.Error))
} }

Loading…
Cancel
Save