From 015ccd502606c1ba22a8a3178cd9896b3a7458e4 Mon Sep 17 00:00:00 2001 From: HFO4 <912394456@qq.com> Date: Sun, 20 Mar 2022 11:20:09 +0800 Subject: [PATCH] Feat: use new ChunkManager for OneDrive API client --- pkg/filesystem/chunk/chunk.go | 16 ++++++ pkg/filesystem/driver/onedrive/api.go | 77 +++++++++------------------ 2 files changed, 41 insertions(+), 52 deletions(-) diff --git a/pkg/filesystem/chunk/chunk.go b/pkg/filesystem/chunk/chunk.go index e8a63f7..5eb2893 100644 --- a/pkg/filesystem/chunk/chunk.go +++ b/pkg/filesystem/chunk/chunk.go @@ -60,6 +60,7 @@ func (c *ChunkGroup) Process(processor ChunkProcessFunc) error { return err } + util.Log().Debug("Chunk %d processed", c.currentIndex) return nil } @@ -68,6 +69,16 @@ func (c *ChunkGroup) Start() int64 { return int64(uint64(c.Index()) * c.chunkSize) } +// Total returns the total length current chunk +func (c *ChunkGroup) Total() int64 { + return int64(c.fileInfo.Size) +} + +// RangeHeader returns header value of Content-Range +func (c *ChunkGroup) RangeHeader() string { + return fmt.Sprintf("bytes %d-%d/%d", c.Start(), c.Start()+c.Length()-1, c.Total()) +} + // Index returns current chunk index, starts from 0 func (c *ChunkGroup) Index() int { return c.currentIndex @@ -89,3 +100,8 @@ func (c *ChunkGroup) Length() int64 { return int64(contentLength) } + +// IsLast returns if current chunk is the last one +func (c *ChunkGroup) IsLast() bool { + return c.Index() == int(c.chunkNum-1) +} diff --git a/pkg/filesystem/driver/onedrive/api.go b/pkg/filesystem/driver/onedrive/api.go index 1b6d9e1..2aa49de 100644 --- a/pkg/filesystem/driver/onedrive/api.go +++ b/pkg/filesystem/driver/onedrive/api.go @@ -1,7 +1,6 @@ package onedrive import ( - "bytes" "context" "encoding/json" "errors" @@ -18,6 +17,8 @@ import ( model "github.com/cloudreve/Cloudreve/v3/models" "github.com/cloudreve/Cloudreve/v3/pkg/cache" + "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/chunk" + "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/chunk/backoff" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx" "github.com/cloudreve/Cloudreve/v3/pkg/mq" "github.com/cloudreve/Cloudreve/v3/pkg/request" @@ -30,7 +31,8 @@ const ( // ChunkSize 服务端中转分片上传分片大小 ChunkSize uint64 = 10 * 1024 * 1024 // ListRetry 列取请求重试次数 - ListRetry = 1 + ListRetry = 1 + chunkRetrySleep = time.Second * 5 ) // GetSourcePath 获取文件的绝对路径 @@ -220,28 +222,21 @@ func (client *Client) GetUploadSessionStatus(ctx context.Context, uploadURL stri } // UploadChunk 上传分片 -func (client *Client) UploadChunk(ctx context.Context, uploadURL string, chunk *Chunk) (*UploadSessionResponse, error) { +func (client *Client) UploadChunk(ctx context.Context, uploadURL string, content io.Reader, current *chunk.ChunkGroup) (*UploadSessionResponse, error) { res, err := client.request( - ctx, "PUT", uploadURL, bytes.NewReader(chunk.Data[0:chunk.ChunkSize]), - request.WithContentLength(int64(chunk.ChunkSize)), + ctx, "PUT", uploadURL, content, + request.WithContentLength(current.Length()), request.WithHeader(http.Header{ - "Content-Range": {fmt.Sprintf("bytes %d-%d/%d", chunk.Offset, chunk.Offset+chunk.ChunkSize-1, chunk.Total)}, + "Content-Range": {current.RangeHeader()}, }), request.WithoutHeader([]string{"Authorization", "Content-Type"}), request.WithTimeout(time.Duration(300)*time.Second), ) if err != nil { - // 如果重试次数小于限制,5秒后重试 - if chunk.Retried < model.GetIntSetting("onedrive_chunk_retries", 1) { - chunk.Retried++ - util.Log().Debug("分片偏移%d上传失败[%s],5秒钟后重试", chunk.Offset, err) - time.Sleep(time.Duration(5) * time.Second) - return client.UploadChunk(ctx, uploadURL, chunk) - } - return nil, err + return nil, fmt.Errorf("failed to upload OneDrive chunk #%d: %w", current.Index(), err) } - if chunk.IsLast() { + if current.IsLast() { return nil, nil } @@ -282,46 +277,24 @@ func (client *Client) Upload(ctx context.Context, file fsctx.FileHeader) error { return err } - offset := 0 - chunkNum := size / int(ChunkSize) - if size%int(ChunkSize) != 0 { - chunkNum++ - } - - chunkData := make([]byte, ChunkSize) - - for i := 0; i < chunkNum; i++ { - select { - case <-ctx.Done(): - util.Log().Debug("OneDrive 客户端取消") - return ErrClientCanceled - default: - // 分块 - chunkSize := int(ChunkSize) - if size-offset < chunkSize { - chunkSize = size - offset - } - - // 因为后面需要错误重试,这里要把分片内容读到内存中 - chunkContent := chunkData[:chunkSize] - _, err := io.ReadFull(file, chunkContent) + // Initial chunk groups + chunks := chunk.NewChunkGroup(file, client.Policy.OptionsSerialized.ChunkSize, &backoff.ConstantBackoff{ + Max: model.GetIntSetting("onedrive_chunk_retries", 5), + Sleep: chunkRetrySleep, + }) - chunk := Chunk{ - Offset: offset, - ChunkSize: chunkSize, - Total: size, - Data: chunkContent, - } + uploadFunc := func(current *chunk.ChunkGroup, content io.Reader) error { + _, err := client.UploadChunk(ctx, uploadURL, content, current) + return err + } - // 上传 - _, err = client.UploadChunk(ctx, uploadURL, &chunk) - if err != nil { - return err - } - offset += chunkSize + // upload chunks + for chunks.Next() { + if err := chunks.Process(uploadFunc); err != nil { + return fmt.Errorf("failed to upload chunk #%d: %w", chunks.Index(), err) } - } + return nil } @@ -354,7 +327,7 @@ func (client *Client) SimpleUpload(ctx context.Context, dst string, body io.Read if v, ok := ctx.Value(fsctx.RetryCtx).(int); ok { retried = v } - if retried < model.GetIntSetting("onedrive_chunk_retries", 1) { + if retried < model.GetIntSetting("onedrive_chunk_retries", 5) { retried++ util.Log().Debug("文件[%s]上传失败[%s],5秒钟后重试", dst, err) time.Sleep(time.Duration(5) * time.Second)