From 5fb9e946fca0d17cd3d372fe23753ad6685ffe66 Mon Sep 17 00:00:00 2001 From: withchao <48119764+withchao@users.noreply.github.com> Date: Thu, 14 Sep 2023 14:38:07 +0800 Subject: [PATCH] feat: url to im s3 (#1067) * fix: repeated modification session notification * fix: repeated modification session notification * fix: jpush return a nil pointer panic * fix: push redis pkg * fix: OANotification * feat: add rpc GetConversationNeedOfflinePushUserIDs * update pkg * cicd: robot automated Change * offlinePushMsg * conversation * conversation * cicd: robot automated Change * conversation * cicd: robot automated Change * conversation * url 2 im s3 * url 2 im s3 * cicd: robot automated Change * url 2 im s3 --------- Co-authored-by: withchao --- go.work | 1 + tools/url2im/go.mod | 20 ++ tools/url2im/go.sum | 33 +++ tools/url2im/main.go | 84 ++++++++ tools/url2im/pkg/api.go | 112 ++++++++++ tools/url2im/pkg/buffer.go | 96 +++++++++ tools/url2im/pkg/config.go | 16 ++ tools/url2im/pkg/http.go | 7 + tools/url2im/pkg/manage.go | 385 +++++++++++++++++++++++++++++++++++ tools/url2im/pkg/md5.go | 29 +++ tools/url2im/pkg/progress.go | 41 ++++ 11 files changed, 824 insertions(+) create mode 100644 tools/url2im/go.mod create mode 100644 tools/url2im/go.sum create mode 100644 tools/url2im/main.go create mode 100644 tools/url2im/pkg/api.go create mode 100644 tools/url2im/pkg/buffer.go create mode 100644 tools/url2im/pkg/config.go create mode 100644 tools/url2im/pkg/http.go create mode 100644 tools/url2im/pkg/manage.go create mode 100644 tools/url2im/pkg/md5.go create mode 100644 tools/url2im/pkg/progress.go diff --git a/go.work b/go.work index fd19bde42..f01eace52 100644 --- a/go.work +++ b/go.work @@ -11,4 +11,5 @@ use ( ./tools/versionchecker ./tools/yamlfmt ./tools/component + ./tools/url2im ) diff --git a/tools/url2im/go.mod b/tools/url2im/go.mod new file mode 100644 index 000000000..b1d0a5bac --- /dev/null +++ b/tools/url2im/go.mod @@ -0,0 +1,20 @@ +module github.com/openimsdk/open-im-server/v3/tools/url2im + +go 1.20 + +require ( + github.com/OpenIMSDK/protocol v0.0.21 + github.com/kelindar/bitmap v1.5.1 +) + +require ( + github.com/golang/protobuf v1.5.3 // indirect + github.com/kelindar/simd v1.1.2 // indirect + github.com/klauspost/cpuid/v2 v2.2.4 // indirect + golang.org/x/net v0.9.0 // indirect + golang.org/x/sys v0.7.0 // indirect + golang.org/x/text v0.9.0 // indirect + google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect + google.golang.org/grpc v1.56.2 // indirect + google.golang.org/protobuf v1.31.0 // indirect +) diff --git a/tools/url2im/go.sum b/tools/url2im/go.sum new file mode 100644 index 000000000..9985d4828 --- /dev/null +++ b/tools/url2im/go.sum @@ -0,0 +1,33 @@ +github.com/OpenIMSDK/protocol v0.0.21 h1:5H6H+hJ9d/VgRqttvxD/zfK9Asd+4M8Eknk5swSbUVY= +github.com/OpenIMSDK/protocol v0.0.21/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/kelindar/bitmap v1.5.1 h1:+ZmZdwHbJ+CGE+q/aAJ74KJSnp0vOlGD7KY5x51mVzk= +github.com/kelindar/bitmap v1.5.1/go.mod h1:j3qZjxH9s4OtvsnFTP2bmPkjqil9Y2xQlxPYHexasEA= +github.com/kelindar/simd v1.1.2 h1:KduKb+M9cMY2HIH8S/cdJyD+5n5EGgq+Aeeleos55To= +github.com/kelindar/simd v1.1.2/go.mod h1:inq4DFudC7W8L5fhxoeZflLRNpWSs0GNx6MlWFvuvr0= +github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk= +github.com/klauspost/cpuid/v2 v2.2.4/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= +golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= +golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= +golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= +golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A= +google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU= +google.golang.org/grpc v1.56.2 h1:fVRFRnXvU+x6C4IlHZewvJOVHoOv1TUuQyoRsYnB4bI= +google.golang.org/grpc v1.56.2/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpXqQ9s= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/tools/url2im/main.go b/tools/url2im/main.go new file mode 100644 index 000000000..ee159b5e8 --- /dev/null +++ b/tools/url2im/main.go @@ -0,0 +1,84 @@ +package main + +import ( + "flag" + "log" + "os" + "path/filepath" + "time" + + "github.com/openimsdk/open-im-server/v3/tools/url2im/pkg" +) + +/*take.txt +{"url":"http://xxx/xxxx","name":"xxxx","contentType":"image/jpeg"} +{"url":"http://xxx/xxxx","name":"xxxx","contentType":"image/jpeg"} +{"url":"http://xxx/xxxx","name":"xxxx","contentType":"image/jpeg"} +*/ + +func main() { + var conf pkg.Config // 后面带*的为必填项 + flag.StringVar(&conf.TaskPath, "task", "take.txt", "task path") // 任务日志文件* + flag.StringVar(&conf.ProgressPath, "progress", "", "progress path") // 进度日志文件 + flag.IntVar(&conf.Concurrency, "concurrency", 1, "concurrency num") // 并发数 + flag.IntVar(&conf.Retry, "retry", 1, "retry num") // 重试次数 + flag.StringVar(&conf.TempDir, "temp", "", "temp dir") // 临时文件夹 + flag.Int64Var(&conf.CacheSize, "cache", 1024*1024*100, "cache size") // 缓存大小(超过时,下载到磁盘) + flag.Int64Var((*int64)(&conf.Timeout), "timeout", 5000, "timeout") // 请求超时时间(毫秒) + flag.StringVar(&conf.Api, "api", "http://127.0.0.1:10002", "api") // im地址* + flag.StringVar(&conf.UserID, "userID", "openIM123456", "userID") // im管理员 + flag.StringVar(&conf.Secret, "secret", "openIM123", "secret") // im config secret + flag.Parse() + if !filepath.IsAbs(conf.TaskPath) { + var err error + conf.TaskPath, err = filepath.Abs(conf.TaskPath) + if err != nil { + log.Println("get abs path err:", err) + return + } + } + if conf.ProgressPath == "" { + conf.ProgressPath = conf.TaskPath + ".progress.txt" + } else if !filepath.IsAbs(conf.ProgressPath) { + var err error + conf.ProgressPath, err = filepath.Abs(conf.ProgressPath) + if err != nil { + log.Println("get abs path err:", err) + return + } + } + if conf.TempDir == "" { + conf.TempDir = conf.TaskPath + ".temp" + } + if info, err := os.Stat(conf.TempDir); err == nil { + if !info.IsDir() { + log.Printf("temp dir %s is not dir\n", err) + return + } + } else if os.IsNotExist(err) { + if err := os.MkdirAll(conf.TempDir, os.ModePerm); err != nil { + log.Printf("mkdir temp dir %s err %+v\n", conf.TempDir, err) + return + } + defer os.RemoveAll(conf.TempDir) + } else { + log.Println("get temp dir err:", err) + return + } + if conf.Concurrency <= 0 { + conf.Concurrency = 1 + } + if conf.Retry <= 0 { + conf.Retry = 1 + } + if conf.CacheSize <= 0 { + conf.CacheSize = 1024 * 1024 * 100 // 100M + } + if conf.Timeout <= 0 { + conf.Timeout = 5000 + } + conf.Timeout = conf.Timeout * time.Millisecond + if err := pkg.Run(conf); err != nil { + log.Println("main err:", err) + } +} diff --git a/tools/url2im/pkg/api.go b/tools/url2im/pkg/api.go new file mode 100644 index 000000000..1fc3813bb --- /dev/null +++ b/tools/url2im/pkg/api.go @@ -0,0 +1,112 @@ +package pkg + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + + "github.com/OpenIMSDK/protocol/auth" + "github.com/OpenIMSDK/protocol/constant" + "github.com/OpenIMSDK/protocol/third" +) + +type Api struct { + Api string + UserID string + Secret string + Token string + Client *http.Client +} + +func (a *Api) apiPost(ctx context.Context, path string, req any, resp any) error { + operationID, _ := ctx.Value("operationID").(string) + if operationID == "" { + return errors.New("call api operationID is empty") + } + reqBody, err := json.Marshal(req) + if err != nil { + return err + } + request, err := http.NewRequestWithContext(ctx, http.MethodPost, a.Api+path, bytes.NewReader(reqBody)) + if err != nil { + return err + } + DefaultRequestHeader(request.Header) + request.ContentLength = int64(len(reqBody)) + request.Header.Set("Content-Type", "application/json") + request.Header.Set("operationID", operationID) + if a.Token != "" { + request.Header.Set("token", a.Token) + } + response, err := a.Client.Do(request) + if err != nil { + return err + } + defer response.Body.Close() + body, err := io.ReadAll(response.Body) + if err != nil { + return err + } + if response.StatusCode != http.StatusOK { + return fmt.Errorf("api %s status %s body %s", path, response.Status, body) + } + var baseResponse struct { + ErrCode int `json:"errCode"` + ErrMsg string `json:"errMsg"` + ErrDlt string `json:"errDlt"` + Data json.RawMessage `json:"data"` + } + if err := json.Unmarshal(body, &baseResponse); err != nil { + return err + } + if baseResponse.ErrCode != 0 { + return fmt.Errorf("api %s errCode %d errMsg %s errDlt %s", path, baseResponse.ErrCode, baseResponse.ErrMsg, baseResponse.ErrDlt) + } + if resp != nil { + if err := json.Unmarshal(baseResponse.Data, resp); err != nil { + return err + } + } + return nil +} + +func (a *Api) GetToken(ctx context.Context) (string, error) { + req := auth.UserTokenReq{ + UserID: a.UserID, + Secret: a.Secret, + PlatformID: constant.AdminPlatformID, + } + var resp auth.UserTokenResp + if err := a.apiPost(ctx, "/auth/user_token", &req, &resp); err != nil { + return "", err + } + return resp.Token, nil +} + +func (a *Api) GetPartLimit(ctx context.Context) (*third.PartLimitResp, error) { + var resp third.PartLimitResp + if err := a.apiPost(ctx, "/object/part_limit", &third.PartLimitReq{}, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (a *Api) InitiateMultipartUpload(ctx context.Context, req *third.InitiateMultipartUploadReq) (*third.InitiateMultipartUploadResp, error) { + var resp third.InitiateMultipartUploadResp + if err := a.apiPost(ctx, "/object/initiate_multipart_upload", req, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (a *Api) CompleteMultipartUpload(ctx context.Context, req *third.CompleteMultipartUploadReq) (string, error) { + var resp third.CompleteMultipartUploadResp + if err := a.apiPost(ctx, "/object/complete_multipart_upload", req, &resp); err != nil { + return "", err + } + return resp.Url, nil +} diff --git a/tools/url2im/pkg/buffer.go b/tools/url2im/pkg/buffer.go new file mode 100644 index 000000000..008400926 --- /dev/null +++ b/tools/url2im/pkg/buffer.go @@ -0,0 +1,96 @@ +package pkg + +import ( + "bytes" + "io" + "os" +) + +type ReadSeekSizeCloser interface { + io.ReadSeekCloser + Size() int64 +} + +func NewReader(r io.Reader, max int64, path string) (ReadSeekSizeCloser, error) { + buf := make([]byte, max+1) + n, err := io.ReadFull(r, buf) + if err == nil { + f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0666) + if err != nil { + return nil, err + } + var ok bool + defer func() { + if !ok { + _ = f.Close() + _ = os.Remove(path) + } + }() + if _, err := f.Write(buf[:n]); err != nil { + return nil, err + } + cn, err := io.Copy(f, r) + if err != nil { + return nil, err + } + if _, err := f.Seek(0, io.SeekStart); err != nil { + return nil, err + } + ok = true + return &fileBuffer{ + f: f, + n: cn + int64(n), + }, nil + } else if err == io.EOF || err == io.ErrUnexpectedEOF { + return &memoryBuffer{ + r: bytes.NewReader(buf[:n]), + }, nil + } else { + return nil, err + } +} + +type fileBuffer struct { + n int64 + f *os.File +} + +func (r *fileBuffer) Read(p []byte) (n int, err error) { + return r.f.Read(p) +} + +func (r *fileBuffer) Seek(offset int64, whence int) (int64, error) { + return r.f.Seek(offset, whence) +} + +func (r *fileBuffer) Size() int64 { + return r.n +} + +func (r *fileBuffer) Close() error { + name := r.f.Name() + if err := r.f.Close(); err != nil { + return err + } + return os.Remove(name) +} + +type memoryBuffer struct { + r *bytes.Reader +} + +func (r *memoryBuffer) Read(p []byte) (n int, err error) { + return r.r.Read(p) +} + +func (r *memoryBuffer) Seek(offset int64, whence int) (int64, error) { + return r.r.Seek(offset, whence) +} + +func (r *memoryBuffer) Close() error { + return nil +} + +func (r *memoryBuffer) Size() int64 { + return r.r.Size() +} diff --git a/tools/url2im/pkg/config.go b/tools/url2im/pkg/config.go new file mode 100644 index 000000000..020395262 --- /dev/null +++ b/tools/url2im/pkg/config.go @@ -0,0 +1,16 @@ +package pkg + +import "time" + +type Config struct { + TaskPath string + ProgressPath string + Concurrency int + Retry int + Timeout time.Duration + Api string + UserID string + Secret string + TempDir string + CacheSize int64 +} diff --git a/tools/url2im/pkg/http.go b/tools/url2im/pkg/http.go new file mode 100644 index 000000000..32e128524 --- /dev/null +++ b/tools/url2im/pkg/http.go @@ -0,0 +1,7 @@ +package pkg + +import "net/http" + +func DefaultRequestHeader(header http.Header) { + header.Set("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36") +} diff --git a/tools/url2im/pkg/manage.go b/tools/url2im/pkg/manage.go new file mode 100644 index 000000000..a68078f85 --- /dev/null +++ b/tools/url2im/pkg/manage.go @@ -0,0 +1,385 @@ +package pkg + +import ( + "bufio" + "context" + "crypto/md5" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "io" + "log" + "net/http" + "net/url" + "os" + "path/filepath" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/OpenIMSDK/protocol/third" +) + +type Upload struct { + URL string `json:"url"` + Name string `json:"name"` + ContentType string `json:"contentType"` +} + +type Task struct { + Index int + Upload Upload +} + +type PartInfo struct { + ContentType string + PartSize int64 + PartNum int + FileMd5 string + PartMd5 string + PartSizes []int64 + PartMd5s []string +} + +func Run(conf Config) error { + m := &Manage{ + prefix: time.Now().Format("20060102150405"), + conf: &conf, + ctx: context.Background(), + } + return m.Run() +} + +type Manage struct { + conf *Config + ctx context.Context + api *Api + partLimit *third.PartLimitResp + prefix string + tasks chan Task + id uint64 + success int64 + failed int64 +} + +func (m *Manage) tempFilePath() string { + return filepath.Join(m.conf.TempDir, fmt.Sprintf("%s_%d", m.prefix, atomic.AddUint64(&m.id, 1))) +} + +func (m *Manage) Run() error { + defer func(start time.Time) { + log.Printf("run time %s\n", time.Since(start)) + }(time.Now()) + m.api = &Api{ + Api: m.conf.Api, + UserID: m.conf.UserID, + Secret: m.conf.Secret, + Client: &http.Client{Timeout: m.conf.Timeout}, + } + var err error + ctx := context.WithValue(m.ctx, "operationID", fmt.Sprintf("%s_init", m.prefix)) + m.api.Token, err = m.api.GetToken(ctx) + if err != nil { + return err + } + m.partLimit, err = m.api.GetPartLimit(ctx) + if err != nil { + return err + } + progress, err := ReadProgress(m.conf.ProgressPath) + if err != nil { + return err + } + progressFile, err := os.OpenFile(m.conf.ProgressPath, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) + if err != nil { + return err + } + var mutex sync.Mutex + writeSuccessIndex := func(index int) { + mutex.Lock() + defer mutex.Unlock() + if _, err := progressFile.Write([]byte(strconv.Itoa(index) + "\n")); err != nil { + log.Printf("write progress err: %v\n", err) + } + } + file, err := os.Open(m.conf.TaskPath) + if err != nil { + return err + } + m.tasks = make(chan Task, m.conf.Concurrency*2) + go func() { + defer file.Close() + defer close(m.tasks) + scanner := bufio.NewScanner(file) + var ( + index int + num int + ) + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if line == "" { + continue + } + index++ + if progress.IsUploaded(index) { + log.Printf("index: %d already uploaded %s\n", index, line) + continue + } + var upload Upload + if err := json.Unmarshal([]byte(line), &upload); err != nil { + log.Printf("index: %d json.Unmarshal(%s) err: %v", index, line, err) + continue + } + num++ + m.tasks <- Task{ + Index: index, + Upload: upload, + } + } + if num == 0 { + log.Println("mark all completed") + } + }() + var wg sync.WaitGroup + wg.Add(m.conf.Concurrency) + for i := 0; i < m.conf.Concurrency; i++ { + go func(tid int) { + defer wg.Done() + for task := range m.tasks { + var success bool + for n := 0; n < m.conf.Retry; n++ { + ctx := context.WithValue(m.ctx, "operationID", fmt.Sprintf("%s_%d_%d_%d", m.prefix, tid, task.Index, n+1)) + if urlRaw, err := m.RunTask(ctx, task); err == nil { + writeSuccessIndex(task.Index) + log.Println("index:", task.Index, "upload success", "urlRaw", urlRaw) + success = true + break + } else { + log.Printf("index: %d upload: %+v err: %v", task.Index, task.Upload, err) + } + } + if success { + atomic.AddInt64(&m.success, 1) + } else { + atomic.AddInt64(&m.failed, 1) + log.Printf("index: %d upload: %+v failed", task.Index, task.Upload) + } + } + }(i + 1) + } + wg.Wait() + log.Printf("execution completed success %d failed %d\n", m.success, m.failed) + return nil +} + +func (m *Manage) RunTask(ctx context.Context, task Task) (string, error) { + resp, err := m.HttpGet(ctx, task.Upload.URL) + if err != nil { + return "", err + } + defer resp.Body.Close() + reader, err := NewReader(resp.Body, m.conf.CacheSize, m.tempFilePath()) + if err != nil { + return "", err + } + defer reader.Close() + part, err := m.getPartInfo(ctx, reader, reader.Size()) + if err != nil { + return "", err + } + var contentType string + if task.Upload.ContentType == "" { + contentType = part.ContentType + } else { + contentType = task.Upload.ContentType + } + initiateMultipartUploadResp, err := m.api.InitiateMultipartUpload(ctx, &third.InitiateMultipartUploadReq{ + Hash: part.PartMd5, + Size: reader.Size(), + PartSize: part.PartSize, + MaxParts: -1, + Cause: "batch-import", + Name: task.Upload.Name, + ContentType: contentType, + }) + if err != nil { + return "", err + } + if initiateMultipartUploadResp.Upload == nil { + return initiateMultipartUploadResp.Url, nil + } + if _, err := reader.Seek(0, io.SeekStart); err != nil { + return "", err + } + uploadParts := make([]*third.SignPart, part.PartNum) + for _, part := range initiateMultipartUploadResp.Upload.Sign.Parts { + uploadParts[part.PartNumber-1] = part + } + for i, currentPartSize := range part.PartSizes { + md5Reader := NewMd5Reader(io.LimitReader(reader, currentPartSize)) + if m.doPut(ctx, m.api.Client, initiateMultipartUploadResp.Upload.Sign, uploadParts[i], md5Reader, currentPartSize); err != nil { + return "", err + } + if md5val := md5Reader.Md5(); md5val != part.PartMd5s[i] { + return "", fmt.Errorf("upload part %d failed, md5 not match, expect %s, got %s", i, part.PartMd5s[i], md5val) + } + } + urlRaw, err := m.api.CompleteMultipartUpload(ctx, &third.CompleteMultipartUploadReq{ + UploadID: initiateMultipartUploadResp.Upload.UploadID, + Parts: part.PartMd5s, + Name: task.Upload.Name, + ContentType: contentType, + Cause: "batch-import", + }) + if err != nil { + return "", err + } + return urlRaw, nil +} + +func (m *Manage) partSize(size int64) (int64, error) { + if size <= 0 { + return 0, errors.New("size must be greater than 0") + } + if size > m.partLimit.MaxPartSize*int64(m.partLimit.MaxNumSize) { + return 0, fmt.Errorf("size must be less than %db", m.partLimit.MaxPartSize*int64(m.partLimit.MaxNumSize)) + } + if size <= m.partLimit.MinPartSize*int64(m.partLimit.MaxNumSize) { + return m.partLimit.MinPartSize, nil + } + partSize := size / int64(m.partLimit.MaxNumSize) + if size%int64(m.partLimit.MaxNumSize) != 0 { + partSize++ + } + return partSize, nil +} + +func (m *Manage) partMD5(parts []string) string { + s := strings.Join(parts, ",") + md5Sum := md5.Sum([]byte(s)) + return hex.EncodeToString(md5Sum[:]) +} + +func (m *Manage) getPartInfo(ctx context.Context, r io.Reader, fileSize int64) (*PartInfo, error) { + partSize, err := m.partSize(fileSize) + if err != nil { + return nil, err + } + partNum := int(fileSize / partSize) + if fileSize%partSize != 0 { + partNum++ + } + partSizes := make([]int64, partNum) + for i := 0; i < partNum; i++ { + partSizes[i] = partSize + } + partSizes[partNum-1] = fileSize - partSize*(int64(partNum)-1) + partMd5s := make([]string, partNum) + buf := make([]byte, 1024*8) + fileMd5 := md5.New() + var contentType string + for i := 0; i < partNum; i++ { + h := md5.New() + r := io.LimitReader(r, partSize) + for { + if n, err := r.Read(buf); err == nil { + if contentType == "" { + contentType = http.DetectContentType(buf[:n]) + } + h.Write(buf[:n]) + fileMd5.Write(buf[:n]) + } else if err == io.EOF { + break + } else { + return nil, err + } + } + partMd5s[i] = hex.EncodeToString(h.Sum(nil)) + } + partMd5Val := m.partMD5(partMd5s) + fileMd5val := hex.EncodeToString(fileMd5.Sum(nil)) + return &PartInfo{ + ContentType: contentType, + PartSize: partSize, + PartNum: partNum, + FileMd5: fileMd5val, + PartMd5: partMd5Val, + PartSizes: partSizes, + PartMd5s: partMd5s, + }, nil +} + +func (m *Manage) doPut(ctx context.Context, client *http.Client, sign *third.AuthSignParts, part *third.SignPart, reader io.Reader, size int64) error { + rawURL := part.Url + if rawURL == "" { + rawURL = sign.Url + } + if len(sign.Query)+len(part.Query) > 0 { + u, err := url.Parse(rawURL) + if err != nil { + return err + } + query := u.Query() + for i := range sign.Query { + v := sign.Query[i] + query[v.Key] = v.Values + } + for i := range part.Query { + v := part.Query[i] + query[v.Key] = v.Values + } + u.RawQuery = query.Encode() + rawURL = u.String() + } + req, err := http.NewRequestWithContext(ctx, http.MethodPut, rawURL, reader) + if err != nil { + return err + } + for i := range sign.Header { + v := sign.Header[i] + req.Header[v.Key] = v.Values + } + for i := range part.Header { + v := part.Header[i] + req.Header[v.Key] = v.Values + } + req.ContentLength = size + resp, err := client.Do(req) + if err != nil { + return err + } + defer func() { + _ = resp.Body.Close() + }() + body, err := io.ReadAll(resp.Body) + if err != nil { + return err + } + if resp.StatusCode/200 != 1 { + return fmt.Errorf("PUT %s part %d failed, status code %d, body %s", rawURL, part.PartNumber, resp.StatusCode, string(body)) + } + return nil +} + +func (m *Manage) HttpGet(ctx context.Context, url string) (*http.Response, error) { + reqUrl := url + for { + request, err := http.NewRequestWithContext(ctx, http.MethodGet, reqUrl, nil) + if err != nil { + return nil, err + } + DefaultRequestHeader(request.Header) + response, err := m.api.Client.Do(request) + if err != nil { + return nil, err + } + if response.StatusCode != http.StatusOK { + _ = response.Body.Close() + return nil, fmt.Errorf("http get %s status %s", url, response.Status) + } + return response, nil + } +} diff --git a/tools/url2im/pkg/md5.go b/tools/url2im/pkg/md5.go new file mode 100644 index 000000000..0db5ba000 --- /dev/null +++ b/tools/url2im/pkg/md5.go @@ -0,0 +1,29 @@ +package pkg + +import ( + "crypto/md5" + "encoding/hex" + "hash" + "io" +) + +func NewMd5Reader(r io.Reader) *Md5Reader { + return &Md5Reader{h: md5.New(), r: r} +} + +type Md5Reader struct { + h hash.Hash + r io.Reader +} + +func (r *Md5Reader) Read(p []byte) (n int, err error) { + n, err = r.r.Read(p) + if err == nil && n > 0 { + r.h.Write(p[:n]) + } + return +} + +func (r *Md5Reader) Md5() string { + return hex.EncodeToString(r.h.Sum(nil)) +} diff --git a/tools/url2im/pkg/progress.go b/tools/url2im/pkg/progress.go new file mode 100644 index 000000000..2d6ef3891 --- /dev/null +++ b/tools/url2im/pkg/progress.go @@ -0,0 +1,41 @@ +package pkg + +import ( + "bufio" + "os" + "strconv" + + "github.com/kelindar/bitmap" +) + +func ReadProgress(path string) (*Progress, error) { + file, err := os.Open(path) + if err != nil { + if os.IsNotExist(err) { + return &Progress{}, nil + } + return nil, err + } + defer file.Close() + scanner := bufio.NewScanner(file) + var upload bitmap.Bitmap + for scanner.Scan() { + index, err := strconv.Atoi(scanner.Text()) + if err != nil || index < 0 { + continue + } + upload.Set(uint32(index)) + } + return &Progress{upload: upload}, nil +} + +type Progress struct { + upload bitmap.Bitmap +} + +func (p *Progress) IsUploaded(index int) bool { + if p == nil { + return false + } + return p.upload.Contains(uint32(index)) +}