fix: fix rest lint errors in pkg/common/db

pull/1263/head
cncsmonster 2 years ago
parent fc1a270057
commit 4bd9acbd6d

@ -46,6 +46,7 @@ func (c *Controller) HashPath(md5 string) string {
func (c *Controller) NowPath() string {
now := time.Now()
return path.Join(
fmt.Sprintf("%04d", now.Year()),
fmt.Sprintf("%02d", now.Month()),
@ -58,6 +59,7 @@ func (c *Controller) NowPath() string {
func (c *Controller) UUID() string {
id := uuid.New()
return hex.EncodeToString(id[:])
}
@ -92,20 +94,24 @@ func (c *Controller) InitiateUpload(ctx context.Context, hash string, size int64
partNumber++
}
if maxParts > 0 && partNumber > 0 && partNumber < maxParts {
return nil, errors.New(fmt.Sprintf("too many parts: %d", partNumber))
return nil, fmt.Errorf("too few parts: %d", partNumber)
}
if info, err := c.impl.StatObject(ctx, c.HashPath(hash)); err == nil {
info, err := c.impl.StatObject(ctx, c.HashPath(hash))
if err == nil {
return nil, &HashAlreadyExistsError{Object: info}
} else if !c.impl.IsNotFound(err) {
}
if !c.impl.IsNotFound(err) {
return nil, err
}
if size <= partSize {
// 预签名上传
key := path.Join(tempPath, c.NowPath(), fmt.Sprintf("%s_%d_%s.presigned", hash, size, c.UUID()))
rawURL, err := c.impl.PresignedPutObject(ctx, key, expire)
if err != nil {
return nil, err
rawURL, err2 := c.impl.PresignedPutObject(ctx, key, expire)
if err2 != nil {
return nil, err2
}
return &InitiateUploadResult{
UploadID: newMultipartUploadID(multipartUploadID{
Type: UploadTypePresigned,
@ -124,38 +130,39 @@ func (c *Controller) InitiateUpload(ctx context.Context, hash string, size int64
},
},
}, nil
} else {
// 分片上传
upload, err := c.impl.InitiateMultipartUpload(ctx, c.HashPath(hash))
}
// 分片上传
upload, err := c.impl.InitiateMultipartUpload(ctx, c.HashPath(hash))
if err != nil {
return nil, err
}
if maxParts < 0 {
maxParts = partNumber
}
var authSign *s3.AuthSignResult
if maxParts > 0 {
partNumbers := make([]int, partNumber)
for i := 0; i < maxParts; i++ {
partNumbers[i] = i + 1
}
authSign, err = c.impl.AuthSign(ctx, upload.UploadID, upload.Key, time.Hour*24, partNumbers)
if err != nil {
return nil, err
}
if maxParts < 0 {
maxParts = partNumber
}
var authSign *s3.AuthSignResult
if maxParts > 0 {
partNumbers := make([]int, partNumber)
for i := 0; i < maxParts; i++ {
partNumbers[i] = i + 1
}
authSign, err = c.impl.AuthSign(ctx, upload.UploadID, upload.Key, time.Hour*24, partNumbers)
if err != nil {
return nil, err
}
}
return &InitiateUploadResult{
UploadID: newMultipartUploadID(multipartUploadID{
Type: UploadTypeMultipart,
ID: upload.UploadID,
Key: upload.Key,
Size: size,
Hash: hash,
}),
PartSize: partSize,
Sign: authSign,
}, nil
}
return &InitiateUploadResult{
UploadID: newMultipartUploadID(multipartUploadID{
Type: UploadTypeMultipart,
ID: upload.UploadID,
Key: upload.Key,
Size: size,
Hash: hash,
}),
PartSize: partSize,
Sign: authSign,
}, nil
}
func (c *Controller) CompleteUpload(ctx context.Context, uploadID string, partHashs []string) (*UploadResult, error) {
@ -164,8 +171,10 @@ func (c *Controller) CompleteUpload(ctx context.Context, uploadID string, partHa
if err != nil {
return nil, err
}
//nolint:gosec //tofix G401: Use of weak cryptographic primitive
if md5Sum := md5.Sum([]byte(strings.Join(partHashs, partSeparator))); hex.EncodeToString(md5Sum[:]) != upload.Hash {
fmt.Println("CompleteUpload sum:", hex.EncodeToString(md5Sum[:]), "upload hash:", upload.Hash)
return nil, errors.New("md5 mismatching")
}
if info, err := c.impl.StatObject(ctx, c.HashPath(upload.Hash)); err == nil {
@ -193,7 +202,7 @@ func (c *Controller) CompleteUpload(ctx context.Context, uploadID string, partHa
ETag: part,
}
}
// todo: 验证大小
// todo: verify size
result, err := c.impl.CompleteMultipartUpload(ctx, upload.ID, upload.Key, parts)
if err != nil {
return nil, err
@ -208,11 +217,12 @@ func (c *Controller) CompleteUpload(ctx context.Context, uploadID string, partHa
if uploadInfo.Size != upload.Size {
return nil, errors.New("upload size mismatching")
}
//nolint:gosec //G401: Use of weak cryptographic primitive
md5Sum := md5.Sum([]byte(strings.Join([]string{uploadInfo.ETag}, partSeparator)))
if md5val := hex.EncodeToString(md5Sum[:]); md5val != upload.Hash {
return nil, errs.ErrArgs.Wrap(fmt.Sprintf("md5 mismatching %s != %s", md5val, upload.Hash))
}
// 防止在这个时候,并发操作,导致文件被覆盖
// Prevent concurrent operations at this time to avoid file overwrite
copyInfo, err := c.impl.CopyObject(ctx, uploadInfo.Key, upload.Key+"."+c.UUID())
if err != nil {
return nil, err
@ -230,6 +240,7 @@ func (c *Controller) CompleteUpload(ctx context.Context, uploadID string, partHa
default:
return nil, errors.New("invalid upload id type")
}
return &UploadResult{
Key: targetKey,
Size: upload.Size,
@ -261,5 +272,6 @@ func (c *Controller) AccessURL(ctx context.Context, name string, expire time.Dur
opt.Filename = ""
opt.ContentType = ""
}
return c.impl.AccessURL(ctx, name, expire, opt)
}

@ -33,6 +33,7 @@ func newMultipartUploadID(id multipartUploadID) string {
if err != nil {
panic(err)
}
return base64.StdEncoding.EncodeToString(data)
}
@ -45,5 +46,6 @@ func parseMultipartUploadID(id string) (*multipartUploadID, error) {
if err := json.Unmarshal(data, &upload); err != nil {
return nil, fmt.Errorf("invalid multipart upload id: %w", err)
}
return &upload, nil
}

@ -44,11 +44,6 @@ const (
imageWebp = "webp"
)
const (
videoSnapshotImagePng = "png"
videoSnapshotImageJpg = "jpg"
)
func NewCos() (s3.Interface, error) {
conf := config.Config.Object.Cos
u, err := url.Parse(conf.BucketURL)
@ -62,6 +57,7 @@ func NewCos() (s3.Interface, error) {
SessionToken: conf.SessionToken,
},
})
return &Cos{
copyURL: u.Host + "/",
client: client,
@ -92,6 +88,7 @@ func (c *Cos) InitiateMultipartUpload(ctx context.Context, name string) (*s3.Ini
if err != nil {
return nil, err
}
return &s3.InitiateMultipartUploadResult{
UploadID: result.UploadID,
Bucket: result.Bucket,
@ -113,6 +110,7 @@ func (c *Cos) CompleteMultipartUpload(ctx context.Context, uploadID string, name
if err != nil {
return nil, err
}
return &s3.CompleteMultipartUploadResult{
Location: result.Location,
Bucket: result.Bucket,
@ -135,6 +133,7 @@ func (c *Cos) PartSize(ctx context.Context, size int64) (int64, error) {
if size%maxNumSize != 0 {
partSize++
}
return partSize, nil
}
@ -157,6 +156,7 @@ func (c *Cos) AuthSign(ctx context.Context, uploadID string, name string, expire
Query: url.Values{"partNumber": {strconv.Itoa(partNumber)}},
}
}
return &result, nil
}
@ -165,11 +165,13 @@ func (c *Cos) PresignedPutObject(ctx context.Context, name string, expire time.D
if err != nil {
return "", err
}
return rawURL.String(), nil
}
func (c *Cos) DeleteObject(ctx context.Context, name string) error {
_, err := c.client.Object.Delete(ctx, name)
return err
}
@ -185,25 +187,26 @@ func (c *Cos) StatObject(ctx context.Context, name string) (*s3.ObjectInfo, erro
if res.ETag = strings.ToLower(strings.ReplaceAll(info.Header.Get("ETag"), `"`, "")); res.ETag == "" {
return nil, errors.New("StatObject etag not found")
}
if contentLengthStr := info.Header.Get("Content-Length"); contentLengthStr == "" {
contentLengthStr := info.Header.Get("Content-Length")
if contentLengthStr == "" {
return nil, errors.New("StatObject content-length not found")
} else {
res.Size, err = strconv.ParseInt(contentLengthStr, 10, 64)
if err != nil {
return nil, fmt.Errorf("StatObject content-length parse error: %w", err)
}
if res.Size < 0 {
return nil, errors.New("StatObject content-length must be greater than 0")
}
}
if lastModified := info.Header.Get("Last-Modified"); lastModified == "" {
res.Size, err = strconv.ParseInt(contentLengthStr, 10, 64)
if err != nil {
return nil, fmt.Errorf("StatObject content-length parse error: %w", err)
}
if res.Size < 0 {
return nil, errors.New("StatObject content-length must be greater than 0")
}
lastModified := info.Header.Get("Last-Modified")
if lastModified == "" {
return nil, errors.New("StatObject last-modified not found")
} else {
res.LastModified, err = time.Parse(http.TimeFormat, lastModified)
if err != nil {
return nil, fmt.Errorf("StatObject last-modified parse error: %w", err)
}
}
res.LastModified, err = time.Parse(http.TimeFormat, lastModified)
if err != nil {
return nil, fmt.Errorf("StatObject last-modified parse error: %w", err)
}
return res, nil
}
@ -213,6 +216,7 @@ func (c *Cos) CopyObject(ctx context.Context, src string, dst string) (*s3.CopyO
if err != nil {
return nil, err
}
return &s3.CopyObjectInfo{
Key: dst,
ETag: strings.ReplaceAll(result.ETag, `"`, ``),
@ -220,16 +224,17 @@ func (c *Cos) CopyObject(ctx context.Context, src string, dst string) (*s3.CopyO
}
func (c *Cos) IsNotFound(err error) bool {
switch e := err.(type) {
case *cos.ErrorResponse:
return e.Response.StatusCode == http.StatusNotFound || e.Code == "NoSuchKey"
default:
return false
var cosErr *cos.ErrorResponse
if errors.As(err, &cosErr) {
return cosErr.Response.StatusCode == http.StatusNotFound || cosErr.Code == "NoSuchKey"
}
return false
}
func (c *Cos) AbortMultipartUpload(ctx context.Context, uploadID string, name string) error {
_, err := c.client.Object.AbortMultipartUpload(ctx, name, uploadID)
return err
}
@ -257,46 +262,59 @@ func (c *Cos) ListUploadedParts(ctx context.Context, uploadID string, name strin
Size: part.Size,
}
}
return res, nil
}
func (c *Cos) AccessURL(ctx context.Context, name string, expire time.Duration, opt *s3.AccessURLOption) (string, error) {
var imageMogr string
var option cos.PresignedURLOptions
if opt != nil {
query := make(url.Values)
if opt.Image != nil {
// https://cloud.tencent.com/document/product/436/44880
style := make([]string, 0, 2)
wh := make([]string, 2)
if opt.Image.Width > 0 {
wh[0] = strconv.Itoa(opt.Image.Width)
}
if opt.Image.Height > 0 {
wh[1] = strconv.Itoa(opt.Image.Height)
}
if opt.Image.Width > 0 || opt.Image.Height > 0 {
style = append(style, strings.Join(wh, "x"))
}
switch opt.Image.Format {
case
imagePng,
imageJpg,
imageJpeg,
imageGif,
imageWebp:
style = append(style, "format/"+opt.Image.Format)
}
if len(style) > 0 {
imageMogr = "imageMogr2/thumbnail/" + strings.Join(style, "/") + "/ignore-error/1"
}
getImageMogr := func(opt *s3.AccessURLOption) (imageMogr string) {
if opt.Image == nil {
return imageMogr
}
// https://cloud.tencent.com/document/product/436/44880
style := make([]string, 0, 2)
wh := make([]string, 2)
if opt.Image.Width > 0 {
wh[0] = strconv.Itoa(opt.Image.Width)
}
if opt.Image.Height > 0 {
wh[1] = strconv.Itoa(opt.Image.Height)
}
if opt.Image.Width > 0 || opt.Image.Height > 0 {
style = append(style, strings.Join(wh, "x"))
}
switch opt.Image.Format {
case
imagePng,
imageJpg,
imageJpeg,
imageGif,
imageWebp:
style = append(style, "format/"+opt.Image.Format)
}
if len(style) > 0 {
imageMogr = "imageMogr2/thumbnail/" + strings.Join(style, "/") + "/ignore-error/1"
}
return imageMogr
}
getQuery := func(opt *s3.AccessURLOption) (query url.Values) {
query = make(url.Values)
if opt.ContentType != "" {
query.Set("response-content-type", opt.ContentType)
}
if opt.Filename != "" {
query.Set("response-content-disposition", `attachment; filename=`+strconv.Quote(opt.Filename))
}
return query
}
if opt != nil {
imageMogr = getImageMogr(opt)
query := getQuery(opt)
if len(query) > 0 {
option.Query = &query
}
@ -317,6 +335,7 @@ func (c *Cos) AccessURL(ctx context.Context, name string, expire time.Duration,
rawURL.RawQuery = rawURL.RawQuery + "&" + imageMogr
}
}
return rawURL.String(), nil
}
@ -324,5 +343,6 @@ func (c *Cos) getPresignedURL(ctx context.Context, name string, expire time.Dura
if !config.Config.Object.Cos.PublicRead {
return c.client.Object.GetPresignedURL(ctx, http.MethodGet, name, c.credential.SecretID, c.credential.SecretKey, expire, opt)
}
return c.client.Object.GetObjectURL(name), nil
}

@ -39,6 +39,7 @@ func ImageStat(reader io.Reader) (image.Image, string, error) {
func ImageWidthHeight(img image.Image) (int, int) {
bounds := img.Bounds().Max
return bounds.X, bounds.Y
}
@ -47,27 +48,27 @@ func resizeImage(img image.Image, maxWidth, maxHeight int) image.Image {
imgWidth := bounds.Max.X
imgHeight := bounds.Max.Y
// 计算缩放比例
// Calculate scaling ratio
scaleWidth := float64(maxWidth) / float64(imgWidth)
scaleHeight := float64(maxHeight) / float64(imgHeight)
// 如果都为0则不缩放返回原始图片
// If both maxWidth and maxHeight are 0, return the original image
if maxWidth == 0 && maxHeight == 0 {
return img
}
// 如果宽度和高度都大于0则选择较小的缩放比例以保持宽高比
// If both maxWidth and maxHeight are greater than 0, choose the smaller scaling ratio to maintain aspect ratio
if maxWidth > 0 && maxHeight > 0 {
scale := scaleWidth
if scaleHeight < scaleWidth {
scale = scaleHeight
}
// 计算缩略图尺寸
// Calculate thumbnail size
thumbnailWidth := int(float64(imgWidth) * scale)
thumbnailHeight := int(float64(imgHeight) * scale)
// 使用"image"库的Resample方法生成缩略图
// Generate thumbnail using the Resample method of the "image" library
thumbnail := image.NewRGBA(image.Rect(0, 0, thumbnailWidth, thumbnailHeight))
for y := 0; y < thumbnailHeight; y++ {
for x := 0; x < thumbnailWidth; x++ {
@ -80,12 +81,12 @@ func resizeImage(img image.Image, maxWidth, maxHeight int) image.Image {
return thumbnail
}
// 如果只指定了宽度或高度,则根据最大不超过的规则生成缩略图
// If only maxWidth or maxHeight is specified, generate thumbnail according to the "max not exceed" rule
if maxWidth > 0 {
thumbnailWidth := maxWidth
thumbnailHeight := int(float64(imgHeight) * scaleWidth)
// 使用"image"库的Resample方法生成缩略图
// Generate thumbnail using the Resample method of the "image" library
thumbnail := image.NewRGBA(image.Rect(0, 0, thumbnailWidth, thumbnailHeight))
for y := 0; y < thumbnailHeight; y++ {
for x := 0; x < thumbnailWidth; x++ {
@ -102,7 +103,7 @@ func resizeImage(img image.Image, maxWidth, maxHeight int) image.Image {
thumbnailWidth := int(float64(imgWidth) * scaleHeight)
thumbnailHeight := maxHeight
// 使用"image"库的Resample方法生成缩略图
// Generate thumbnail using the Resample method of the "image" library
thumbnail := image.NewRGBA(image.Rect(0, 0, thumbnailWidth, thumbnailHeight))
for y := 0; y < thumbnailHeight; y++ {
for x := 0; x < thumbnailWidth; x++ {
@ -115,6 +116,6 @@ func resizeImage(img image.Image, maxWidth, maxHeight int) image.Image {
return thumbnail
}
// 默认情况下,返回原始图片
// By default, return the original image
return img
}

@ -111,6 +111,7 @@ func NewMinio() (s3.Interface, error) {
if err := m.initMinio(ctx); err != nil {
fmt.Println("init minio error:", err)
}
return m, nil
}
@ -141,8 +142,9 @@ func (m *Minio) initMinio(ctx context.Context) error {
return fmt.Errorf("check bucket exists error: %w", err)
}
if !exists {
if err := m.core.Client.MakeBucket(ctx, conf.Bucket, minio.MakeBucketOptions{}); err != nil {
return fmt.Errorf("make bucket error: %w", err)
err2 := m.core.Client.MakeBucket(ctx, conf.Bucket, minio.MakeBucketOptions{})
if err2 != nil {
return fmt.Errorf("make bucket error: %w", err2)
}
}
if conf.PublicRead {
@ -150,8 +152,9 @@ func (m *Minio) initMinio(ctx context.Context) error {
`{"Version": "2012-10-17","Statement": [{"Action": ["s3:GetObject","s3:PutObject"],"Effect": "Allow","Principal": {"AWS": ["*"]},"Resource": ["arn:aws:s3:::%s/*"],"Sid": ""}]}`,
conf.Bucket,
)
if err := m.core.Client.SetBucketPolicy(ctx, conf.Bucket, policy); err != nil {
return err
err2 := m.core.Client.SetBucketPolicy(ctx, conf.Bucket, policy)
if err2 != nil {
return err2
}
}
m.location, err = m.core.Client.GetBucketLocation(ctx, conf.Bucket)
@ -182,6 +185,7 @@ func (m *Minio) initMinio(ctx context.Context) error {
vblc.Elem().Elem().Interface().(interface{ Set(string, string) }).Set(conf.Bucket, m.location)
}()
m.init = true
return nil
}
@ -205,6 +209,7 @@ func (m *Minio) InitiateMultipartUpload(ctx context.Context, name string) (*s3.I
if err != nil {
return nil, err
}
return &s3.InitiateMultipartUploadResult{
Bucket: m.bucket,
Key: name,
@ -227,6 +232,7 @@ func (m *Minio) CompleteMultipartUpload(ctx context.Context, uploadID string, na
if err != nil {
return nil, err
}
return &s3.CompleteMultipartUploadResult{
Location: upload.Location,
Bucket: upload.Bucket,
@ -249,6 +255,7 @@ func (m *Minio) PartSize(ctx context.Context, size int64) (int64, error) {
if size%maxNumSize != 0 {
partSize++
}
return partSize, nil
}
@ -282,6 +289,7 @@ func (m *Minio) AuthSign(ctx context.Context, uploadID string, name string, expi
if m.prefix != "" {
result.URL = m.signEndpoint + m.prefix + "/" + m.bucket + "/" + name
}
return &result, nil
}
@ -296,6 +304,7 @@ func (m *Minio) PresignedPutObject(ctx context.Context, name string, expire time
if m.prefix != "" {
rawURL.Path = path.Join(m.prefix, rawURL.Path)
}
return rawURL.String(), nil
}
@ -303,6 +312,7 @@ func (m *Minio) DeleteObject(ctx context.Context, name string) error {
if err := m.initMinio(ctx); err != nil {
return err
}
return m.core.Client.RemoveObject(ctx, m.bucket, name, minio.RemoveObjectOptions{})
}
@ -314,6 +324,7 @@ func (m *Minio) StatObject(ctx context.Context, name string) (*s3.ObjectInfo, er
if err != nil {
return nil, err
}
return &s3.ObjectInfo{
ETag: strings.ToLower(info.ETag),
Key: info.Key,
@ -336,6 +347,7 @@ func (m *Minio) CopyObject(ctx context.Context, src string, dst string) (*s3.Cop
if err != nil {
return nil, err
}
return &s3.CopyObjectInfo{
Key: dst,
ETag: strings.ToLower(result.ETag),
@ -346,20 +358,23 @@ func (m *Minio) IsNotFound(err error) bool {
if err == nil {
return false
}
switch e := err.(type) {
case minio.ErrorResponse:
return e.StatusCode == http.StatusNotFound || e.Code == "NoSuchKey"
case *minio.ErrorResponse:
return e.StatusCode == http.StatusNotFound || e.Code == "NoSuchKey"
default:
return false
var minioErr minio.ErrorResponse
if errors.As(err, &minio.ErrorResponse{}) {
return minioErr.StatusCode == http.StatusNotFound || minioErr.Code == "NoSuchKey"
}
var minioErr2 *minio.ErrorResponse
if errors.As(err, &minioErr2) {
return minioErr2.StatusCode == http.StatusNotFound || minioErr2.Code == "NoSuchKey"
}
return false
}
func (m *Minio) AbortMultipartUpload(ctx context.Context, uploadID string, name string) error {
if err := m.initMinio(ctx); err != nil {
return err
}
return m.core.AbortMultipartUpload(ctx, m.bucket, name, uploadID)
}
@ -386,6 +401,7 @@ func (m *Minio) ListUploadedParts(ctx context.Context, uploadID string, name str
Size: part.Size,
}
}
return res, nil
}
@ -410,14 +426,11 @@ func (m *Minio) presignedGetObject(ctx context.Context, name string, expire time
if m.prefix != "" {
rawURL.Path = path.Join(m.prefix, rawURL.Path)
}
return rawURL.String(), nil
}
func (m *Minio) AccessURL(ctx context.Context, name string, expire time.Duration, opt *s3.AccessURLOption) (string, error) {
if err := m.initMinio(ctx); err != nil {
return "", err
}
reqParams := make(url.Values)
func (m *Minio) getImageInfoForAccessURL(ctx context.Context, name string, expire time.Duration, opt *s3.AccessURLOption, reqParams url.Values) (fileInfo *s3.ObjectInfo, objectInfoPath, msg string, err error) {
if opt != nil {
if opt.ContentType != "" {
reqParams.Set("response-content-type", opt.ContentType)
@ -427,35 +440,47 @@ func (m *Minio) AccessURL(ctx context.Context, name string, expire time.Duration
}
}
if opt.Image == nil || (opt.Image.Width < 0 && opt.Image.Height < 0 && opt.Image.Format == "") || (opt.Image.Width > maxImageWidth || opt.Image.Height > maxImageHeight) {
return m.presignedGetObject(ctx, name, expire, reqParams)
msg, err = m.presignedGetObject(ctx, name, expire, reqParams)
return nil, "", msg, err
}
fileInfo, err := m.StatObject(ctx, name)
fileInfo, err = m.StatObject(ctx, name)
objectInfoPath = path.Join(pathInfo, fileInfo.ETag, "image.json")
if err != nil {
return "", err
return nil, "", msg, err
}
if fileInfo.Size > maxImageSize {
return "", errors.New("file size too large")
return nil, "", "", errors.New("file size too large")
}
objectInfoPath := path.Join(pathInfo, fileInfo.ETag, "image.json")
var (
img image.Image
info minioImageInfo
)
data, err := m.getObjectData(ctx, objectInfoPath, 1024)
return fileInfo, objectInfoPath, "", nil
}
func (m *Minio) loadImgDataForAccessURL(objectInfoPath string, ctx context.Context, name string, info *minioImageInfo) (img image.Image, msg string, err error) {
var data []byte
data, err = m.getObjectData(ctx, objectInfoPath, 1024)
//nolint:nestif //easy enough to understand
if err == nil {
if err := json.Unmarshal(data, &info); err != nil {
return "", fmt.Errorf("unmarshal minio image info.json error: %w", err)
err = json.Unmarshal(data, &info)
if err != nil {
return nil, "", fmt.Errorf("unmarshal minio image info.json error: %w", err)
}
if info.NotImage {
return "", errors.New("not image")
return nil, "", errors.New("not image")
}
} else if m.IsNotFound(err) {
reader, err := m.core.Client.GetObject(ctx, m.bucket, name, minio.GetObjectOptions{})
var reader *minio.Object
reader, err = m.core.Client.GetObject(ctx, m.bucket, name, minio.GetObjectOptions{})
if err != nil {
return "", err
return img, msg, err
}
defer reader.Close()
imageInfo, format, err := ImageStat(reader)
var (
imageInfo image.Image
format string
)
imageInfo, format, err = ImageStat(reader)
if err == nil {
info.NotImage = false
info.Format = format
@ -464,16 +489,22 @@ func (m *Minio) AccessURL(ctx context.Context, name string, expire time.Duration
} else {
info.NotImage = true
}
data, err := json.Marshal(&info)
data, err = json.Marshal(&info)
if err != nil {
return "", err
return img, msg, err
}
if _, err := m.core.Client.PutObject(ctx, m.bucket, objectInfoPath, bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{}); err != nil {
return "", err
_, err = m.core.Client.PutObject(ctx, m.bucket, objectInfoPath, bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{})
if err != nil {
return img, msg, err
}
} else {
return "", err
}
return img, msg, err
}
func (m *Minio) formatImgInfoForAccessURL(opt *s3.AccessURLOption, info *minioImageInfo, reqParams url.Values) {
if opt.Image.Width > info.Width || opt.Image.Width <= 0 {
opt.Image.Width = info.Width
}
@ -496,24 +527,24 @@ func (m *Minio) AccessURL(ctx context.Context, name string, expire time.Duration
}
}
reqParams.Set("response-content-type", "image/"+opt.Image.Format)
if opt.Image.Width == info.Width && opt.Image.Height == info.Height && opt.Image.Format == info.Format {
return m.presignedGetObject(ctx, name, expire, reqParams)
}
cacheKey := filepath.Join(pathInfo, fileInfo.ETag, fmt.Sprintf("image_w%d_h%d.%s", opt.Image.Width, opt.Image.Height, opt.Image.Format))
if _, err := m.core.Client.StatObject(ctx, m.bucket, cacheKey, minio.StatObjectOptions{}); err == nil {
}
func (m *Minio) cacheImgInfoForAccessURL(ctx context.Context, name, cacheKey string, img image.Image, expire time.Duration, opt *s3.AccessURLOption, reqParams url.Values) (string, error) {
_, err := m.core.Client.StatObject(ctx, m.bucket, cacheKey, minio.StatObjectOptions{})
if err == nil {
return m.presignedGetObject(ctx, cacheKey, expire, reqParams)
} else if !m.IsNotFound(err) {
return "", err
}
if img == nil {
reader, err := m.core.Client.GetObject(ctx, m.bucket, name, minio.GetObjectOptions{})
if err != nil {
return "", err
reader, err2 := m.core.Client.GetObject(ctx, m.bucket, name, minio.GetObjectOptions{})
if err2 != nil {
return "", err2
}
defer reader.Close()
img, _, err = ImageStat(reader)
if err != nil {
return "", err
img, _, err2 = ImageStat(reader)
if err2 != nil {
return "", err2
}
}
thumbnail := resizeImage(img, opt.Image.Width, opt.Image.Height)
@ -526,9 +557,48 @@ func (m *Minio) AccessURL(ctx context.Context, name string, expire time.Duration
case formatGif:
err = gif.Encode(buf, thumbnail, nil)
}
if err != nil {
return "", err
}
if _, err := m.core.Client.PutObject(ctx, m.bucket, cacheKey, buf, int64(buf.Len()), minio.PutObjectOptions{}); err != nil {
return "", err
}
return "", nil
}
func (m *Minio) AccessURL(ctx context.Context, name string, expire time.Duration, opt *s3.AccessURLOption) (string, error) {
errInit := m.initMinio(ctx)
if errInit != nil {
return "", errInit
}
reqParams := make(url.Values)
fileInfo, objectInfoPath, msg, err := m.getImageInfoForAccessURL(ctx, name, expire, opt, reqParams)
if err != nil {
return msg, err
}
// load-cache img data
var (
img image.Image
info minioImageInfo
)
img, msg, err = m.loadImgDataForAccessURL(objectInfoPath, ctx, name, &info)
if err != nil {
return msg, err
}
// format img info
m.formatImgInfoForAccessURL(opt, &info, reqParams)
// no need resize
if opt.Image.Width == info.Width && opt.Image.Height == info.Height && opt.Image.Format == info.Format {
return m.presignedGetObject(ctx, name, expire, reqParams)
}
// cache img
cacheKey := filepath.Join(pathInfo, fileInfo.ETag, fmt.Sprintf("image_w%d_h%d.%s", opt.Image.Width, opt.Image.Height, opt.Image.Format))
msg, err = m.cacheImgInfoForAccessURL(ctx, name, cacheKey, img, expire, opt, reqParams)
if err != nil {
return msg, err
}
// return cache img
return m.presignedGetObject(ctx, cacheKey, expire, reqParams)
}
@ -541,5 +611,6 @@ func (m *Minio) getObjectData(ctx context.Context, name string, limit int64) ([]
if limit < 0 {
return io.ReadAll(object)
}
return io.ReadAll(io.LimitReader(object, 1024))
}

@ -45,11 +45,6 @@ const (
imageWebp = "webp"
)
const (
videoSnapshotImagePng = "png"
videoSnapshotImageJpg = "jpg"
)
func NewOSS() (s3.Interface, error) {
conf := config.Config.Object.Oss
if conf.BucketURL == "" {
@ -66,6 +61,7 @@ func NewOSS() (s3.Interface, error) {
if conf.BucketURL[len(conf.BucketURL)-1] != '/' {
conf.BucketURL += "/"
}
return &OSS{
bucketURL: conf.BucketURL,
bucket: bucket,
@ -98,6 +94,7 @@ func (o *OSS) InitiateMultipartUpload(ctx context.Context, name string) (*s3.Ini
if err != nil {
return nil, err
}
return &s3.InitiateMultipartUploadResult{
UploadID: result.UploadID,
Bucket: result.Bucket,
@ -121,6 +118,7 @@ func (o *OSS) CompleteMultipartUpload(ctx context.Context, uploadID string, name
if err != nil {
return nil, err
}
return &s3.CompleteMultipartUploadResult{
Location: result.Location,
Bucket: result.Bucket,
@ -143,6 +141,7 @@ func (o *OSS) PartSize(ctx context.Context, size int64) (int64, error) {
if size%maxNumSize != 0 {
partSize++
}
return partSize, nil
}
@ -155,7 +154,7 @@ func (o *OSS) AuthSign(ctx context.Context, uploadID string, name string, expire
}
for i, partNumber := range partNumbers {
rawURL := fmt.Sprintf(`%s%s?partNumber=%d&uploadId=%s`, o.bucketURL, name, partNumber, uploadID)
request, err := http.NewRequest(http.MethodPut, rawURL, nil)
request, err := http.NewRequestWithContext(context.Background(), http.MethodPut, rawURL, nil)
if err != nil {
return nil, err
}
@ -175,6 +174,7 @@ func (o *OSS) AuthSign(ctx context.Context, uploadID string, name string, expire
Header: request.Header,
}
}
return &result, nil
}
@ -191,25 +191,26 @@ func (o *OSS) StatObject(ctx context.Context, name string) (*s3.ObjectInfo, erro
if res.ETag = strings.ToLower(strings.ReplaceAll(header.Get("ETag"), `"`, ``)); res.ETag == "" {
return nil, errors.New("StatObject etag not found")
}
if contentLengthStr := header.Get("Content-Length"); contentLengthStr == "" {
contentLengthStr := header.Get("Content-Length")
if contentLengthStr == "" {
return nil, errors.New("StatObject content-length not found")
} else {
res.Size, err = strconv.ParseInt(contentLengthStr, 10, 64)
if err != nil {
return nil, fmt.Errorf("StatObject content-length parse error: %w", err)
}
if res.Size < 0 {
return nil, errors.New("StatObject content-length must be greater than 0")
}
}
if lastModified := header.Get("Last-Modified"); lastModified == "" {
res.Size, err = strconv.ParseInt(contentLengthStr, 10, 64)
if err != nil {
return nil, fmt.Errorf("StatObject content-length parse error: %w", err)
}
if res.Size < 0 {
return nil, errors.New("StatObject content-length must be greater than 0")
}
lastModified := header.Get("Last-Modified")
if lastModified == "" {
return nil, errors.New("StatObject last-modified not found")
} else {
res.LastModified, err = time.Parse(http.TimeFormat, lastModified)
if err != nil {
return nil, fmt.Errorf("StatObject last-modified parse error: %w", err)
}
}
res.LastModified, err = time.Parse(http.TimeFormat, lastModified)
if err != nil {
return nil, fmt.Errorf("StatObject last-modified parse error: %w", err)
}
return res, nil
}
@ -222,6 +223,7 @@ func (o *OSS) CopyObject(ctx context.Context, src string, dst string) (*s3.CopyO
if err != nil {
return nil, err
}
return &s3.CopyObjectInfo{
Key: dst,
ETag: strings.ToLower(strings.ReplaceAll(result.ETag, `"`, ``)),
@ -229,6 +231,7 @@ func (o *OSS) CopyObject(ctx context.Context, src string, dst string) (*s3.CopyO
}
func (o *OSS) IsNotFound(err error) bool {
//nolint:errorlint //this is exactly what we want,there is no risk for no wrapped errors
switch e := err.(type) {
case oss.ServiceError:
return e.StatusCode == http.StatusNotFound || e.Code == "NoSuchKey"
@ -271,6 +274,7 @@ func (o *OSS) ListUploadedParts(ctx context.Context, uploadID string, name strin
Size: int64(part.Size),
}
}
return res, nil
}
@ -278,39 +282,7 @@ func (o *OSS) AccessURL(ctx context.Context, name string, expire time.Duration,
publicRead := config.Config.Object.Oss.PublicRead
var opts []oss.Option
if opt != nil {
if opt.Image != nil {
// 文档地址: https://help.aliyun.com/zh/oss/user-guide/resize-images-4?spm=a2c4g.11186623.0.0.4b3b1e4fWW6yji
var format string
switch opt.Image.Format {
case
imagePng,
imageJpg,
imageJpeg,
imageGif,
imageWebp:
format = opt.Image.Format
default:
opt.Image.Format = imageJpg
}
// https://oss-console-img-demo-cn-hangzhou.oss-cn-hangzhou.aliyuncs.com/example.jpg?x-oss-process=image/resize,h_100,m_lfit
process := "image/resize,m_lfit"
if opt.Image.Width > 0 {
process += ",w_" + strconv.Itoa(opt.Image.Width)
}
if opt.Image.Height > 0 {
process += ",h_" + strconv.Itoa(opt.Image.Height)
}
process += ",format," + format
opts = append(opts, oss.Process(process))
}
if !publicRead {
if opt.ContentType != "" {
opts = append(opts, oss.ResponseContentType(opt.ContentType))
}
if opt.Filename != "" {
opts = append(opts, oss.ResponseContentDisposition(`attachment; filename=`+strconv.Quote(opt.Filename)))
}
}
opts = optsForAccessURL(opt, opts, publicRead)
}
if expire <= 0 {
expire = time.Hour * 24 * 365 * 99 // 99 years
@ -325,5 +297,44 @@ func (o *OSS) AccessURL(ctx context.Context, name string, expire time.Duration,
return "", err
}
params := getURLParams(*o.bucket.Client.Conn, rawParams)
return getURL(o.um, o.bucket.BucketName, name, params).String(), nil
}
func optsForAccessURL(opt *s3.AccessURLOption, opts []oss.Option, publicRead bool) []oss.Option {
if opt.Image != nil {
// 文档地址: https://help.aliyun.com/zh/oss/user-guide/resize-images-4?spm=a2c4g.11186623.0.0.4b3b1e4fWW6yji
var format string
switch opt.Image.Format {
case
imagePng,
imageJpg,
imageJpeg,
imageGif,
imageWebp:
format = opt.Image.Format
default:
opt.Image.Format = imageJpg
}
// https://oss-console-img-demo-cn-hangzhou.oss-cn-hangzhou.aliyuncs.com/example.jpg?x-oss-process=image/resize,h_100,m_lfit
process := "image/resize,m_lfit"
if opt.Image.Width > 0 {
process += ",w_" + strconv.Itoa(opt.Image.Width)
}
if opt.Image.Height > 0 {
process += ",h_" + strconv.Itoa(opt.Image.Height)
}
process += ",format," + format
opts = append(opts, oss.Process(process))
}
if !publicRead {
if opt.ContentType != "" {
opts = append(opts, oss.ResponseContentType(opt.ContentType))
}
if opt.Filename != "" {
opts = append(opts, oss.ResponseContentDisposition(`attachment; filename=`+strconv.Quote(opt.Filename)))
}
}
return opts
}

@ -30,7 +30,7 @@ type GroupModel struct {
Introduction string `gorm:"column:introduction;size:255" json:"introduction"`
FaceURL string `gorm:"column:face_url;size:255" json:"faceURL"`
CreateTime time.Time `gorm:"column:create_time;index:create_time;autoCreateTime"`
Ex string `gorm:"column:ex" json:"ex;size:1024"`
Ex string `gorm:"column:ex;size:1024" json:"ex"`
Status int32 `gorm:"column:status"`
CreatorUserID string `gorm:"column:creator_user_id;size:64"`
GroupType int32 `gorm:"column:group_type"`

@ -15,6 +15,8 @@
package relation
import (
"errors"
"gorm.io/gorm"
"github.com/OpenIMSDK/tools/utils"
@ -32,5 +34,5 @@ type GroupSimpleUserID struct {
}
func IsNotFound(err error) bool {
return utils.Unwrap(err) == gorm.ErrRecordNotFound
return errors.Is(utils.Unwrap(err), gorm.ErrRecordNotFound)
}

@ -150,6 +150,7 @@ func (m *MsgDocModel) IsFull() bool {
func (m MsgDocModel) GetDocID(conversationID string, seq int64) string {
seqSuffix := (seq - 1) / singleGocMsgNum
return m.indexGen(conversationID, seqSuffix)
}
@ -164,6 +165,7 @@ func (m MsgDocModel) GetDocIDSeqsMap(conversationID string, seqs []int64) map[st
t[docID] = append(value, seqs[i])
}
}
return t
}
@ -181,5 +183,6 @@ func (MsgDocModel) GenExceptionMessageBySeqs(seqs []int64) (exceptionMsg []*sdkw
msgModel.Seq = v
exceptionMsg = append(exceptionMsg, msgModel)
}
return exceptionMsg
}

@ -16,6 +16,7 @@ package unrelation
import (
"context"
"errors"
"fmt"
"strings"
"time"
@ -44,27 +45,12 @@ type Mongo struct {
// NewMongo Initialize MongoDB connection.
func NewMongo() (*Mongo, error) {
specialerror.AddReplace(mongo.ErrNoDocuments, errs.ErrRecordNotFound)
uri := "mongodb://sample.host:27017/?maxPoolSize=20&w=majority"
// uri := "mongodb://sample.host:27017/?maxPoolSize=20&w=majority"
var uri string
if config.Config.Mongo.Uri != "" {
uri = config.Config.Mongo.Uri
} else {
mongodbHosts := ""
for i, v := range config.Config.Mongo.Address {
if i == len(config.Config.Mongo.Address)-1 {
mongodbHosts += v
} else {
mongodbHosts += v + ","
}
}
if config.Config.Mongo.Password != "" && config.Config.Mongo.Username != "" {
uri = fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin",
config.Config.Mongo.Username, config.Config.Mongo.Password, mongodbHosts,
config.Config.Mongo.Database, config.Config.Mongo.MaxPoolSize)
} else {
uri = fmt.Sprintf("mongodb://%s/%s/?maxPoolSize=%d&authSource=admin",
mongodbHosts, config.Config.Mongo.Database,
config.Config.Mongo.MaxPoolSize)
}
uri = defaultMongoUriForNewMongo()
}
fmt.Println("mongo:", uri)
var mongoClient *mongo.Client
@ -76,17 +62,41 @@ func NewMongo() (*Mongo, error) {
if err == nil {
return &Mongo{db: mongoClient}, nil
}
if cmdErr, ok := err.(mongo.CommandError); ok {
var cmdErr mongo.CommandError
if errors.As(err, &cmdErr) {
if cmdErr.Code == 13 || cmdErr.Code == 18 {
return nil, err
} else {
fmt.Printf("Failed to connect to MongoDB: %s\n", err)
}
fmt.Printf("Failed to connect to MongoDB: %s\n", err)
}
}
return nil, err
}
func defaultMongoUriForNewMongo() string {
var uri string
mongodbHosts := ""
for i, v := range config.Config.Mongo.Address {
if i == len(config.Config.Mongo.Address)-1 {
mongodbHosts += v
} else {
mongodbHosts += v + ","
}
}
if config.Config.Mongo.Password != "" && config.Config.Mongo.Username != "" {
uri = fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin",
config.Config.Mongo.Username, config.Config.Mongo.Password, mongodbHosts,
config.Config.Mongo.Database, config.Config.Mongo.MaxPoolSize)
} else {
uri = fmt.Sprintf("mongodb://%s/%s/?maxPoolSize=%d&authSource=admin",
mongodbHosts, config.Config.Mongo.Database,
config.Config.Mongo.MaxPoolSize)
}
return uri
}
func (m *Mongo) GetClient() *mongo.Client {
return m.db
}
@ -106,6 +116,7 @@ func (m *Mongo) CreateSuperGroupIndex() error {
if err := m.createMongoIndex(unrelation.CUserToSuperGroup, true, "user_id"); err != nil {
return err
}
return nil
}
@ -139,5 +150,6 @@ func (m *Mongo) createMongoIndex(collection string, isUnique bool, keys ...strin
if err != nil {
return utils.Wrap(err, result)
}
return nil
}

@ -49,6 +49,7 @@ type MsgMongoDriver struct {
func NewMsgMongoDriver(database *mongo.Database) table.MsgDocModelInterface {
collection := database.Collection(table.MsgDocModel{}.TableName())
return &MsgMongoDriver{MsgCollection: collection}
}
@ -59,6 +60,7 @@ func (m *MsgMongoDriver) PushMsgsToDoc(ctx context.Context, docID string, msgsTo
func (m *MsgMongoDriver) Create(ctx context.Context, model *table.MsgDocModel) error {
_, err := m.MsgCollection.InsertOne(ctx, model)
return err
}
@ -81,6 +83,7 @@ func (m *MsgMongoDriver) UpdateMsg(
if err != nil {
return nil, utils.Wrap(err, "")
}
return res, nil
}
@ -108,6 +111,7 @@ func (m *MsgMongoDriver) PushUnique(
if err != nil {
return nil, utils.Wrap(err, "")
}
return res, nil
}
@ -120,6 +124,7 @@ func (m *MsgMongoDriver) UpdateMsgContent(ctx context.Context, docID string, ind
if err != nil {
return utils.Wrap(err, "")
}
return nil
}
@ -143,12 +148,14 @@ func (m *MsgMongoDriver) UpdateMsgStatusByIndexInOneDoc(
if err != nil {
return utils.Wrap(err, "")
}
return nil
}
func (m *MsgMongoDriver) FindOneByDocID(ctx context.Context, docID string) (*table.MsgDocModel, error) {
doc := &table.MsgDocModel{}
err := m.MsgCollection.FindOne(ctx, bson.M{"doc_id": docID}).Decode(doc)
return doc, err
}
@ -177,6 +184,7 @@ func (m *MsgMongoDriver) GetMsgDocModelByIndex(
if len(msgs) > 0 {
return &msgs[0], nil
}
return nil, ErrMsgListNotExist
}
@ -225,6 +233,7 @@ func (m *MsgMongoDriver) DeleteMsgsInOneDocByIndex(ctx context.Context, docID st
if err != nil {
return utils.Wrap(err, "")
}
return nil
}
@ -233,6 +242,7 @@ func (m *MsgMongoDriver) DeleteDocs(ctx context.Context, docIDs []string) error
return nil
}
_, err := m.MsgCollection.DeleteMany(ctx, bson.M{"doc_id": bson.M{"$in": docIDs}})
return err
}
@ -246,6 +256,7 @@ func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(
for _, seq := range seqs {
indexs = append(indexs, m.model.GetMsgIndex(seq))
}
//nolint:govet //This is already the officially recommended standard practice.
pipeline := mongo.Pipeline{
{
{"$match", bson.D{
@ -336,6 +347,7 @@ func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(
}
msgs = append(msgs, msg)
}
return msgs, nil
}
@ -344,6 +356,7 @@ func (m *MsgMongoDriver) IsExistDocID(ctx context.Context, docID string) (bool,
if err != nil {
return false, errs.Wrap(err)
}
return count > 0, nil
}
@ -372,6 +385,7 @@ func (m *MsgMongoDriver) MarkSingleChatMsgsAsRead(
updates = append(updates, updateModel)
}
_, err := m.MsgCollection.BulkWrite(ctx, updates)
return err
}
@ -611,7 +625,39 @@ func (m *MsgMongoDriver) RangeUserSendCount(
},
)
}
pipeline := bson.A{
pipeline := buildPiplineForRangeUserSendCount(or, start, end, sort, pageNumber, showNumber)
cur, err := m.MsgCollection.Aggregate(ctx, pipeline, options.Aggregate().SetAllowDiskUse(true))
if err != nil {
return 0, 0, nil, nil, errs.Wrap(err)
}
defer cur.Close(ctx)
var result []Result
if err = cur.All(ctx, &result); err != nil {
return 0, 0, nil, nil, errs.Wrap(err)
}
if len(result) == 0 {
return 0, 0, nil, nil, errs.Wrap(err)
}
users = make([]*table.UserCount, len(result[0].Users))
for i, r := range result[0].Users {
users[i] = &table.UserCount{
UserID: r.UserID,
Count: r.Count,
}
}
dateCount = make(map[string]int64)
for _, r := range result[0].Dates {
dateCount[r.Date] = r.Count
}
return result[0].MsgCount, result[0].UserCount, users, dateCount, nil
}
//nolint:funlen // it need to be such long
func buildPiplineForRangeUserSendCount(or bson.A, start time.Time,
end time.Time, sort int, pageNumber, showNumber int32,
) bson.A {
return bson.A{
bson.M{
"$match": bson.M{
"$and": bson.A{
@ -795,30 +841,6 @@ func (m *MsgMongoDriver) RangeUserSendCount(
},
},
}
cur, err := m.MsgCollection.Aggregate(ctx, pipeline, options.Aggregate().SetAllowDiskUse(true))
if err != nil {
return 0, 0, nil, nil, errs.Wrap(err)
}
defer cur.Close(ctx)
var result []Result
if err := cur.All(ctx, &result); err != nil {
return 0, 0, nil, nil, errs.Wrap(err)
}
if len(result) == 0 {
return 0, 0, nil, nil, errs.Wrap(err)
}
users = make([]*table.UserCount, len(result[0].Users))
for i, r := range result[0].Users {
users[i] = &table.UserCount{
UserID: r.UserID,
Count: r.Count,
}
}
dateCount = make(map[string]int64)
for _, r := range result[0].Dates {
dateCount[r.Date] = r.Count
}
return result[0].MsgCount, result[0].UserCount, users, dateCount, nil
}
func (m *MsgMongoDriver) RangeGroupSendCount(
@ -847,7 +869,39 @@ func (m *MsgMongoDriver) RangeGroupSendCount(
Count int64 `bson:"count"`
} `bson:"dates"`
}
pipeline := bson.A{
pipeline := buildPiplineForRangeGroupSendCount(start, end, sort, pageNumber, showNumber)
cur, err := m.MsgCollection.Aggregate(ctx, pipeline, options.Aggregate().SetAllowDiskUse(true))
if err != nil {
return 0, 0, nil, nil, errs.Wrap(err)
}
defer cur.Close(ctx)
var result []Result
if err = cur.All(ctx, &result); err != nil {
return 0, 0, nil, nil, errs.Wrap(err)
}
if len(result) == 0 {
return 0, 0, nil, nil, errs.Wrap(err)
}
groups = make([]*table.GroupCount, len(result[0].Groups))
for i, r := range result[0].Groups {
groups[i] = &table.GroupCount{
GroupID: r.GroupID,
Count: r.Count,
}
}
dateCount = make(map[string]int64)
for _, r := range result[0].Dates {
dateCount[r.Date] = r.Count
}
return result[0].MsgCount, result[0].UserCount, groups, dateCount, nil
}
//nolint:funlen //it need to has such length
func buildPiplineForRangeGroupSendCount(start time.Time,
end time.Time, sort int, pageNumber, showNumber int32,
) bson.A {
return bson.A{
bson.M{
"$match": bson.M{
"$and": bson.A{
@ -1044,30 +1098,6 @@ func (m *MsgMongoDriver) RangeGroupSendCount(
},
},
}
cur, err := m.MsgCollection.Aggregate(ctx, pipeline, options.Aggregate().SetAllowDiskUse(true))
if err != nil {
return 0, 0, nil, nil, errs.Wrap(err)
}
defer cur.Close(ctx)
var result []Result
if err := cur.All(ctx, &result); err != nil {
return 0, 0, nil, nil, errs.Wrap(err)
}
if len(result) == 0 {
return 0, 0, nil, nil, errs.Wrap(err)
}
groups = make([]*table.GroupCount, len(result[0].Groups))
for i, r := range result[0].Groups {
groups[i] = &table.GroupCount{
GroupID: r.GroupID,
Count: r.Count,
}
}
dateCount = make(map[string]int64)
for _, r := range result[0].Dates {
dateCount[r.Date] = r.Count
}
return result[0].MsgCount, result[0].UserCount, groups, dateCount, nil
}
func (m *MsgMongoDriver) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int32, []*table.MsgInfoModel, error) {
@ -1075,6 +1105,7 @@ func (m *MsgMongoDriver) SearchMessage(ctx context.Context, req *msg.SearchMessa
if err != nil {
return 0, nil, err
}
return total, msgs, nil
}
@ -1119,7 +1150,7 @@ func (m *MsgMongoDriver) searchMessage(ctx context.Context, req *msg.SearchMessa
},
},
)
//nolint:govet //this is already standard
pipe = mongo.Pipeline{
{
{"$match", bson.D{
@ -1214,5 +1245,6 @@ func (m *MsgMongoDriver) searchMessage(ctx context.Context, req *msg.SearchMessa
} else {
msgs = msgs[start:]
}
return n, msgs, nil
}

@ -31,12 +31,14 @@ func (m *MsgMongoDriver) ConvertMsgsDocLen(ctx context.Context, conversationIDs
cursor, err := m.MsgCollection.Find(ctx, bson.M{"doc_id": regex})
if err != nil {
log.ZError(ctx, "convertAll find msg doc failed", err, "conversationID", conversationID)
continue
}
var msgDocs []table.MsgDocModel
err = cursor.All(ctx, &msgDocs)
if err != nil {
log.ZError(ctx, "convertAll cursor all failed", err, "conversationID", conversationID)
continue
}
if len(msgDocs) < 1 {
@ -44,39 +46,45 @@ func (m *MsgMongoDriver) ConvertMsgsDocLen(ctx context.Context, conversationIDs
}
log.ZInfo(ctx, "msg doc convert", "conversationID", conversationID, "len(msgDocs)", len(msgDocs))
if len(msgDocs[0].Msg) == int(m.model.GetSingleGocMsgNum5000()) {
if _, err := m.MsgCollection.DeleteMany(ctx, bson.M{"doc_id": regex}); err != nil {
log.ZError(ctx, "convertAll delete many failed", err, "conversationID", conversationID)
continue
}
var newMsgDocs []interface{}
for _, msgDoc := range msgDocs {
if int64(len(msgDoc.Msg)) == m.model.GetSingleGocMsgNum() {
continue
}
var index int64
for index < int64(len(msgDoc.Msg)) {
msg := msgDoc.Msg[index]
if msg != nil && msg.Msg != nil {
msgDocModel := table.MsgDocModel{DocID: m.model.GetDocID(conversationID, msg.Msg.Seq)}
end := index + m.model.GetSingleGocMsgNum()
if int(end) >= len(msgDoc.Msg) {
msgDocModel.Msg = msgDoc.Msg[index:]
} else {
msgDocModel.Msg = msgDoc.Msg[index:end]
}
newMsgDocs = append(newMsgDocs, msgDocModel)
index = end
} else {
break
}
convertMsgDocs(m, ctx, msgDocs, conversationID, regex)
}
}
}
func convertMsgDocs(m *MsgMongoDriver, ctx context.Context, msgDocs []table.MsgDocModel, conversationID string, regex primitive.Regex) {
var err error
if _, err = m.MsgCollection.DeleteMany(ctx, bson.M{"doc_id": regex}); err != nil {
log.ZError(ctx, "convertAll delete many failed", err, "conversationID", conversationID)
return
}
var newMsgDocs []interface{}
for _, msgDoc := range msgDocs {
if int64(len(msgDoc.Msg)) == m.model.GetSingleGocMsgNum() {
continue
}
var index int64
for index < int64(len(msgDoc.Msg)) {
msg := msgDoc.Msg[index]
if msg != nil && msg.Msg != nil {
msgDocModel := table.MsgDocModel{DocID: m.model.GetDocID(conversationID, msg.Msg.Seq)}
end := index + m.model.GetSingleGocMsgNum()
if int(end) >= len(msgDoc.Msg) {
msgDocModel.Msg = msgDoc.Msg[index:]
} else {
msgDocModel.Msg = msgDoc.Msg[index:end]
}
}
_, err = m.MsgCollection.InsertMany(ctx, newMsgDocs)
if err != nil {
log.ZError(ctx, "convertAll insert many failed", err, "conversationID", conversationID, "len(newMsgDocs)", len(newMsgDocs))
newMsgDocs = append(newMsgDocs, msgDocModel)
index = end
} else {
log.ZInfo(ctx, "msg doc convert", "conversationID", conversationID, "len(newMsgDocs)", len(newMsgDocs))
break
}
}
}
_, err = m.MsgCollection.InsertMany(ctx, newMsgDocs)
if err != nil {
log.ZError(ctx, "convertAll insert many failed", err, "conversationID", conversationID, "len(newMsgDocs)", len(newMsgDocs))
} else {
log.ZInfo(ctx, "msg doc convert", "conversationID", conversationID, "len(newMsgDocs)", len(newMsgDocs))
}
}

@ -59,6 +59,7 @@ func (s *SuperGroupMongoDriver) CreateSuperGroup(ctx context.Context, groupID st
return err
}
}
return nil
}
@ -69,6 +70,7 @@ func (s *SuperGroupMongoDriver) TakeSuperGroup(
if err := s.superGroupCollection.FindOne(ctx, bson.M{"group_id": groupID}).Decode(&group); err != nil {
return nil, utils.Wrap(err, "")
}
return group, nil
}
@ -86,6 +88,7 @@ func (s *SuperGroupMongoDriver) FindSuperGroup(
if err := cursor.All(ctx, &groups); err != nil {
return nil, utils.Wrap(err, "")
}
return groups, nil
}
@ -113,6 +116,7 @@ func (s *SuperGroupMongoDriver) AddUserToSuperGroup(ctx context.Context, groupID
return utils.Wrap(err, "transaction failed")
}
}
return nil
}
@ -129,6 +133,7 @@ func (s *SuperGroupMongoDriver) RemoverUserFromSuperGroup(ctx context.Context, g
if err != nil {
return err
}
return nil
}
@ -138,6 +143,7 @@ func (s *SuperGroupMongoDriver) GetSuperGroupByUserID(
) (*unrelation.UserToSuperGroupModel, error) {
var user unrelation.UserToSuperGroupModel
err := s.userToSuperGroupCollection.FindOne(ctx, bson.M{"user_id": userID}).Decode(&user)
return &user, utils.Wrap(err, "")
}
@ -149,6 +155,7 @@ func (s *SuperGroupMongoDriver) DeleteSuperGroup(ctx context.Context, groupID st
if _, err := s.superGroupCollection.DeleteOne(ctx, bson.M{"group_id": groupID}); err != nil {
return utils.Wrap(err, "")
}
return s.RemoveGroupFromUser(ctx, groupID, group.MemberIDs)
}
@ -158,5 +165,6 @@ func (s *SuperGroupMongoDriver) RemoveGroupFromUser(ctx context.Context, groupID
bson.M{"user_id": bson.M{"$in": userIDs}},
bson.M{"$pull": bson.M{"group_id_list": groupID}},
)
return utils.Wrap(err, "")
}

@ -16,6 +16,7 @@ package unrelation
import (
"context"
"errors"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/utils"
@ -50,6 +51,7 @@ type UserMongoDriver struct {
// AddSubscriptionList Subscriber's handling of thresholds.
func (u *UserMongoDriver) AddSubscriptionList(ctx context.Context, userID string, userIDList []string) error {
// Check the number of lists in the key.
//nolint:govet //this has already been the standard format for mongo.Pipeline
pipeline := mongo.Pipeline{
{{"$match", bson.D{{"user_id", SubscriptionPrefix + userID}}}},
{{"$project", bson.D{{"count", bson.D{{"$size", "$user_id_list"}}}}}},
@ -65,7 +67,7 @@ func (u *UserMongoDriver) AddSubscriptionList(ctx context.Context, userID string
}
// iterate over aggregated results
for cursor.Next(ctx) {
err := cursor.Decode(&cnt)
err = cursor.Decode(&cnt)
if err != nil {
return errs.Wrap(err)
}
@ -122,6 +124,7 @@ func (u *UserMongoDriver) AddSubscriptionList(ctx context.Context, userID string
return utils.Wrap(err, "transaction failed")
}
}
return nil
}
@ -139,6 +142,7 @@ func (u *UserMongoDriver) UnsubscriptionList(ctx context.Context, userID string,
if err != nil {
return errs.Wrap(err)
}
return nil
}
@ -152,6 +156,7 @@ func (u *UserMongoDriver) RemoveSubscribedListFromUser(ctx context.Context, user
bson.M{"$pull": bson.M{"user_id_list": userID}},
)
}
return errs.Wrap(err)
}
@ -163,12 +168,13 @@ func (u *UserMongoDriver) GetAllSubscribeList(ctx context.Context, userID string
bson.M{"user_id": SubscriptionPrefix + userID})
err = cursor.Decode(&user)
if err != nil {
if err == mongo.ErrNoDocuments {
if errors.Is(err, mongo.ErrNoDocuments) {
return []string{}, nil
} else {
return nil, errs.Wrap(err)
}
return nil, errs.Wrap(err)
}
return user.UserIDList, nil
}
@ -180,11 +186,12 @@ func (u *UserMongoDriver) GetSubscribedList(ctx context.Context, userID string)
bson.M{"user_id": SubscribedPrefix + userID})
err = cursor.Decode(&user)
if err != nil {
if err == mongo.ErrNoDocuments {
if errors.Is(err, mongo.ErrNoDocuments) {
return []string{}, nil
} else {
return nil, errs.Wrap(err)
}
return nil, errs.Wrap(err)
}
return user.UserIDList, nil
}

Loading…
Cancel
Save