Feat: resume upload in server side

pull/247/head
HFO4 5 years ago
parent 06ff8b5a50
commit 752ce5ce62

@ -109,6 +109,7 @@ solid #e9e9e9;"bgcolor="#fff"><tbody><tr style="font-family: 'Helvetica Neue',He
{Name: "slave_api_timeout", Value: `60`, Type: "timeout"},
{Name: "onedrive_monitor_timeout", Value: `600`, Type: "timeout"},
{Name: "onedrive_callback_check", Value: `20`, Type: "timeout"},
{Name: "onedrive_chunk_retries", Value: `1`, Type: "retry"},
{Name: "allowdVisitorDownload", Value: `false`, Type: "share"},
{Name: "login_captcha", Value: `0`, Type: "login"},
{Name: "qq_login", Value: `0`, Type: "login"},

@ -9,6 +9,7 @@ import (
"github.com/HFO4/cloudreve/pkg/cache"
"github.com/HFO4/cloudreve/pkg/request"
"github.com/HFO4/cloudreve/pkg/util"
"github.com/cloudflare/cfssl/log"
"io"
"io/ioutil"
"net/http"
@ -57,10 +58,16 @@ func (client *Client) getRequestURL(api string) string {
return base.String()
}
// Meta 根据资源ID获取文件元信息
func (client *Client) Meta(ctx context.Context, id string) (*FileInfo, error) {
// Meta 根据资源ID或文件路径获取文件元信息
func (client *Client) Meta(ctx context.Context, id string, path string) (*FileInfo, error) {
var requestURL string
if id != "" {
requestURL = client.getRequestURL("/me/drive/items/" + id)
} else {
dst := strings.TrimPrefix(path, "/")
requestURL = client.getRequestURL("me/drive/root:/" + dst)
}
requestURL := client.getRequestURL("/me/drive/items/" + id)
res, err := client.requestWithStr(ctx, "GET", requestURL+"?expand=thumbnails", "", 200)
if err != nil {
return nil, err
@ -132,6 +139,89 @@ func (client *Client) GetUploadSessionStatus(ctx context.Context, uploadURL stri
return &uploadSession, nil
}
// UploadChunk 上传分片
func (client *Client) UploadChunk(ctx context.Context, uploadURL string, chunk *Chunk) (*UploadSessionResponse, error) {
res, err := client.request(
ctx, "PUT", uploadURL, chunk.Reader,
request.WithContentLength(int64(chunk.ChunkSize)),
request.WithHeader(http.Header{
"Content-Range": {fmt.Sprintf("bytes %d-%d/%d", chunk.Offset, chunk.Offset+chunk.ChunkSize-1, chunk.Total)},
}),
request.WithoutHeader([]string{"Authorization", "Content-Type"}),
)
if err != nil {
// 如果重试次数小于限制5秒后重试
if chunk.Retried < model.GetIntSetting("onedrive_chunk_retries", 1) {
chunk.Retried++
log.Debug("分片偏移%d上传失败5秒钟后重试", chunk.Offset)
time.Sleep(time.Duration(5) * time.Second)
return client.UploadChunk(ctx, uploadURL, chunk)
}
return nil, err
}
if chunk.IsLast() {
return nil, nil
}
var (
decodeErr error
uploadRes UploadSessionResponse
)
decodeErr = json.Unmarshal([]byte(res), &uploadRes)
if decodeErr != nil {
return nil, decodeErr
}
return &uploadRes, nil
}
// Upload 上传文件
func (client *Client) Upload(ctx context.Context, dst string, size int, file io.Reader) error {
// 小文件,使用简单上传接口上传
if size <= int(SmallFileSize) {
_, err := client.SimpleUpload(ctx, dst, file)
return err
}
// 大文件,进行分片
// 创建上传会话
uploadURL, err := client.CreateUploadSession(ctx, dst, WithConflictBehavior("replace"))
if err != nil {
return err
}
offset := 0
chunkNum := size / int(ChunkSize)
if size%int(ChunkSize) != 0 {
chunkNum++
}
for i := 0; i < chunkNum; i++ {
// 分块
// TODO 取消上传
chunkSize := int(ChunkSize)
if size-offset < chunkSize {
chunkSize = size - offset
}
chunk := Chunk{
Offset: offset,
ChunkSize: chunkSize,
Total: size,
Reader: &io.LimitedReader{
R: file,
N: int64(chunkSize),
},
}
// 上传
_, err := client.UploadChunk(ctx, uploadURL, &chunk)
if err != nil {
return err
}
offset += chunkSize
}
return nil
}
// DeleteUploadSession 删除上传会话
func (client *Client) DeleteUploadSession(ctx context.Context, uploadURL string) error {
_, err := client.requestWithStr(ctx, "DELETE", uploadURL, "", 204)
@ -142,12 +232,12 @@ func (client *Client) DeleteUploadSession(ctx context.Context, uploadURL string)
return nil
}
// PutFile 上传小文件到dst
func (client *Client) PutFile(ctx context.Context, dst string, body io.Reader) (*UploadResult, error) {
// SimpleUpload 上传小文件到dst
func (client *Client) SimpleUpload(ctx context.Context, dst string, body io.Reader) (*UploadResult, error) {
dst = strings.TrimPrefix(dst, "/")
requestURL := client.getRequestURL("me/drive/root:/" + dst + ":/content")
res, err := client.request(ctx, "PUT", requestURL, body, 201)
res, err := client.request(ctx, "PUT", requestURL, body)
if err != nil {
return nil, err
}
@ -263,7 +353,7 @@ func (client *Client) MonitorUpload(uploadURL, callbackKey, path string, size ui
case <-time.After(time.Duration(ttl) * time.Second):
// 上传会话到期,仍未完成上传,创建占位符
client.DeleteUploadSession(context.Background(), uploadURL)
_, err := client.PutFile(context.Background(), path, strings.NewReader(""))
_, err := client.SimpleUpload(context.Background(), path, strings.NewReader(""))
if err != nil {
util.Log().Debug("无法创建占位文件,%s", err)
}
@ -309,7 +399,7 @@ func (client *Client) MonitorUpload(uploadURL, callbackKey, path string, size ui
// 取消上传会话实测OneDrive取消上传会话后客户端还是可以上传
// 所以上传一个空文件占位,阻止客户端上传
client.DeleteUploadSession(context.Background(), uploadURL)
_, err := client.PutFile(context.Background(), path, strings.NewReader(""))
_, err := client.SimpleUpload(context.Background(), path, strings.NewReader(""))
if err != nil {
util.Log().Debug("无法创建占位文件,%s", err)
}
@ -336,7 +426,7 @@ func sysError(err error) *RespError {
}}
}
func (client *Client) request(ctx context.Context, method string, url string, body io.Reader, expectedCode int, option ...request.Option) (string, *RespError) {
func (client *Client) request(ctx context.Context, method string, url string, body io.Reader, option ...request.Option) (string, *RespError) {
// 获取凭证
err := client.UpdateCredential(ctx)
if err != nil {
@ -374,7 +464,7 @@ func (client *Client) request(ctx context.Context, method string, url string, bo
decodeErr error
)
// 如果有错误
if res.Response.StatusCode != expectedCode {
if res.Response.StatusCode < 200 && res.Response.StatusCode >= 300 {
decodeErr = json.Unmarshal([]byte(respBody), &errResp)
if decodeErr != nil {
return "", sysError(decodeErr)
@ -388,7 +478,7 @@ func (client *Client) request(ctx context.Context, method string, url string, bo
func (client *Client) requestWithStr(ctx context.Context, method string, url string, body string, expectedCode int) (string, *RespError) {
// 发送请求
bodyReader := ioutil.NopCloser(strings.NewReader(body))
return client.request(ctx, method, url, bodyReader, expectedCode,
return client.request(ctx, method, url, bodyReader,
request.WithContentLength(int64(len(body))),
)
}

@ -6,6 +6,7 @@ import (
model "github.com/HFO4/cloudreve/models"
"github.com/HFO4/cloudreve/pkg/filesystem/fsctx"
"github.com/HFO4/cloudreve/pkg/filesystem/response"
"github.com/HFO4/cloudreve/pkg/request"
"github.com/HFO4/cloudreve/pkg/serializer"
"io"
"net/url"
@ -13,20 +14,51 @@ import (
// Driver OneDrive 适配器
type Driver struct {
Policy *model.Policy
Client *Client
Policy *model.Policy
Client *Client
HTTPClient request.Client
}
// Get 获取文件
func (handler Driver) Get(ctx context.Context, path string) (response.RSCloser, error) {
return nil, errors.New("未实现")
// 获取文件源地址
downloadURL, err := handler.Source(
ctx,
path,
url.URL{},
int64(model.GetIntSetting("preview_timeout", 60)),
false,
0,
)
if err != nil {
return nil, err
}
// 获取文件数据流
resp, err := handler.HTTPClient.Request(
"GET",
downloadURL,
nil,
request.WithContext(ctx),
).CheckHTTPResponse(200).GetRSCloser()
if err != nil {
return nil, err
}
resp.SetFirstFakeChunk()
// 尝试自主获取文件大小
if file, ok := ctx.Value(fsctx.FileModelCtx).(model.File); ok {
resp.SetContentLength(int64(file.Size))
}
return resp, nil
}
// Put 将文件流保存到指定目录
func (handler Driver) Put(ctx context.Context, file io.ReadCloser, dst string, size uint64) error {
defer file.Close()
_, err := handler.Client.PutFile(ctx, dst, file)
return err
return handler.Client.Upload(ctx, dst, int(size), file)
}
// Delete 删除一个或多个文件,
@ -67,7 +99,11 @@ func (handler Driver) Source(
isDownload bool,
speed int,
) (string, error) {
return "", errors.New("未实现")
res, err := handler.Client.Meta(ctx, "", path)
if err == nil {
return res.DownloadURL, nil
}
return "", err
}
// Token 获取上传会话URL

@ -1,6 +1,7 @@
package onedrive
import (
"io"
"sync"
)
@ -29,6 +30,7 @@ type FileInfo struct {
Size uint64 `json:"size"`
Image imageInfo `json:"image"`
ParentReference parentReference `json:"parentReference"`
DownloadURL string `json:"@microsoft.graph.downloadUrl"`
}
type imageInfo struct {
@ -79,4 +81,18 @@ type ThumbResponse struct {
Value []map[string]interface{} `json:"value"`
}
// Chunk 文件分片
type Chunk struct {
Offset int
ChunkSize int
Total int
Retried int
Reader io.Reader
}
// IsLast 返回是否为最后一个分片
func (chunk *Chunk) IsLast() bool {
return chunk.Total-chunk.Offset == chunk.ChunkSize
}
var callbackSignal sync.Map

@ -186,8 +186,9 @@ func (fs *FileSystem) DispatchHandler() error {
case "onedrive":
client, err := onedrive.NewClient(currentPolicy)
fs.Handler = onedrive.Driver{
Policy: currentPolicy,
Client: client,
Policy: currentPolicy,
Client: client,
HTTPClient: request.HTTPClient{},
}
return err
default:

@ -85,7 +85,19 @@ func WithCredential(instance auth.Auth, ttl int64) Option {
// WithHeader 设置请求Header
func WithHeader(header http.Header) Option {
return optionFunc(func(o *options) {
o.header = header
for k, v := range header {
o.header[k] = v
}
})
}
// WithoutHeader 设置清除请求Header
func WithoutHeader(header []string) Option {
return optionFunc(func(o *options) {
for _, v := range header {
delete(o.header, v)
}
})
}

@ -280,7 +280,7 @@ loop:
// The file doesn't implement the optional DeadPropsHolder interface, so
// all patches are forbidden.
pstat := Propstat{Status: http.StatusForbidden}
pstat := Propstat{Status: http.StatusOK}
for _, patch := range patches {
for _, p := range patch.Props {
pstat.Props = append(pstat.Props, Property{XMLName: p.XMLName})

@ -160,7 +160,7 @@ func (service *OneDriveCallback) PreProcess(c *gin.Context) serializer.Response
callbackSession := callbackSessionRaw.(*serializer.UploadSession)
// 获取文件信息
info, err := fs.Handler.(onedrive.Driver).Client.Meta(context.Background(), service.ID)
info, err := fs.Handler.(onedrive.Driver).Client.Meta(context.Background(), service.ID, "")
if err != nil {
return serializer.Err(serializer.CodeUploadFailed, "文件元信息查询失败", err)
}

Loading…
Cancel
Save