From 4bd9acbd6da58aa32a0bbb86fd2a0d798dd783b6 Mon Sep 17 00:00:00 2001 From: cncsmonster Date: Mon, 23 Oct 2023 00:13:18 +0800 Subject: [PATCH] fix: fix rest lint errors in pkg/common/db --- pkg/common/db/s3/cont/controller.go | 84 +++++++----- pkg/common/db/s3/cont/id.go | 2 + pkg/common/db/s3/cos/cos.go | 124 +++++++++-------- pkg/common/db/s3/minio/image.go | 19 +-- pkg/common/db/s3/minio/minio.go | 169 +++++++++++++++++------- pkg/common/db/s3/oss/oss.go | 119 +++++++++-------- pkg/common/db/table/relation/group.go | 2 +- pkg/common/db/table/relation/utils.go | 4 +- pkg/common/db/table/unrelation/msg.go | 3 + pkg/common/db/unrelation/mongo.go | 54 +++++--- pkg/common/db/unrelation/msg.go | 134 ++++++++++++------- pkg/common/db/unrelation/msg_convert.go | 68 +++++----- pkg/common/db/unrelation/super_group.go | 8 ++ pkg/common/db/unrelation/user.go | 21 ++- 14 files changed, 500 insertions(+), 311 deletions(-) mode change 100644 => 100755 pkg/common/db/s3/oss/oss.go mode change 100644 => 100755 pkg/common/db/unrelation/mongo.go mode change 100644 => 100755 pkg/common/db/unrelation/msg.go mode change 100644 => 100755 pkg/common/db/unrelation/user.go diff --git a/pkg/common/db/s3/cont/controller.go b/pkg/common/db/s3/cont/controller.go index 6faa997a9..7ff6fa755 100644 --- a/pkg/common/db/s3/cont/controller.go +++ b/pkg/common/db/s3/cont/controller.go @@ -46,6 +46,7 @@ func (c *Controller) HashPath(md5 string) string { func (c *Controller) NowPath() string { now := time.Now() + return path.Join( fmt.Sprintf("%04d", now.Year()), fmt.Sprintf("%02d", now.Month()), @@ -58,6 +59,7 @@ func (c *Controller) NowPath() string { func (c *Controller) UUID() string { id := uuid.New() + return hex.EncodeToString(id[:]) } @@ -92,20 +94,24 @@ func (c *Controller) InitiateUpload(ctx context.Context, hash string, size int64 partNumber++ } if maxParts > 0 && partNumber > 0 && partNumber < maxParts { - return nil, errors.New(fmt.Sprintf("too many parts: %d", partNumber)) + return nil, fmt.Errorf("too few parts: %d", partNumber) } - if info, err := c.impl.StatObject(ctx, c.HashPath(hash)); err == nil { + info, err := c.impl.StatObject(ctx, c.HashPath(hash)) + if err == nil { return nil, &HashAlreadyExistsError{Object: info} - } else if !c.impl.IsNotFound(err) { + } + if !c.impl.IsNotFound(err) { return nil, err } + if size <= partSize { // 预签名上传 key := path.Join(tempPath, c.NowPath(), fmt.Sprintf("%s_%d_%s.presigned", hash, size, c.UUID())) - rawURL, err := c.impl.PresignedPutObject(ctx, key, expire) - if err != nil { - return nil, err + rawURL, err2 := c.impl.PresignedPutObject(ctx, key, expire) + if err2 != nil { + return nil, err2 } + return &InitiateUploadResult{ UploadID: newMultipartUploadID(multipartUploadID{ Type: UploadTypePresigned, @@ -124,38 +130,39 @@ func (c *Controller) InitiateUpload(ctx context.Context, hash string, size int64 }, }, }, nil - } else { - // 分片上传 - upload, err := c.impl.InitiateMultipartUpload(ctx, c.HashPath(hash)) + } + + // 分片上传 + upload, err := c.impl.InitiateMultipartUpload(ctx, c.HashPath(hash)) + if err != nil { + return nil, err + } + if maxParts < 0 { + maxParts = partNumber + } + var authSign *s3.AuthSignResult + if maxParts > 0 { + partNumbers := make([]int, partNumber) + for i := 0; i < maxParts; i++ { + partNumbers[i] = i + 1 + } + authSign, err = c.impl.AuthSign(ctx, upload.UploadID, upload.Key, time.Hour*24, partNumbers) if err != nil { return nil, err } - if maxParts < 0 { - maxParts = partNumber - } - var authSign *s3.AuthSignResult - if maxParts > 0 { - partNumbers := make([]int, partNumber) - for i := 0; i < maxParts; i++ { - partNumbers[i] = i + 1 - } - authSign, err = c.impl.AuthSign(ctx, upload.UploadID, upload.Key, time.Hour*24, partNumbers) - if err != nil { - return nil, err - } - } - return &InitiateUploadResult{ - UploadID: newMultipartUploadID(multipartUploadID{ - Type: UploadTypeMultipart, - ID: upload.UploadID, - Key: upload.Key, - Size: size, - Hash: hash, - }), - PartSize: partSize, - Sign: authSign, - }, nil } + + return &InitiateUploadResult{ + UploadID: newMultipartUploadID(multipartUploadID{ + Type: UploadTypeMultipart, + ID: upload.UploadID, + Key: upload.Key, + Size: size, + Hash: hash, + }), + PartSize: partSize, + Sign: authSign, + }, nil } func (c *Controller) CompleteUpload(ctx context.Context, uploadID string, partHashs []string) (*UploadResult, error) { @@ -164,8 +171,10 @@ func (c *Controller) CompleteUpload(ctx context.Context, uploadID string, partHa if err != nil { return nil, err } + //nolint:gosec //tofix G401: Use of weak cryptographic primitive if md5Sum := md5.Sum([]byte(strings.Join(partHashs, partSeparator))); hex.EncodeToString(md5Sum[:]) != upload.Hash { fmt.Println("CompleteUpload sum:", hex.EncodeToString(md5Sum[:]), "upload hash:", upload.Hash) + return nil, errors.New("md5 mismatching") } if info, err := c.impl.StatObject(ctx, c.HashPath(upload.Hash)); err == nil { @@ -193,7 +202,7 @@ func (c *Controller) CompleteUpload(ctx context.Context, uploadID string, partHa ETag: part, } } - // todo: 验证大小 + // todo: verify size result, err := c.impl.CompleteMultipartUpload(ctx, upload.ID, upload.Key, parts) if err != nil { return nil, err @@ -208,11 +217,12 @@ func (c *Controller) CompleteUpload(ctx context.Context, uploadID string, partHa if uploadInfo.Size != upload.Size { return nil, errors.New("upload size mismatching") } + //nolint:gosec //G401: Use of weak cryptographic primitive md5Sum := md5.Sum([]byte(strings.Join([]string{uploadInfo.ETag}, partSeparator))) if md5val := hex.EncodeToString(md5Sum[:]); md5val != upload.Hash { return nil, errs.ErrArgs.Wrap(fmt.Sprintf("md5 mismatching %s != %s", md5val, upload.Hash)) } - // 防止在这个时候,并发操作,导致文件被覆盖 + // Prevent concurrent operations at this time to avoid file overwrite copyInfo, err := c.impl.CopyObject(ctx, uploadInfo.Key, upload.Key+"."+c.UUID()) if err != nil { return nil, err @@ -230,6 +240,7 @@ func (c *Controller) CompleteUpload(ctx context.Context, uploadID string, partHa default: return nil, errors.New("invalid upload id type") } + return &UploadResult{ Key: targetKey, Size: upload.Size, @@ -261,5 +272,6 @@ func (c *Controller) AccessURL(ctx context.Context, name string, expire time.Dur opt.Filename = "" opt.ContentType = "" } + return c.impl.AccessURL(ctx, name, expire, opt) } diff --git a/pkg/common/db/s3/cont/id.go b/pkg/common/db/s3/cont/id.go index 47f37d4aa..a2b723b83 100644 --- a/pkg/common/db/s3/cont/id.go +++ b/pkg/common/db/s3/cont/id.go @@ -33,6 +33,7 @@ func newMultipartUploadID(id multipartUploadID) string { if err != nil { panic(err) } + return base64.StdEncoding.EncodeToString(data) } @@ -45,5 +46,6 @@ func parseMultipartUploadID(id string) (*multipartUploadID, error) { if err := json.Unmarshal(data, &upload); err != nil { return nil, fmt.Errorf("invalid multipart upload id: %w", err) } + return &upload, nil } diff --git a/pkg/common/db/s3/cos/cos.go b/pkg/common/db/s3/cos/cos.go index 7add88487..5484778a5 100644 --- a/pkg/common/db/s3/cos/cos.go +++ b/pkg/common/db/s3/cos/cos.go @@ -44,11 +44,6 @@ const ( imageWebp = "webp" ) -const ( - videoSnapshotImagePng = "png" - videoSnapshotImageJpg = "jpg" -) - func NewCos() (s3.Interface, error) { conf := config.Config.Object.Cos u, err := url.Parse(conf.BucketURL) @@ -62,6 +57,7 @@ func NewCos() (s3.Interface, error) { SessionToken: conf.SessionToken, }, }) + return &Cos{ copyURL: u.Host + "/", client: client, @@ -92,6 +88,7 @@ func (c *Cos) InitiateMultipartUpload(ctx context.Context, name string) (*s3.Ini if err != nil { return nil, err } + return &s3.InitiateMultipartUploadResult{ UploadID: result.UploadID, Bucket: result.Bucket, @@ -113,6 +110,7 @@ func (c *Cos) CompleteMultipartUpload(ctx context.Context, uploadID string, name if err != nil { return nil, err } + return &s3.CompleteMultipartUploadResult{ Location: result.Location, Bucket: result.Bucket, @@ -135,6 +133,7 @@ func (c *Cos) PartSize(ctx context.Context, size int64) (int64, error) { if size%maxNumSize != 0 { partSize++ } + return partSize, nil } @@ -157,6 +156,7 @@ func (c *Cos) AuthSign(ctx context.Context, uploadID string, name string, expire Query: url.Values{"partNumber": {strconv.Itoa(partNumber)}}, } } + return &result, nil } @@ -165,11 +165,13 @@ func (c *Cos) PresignedPutObject(ctx context.Context, name string, expire time.D if err != nil { return "", err } + return rawURL.String(), nil } func (c *Cos) DeleteObject(ctx context.Context, name string) error { _, err := c.client.Object.Delete(ctx, name) + return err } @@ -185,25 +187,26 @@ func (c *Cos) StatObject(ctx context.Context, name string) (*s3.ObjectInfo, erro if res.ETag = strings.ToLower(strings.ReplaceAll(info.Header.Get("ETag"), `"`, "")); res.ETag == "" { return nil, errors.New("StatObject etag not found") } - if contentLengthStr := info.Header.Get("Content-Length"); contentLengthStr == "" { + contentLengthStr := info.Header.Get("Content-Length") + if contentLengthStr == "" { return nil, errors.New("StatObject content-length not found") - } else { - res.Size, err = strconv.ParseInt(contentLengthStr, 10, 64) - if err != nil { - return nil, fmt.Errorf("StatObject content-length parse error: %w", err) - } - if res.Size < 0 { - return nil, errors.New("StatObject content-length must be greater than 0") - } } - if lastModified := info.Header.Get("Last-Modified"); lastModified == "" { + res.Size, err = strconv.ParseInt(contentLengthStr, 10, 64) + if err != nil { + return nil, fmt.Errorf("StatObject content-length parse error: %w", err) + } + if res.Size < 0 { + return nil, errors.New("StatObject content-length must be greater than 0") + } + lastModified := info.Header.Get("Last-Modified") + if lastModified == "" { return nil, errors.New("StatObject last-modified not found") - } else { - res.LastModified, err = time.Parse(http.TimeFormat, lastModified) - if err != nil { - return nil, fmt.Errorf("StatObject last-modified parse error: %w", err) - } } + res.LastModified, err = time.Parse(http.TimeFormat, lastModified) + if err != nil { + return nil, fmt.Errorf("StatObject last-modified parse error: %w", err) + } + return res, nil } @@ -213,6 +216,7 @@ func (c *Cos) CopyObject(ctx context.Context, src string, dst string) (*s3.CopyO if err != nil { return nil, err } + return &s3.CopyObjectInfo{ Key: dst, ETag: strings.ReplaceAll(result.ETag, `"`, ``), @@ -220,16 +224,17 @@ func (c *Cos) CopyObject(ctx context.Context, src string, dst string) (*s3.CopyO } func (c *Cos) IsNotFound(err error) bool { - switch e := err.(type) { - case *cos.ErrorResponse: - return e.Response.StatusCode == http.StatusNotFound || e.Code == "NoSuchKey" - default: - return false + var cosErr *cos.ErrorResponse + if errors.As(err, &cosErr) { + return cosErr.Response.StatusCode == http.StatusNotFound || cosErr.Code == "NoSuchKey" } + + return false } func (c *Cos) AbortMultipartUpload(ctx context.Context, uploadID string, name string) error { _, err := c.client.Object.AbortMultipartUpload(ctx, name, uploadID) + return err } @@ -257,46 +262,59 @@ func (c *Cos) ListUploadedParts(ctx context.Context, uploadID string, name strin Size: part.Size, } } + return res, nil } func (c *Cos) AccessURL(ctx context.Context, name string, expire time.Duration, opt *s3.AccessURLOption) (string, error) { var imageMogr string var option cos.PresignedURLOptions - if opt != nil { - query := make(url.Values) - if opt.Image != nil { - // https://cloud.tencent.com/document/product/436/44880 - style := make([]string, 0, 2) - wh := make([]string, 2) - if opt.Image.Width > 0 { - wh[0] = strconv.Itoa(opt.Image.Width) - } - if opt.Image.Height > 0 { - wh[1] = strconv.Itoa(opt.Image.Height) - } - if opt.Image.Width > 0 || opt.Image.Height > 0 { - style = append(style, strings.Join(wh, "x")) - } - switch opt.Image.Format { - case - imagePng, - imageJpg, - imageJpeg, - imageGif, - imageWebp: - style = append(style, "format/"+opt.Image.Format) - } - if len(style) > 0 { - imageMogr = "imageMogr2/thumbnail/" + strings.Join(style, "/") + "/ignore-error/1" - } + getImageMogr := func(opt *s3.AccessURLOption) (imageMogr string) { + if opt.Image == nil { + return imageMogr + } + // https://cloud.tencent.com/document/product/436/44880 + style := make([]string, 0, 2) + wh := make([]string, 2) + if opt.Image.Width > 0 { + wh[0] = strconv.Itoa(opt.Image.Width) + } + if opt.Image.Height > 0 { + wh[1] = strconv.Itoa(opt.Image.Height) + } + if opt.Image.Width > 0 || opt.Image.Height > 0 { + style = append(style, strings.Join(wh, "x")) + } + switch opt.Image.Format { + case + imagePng, + imageJpg, + imageJpeg, + imageGif, + imageWebp: + style = append(style, "format/"+opt.Image.Format) } + if len(style) > 0 { + imageMogr = "imageMogr2/thumbnail/" + strings.Join(style, "/") + "/ignore-error/1" + } + + return imageMogr + } + getQuery := func(opt *s3.AccessURLOption) (query url.Values) { + query = make(url.Values) if opt.ContentType != "" { query.Set("response-content-type", opt.ContentType) } if opt.Filename != "" { query.Set("response-content-disposition", `attachment; filename=`+strconv.Quote(opt.Filename)) } + + return query + } + + if opt != nil { + imageMogr = getImageMogr(opt) + query := getQuery(opt) if len(query) > 0 { option.Query = &query } @@ -317,6 +335,7 @@ func (c *Cos) AccessURL(ctx context.Context, name string, expire time.Duration, rawURL.RawQuery = rawURL.RawQuery + "&" + imageMogr } } + return rawURL.String(), nil } @@ -324,5 +343,6 @@ func (c *Cos) getPresignedURL(ctx context.Context, name string, expire time.Dura if !config.Config.Object.Cos.PublicRead { return c.client.Object.GetPresignedURL(ctx, http.MethodGet, name, c.credential.SecretID, c.credential.SecretKey, expire, opt) } + return c.client.Object.GetObjectURL(name), nil } diff --git a/pkg/common/db/s3/minio/image.go b/pkg/common/db/s3/minio/image.go index 71db1ea51..4812f47ca 100644 --- a/pkg/common/db/s3/minio/image.go +++ b/pkg/common/db/s3/minio/image.go @@ -39,6 +39,7 @@ func ImageStat(reader io.Reader) (image.Image, string, error) { func ImageWidthHeight(img image.Image) (int, int) { bounds := img.Bounds().Max + return bounds.X, bounds.Y } @@ -47,27 +48,27 @@ func resizeImage(img image.Image, maxWidth, maxHeight int) image.Image { imgWidth := bounds.Max.X imgHeight := bounds.Max.Y - // 计算缩放比例 + // Calculate scaling ratio scaleWidth := float64(maxWidth) / float64(imgWidth) scaleHeight := float64(maxHeight) / float64(imgHeight) - // 如果都为0,则不缩放,返回原始图片 + // If both maxWidth and maxHeight are 0, return the original image if maxWidth == 0 && maxHeight == 0 { return img } - // 如果宽度和高度都大于0,则选择较小的缩放比例,以保持宽高比 + // If both maxWidth and maxHeight are greater than 0, choose the smaller scaling ratio to maintain aspect ratio if maxWidth > 0 && maxHeight > 0 { scale := scaleWidth if scaleHeight < scaleWidth { scale = scaleHeight } - // 计算缩略图尺寸 + // Calculate thumbnail size thumbnailWidth := int(float64(imgWidth) * scale) thumbnailHeight := int(float64(imgHeight) * scale) - // 使用"image"库的Resample方法生成缩略图 + // Generate thumbnail using the Resample method of the "image" library thumbnail := image.NewRGBA(image.Rect(0, 0, thumbnailWidth, thumbnailHeight)) for y := 0; y < thumbnailHeight; y++ { for x := 0; x < thumbnailWidth; x++ { @@ -80,12 +81,12 @@ func resizeImage(img image.Image, maxWidth, maxHeight int) image.Image { return thumbnail } - // 如果只指定了宽度或高度,则根据最大不超过的规则生成缩略图 + // If only maxWidth or maxHeight is specified, generate thumbnail according to the "max not exceed" rule if maxWidth > 0 { thumbnailWidth := maxWidth thumbnailHeight := int(float64(imgHeight) * scaleWidth) - // 使用"image"库的Resample方法生成缩略图 + // Generate thumbnail using the Resample method of the "image" library thumbnail := image.NewRGBA(image.Rect(0, 0, thumbnailWidth, thumbnailHeight)) for y := 0; y < thumbnailHeight; y++ { for x := 0; x < thumbnailWidth; x++ { @@ -102,7 +103,7 @@ func resizeImage(img image.Image, maxWidth, maxHeight int) image.Image { thumbnailWidth := int(float64(imgWidth) * scaleHeight) thumbnailHeight := maxHeight - // 使用"image"库的Resample方法生成缩略图 + // Generate thumbnail using the Resample method of the "image" library thumbnail := image.NewRGBA(image.Rect(0, 0, thumbnailWidth, thumbnailHeight)) for y := 0; y < thumbnailHeight; y++ { for x := 0; x < thumbnailWidth; x++ { @@ -115,6 +116,6 @@ func resizeImage(img image.Image, maxWidth, maxHeight int) image.Image { return thumbnail } - // 默认情况下,返回原始图片 + // By default, return the original image return img } diff --git a/pkg/common/db/s3/minio/minio.go b/pkg/common/db/s3/minio/minio.go index 7984df5a0..a84b8c3f7 100644 --- a/pkg/common/db/s3/minio/minio.go +++ b/pkg/common/db/s3/minio/minio.go @@ -111,6 +111,7 @@ func NewMinio() (s3.Interface, error) { if err := m.initMinio(ctx); err != nil { fmt.Println("init minio error:", err) } + return m, nil } @@ -141,8 +142,9 @@ func (m *Minio) initMinio(ctx context.Context) error { return fmt.Errorf("check bucket exists error: %w", err) } if !exists { - if err := m.core.Client.MakeBucket(ctx, conf.Bucket, minio.MakeBucketOptions{}); err != nil { - return fmt.Errorf("make bucket error: %w", err) + err2 := m.core.Client.MakeBucket(ctx, conf.Bucket, minio.MakeBucketOptions{}) + if err2 != nil { + return fmt.Errorf("make bucket error: %w", err2) } } if conf.PublicRead { @@ -150,8 +152,9 @@ func (m *Minio) initMinio(ctx context.Context) error { `{"Version": "2012-10-17","Statement": [{"Action": ["s3:GetObject","s3:PutObject"],"Effect": "Allow","Principal": {"AWS": ["*"]},"Resource": ["arn:aws:s3:::%s/*"],"Sid": ""}]}`, conf.Bucket, ) - if err := m.core.Client.SetBucketPolicy(ctx, conf.Bucket, policy); err != nil { - return err + err2 := m.core.Client.SetBucketPolicy(ctx, conf.Bucket, policy) + if err2 != nil { + return err2 } } m.location, err = m.core.Client.GetBucketLocation(ctx, conf.Bucket) @@ -182,6 +185,7 @@ func (m *Minio) initMinio(ctx context.Context) error { vblc.Elem().Elem().Interface().(interface{ Set(string, string) }).Set(conf.Bucket, m.location) }() m.init = true + return nil } @@ -205,6 +209,7 @@ func (m *Minio) InitiateMultipartUpload(ctx context.Context, name string) (*s3.I if err != nil { return nil, err } + return &s3.InitiateMultipartUploadResult{ Bucket: m.bucket, Key: name, @@ -227,6 +232,7 @@ func (m *Minio) CompleteMultipartUpload(ctx context.Context, uploadID string, na if err != nil { return nil, err } + return &s3.CompleteMultipartUploadResult{ Location: upload.Location, Bucket: upload.Bucket, @@ -249,6 +255,7 @@ func (m *Minio) PartSize(ctx context.Context, size int64) (int64, error) { if size%maxNumSize != 0 { partSize++ } + return partSize, nil } @@ -282,6 +289,7 @@ func (m *Minio) AuthSign(ctx context.Context, uploadID string, name string, expi if m.prefix != "" { result.URL = m.signEndpoint + m.prefix + "/" + m.bucket + "/" + name } + return &result, nil } @@ -296,6 +304,7 @@ func (m *Minio) PresignedPutObject(ctx context.Context, name string, expire time if m.prefix != "" { rawURL.Path = path.Join(m.prefix, rawURL.Path) } + return rawURL.String(), nil } @@ -303,6 +312,7 @@ func (m *Minio) DeleteObject(ctx context.Context, name string) error { if err := m.initMinio(ctx); err != nil { return err } + return m.core.Client.RemoveObject(ctx, m.bucket, name, minio.RemoveObjectOptions{}) } @@ -314,6 +324,7 @@ func (m *Minio) StatObject(ctx context.Context, name string) (*s3.ObjectInfo, er if err != nil { return nil, err } + return &s3.ObjectInfo{ ETag: strings.ToLower(info.ETag), Key: info.Key, @@ -336,6 +347,7 @@ func (m *Minio) CopyObject(ctx context.Context, src string, dst string) (*s3.Cop if err != nil { return nil, err } + return &s3.CopyObjectInfo{ Key: dst, ETag: strings.ToLower(result.ETag), @@ -346,20 +358,23 @@ func (m *Minio) IsNotFound(err error) bool { if err == nil { return false } - switch e := err.(type) { - case minio.ErrorResponse: - return e.StatusCode == http.StatusNotFound || e.Code == "NoSuchKey" - case *minio.ErrorResponse: - return e.StatusCode == http.StatusNotFound || e.Code == "NoSuchKey" - default: - return false + var minioErr minio.ErrorResponse + if errors.As(err, &minio.ErrorResponse{}) { + return minioErr.StatusCode == http.StatusNotFound || minioErr.Code == "NoSuchKey" + } + var minioErr2 *minio.ErrorResponse + if errors.As(err, &minioErr2) { + return minioErr2.StatusCode == http.StatusNotFound || minioErr2.Code == "NoSuchKey" } + + return false } func (m *Minio) AbortMultipartUpload(ctx context.Context, uploadID string, name string) error { if err := m.initMinio(ctx); err != nil { return err } + return m.core.AbortMultipartUpload(ctx, m.bucket, name, uploadID) } @@ -386,6 +401,7 @@ func (m *Minio) ListUploadedParts(ctx context.Context, uploadID string, name str Size: part.Size, } } + return res, nil } @@ -410,14 +426,11 @@ func (m *Minio) presignedGetObject(ctx context.Context, name string, expire time if m.prefix != "" { rawURL.Path = path.Join(m.prefix, rawURL.Path) } + return rawURL.String(), nil } -func (m *Minio) AccessURL(ctx context.Context, name string, expire time.Duration, opt *s3.AccessURLOption) (string, error) { - if err := m.initMinio(ctx); err != nil { - return "", err - } - reqParams := make(url.Values) +func (m *Minio) getImageInfoForAccessURL(ctx context.Context, name string, expire time.Duration, opt *s3.AccessURLOption, reqParams url.Values) (fileInfo *s3.ObjectInfo, objectInfoPath, msg string, err error) { if opt != nil { if opt.ContentType != "" { reqParams.Set("response-content-type", opt.ContentType) @@ -427,35 +440,47 @@ func (m *Minio) AccessURL(ctx context.Context, name string, expire time.Duration } } if opt.Image == nil || (opt.Image.Width < 0 && opt.Image.Height < 0 && opt.Image.Format == "") || (opt.Image.Width > maxImageWidth || opt.Image.Height > maxImageHeight) { - return m.presignedGetObject(ctx, name, expire, reqParams) + msg, err = m.presignedGetObject(ctx, name, expire, reqParams) + + return nil, "", msg, err } - fileInfo, err := m.StatObject(ctx, name) + fileInfo, err = m.StatObject(ctx, name) + objectInfoPath = path.Join(pathInfo, fileInfo.ETag, "image.json") if err != nil { - return "", err + return nil, "", msg, err } if fileInfo.Size > maxImageSize { - return "", errors.New("file size too large") + return nil, "", "", errors.New("file size too large") } - objectInfoPath := path.Join(pathInfo, fileInfo.ETag, "image.json") - var ( - img image.Image - info minioImageInfo - ) - data, err := m.getObjectData(ctx, objectInfoPath, 1024) + + return fileInfo, objectInfoPath, "", nil +} + +func (m *Minio) loadImgDataForAccessURL(objectInfoPath string, ctx context.Context, name string, info *minioImageInfo) (img image.Image, msg string, err error) { + var data []byte + data, err = m.getObjectData(ctx, objectInfoPath, 1024) + + //nolint:nestif //easy enough to understand if err == nil { - if err := json.Unmarshal(data, &info); err != nil { - return "", fmt.Errorf("unmarshal minio image info.json error: %w", err) + err = json.Unmarshal(data, &info) + if err != nil { + return nil, "", fmt.Errorf("unmarshal minio image info.json error: %w", err) } if info.NotImage { - return "", errors.New("not image") + return nil, "", errors.New("not image") } } else if m.IsNotFound(err) { - reader, err := m.core.Client.GetObject(ctx, m.bucket, name, minio.GetObjectOptions{}) + var reader *minio.Object + reader, err = m.core.Client.GetObject(ctx, m.bucket, name, minio.GetObjectOptions{}) if err != nil { - return "", err + return img, msg, err } defer reader.Close() - imageInfo, format, err := ImageStat(reader) + var ( + imageInfo image.Image + format string + ) + imageInfo, format, err = ImageStat(reader) if err == nil { info.NotImage = false info.Format = format @@ -464,16 +489,22 @@ func (m *Minio) AccessURL(ctx context.Context, name string, expire time.Duration } else { info.NotImage = true } - data, err := json.Marshal(&info) + + data, err = json.Marshal(&info) if err != nil { - return "", err + return img, msg, err } - if _, err := m.core.Client.PutObject(ctx, m.bucket, objectInfoPath, bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{}); err != nil { - return "", err + + _, err = m.core.Client.PutObject(ctx, m.bucket, objectInfoPath, bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{}) + if err != nil { + return img, msg, err } - } else { - return "", err } + + return img, msg, err +} + +func (m *Minio) formatImgInfoForAccessURL(opt *s3.AccessURLOption, info *minioImageInfo, reqParams url.Values) { if opt.Image.Width > info.Width || opt.Image.Width <= 0 { opt.Image.Width = info.Width } @@ -496,24 +527,24 @@ func (m *Minio) AccessURL(ctx context.Context, name string, expire time.Duration } } reqParams.Set("response-content-type", "image/"+opt.Image.Format) - if opt.Image.Width == info.Width && opt.Image.Height == info.Height && opt.Image.Format == info.Format { - return m.presignedGetObject(ctx, name, expire, reqParams) - } - cacheKey := filepath.Join(pathInfo, fileInfo.ETag, fmt.Sprintf("image_w%d_h%d.%s", opt.Image.Width, opt.Image.Height, opt.Image.Format)) - if _, err := m.core.Client.StatObject(ctx, m.bucket, cacheKey, minio.StatObjectOptions{}); err == nil { +} + +func (m *Minio) cacheImgInfoForAccessURL(ctx context.Context, name, cacheKey string, img image.Image, expire time.Duration, opt *s3.AccessURLOption, reqParams url.Values) (string, error) { + _, err := m.core.Client.StatObject(ctx, m.bucket, cacheKey, minio.StatObjectOptions{}) + if err == nil { return m.presignedGetObject(ctx, cacheKey, expire, reqParams) } else if !m.IsNotFound(err) { return "", err } if img == nil { - reader, err := m.core.Client.GetObject(ctx, m.bucket, name, minio.GetObjectOptions{}) - if err != nil { - return "", err + reader, err2 := m.core.Client.GetObject(ctx, m.bucket, name, minio.GetObjectOptions{}) + if err2 != nil { + return "", err2 } defer reader.Close() - img, _, err = ImageStat(reader) - if err != nil { - return "", err + img, _, err2 = ImageStat(reader) + if err2 != nil { + return "", err2 } } thumbnail := resizeImage(img, opt.Image.Width, opt.Image.Height) @@ -526,9 +557,48 @@ func (m *Minio) AccessURL(ctx context.Context, name string, expire time.Duration case formatGif: err = gif.Encode(buf, thumbnail, nil) } + if err != nil { + return "", err + } if _, err := m.core.Client.PutObject(ctx, m.bucket, cacheKey, buf, int64(buf.Len()), minio.PutObjectOptions{}); err != nil { return "", err } + + return "", nil +} + +func (m *Minio) AccessURL(ctx context.Context, name string, expire time.Duration, opt *s3.AccessURLOption) (string, error) { + errInit := m.initMinio(ctx) + if errInit != nil { + return "", errInit + } + reqParams := make(url.Values) + fileInfo, objectInfoPath, msg, err := m.getImageInfoForAccessURL(ctx, name, expire, opt, reqParams) + if err != nil { + return msg, err + } + // load-cache img data + var ( + img image.Image + info minioImageInfo + ) + img, msg, err = m.loadImgDataForAccessURL(objectInfoPath, ctx, name, &info) + if err != nil { + return msg, err + } + // format img info + m.formatImgInfoForAccessURL(opt, &info, reqParams) + // no need resize + if opt.Image.Width == info.Width && opt.Image.Height == info.Height && opt.Image.Format == info.Format { + return m.presignedGetObject(ctx, name, expire, reqParams) + } + // cache img + cacheKey := filepath.Join(pathInfo, fileInfo.ETag, fmt.Sprintf("image_w%d_h%d.%s", opt.Image.Width, opt.Image.Height, opt.Image.Format)) + msg, err = m.cacheImgInfoForAccessURL(ctx, name, cacheKey, img, expire, opt, reqParams) + if err != nil { + return msg, err + } + // return cache img return m.presignedGetObject(ctx, cacheKey, expire, reqParams) } @@ -541,5 +611,6 @@ func (m *Minio) getObjectData(ctx context.Context, name string, limit int64) ([] if limit < 0 { return io.ReadAll(object) } + return io.ReadAll(io.LimitReader(object, 1024)) } diff --git a/pkg/common/db/s3/oss/oss.go b/pkg/common/db/s3/oss/oss.go old mode 100644 new mode 100755 index 6a728127b..4f7f37497 --- a/pkg/common/db/s3/oss/oss.go +++ b/pkg/common/db/s3/oss/oss.go @@ -45,11 +45,6 @@ const ( imageWebp = "webp" ) -const ( - videoSnapshotImagePng = "png" - videoSnapshotImageJpg = "jpg" -) - func NewOSS() (s3.Interface, error) { conf := config.Config.Object.Oss if conf.BucketURL == "" { @@ -66,6 +61,7 @@ func NewOSS() (s3.Interface, error) { if conf.BucketURL[len(conf.BucketURL)-1] != '/' { conf.BucketURL += "/" } + return &OSS{ bucketURL: conf.BucketURL, bucket: bucket, @@ -98,6 +94,7 @@ func (o *OSS) InitiateMultipartUpload(ctx context.Context, name string) (*s3.Ini if err != nil { return nil, err } + return &s3.InitiateMultipartUploadResult{ UploadID: result.UploadID, Bucket: result.Bucket, @@ -121,6 +118,7 @@ func (o *OSS) CompleteMultipartUpload(ctx context.Context, uploadID string, name if err != nil { return nil, err } + return &s3.CompleteMultipartUploadResult{ Location: result.Location, Bucket: result.Bucket, @@ -143,6 +141,7 @@ func (o *OSS) PartSize(ctx context.Context, size int64) (int64, error) { if size%maxNumSize != 0 { partSize++ } + return partSize, nil } @@ -155,7 +154,7 @@ func (o *OSS) AuthSign(ctx context.Context, uploadID string, name string, expire } for i, partNumber := range partNumbers { rawURL := fmt.Sprintf(`%s%s?partNumber=%d&uploadId=%s`, o.bucketURL, name, partNumber, uploadID) - request, err := http.NewRequest(http.MethodPut, rawURL, nil) + request, err := http.NewRequestWithContext(context.Background(), http.MethodPut, rawURL, nil) if err != nil { return nil, err } @@ -175,6 +174,7 @@ func (o *OSS) AuthSign(ctx context.Context, uploadID string, name string, expire Header: request.Header, } } + return &result, nil } @@ -191,25 +191,26 @@ func (o *OSS) StatObject(ctx context.Context, name string) (*s3.ObjectInfo, erro if res.ETag = strings.ToLower(strings.ReplaceAll(header.Get("ETag"), `"`, ``)); res.ETag == "" { return nil, errors.New("StatObject etag not found") } - if contentLengthStr := header.Get("Content-Length"); contentLengthStr == "" { + contentLengthStr := header.Get("Content-Length") + if contentLengthStr == "" { return nil, errors.New("StatObject content-length not found") - } else { - res.Size, err = strconv.ParseInt(contentLengthStr, 10, 64) - if err != nil { - return nil, fmt.Errorf("StatObject content-length parse error: %w", err) - } - if res.Size < 0 { - return nil, errors.New("StatObject content-length must be greater than 0") - } } - if lastModified := header.Get("Last-Modified"); lastModified == "" { + res.Size, err = strconv.ParseInt(contentLengthStr, 10, 64) + if err != nil { + return nil, fmt.Errorf("StatObject content-length parse error: %w", err) + } + if res.Size < 0 { + return nil, errors.New("StatObject content-length must be greater than 0") + } + lastModified := header.Get("Last-Modified") + if lastModified == "" { return nil, errors.New("StatObject last-modified not found") - } else { - res.LastModified, err = time.Parse(http.TimeFormat, lastModified) - if err != nil { - return nil, fmt.Errorf("StatObject last-modified parse error: %w", err) - } } + res.LastModified, err = time.Parse(http.TimeFormat, lastModified) + if err != nil { + return nil, fmt.Errorf("StatObject last-modified parse error: %w", err) + } + return res, nil } @@ -222,6 +223,7 @@ func (o *OSS) CopyObject(ctx context.Context, src string, dst string) (*s3.CopyO if err != nil { return nil, err } + return &s3.CopyObjectInfo{ Key: dst, ETag: strings.ToLower(strings.ReplaceAll(result.ETag, `"`, ``)), @@ -229,6 +231,7 @@ func (o *OSS) CopyObject(ctx context.Context, src string, dst string) (*s3.CopyO } func (o *OSS) IsNotFound(err error) bool { + //nolint:errorlint //this is exactly what we want,there is no risk for no wrapped errors switch e := err.(type) { case oss.ServiceError: return e.StatusCode == http.StatusNotFound || e.Code == "NoSuchKey" @@ -271,6 +274,7 @@ func (o *OSS) ListUploadedParts(ctx context.Context, uploadID string, name strin Size: int64(part.Size), } } + return res, nil } @@ -278,39 +282,7 @@ func (o *OSS) AccessURL(ctx context.Context, name string, expire time.Duration, publicRead := config.Config.Object.Oss.PublicRead var opts []oss.Option if opt != nil { - if opt.Image != nil { - // 文档地址: https://help.aliyun.com/zh/oss/user-guide/resize-images-4?spm=a2c4g.11186623.0.0.4b3b1e4fWW6yji - var format string - switch opt.Image.Format { - case - imagePng, - imageJpg, - imageJpeg, - imageGif, - imageWebp: - format = opt.Image.Format - default: - opt.Image.Format = imageJpg - } - // https://oss-console-img-demo-cn-hangzhou.oss-cn-hangzhou.aliyuncs.com/example.jpg?x-oss-process=image/resize,h_100,m_lfit - process := "image/resize,m_lfit" - if opt.Image.Width > 0 { - process += ",w_" + strconv.Itoa(opt.Image.Width) - } - if opt.Image.Height > 0 { - process += ",h_" + strconv.Itoa(opt.Image.Height) - } - process += ",format," + format - opts = append(opts, oss.Process(process)) - } - if !publicRead { - if opt.ContentType != "" { - opts = append(opts, oss.ResponseContentType(opt.ContentType)) - } - if opt.Filename != "" { - opts = append(opts, oss.ResponseContentDisposition(`attachment; filename=`+strconv.Quote(opt.Filename))) - } - } + opts = optsForAccessURL(opt, opts, publicRead) } if expire <= 0 { expire = time.Hour * 24 * 365 * 99 // 99 years @@ -325,5 +297,44 @@ func (o *OSS) AccessURL(ctx context.Context, name string, expire time.Duration, return "", err } params := getURLParams(*o.bucket.Client.Conn, rawParams) + return getURL(o.um, o.bucket.BucketName, name, params).String(), nil } + +func optsForAccessURL(opt *s3.AccessURLOption, opts []oss.Option, publicRead bool) []oss.Option { + if opt.Image != nil { + // 文档地址: https://help.aliyun.com/zh/oss/user-guide/resize-images-4?spm=a2c4g.11186623.0.0.4b3b1e4fWW6yji + var format string + switch opt.Image.Format { + case + imagePng, + imageJpg, + imageJpeg, + imageGif, + imageWebp: + format = opt.Image.Format + default: + opt.Image.Format = imageJpg + } + // https://oss-console-img-demo-cn-hangzhou.oss-cn-hangzhou.aliyuncs.com/example.jpg?x-oss-process=image/resize,h_100,m_lfit + process := "image/resize,m_lfit" + if opt.Image.Width > 0 { + process += ",w_" + strconv.Itoa(opt.Image.Width) + } + if opt.Image.Height > 0 { + process += ",h_" + strconv.Itoa(opt.Image.Height) + } + process += ",format," + format + opts = append(opts, oss.Process(process)) + } + if !publicRead { + if opt.ContentType != "" { + opts = append(opts, oss.ResponseContentType(opt.ContentType)) + } + if opt.Filename != "" { + opts = append(opts, oss.ResponseContentDisposition(`attachment; filename=`+strconv.Quote(opt.Filename))) + } + } + + return opts +} diff --git a/pkg/common/db/table/relation/group.go b/pkg/common/db/table/relation/group.go index 6759e0d35..24a75173d 100644 --- a/pkg/common/db/table/relation/group.go +++ b/pkg/common/db/table/relation/group.go @@ -30,7 +30,7 @@ type GroupModel struct { Introduction string `gorm:"column:introduction;size:255" json:"introduction"` FaceURL string `gorm:"column:face_url;size:255" json:"faceURL"` CreateTime time.Time `gorm:"column:create_time;index:create_time;autoCreateTime"` - Ex string `gorm:"column:ex" json:"ex;size:1024"` + Ex string `gorm:"column:ex;size:1024" json:"ex"` Status int32 `gorm:"column:status"` CreatorUserID string `gorm:"column:creator_user_id;size:64"` GroupType int32 `gorm:"column:group_type"` diff --git a/pkg/common/db/table/relation/utils.go b/pkg/common/db/table/relation/utils.go index c944eae8b..bc2639e1a 100644 --- a/pkg/common/db/table/relation/utils.go +++ b/pkg/common/db/table/relation/utils.go @@ -15,6 +15,8 @@ package relation import ( + "errors" + "gorm.io/gorm" "github.com/OpenIMSDK/tools/utils" @@ -32,5 +34,5 @@ type GroupSimpleUserID struct { } func IsNotFound(err error) bool { - return utils.Unwrap(err) == gorm.ErrRecordNotFound + return errors.Is(utils.Unwrap(err), gorm.ErrRecordNotFound) } diff --git a/pkg/common/db/table/unrelation/msg.go b/pkg/common/db/table/unrelation/msg.go index c95b211a8..542f318ad 100644 --- a/pkg/common/db/table/unrelation/msg.go +++ b/pkg/common/db/table/unrelation/msg.go @@ -150,6 +150,7 @@ func (m *MsgDocModel) IsFull() bool { func (m MsgDocModel) GetDocID(conversationID string, seq int64) string { seqSuffix := (seq - 1) / singleGocMsgNum + return m.indexGen(conversationID, seqSuffix) } @@ -164,6 +165,7 @@ func (m MsgDocModel) GetDocIDSeqsMap(conversationID string, seqs []int64) map[st t[docID] = append(value, seqs[i]) } } + return t } @@ -181,5 +183,6 @@ func (MsgDocModel) GenExceptionMessageBySeqs(seqs []int64) (exceptionMsg []*sdkw msgModel.Seq = v exceptionMsg = append(exceptionMsg, msgModel) } + return exceptionMsg } diff --git a/pkg/common/db/unrelation/mongo.go b/pkg/common/db/unrelation/mongo.go old mode 100644 new mode 100755 index 09e3e904e..8a90a2a2c --- a/pkg/common/db/unrelation/mongo.go +++ b/pkg/common/db/unrelation/mongo.go @@ -16,6 +16,7 @@ package unrelation import ( "context" + "errors" "fmt" "strings" "time" @@ -44,27 +45,12 @@ type Mongo struct { // NewMongo Initialize MongoDB connection. func NewMongo() (*Mongo, error) { specialerror.AddReplace(mongo.ErrNoDocuments, errs.ErrRecordNotFound) - uri := "mongodb://sample.host:27017/?maxPoolSize=20&w=majority" + // uri := "mongodb://sample.host:27017/?maxPoolSize=20&w=majority" + var uri string if config.Config.Mongo.Uri != "" { uri = config.Config.Mongo.Uri } else { - mongodbHosts := "" - for i, v := range config.Config.Mongo.Address { - if i == len(config.Config.Mongo.Address)-1 { - mongodbHosts += v - } else { - mongodbHosts += v + "," - } - } - if config.Config.Mongo.Password != "" && config.Config.Mongo.Username != "" { - uri = fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin", - config.Config.Mongo.Username, config.Config.Mongo.Password, mongodbHosts, - config.Config.Mongo.Database, config.Config.Mongo.MaxPoolSize) - } else { - uri = fmt.Sprintf("mongodb://%s/%s/?maxPoolSize=%d&authSource=admin", - mongodbHosts, config.Config.Mongo.Database, - config.Config.Mongo.MaxPoolSize) - } + uri = defaultMongoUriForNewMongo() } fmt.Println("mongo:", uri) var mongoClient *mongo.Client @@ -76,17 +62,41 @@ func NewMongo() (*Mongo, error) { if err == nil { return &Mongo{db: mongoClient}, nil } - if cmdErr, ok := err.(mongo.CommandError); ok { + var cmdErr mongo.CommandError + if errors.As(err, &cmdErr) { if cmdErr.Code == 13 || cmdErr.Code == 18 { return nil, err - } else { - fmt.Printf("Failed to connect to MongoDB: %s\n", err) } + fmt.Printf("Failed to connect to MongoDB: %s\n", err) } } + return nil, err } +func defaultMongoUriForNewMongo() string { + var uri string + mongodbHosts := "" + for i, v := range config.Config.Mongo.Address { + if i == len(config.Config.Mongo.Address)-1 { + mongodbHosts += v + } else { + mongodbHosts += v + "," + } + } + if config.Config.Mongo.Password != "" && config.Config.Mongo.Username != "" { + uri = fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin", + config.Config.Mongo.Username, config.Config.Mongo.Password, mongodbHosts, + config.Config.Mongo.Database, config.Config.Mongo.MaxPoolSize) + } else { + uri = fmt.Sprintf("mongodb://%s/%s/?maxPoolSize=%d&authSource=admin", + mongodbHosts, config.Config.Mongo.Database, + config.Config.Mongo.MaxPoolSize) + } + + return uri +} + func (m *Mongo) GetClient() *mongo.Client { return m.db } @@ -106,6 +116,7 @@ func (m *Mongo) CreateSuperGroupIndex() error { if err := m.createMongoIndex(unrelation.CUserToSuperGroup, true, "user_id"); err != nil { return err } + return nil } @@ -139,5 +150,6 @@ func (m *Mongo) createMongoIndex(collection string, isUnique bool, keys ...strin if err != nil { return utils.Wrap(err, result) } + return nil } diff --git a/pkg/common/db/unrelation/msg.go b/pkg/common/db/unrelation/msg.go old mode 100644 new mode 100755 index 9b461dd1f..afa2f81e4 --- a/pkg/common/db/unrelation/msg.go +++ b/pkg/common/db/unrelation/msg.go @@ -49,6 +49,7 @@ type MsgMongoDriver struct { func NewMsgMongoDriver(database *mongo.Database) table.MsgDocModelInterface { collection := database.Collection(table.MsgDocModel{}.TableName()) + return &MsgMongoDriver{MsgCollection: collection} } @@ -59,6 +60,7 @@ func (m *MsgMongoDriver) PushMsgsToDoc(ctx context.Context, docID string, msgsTo func (m *MsgMongoDriver) Create(ctx context.Context, model *table.MsgDocModel) error { _, err := m.MsgCollection.InsertOne(ctx, model) + return err } @@ -81,6 +83,7 @@ func (m *MsgMongoDriver) UpdateMsg( if err != nil { return nil, utils.Wrap(err, "") } + return res, nil } @@ -108,6 +111,7 @@ func (m *MsgMongoDriver) PushUnique( if err != nil { return nil, utils.Wrap(err, "") } + return res, nil } @@ -120,6 +124,7 @@ func (m *MsgMongoDriver) UpdateMsgContent(ctx context.Context, docID string, ind if err != nil { return utils.Wrap(err, "") } + return nil } @@ -143,12 +148,14 @@ func (m *MsgMongoDriver) UpdateMsgStatusByIndexInOneDoc( if err != nil { return utils.Wrap(err, "") } + return nil } func (m *MsgMongoDriver) FindOneByDocID(ctx context.Context, docID string) (*table.MsgDocModel, error) { doc := &table.MsgDocModel{} err := m.MsgCollection.FindOne(ctx, bson.M{"doc_id": docID}).Decode(doc) + return doc, err } @@ -177,6 +184,7 @@ func (m *MsgMongoDriver) GetMsgDocModelByIndex( if len(msgs) > 0 { return &msgs[0], nil } + return nil, ErrMsgListNotExist } @@ -225,6 +233,7 @@ func (m *MsgMongoDriver) DeleteMsgsInOneDocByIndex(ctx context.Context, docID st if err != nil { return utils.Wrap(err, "") } + return nil } @@ -233,6 +242,7 @@ func (m *MsgMongoDriver) DeleteDocs(ctx context.Context, docIDs []string) error return nil } _, err := m.MsgCollection.DeleteMany(ctx, bson.M{"doc_id": bson.M{"$in": docIDs}}) + return err } @@ -246,6 +256,7 @@ func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc( for _, seq := range seqs { indexs = append(indexs, m.model.GetMsgIndex(seq)) } + //nolint:govet //This is already the officially recommended standard practice. pipeline := mongo.Pipeline{ { {"$match", bson.D{ @@ -336,6 +347,7 @@ func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc( } msgs = append(msgs, msg) } + return msgs, nil } @@ -344,6 +356,7 @@ func (m *MsgMongoDriver) IsExistDocID(ctx context.Context, docID string) (bool, if err != nil { return false, errs.Wrap(err) } + return count > 0, nil } @@ -372,6 +385,7 @@ func (m *MsgMongoDriver) MarkSingleChatMsgsAsRead( updates = append(updates, updateModel) } _, err := m.MsgCollection.BulkWrite(ctx, updates) + return err } @@ -611,7 +625,39 @@ func (m *MsgMongoDriver) RangeUserSendCount( }, ) } - pipeline := bson.A{ + pipeline := buildPiplineForRangeUserSendCount(or, start, end, sort, pageNumber, showNumber) + cur, err := m.MsgCollection.Aggregate(ctx, pipeline, options.Aggregate().SetAllowDiskUse(true)) + if err != nil { + return 0, 0, nil, nil, errs.Wrap(err) + } + defer cur.Close(ctx) + var result []Result + if err = cur.All(ctx, &result); err != nil { + return 0, 0, nil, nil, errs.Wrap(err) + } + if len(result) == 0 { + return 0, 0, nil, nil, errs.Wrap(err) + } + users = make([]*table.UserCount, len(result[0].Users)) + for i, r := range result[0].Users { + users[i] = &table.UserCount{ + UserID: r.UserID, + Count: r.Count, + } + } + dateCount = make(map[string]int64) + for _, r := range result[0].Dates { + dateCount[r.Date] = r.Count + } + + return result[0].MsgCount, result[0].UserCount, users, dateCount, nil +} + +//nolint:funlen // it need to be such long +func buildPiplineForRangeUserSendCount(or bson.A, start time.Time, + end time.Time, sort int, pageNumber, showNumber int32, +) bson.A { + return bson.A{ bson.M{ "$match": bson.M{ "$and": bson.A{ @@ -795,30 +841,6 @@ func (m *MsgMongoDriver) RangeUserSendCount( }, }, } - cur, err := m.MsgCollection.Aggregate(ctx, pipeline, options.Aggregate().SetAllowDiskUse(true)) - if err != nil { - return 0, 0, nil, nil, errs.Wrap(err) - } - defer cur.Close(ctx) - var result []Result - if err := cur.All(ctx, &result); err != nil { - return 0, 0, nil, nil, errs.Wrap(err) - } - if len(result) == 0 { - return 0, 0, nil, nil, errs.Wrap(err) - } - users = make([]*table.UserCount, len(result[0].Users)) - for i, r := range result[0].Users { - users[i] = &table.UserCount{ - UserID: r.UserID, - Count: r.Count, - } - } - dateCount = make(map[string]int64) - for _, r := range result[0].Dates { - dateCount[r.Date] = r.Count - } - return result[0].MsgCount, result[0].UserCount, users, dateCount, nil } func (m *MsgMongoDriver) RangeGroupSendCount( @@ -847,7 +869,39 @@ func (m *MsgMongoDriver) RangeGroupSendCount( Count int64 `bson:"count"` } `bson:"dates"` } - pipeline := bson.A{ + pipeline := buildPiplineForRangeGroupSendCount(start, end, sort, pageNumber, showNumber) + cur, err := m.MsgCollection.Aggregate(ctx, pipeline, options.Aggregate().SetAllowDiskUse(true)) + if err != nil { + return 0, 0, nil, nil, errs.Wrap(err) + } + defer cur.Close(ctx) + var result []Result + if err = cur.All(ctx, &result); err != nil { + return 0, 0, nil, nil, errs.Wrap(err) + } + if len(result) == 0 { + return 0, 0, nil, nil, errs.Wrap(err) + } + groups = make([]*table.GroupCount, len(result[0].Groups)) + for i, r := range result[0].Groups { + groups[i] = &table.GroupCount{ + GroupID: r.GroupID, + Count: r.Count, + } + } + dateCount = make(map[string]int64) + for _, r := range result[0].Dates { + dateCount[r.Date] = r.Count + } + + return result[0].MsgCount, result[0].UserCount, groups, dateCount, nil +} + +//nolint:funlen //it need to has such length +func buildPiplineForRangeGroupSendCount(start time.Time, + end time.Time, sort int, pageNumber, showNumber int32, +) bson.A { + return bson.A{ bson.M{ "$match": bson.M{ "$and": bson.A{ @@ -1044,30 +1098,6 @@ func (m *MsgMongoDriver) RangeGroupSendCount( }, }, } - cur, err := m.MsgCollection.Aggregate(ctx, pipeline, options.Aggregate().SetAllowDiskUse(true)) - if err != nil { - return 0, 0, nil, nil, errs.Wrap(err) - } - defer cur.Close(ctx) - var result []Result - if err := cur.All(ctx, &result); err != nil { - return 0, 0, nil, nil, errs.Wrap(err) - } - if len(result) == 0 { - return 0, 0, nil, nil, errs.Wrap(err) - } - groups = make([]*table.GroupCount, len(result[0].Groups)) - for i, r := range result[0].Groups { - groups[i] = &table.GroupCount{ - GroupID: r.GroupID, - Count: r.Count, - } - } - dateCount = make(map[string]int64) - for _, r := range result[0].Dates { - dateCount[r.Date] = r.Count - } - return result[0].MsgCount, result[0].UserCount, groups, dateCount, nil } func (m *MsgMongoDriver) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int32, []*table.MsgInfoModel, error) { @@ -1075,6 +1105,7 @@ func (m *MsgMongoDriver) SearchMessage(ctx context.Context, req *msg.SearchMessa if err != nil { return 0, nil, err } + return total, msgs, nil } @@ -1119,7 +1150,7 @@ func (m *MsgMongoDriver) searchMessage(ctx context.Context, req *msg.SearchMessa }, }, ) - + //nolint:govet //this is already standard pipe = mongo.Pipeline{ { {"$match", bson.D{ @@ -1214,5 +1245,6 @@ func (m *MsgMongoDriver) searchMessage(ctx context.Context, req *msg.SearchMessa } else { msgs = msgs[start:] } + return n, msgs, nil } diff --git a/pkg/common/db/unrelation/msg_convert.go b/pkg/common/db/unrelation/msg_convert.go index 810b4f419..a5b28a5c7 100644 --- a/pkg/common/db/unrelation/msg_convert.go +++ b/pkg/common/db/unrelation/msg_convert.go @@ -31,12 +31,14 @@ func (m *MsgMongoDriver) ConvertMsgsDocLen(ctx context.Context, conversationIDs cursor, err := m.MsgCollection.Find(ctx, bson.M{"doc_id": regex}) if err != nil { log.ZError(ctx, "convertAll find msg doc failed", err, "conversationID", conversationID) + continue } var msgDocs []table.MsgDocModel err = cursor.All(ctx, &msgDocs) if err != nil { log.ZError(ctx, "convertAll cursor all failed", err, "conversationID", conversationID) + continue } if len(msgDocs) < 1 { @@ -44,39 +46,45 @@ func (m *MsgMongoDriver) ConvertMsgsDocLen(ctx context.Context, conversationIDs } log.ZInfo(ctx, "msg doc convert", "conversationID", conversationID, "len(msgDocs)", len(msgDocs)) if len(msgDocs[0].Msg) == int(m.model.GetSingleGocMsgNum5000()) { - if _, err := m.MsgCollection.DeleteMany(ctx, bson.M{"doc_id": regex}); err != nil { - log.ZError(ctx, "convertAll delete many failed", err, "conversationID", conversationID) - continue - } - var newMsgDocs []interface{} - for _, msgDoc := range msgDocs { - if int64(len(msgDoc.Msg)) == m.model.GetSingleGocMsgNum() { - continue - } - var index int64 - for index < int64(len(msgDoc.Msg)) { - msg := msgDoc.Msg[index] - if msg != nil && msg.Msg != nil { - msgDocModel := table.MsgDocModel{DocID: m.model.GetDocID(conversationID, msg.Msg.Seq)} - end := index + m.model.GetSingleGocMsgNum() - if int(end) >= len(msgDoc.Msg) { - msgDocModel.Msg = msgDoc.Msg[index:] - } else { - msgDocModel.Msg = msgDoc.Msg[index:end] - } - newMsgDocs = append(newMsgDocs, msgDocModel) - index = end - } else { - break - } + convertMsgDocs(m, ctx, msgDocs, conversationID, regex) + } + } +} + +func convertMsgDocs(m *MsgMongoDriver, ctx context.Context, msgDocs []table.MsgDocModel, conversationID string, regex primitive.Regex) { + var err error + if _, err = m.MsgCollection.DeleteMany(ctx, bson.M{"doc_id": regex}); err != nil { + log.ZError(ctx, "convertAll delete many failed", err, "conversationID", conversationID) + + return + } + var newMsgDocs []interface{} + for _, msgDoc := range msgDocs { + if int64(len(msgDoc.Msg)) == m.model.GetSingleGocMsgNum() { + continue + } + var index int64 + for index < int64(len(msgDoc.Msg)) { + msg := msgDoc.Msg[index] + if msg != nil && msg.Msg != nil { + msgDocModel := table.MsgDocModel{DocID: m.model.GetDocID(conversationID, msg.Msg.Seq)} + end := index + m.model.GetSingleGocMsgNum() + if int(end) >= len(msgDoc.Msg) { + msgDocModel.Msg = msgDoc.Msg[index:] + } else { + msgDocModel.Msg = msgDoc.Msg[index:end] } - } - _, err = m.MsgCollection.InsertMany(ctx, newMsgDocs) - if err != nil { - log.ZError(ctx, "convertAll insert many failed", err, "conversationID", conversationID, "len(newMsgDocs)", len(newMsgDocs)) + newMsgDocs = append(newMsgDocs, msgDocModel) + index = end } else { - log.ZInfo(ctx, "msg doc convert", "conversationID", conversationID, "len(newMsgDocs)", len(newMsgDocs)) + break } } } + _, err = m.MsgCollection.InsertMany(ctx, newMsgDocs) + if err != nil { + log.ZError(ctx, "convertAll insert many failed", err, "conversationID", conversationID, "len(newMsgDocs)", len(newMsgDocs)) + } else { + log.ZInfo(ctx, "msg doc convert", "conversationID", conversationID, "len(newMsgDocs)", len(newMsgDocs)) + } } diff --git a/pkg/common/db/unrelation/super_group.go b/pkg/common/db/unrelation/super_group.go index c762140a2..7f9aecfd6 100644 --- a/pkg/common/db/unrelation/super_group.go +++ b/pkg/common/db/unrelation/super_group.go @@ -59,6 +59,7 @@ func (s *SuperGroupMongoDriver) CreateSuperGroup(ctx context.Context, groupID st return err } } + return nil } @@ -69,6 +70,7 @@ func (s *SuperGroupMongoDriver) TakeSuperGroup( if err := s.superGroupCollection.FindOne(ctx, bson.M{"group_id": groupID}).Decode(&group); err != nil { return nil, utils.Wrap(err, "") } + return group, nil } @@ -86,6 +88,7 @@ func (s *SuperGroupMongoDriver) FindSuperGroup( if err := cursor.All(ctx, &groups); err != nil { return nil, utils.Wrap(err, "") } + return groups, nil } @@ -113,6 +116,7 @@ func (s *SuperGroupMongoDriver) AddUserToSuperGroup(ctx context.Context, groupID return utils.Wrap(err, "transaction failed") } } + return nil } @@ -129,6 +133,7 @@ func (s *SuperGroupMongoDriver) RemoverUserFromSuperGroup(ctx context.Context, g if err != nil { return err } + return nil } @@ -138,6 +143,7 @@ func (s *SuperGroupMongoDriver) GetSuperGroupByUserID( ) (*unrelation.UserToSuperGroupModel, error) { var user unrelation.UserToSuperGroupModel err := s.userToSuperGroupCollection.FindOne(ctx, bson.M{"user_id": userID}).Decode(&user) + return &user, utils.Wrap(err, "") } @@ -149,6 +155,7 @@ func (s *SuperGroupMongoDriver) DeleteSuperGroup(ctx context.Context, groupID st if _, err := s.superGroupCollection.DeleteOne(ctx, bson.M{"group_id": groupID}); err != nil { return utils.Wrap(err, "") } + return s.RemoveGroupFromUser(ctx, groupID, group.MemberIDs) } @@ -158,5 +165,6 @@ func (s *SuperGroupMongoDriver) RemoveGroupFromUser(ctx context.Context, groupID bson.M{"user_id": bson.M{"$in": userIDs}}, bson.M{"$pull": bson.M{"group_id_list": groupID}}, ) + return utils.Wrap(err, "") } diff --git a/pkg/common/db/unrelation/user.go b/pkg/common/db/unrelation/user.go old mode 100644 new mode 100755 index 4b4a78c79..ad02968bd --- a/pkg/common/db/unrelation/user.go +++ b/pkg/common/db/unrelation/user.go @@ -16,6 +16,7 @@ package unrelation import ( "context" + "errors" "github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/utils" @@ -50,6 +51,7 @@ type UserMongoDriver struct { // AddSubscriptionList Subscriber's handling of thresholds. func (u *UserMongoDriver) AddSubscriptionList(ctx context.Context, userID string, userIDList []string) error { // Check the number of lists in the key. + //nolint:govet //this has already been the standard format for mongo.Pipeline pipeline := mongo.Pipeline{ {{"$match", bson.D{{"user_id", SubscriptionPrefix + userID}}}}, {{"$project", bson.D{{"count", bson.D{{"$size", "$user_id_list"}}}}}}, @@ -65,7 +67,7 @@ func (u *UserMongoDriver) AddSubscriptionList(ctx context.Context, userID string } // iterate over aggregated results for cursor.Next(ctx) { - err := cursor.Decode(&cnt) + err = cursor.Decode(&cnt) if err != nil { return errs.Wrap(err) } @@ -122,6 +124,7 @@ func (u *UserMongoDriver) AddSubscriptionList(ctx context.Context, userID string return utils.Wrap(err, "transaction failed") } } + return nil } @@ -139,6 +142,7 @@ func (u *UserMongoDriver) UnsubscriptionList(ctx context.Context, userID string, if err != nil { return errs.Wrap(err) } + return nil } @@ -152,6 +156,7 @@ func (u *UserMongoDriver) RemoveSubscribedListFromUser(ctx context.Context, user bson.M{"$pull": bson.M{"user_id_list": userID}}, ) } + return errs.Wrap(err) } @@ -163,12 +168,13 @@ func (u *UserMongoDriver) GetAllSubscribeList(ctx context.Context, userID string bson.M{"user_id": SubscriptionPrefix + userID}) err = cursor.Decode(&user) if err != nil { - if err == mongo.ErrNoDocuments { + if errors.Is(err, mongo.ErrNoDocuments) { return []string{}, nil - } else { - return nil, errs.Wrap(err) } + + return nil, errs.Wrap(err) } + return user.UserIDList, nil } @@ -180,11 +186,12 @@ func (u *UserMongoDriver) GetSubscribedList(ctx context.Context, userID string) bson.M{"user_id": SubscribedPrefix + userID}) err = cursor.Decode(&user) if err != nil { - if err == mongo.ErrNoDocuments { + if errors.Is(err, mongo.ErrNoDocuments) { return []string{}, nil - } else { - return nil, errs.Wrap(err) } + + return nil, errs.Wrap(err) } + return user.UserIDList, nil }