From effbc8607ecf792abf9d2961fdd191ae2300c54f Mon Sep 17 00:00:00 2001 From: HFO4 <912394456@qq.com> Date: Sun, 13 Mar 2022 16:17:20 +0800 Subject: [PATCH] Refactor: use chunk manager to manage resume upload in server side --- .../chunk/backoff}/backoff.go | 9 +- pkg/filesystem/chunk/chunk.go | 91 +++++++++++++++++++ pkg/filesystem/driver/remote/client.go | 43 +++------ pkg/retry/chunk.go | 29 ------ 4 files changed, 112 insertions(+), 60 deletions(-) rename pkg/{retry => filesystem/chunk/backoff}/backoff.go (74%) create mode 100644 pkg/filesystem/chunk/chunk.go delete mode 100644 pkg/retry/chunk.go diff --git a/pkg/retry/backoff.go b/pkg/filesystem/chunk/backoff/backoff.go similarity index 74% rename from pkg/retry/backoff.go rename to pkg/filesystem/chunk/backoff/backoff.go index a64c7c7..0b395f1 100644 --- a/pkg/retry/backoff.go +++ b/pkg/filesystem/chunk/backoff/backoff.go @@ -1,10 +1,11 @@ -package retry +package backoff import "time" // Backoff used for retry sleep backoff type Backoff interface { Next() bool + Reset() } // ConstantBackoff implements Backoff interface with constant sleep time @@ -15,7 +16,7 @@ type ConstantBackoff struct { tried int } -func (c ConstantBackoff) Next() bool { +func (c *ConstantBackoff) Next() bool { c.tried++ if c.tried >= c.Max { return false @@ -24,3 +25,7 @@ func (c ConstantBackoff) Next() bool { time.Sleep(c.Sleep) return true } + +func (c *ConstantBackoff) Reset() { + c.tried = 0 +} diff --git a/pkg/filesystem/chunk/chunk.go b/pkg/filesystem/chunk/chunk.go new file mode 100644 index 0000000..dc98fb4 --- /dev/null +++ b/pkg/filesystem/chunk/chunk.go @@ -0,0 +1,91 @@ +package chunk + +import ( + "context" + "fmt" + "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/chunk/backoff" + "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx" + "github.com/cloudreve/Cloudreve/v3/pkg/util" + "io" +) + +// ChunkProcessFunc callback function for processing a chunk +type ChunkProcessFunc func(c *ChunkGroup, chunk io.Reader) error + +// ChunkGroup manage groups of chunks +type ChunkGroup struct { + file fsctx.FileHeader + chunkSize uint64 + backoff backoff.Backoff + + fileInfo *fsctx.UploadTaskInfo + currentIndex int + chunkNum uint64 +} + +func NewChunkGroup(file fsctx.FileHeader, chunkSize uint64, backoff backoff.Backoff) *ChunkGroup { + c := &ChunkGroup{ + file: file, + chunkSize: chunkSize, + backoff: backoff, + fileInfo: file.Info(), + currentIndex: -1, + } + + if c.chunkSize == 0 { + c.chunkSize = c.fileInfo.Size + } + + c.chunkNum = c.fileInfo.Size / c.chunkSize + if c.fileInfo.Size%c.chunkSize != 0 || c.fileInfo.Size == 0 { + c.chunkNum++ + } + + return c +} + +// Process a chunk with retry logic +func (c *ChunkGroup) Process(processor ChunkProcessFunc) error { + err := processor(c, io.LimitReader(c.file, int64(c.chunkSize))) + if err != nil { + if err != context.Canceled && c.file.Seekable() && c.backoff.Next() { + if _, seekErr := c.file.Seek(c.Start(), io.SeekStart); err != nil { + return fmt.Errorf("failed to seek back to chunk start: %w, last error: %w", seekErr, err) + } + + util.Log().Debug("Retrying chunk %d, last error: %s", c.currentIndex, err) + return c.Process(processor) + } + + return err + } + + return nil +} + +// Start returns the byte index of current chunk +func (c *ChunkGroup) Start() int64 { + return int64(uint64(c.Index()) * c.chunkSize) +} + +// Index returns current chunk index, starts from 0 +func (c *ChunkGroup) Index() int { + return c.currentIndex +} + +// Next switch to next chunk, returns whether all chunks are processed +func (c *ChunkGroup) Next() bool { + c.currentIndex++ + c.backoff.Reset() + return c.currentIndex < int(c.chunkNum) +} + +// Length returns the length of current chunk +func (c *ChunkGroup) Length() int64 { + contentLength := c.chunkSize + if c.Index() == int(c.chunkNum-1) { + contentLength = c.fileInfo.Size - c.chunkSize*(c.chunkNum-1) + } + + return int64(contentLength) +} diff --git a/pkg/filesystem/driver/remote/client.go b/pkg/filesystem/driver/remote/client.go index 073ae40..f052337 100644 --- a/pkg/filesystem/driver/remote/client.go +++ b/pkg/filesystem/driver/remote/client.go @@ -6,9 +6,10 @@ import ( "fmt" model "github.com/cloudreve/Cloudreve/v3/models" "github.com/cloudreve/Cloudreve/v3/pkg/auth" + "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/chunk" + "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/chunk/backoff" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx" "github.com/cloudreve/Cloudreve/v3/pkg/request" - "github.com/cloudreve/Cloudreve/v3/pkg/retry" "github.com/cloudreve/Cloudreve/v3/pkg/serializer" "github.com/gofrs/uuid" "io" @@ -84,36 +85,20 @@ func (c *remoteClient) Upload(ctx context.Context, file fsctx.FileHeader) error overwrite := fileInfo.Mode&fsctx.Overwrite == fsctx.Overwrite // Upload chunks - offset := uint64(0) - chunkSize := session.Policy.OptionsSerialized.ChunkSize - if chunkSize == 0 { - chunkSize = fileInfo.Size - } + chunks := chunk.NewChunkGroup(file, c.policy.OptionsSerialized.ChunkSize, &backoff.ConstantBackoff{ + Max: model.GetIntSetting("onedrive_chunk_retries", 1), + Sleep: chunkRetrySleep, + }) - chunkNum := fileInfo.Size / chunkSize - if fileInfo.Size%chunkSize != 0 { - chunkNum++ + uploadFunc := func(current *chunk.ChunkGroup, content io.Reader) error { + return c.uploadChunk(ctx, session.Key, current.Index(), content, overwrite, current.Length()) } - for i := 0; i < int(chunkNum); i++ { - uploadFunc := func(index int, chunk io.Reader) error { - contentLength := chunkSize - if index == int(chunkNum-1) { - contentLength = fileInfo.Size - chunkSize*(chunkNum-1) - } - - return c.uploadChunk(ctx, session.Key, index, chunk, overwrite, contentLength) - } - - if err := retry.Chunk(i, chunkSize, file, uploadFunc, retry.ConstantBackoff{ - Max: model.GetIntSetting("onedrive_chunk_retries", 1), - Sleep: chunkRetrySleep, - }); err != nil { + for chunks.Next() { + if err := chunks.Process(uploadFunc); err != nil { // TODO 删除上传会话 - return fmt.Errorf("failed to upload chunk #%d: %w", i, err) + return fmt.Errorf("failed to upload chunk #%d: %w", chunks.Index(), err) } - - offset += chunkSize } return nil @@ -162,14 +147,14 @@ func (c *remoteClient) GetUploadURL(ttl int64, sessionID string) (string, string return req.URL.String(), req.Header["Authorization"][0], nil } -func (c *remoteClient) uploadChunk(ctx context.Context, sessionID string, index int, chunk io.Reader, overwrite bool, size uint64) error { +func (c *remoteClient) uploadChunk(ctx context.Context, sessionID string, index int, chunk io.Reader, overwrite bool, size int64) error { resp, err := c.httpClient.Request( "POST", fmt.Sprintf("upload/%s?chunk=%d", sessionID, index), - io.LimitReader(chunk, int64(size)), + chunk, request.WithContext(ctx), request.WithTimeout(time.Duration(0)), - request.WithContentLength(int64(size)), + request.WithContentLength(size), request.WithHeader(map[string][]string{OverwriteHeader: {fmt.Sprintf("%t", overwrite)}}), ).CheckHTTPResponse(200).DecodeResponse() if err != nil { diff --git a/pkg/retry/chunk.go b/pkg/retry/chunk.go deleted file mode 100644 index f8ce75a..0000000 --- a/pkg/retry/chunk.go +++ /dev/null @@ -1,29 +0,0 @@ -package retry - -import ( - "context" - "fmt" - "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx" - "github.com/cloudreve/Cloudreve/v3/pkg/util" - "io" -) - -type ChunkProcessFunc func(index int, chunk io.Reader) error - -func Chunk(index int, chunkSize uint64, file fsctx.FileHeader, processor ChunkProcessFunc, backoff Backoff) error { - err := processor(index, file) - if err != nil { - if err != context.Canceled && file.Seekable() && backoff.Next() { - if _, seekErr := file.Seek(int64(uint64(index)*chunkSize), io.SeekStart); err != nil { - return fmt.Errorf("failed to seek back to chunk start: %w, last error: %w", seekErr, err) - } - - util.Log().Debug("Retrying chunk %d, last error: %s", index, err) - return Chunk(index, chunkSize, file, processor, backoff) - } - - return err - } - - return nil -}