Merge remote-tracking branch 'upstream/main' into main

# Conflicts:
#	internal/msggateway/n_ws_server.go
#	internal/msgtransfer/init.go
pull/1337/head
lin.huang 2 years ago
commit dfd36a6b7a

@ -28,6 +28,7 @@ import (
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/tools/discoveryregistry"
"github.com/OpenIMSDK/tools/log"
"github.com/openimsdk/open-im-server/v3/internal/api"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"

@ -38,7 +38,7 @@ require github.com/google/uuid v1.3.1
require (
github.com/IBM/sarama v1.41.3
github.com/OpenIMSDK/protocol v0.0.30
github.com/OpenIMSDK/tools v0.0.15
github.com/OpenIMSDK/tools v0.0.16
github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible
github.com/go-redis/redis v6.15.9+incompatible
github.com/go-sql-driver/mysql v1.7.1

@ -20,8 +20,8 @@ github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c=
github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ=
github.com/OpenIMSDK/protocol v0.0.30 h1:MiHO6PyQMR9ojBHNnSFxCHLmsoE2xZqaiYj975JiZnM=
github.com/OpenIMSDK/protocol v0.0.30/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y=
github.com/OpenIMSDK/tools v0.0.15 h1:FF3m0TQUG56pJC15a11jmBG6Y1EjXarEW4JV3CBF/Jc=
github.com/OpenIMSDK/tools v0.0.15/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI=
github.com/OpenIMSDK/tools v0.0.16 h1:te/GIq2imCMsrRPgU9OObYKbzZ3rT08Lih/o+3QFIz0=
github.com/OpenIMSDK/tools v0.0.16/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI=
github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM=
github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7 h1:uSoVVbwJiQipAclBbw+8quDsfcvFjOpI5iCf4p/cqCs=
github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7/go.mod h1:6zEj6s6u/ghQa61ZWa/C2Aw3RkjiTBOix7dkqa1VLIs=

@ -75,8 +75,8 @@ type WsServer struct {
kickHandlerChan chan *kickHandler
clients *UserMap
clientPool sync.Pool
onlineUserNum int64
onlineUserConnNum int64
onlineUserNum atomic.Int64
onlineUserConnNum atomic.Int64
handshakeTimeout time.Duration
hubServer *Server
validate *validator.Validate
@ -221,9 +221,9 @@ func (ws *WsServer) registerClient(client *Client) {
if !userOK {
ws.clients.Set(client.UserID, client)
log.ZDebug(client.ctx, "user not exist", "userID", client.UserID, "platformID", client.PlatformID)
atomic.AddInt64(&ws.onlineUserNum, 1)
atomic.AddInt64(&ws.onlineUserConnNum, 1)
prom_metrics.OnlineUserGauge.Add(1)
ws.onlineUserNum.Add(1)
ws.onlineUserConnNum.Add(1)
} else {
i := &kickHandler{
clientOK: clientOK,
@ -236,22 +236,35 @@ func (ws *WsServer) registerClient(client *Client) {
ws.clients.Set(client.UserID, client)
// 已经有同平台的连接存在
log.ZInfo(client.ctx, "repeat login", "userID", client.UserID, "platformID", client.PlatformID, "old remote addr", getRemoteAdders(oldClients))
atomic.AddInt64(&ws.onlineUserConnNum, 1)
ws.onlineUserConnNum.Add(1)
} else {
ws.clients.Set(client.UserID, client)
atomic.AddInt64(&ws.onlineUserConnNum, 1)
ws.onlineUserConnNum.Add(1)
}
}
ws.sendUserOnlineInfoToOtherNode(client.ctx, client)
ws.SetUserOnlineStatus(client.ctx, client, constant.Online)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
_ = ws.sendUserOnlineInfoToOtherNode(client.ctx, client)
}()
wg.Add(1)
go func() {
defer wg.Done()
ws.SetUserOnlineStatus(client.ctx, client, constant.Online)
}()
wg.Wait()
log.ZInfo(
client.ctx,
"user online",
"online user Num",
ws.onlineUserNum,
ws.onlineUserNum.Load(),
"online user conn Num",
ws.onlineUserConnNum,
ws.onlineUserConnNum.Load(),
)
}
@ -284,7 +297,7 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien
if clientOK {
isDeleteUser := ws.clients.deleteClients(newClient.UserID, oldClients)
if isDeleteUser {
atomic.AddInt64(&ws.onlineUserNum, -1)
ws.onlineUserNum.Add(-1)
}
for _, c := range oldClients {
err := c.KickOnlineMessage()
@ -352,19 +365,19 @@ func (ws *WsServer) unregisterClient(client *Client) {
defer ws.clientPool.Put(client)
isDeleteUser := ws.clients.delete(client.UserID, client.ctx.GetRemoteAddr())
if isDeleteUser {
ws.onlineUserNum.Add(-1)
prom_metrics.OnlineUserGauge.Dec()
atomic.AddInt64(&ws.onlineUserNum, -1)
}
atomic.AddInt64(&ws.onlineUserConnNum, -1)
ws.onlineUserConnNum.Add(-1)
ws.SetUserOnlineStatus(client.ctx, client, constant.Offline)
log.ZInfo(client.ctx, "user offline", "close reason", client.closedErr, "online user Num", ws.onlineUserNum, "online user conn Num",
ws.onlineUserConnNum,
ws.onlineUserConnNum.Load(),
)
}
func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) {
connContext := newContext(w, r)
if ws.onlineUserConnNum >= ws.wsMaxConnNum {
if ws.onlineUserConnNum.Load() >= ws.wsMaxConnNum {
httpError(connContext, errs.ErrConnOverMaxNumLimit)
return
}

@ -112,7 +112,7 @@ func (c *conversationServer) SetConversation(ctx context.Context, req *pbconvers
return resp, nil
}
//nolint
// nolint
func (c *conversationServer) SetConversations(ctx context.Context,
req *pbconversation.SetConversationsReq,
) (*pbconversation.SetConversationsResp, error) {

@ -67,7 +67,7 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
var o s3.Interface
switch config.Config.Object.Enable {
case "minio":
o, err = minio.NewMinio()
o, err = minio.NewMinio(cache.NewMinioCache(rdb))
case "cos":
o, err = cos.NewCos()
case "oss":
@ -78,11 +78,17 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
if err != nil {
return err
}
//specialerror.AddErrHandler(func(err error) errs.CodeError {
// if o.IsNotFound(err) {
// return errs.ErrRecordNotFound
// }
// return nil
//})
third.RegisterThirdServer(server, &thirdServer{
apiURL: apiURL,
thirdDatabase: controller.NewThirdDatabase(cache.NewMsgCacheModel(rdb), db),
userRpcClient: rpcclient.NewUserRpcClient(client),
s3dataBase: controller.NewS3Database(o, relation.NewObjectInfo(db)),
s3dataBase: controller.NewS3Database(rdb, o, relation.NewObjectInfo(db)),
defaultExpire: time.Hour * 24 * 7,
})
return nil

@ -16,49 +16,126 @@ package tools
import (
"context"
"time"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mcontext"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"math/rand"
"time"
)
//func (c *MsgTool) ConversationsDestructMsgs() {
// log.ZInfo(context.Background(), "start msg destruct cron task")
// ctx := mcontext.NewCtx(utils.GetSelfFuncName())
// conversations, err := c.conversationDatabase.GetConversationIDsNeedDestruct(ctx)
// if err != nil {
// log.ZError(ctx, "get conversation id need destruct failed", err)
// return
// }
// log.ZDebug(context.Background(), "nums conversations need destruct", "nums", len(conversations))
// for _, conversation := range conversations {
// ctx = mcontext.NewCtx(utils.GetSelfFuncName() + "-" + utils.OperationIDGenerator() + "-" + conversation.ConversationID + "-" + conversation.OwnerUserID)
// log.ZDebug(
// ctx,
// "UserMsgsDestruct",
// "conversationID",
// conversation.ConversationID,
// "ownerUserID",
// conversation.OwnerUserID,
// "msgDestructTime",
// conversation.MsgDestructTime,
// "lastMsgDestructTime",
// conversation.LatestMsgDestructTime,
// )
// now := time.Now()
// seqs, err := c.msgDatabase.UserMsgsDestruct(ctx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime)
// if err != nil {
// log.ZError(ctx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
// continue
// }
// if len(seqs) > 0 {
// if err := c.conversationDatabase.UpdateUsersConversationFiled(ctx, []string{conversation.OwnerUserID}, conversation.ConversationID, map[string]interface{}{"latest_msg_destruct_time": now}); err != nil {
// log.ZError(ctx, "updateUsersConversationFiled failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
// continue
// }
// if err := c.msgNotificationSender.UserDeleteMsgsNotification(ctx, conversation.OwnerUserID, conversation.ConversationID, seqs); err != nil {
// log.ZError(ctx, "userDeleteMsgsNotification failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
// }
// }
// }
//}
func (c *MsgTool) ConversationsDestructMsgs() {
log.ZInfo(context.Background(), "start msg destruct cron task")
ctx := mcontext.NewCtx(utils.GetSelfFuncName())
conversations, err := c.conversationDatabase.GetConversationIDsNeedDestruct(ctx)
num, err := c.conversationDatabase.GetAllConversationIDsNumber(ctx)
if err != nil {
log.ZError(ctx, "get conversation id need destruct failed", err)
log.ZError(ctx, "GetAllConversationIDsNumber failed", err)
return
}
log.ZDebug(context.Background(), "nums conversations need destruct", "nums", len(conversations))
for _, conversation := range conversations {
ctx = mcontext.NewCtx(utils.GetSelfFuncName() + "-" + utils.OperationIDGenerator() + "-" + conversation.ConversationID + "-" + conversation.OwnerUserID)
log.ZDebug(
ctx,
"UserMsgsDestruct",
"conversationID",
conversation.ConversationID,
"ownerUserID",
conversation.OwnerUserID,
"msgDestructTime",
conversation.MsgDestructTime,
"lastMsgDestructTime",
conversation.LatestMsgDestructTime,
)
now := time.Now()
seqs, err := c.msgDatabase.UserMsgsDestruct(ctx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime)
const batchNum = 50
log.ZDebug(ctx, "GetAllConversationIDsNumber", "num", num)
if num == 0 {
return
}
count := int(num/batchNum + num/batchNum/2)
if count < 1 {
count = 1
}
maxPage := 1 + num/batchNum
if num%batchNum != 0 {
maxPage++
}
for i := 0; i < count; i++ {
pageNumber := rand.Int63() % maxPage
conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, int32(pageNumber), batchNum)
if err != nil {
log.ZError(ctx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber)
continue
}
if len(seqs) > 0 {
if err := c.conversationDatabase.UpdateUsersConversationFiled(ctx, []string{conversation.OwnerUserID}, conversation.ConversationID, map[string]interface{}{"latest_msg_destruct_time": now}); err != nil {
log.ZError(ctx, "updateUsersConversationFiled failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber, "conversationIDsNum", len(conversationIDs), "conversationIDs", conversationIDs)
if len(conversationIDs) == 0 {
continue
}
conversations, err := c.conversationDatabase.GetConversationsByConversationID(ctx, conversationIDs)
if err != nil {
log.ZError(ctx, "GetConversationsByConversationID failed", err, "conversationIDs", conversationIDs)
continue
}
temp := make([]*relation.ConversationModel, 0, len(conversations))
for i, conversation := range conversations {
if conversation.IsMsgDestruct && conversation.MsgDestructTime != 0 && (time.Now().Unix() > (conversation.MsgDestructTime+conversation.LatestMsgDestructTime.Unix()+8*60*60)) || conversation.LatestMsgDestructTime.IsZero() {
temp = append(temp, conversations[i])
}
}
for _, conversation := range temp {
ctx = mcontext.NewCtx(utils.GetSelfFuncName() + "-" + utils.OperationIDGenerator() + "-" + conversation.ConversationID + "-" + conversation.OwnerUserID)
log.ZDebug(
ctx,
"UserMsgsDestruct",
"conversationID",
conversation.ConversationID,
"ownerUserID",
conversation.OwnerUserID,
"msgDestructTime",
conversation.MsgDestructTime,
"lastMsgDestructTime",
conversation.LatestMsgDestructTime,
)
now := time.Now()
seqs, err := c.msgDatabase.UserMsgsDestruct(ctx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime)
if err != nil {
log.ZError(ctx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
continue
}
if err := c.msgNotificationSender.UserDeleteMsgsNotification(ctx, conversation.OwnerUserID, conversation.ConversationID, seqs); err != nil {
log.ZError(ctx, "userDeleteMsgsNotification failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
if len(seqs) > 0 {
if err := c.conversationDatabase.UpdateUsersConversationFiled(ctx, []string{conversation.OwnerUserID}, conversation.ConversationID, map[string]interface{}{"latest_msg_destruct_time": now}); err != nil {
log.ZError(ctx, "updateUsersConversationFiled failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
continue
}
if err := c.msgNotificationSender.UserDeleteMsgsNotification(ctx, conversation.OwnerUserID, conversation.ConversationID, seqs); err != nil {
log.ZError(ctx, "userDeleteMsgsNotification failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
}
}
}
}

@ -39,13 +39,13 @@ func StartTask() error {
log.ZInfo(context.Background(), "start chatRecordsClearTime cron task", "cron config", config.Config.ChatRecordsClearTime)
_, err = c.AddFunc(config.Config.ChatRecordsClearTime, msgTool.AllConversationClearMsgAndFixSeq)
if err != nil {
fmt.Println("start allConversationClearMsgAndFixSeq cron failed", err.Error(), config.Config.ChatRecordsClearTime)
log.ZError(context.Background(), "start allConversationClearMsgAndFixSeq cron failed", err)
panic(err)
}
log.ZInfo(context.Background(), "start msgDestruct cron task", "cron config", config.Config.MsgDestructTime)
_, err = c.AddFunc(config.Config.MsgDestructTime, msgTool.ConversationsDestructMsgs)
if err != nil {
fmt.Println("start conversationsDestructMsgs cron failed", err.Error(), config.Config.ChatRecordsClearTime)
log.ZError(context.Background(), "start conversationsDestructMsgs cron failed", err)
panic(err)
}
c.Start()

@ -29,6 +29,7 @@ import (
"github.com/OpenIMSDK/tools/mw"
"github.com/OpenIMSDK/tools/tx"
"github.com/OpenIMSDK/tools/utils"
"math/rand"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
@ -102,18 +103,55 @@ func InitMsgTool() (*MsgTool, error) {
return msgTool, nil
}
//func (c *MsgTool) AllConversationClearMsgAndFixSeq() {
// ctx := mcontext.NewCtx(utils.GetSelfFuncName())
// log.ZInfo(ctx, "============================ start del cron task ============================")
// conversationIDs, err := c.conversationDatabase.GetAllConversationIDs(ctx)
// if err != nil {
// log.ZError(ctx, "GetAllConversationIDs failed", err)
// return
// }
// for _, conversationID := range conversationIDs {
// conversationIDs = append(conversationIDs, utils.GetNotificationConversationIDByConversationID(conversationID))
// }
// c.ClearConversationsMsg(ctx, conversationIDs)
// log.ZInfo(ctx, "============================ start del cron finished ============================")
//}
func (c *MsgTool) AllConversationClearMsgAndFixSeq() {
ctx := mcontext.NewCtx(utils.GetSelfFuncName())
log.ZInfo(ctx, "============================ start del cron task ============================")
conversationIDs, err := c.conversationDatabase.GetAllConversationIDs(ctx)
num, err := c.conversationDatabase.GetAllConversationIDsNumber(ctx)
if err != nil {
log.ZError(ctx, "GetAllConversationIDs failed", err)
log.ZError(ctx, "GetAllConversationIDsNumber failed", err)
return
}
for _, conversationID := range conversationIDs {
conversationIDs = append(conversationIDs, utils.GetNotificationConversationIDByConversationID(conversationID))
const batchNum = 50
log.ZDebug(ctx, "GetAllConversationIDsNumber", "num", num)
if num == 0 {
return
}
count := int(num/batchNum + num/batchNum/2)
if count < 1 {
count = 1
}
maxPage := 1 + num/batchNum
if num%batchNum != 0 {
maxPage++
}
for i := 0; i < count; i++ {
pageNumber := rand.Int63() % maxPage
conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, int32(pageNumber), batchNum)
if err != nil {
log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber)
continue
}
log.ZDebug(ctx, "PageConversationIDs failed", "pageNumber", pageNumber, "conversationIDsNum", len(conversationIDs), "conversationIDs", conversationIDs)
if len(conversationIDs) == 0 {
continue
}
c.ClearConversationsMsg(ctx, conversationIDs)
}
c.ClearConversationsMsg(ctx, conversationIDs)
log.ZInfo(ctx, "============================ start del cron finished ============================")
}

@ -41,7 +41,7 @@ type SendMsgReq struct {
type BatchSendMsgReq struct {
SendMsg
IsSendAll bool `json:"isSendAll"`
RecvIDs []string `json:"recvIDs" binding:"required"`
RecvIDs []string `json:"recvIDs" binding:"required"`
}
type BatchSendMsgResp struct {

@ -16,9 +16,11 @@ package cmd
import (
"fmt"
"github.com/OpenIMSDK/protocol/constant"
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/spf13/cobra"
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
)
type ApiCmd struct {

@ -16,11 +16,13 @@ package cmd
import (
"errors"
"github.com/OpenIMSDK/protocol/constant"
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/spf13/cobra"
"google.golang.org/grpc"
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/OpenIMSDK/tools/discoveryregistry"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"

@ -21,8 +21,9 @@ import (
"path/filepath"
"github.com/OpenIMSDK/protocol/constant"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"gopkg.in/yaml.v3"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
)
//go:embed version

@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"errors"
"github.com/OpenIMSDK/tools/mw/specialerror"
"time"
"github.com/dtm-labs/rockscache"
@ -209,6 +210,9 @@ func batchGetCache2[T any, K comparable](
return fns(ctx, key)
})
if err != nil {
if errs.ErrRecordNotFound.Is(specialerror.ErrCode(errs.Unwrap(err))) {
continue
}
return nil, err
}
res = append(res, val)

@ -0,0 +1,190 @@
package cache
import (
"context"
"github.com/dtm-labs/rockscache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/s3"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"github.com/redis/go-redis/v9"
"strconv"
"time"
)
type ObjectCache interface {
metaCache
GetName(ctx context.Context, name string) (*relationtb.ObjectModel, error)
DelObjectName(names ...string) ObjectCache
}
func NewObjectCacheRedis(rdb redis.UniversalClient, objDB relationtb.ObjectInfoModelInterface) ObjectCache {
rcClient := rockscache.NewClient(rdb, rockscache.NewDefaultOptions())
return &objectCacheRedis{
rcClient: rcClient,
expireTime: time.Hour * 12,
objDB: objDB,
metaCache: NewMetaCacheRedis(rcClient),
}
}
type objectCacheRedis struct {
metaCache
objDB relationtb.ObjectInfoModelInterface
rcClient *rockscache.Client
expireTime time.Duration
}
func (g *objectCacheRedis) NewCache() ObjectCache {
return &objectCacheRedis{
rcClient: g.rcClient,
expireTime: g.expireTime,
objDB: g.objDB,
metaCache: NewMetaCacheRedis(g.rcClient, g.metaCache.GetPreDelKeys()...),
}
}
func (g *objectCacheRedis) DelObjectName(names ...string) ObjectCache {
objectCache := g.NewCache()
keys := make([]string, 0, len(names))
for _, name := range names {
keys = append(keys, g.getObjectKey(name))
}
objectCache.AddKeys(keys...)
return objectCache
}
func (g *objectCacheRedis) getObjectKey(name string) string {
return "OBJECT:" + name
}
func (g *objectCacheRedis) GetName(ctx context.Context, name string) (*relationtb.ObjectModel, error) {
return getCache(ctx, g.rcClient, g.getObjectKey(name), g.expireTime, func(ctx context.Context) (*relationtb.ObjectModel, error) {
return g.objDB.Take(ctx, name)
})
}
type S3Cache interface {
metaCache
GetKey(ctx context.Context, engine string, key string) (*s3.ObjectInfo, error)
DelS3Key(engine string, keys ...string) S3Cache
}
func NewS3Cache(rdb redis.UniversalClient, s3 s3.Interface) S3Cache {
rcClient := rockscache.NewClient(rdb, rockscache.NewDefaultOptions())
return &s3CacheRedis{
rcClient: rcClient,
expireTime: time.Hour * 12,
s3: s3,
metaCache: NewMetaCacheRedis(rcClient),
}
}
type s3CacheRedis struct {
metaCache
s3 s3.Interface
rcClient *rockscache.Client
expireTime time.Duration
}
func (g *s3CacheRedis) NewCache() S3Cache {
return &s3CacheRedis{
rcClient: g.rcClient,
expireTime: g.expireTime,
s3: g.s3,
metaCache: NewMetaCacheRedis(g.rcClient, g.metaCache.GetPreDelKeys()...),
}
}
func (g *s3CacheRedis) DelS3Key(engine string, keys ...string) S3Cache {
s3cache := g.NewCache()
ks := make([]string, 0, len(keys))
for _, key := range keys {
ks = append(ks, g.getS3Key(engine, key))
}
s3cache.AddKeys(ks...)
return s3cache
}
func (g *s3CacheRedis) getS3Key(engine string, name string) string {
return "S3:" + engine + ":" + name
}
func (g *s3CacheRedis) GetKey(ctx context.Context, engine string, name string) (*s3.ObjectInfo, error) {
return getCache(ctx, g.rcClient, g.getS3Key(engine, name), g.expireTime, func(ctx context.Context) (*s3.ObjectInfo, error) {
return g.s3.StatObject(ctx, name)
})
}
type MinioCache interface {
metaCache
GetImageObjectKeyInfo(ctx context.Context, key string, fn func(ctx context.Context) (*MinioImageInfo, error)) (*MinioImageInfo, error)
GetThumbnailKey(ctx context.Context, key string, format string, width int, height int, minioCache func(ctx context.Context) (string, error)) (string, error)
DelObjectImageInfoKey(keys ...string) MinioCache
DelImageThumbnailKey(key string, format string, width int, height int) MinioCache
}
func NewMinioCache(rdb redis.UniversalClient) MinioCache {
rcClient := rockscache.NewClient(rdb, rockscache.NewDefaultOptions())
return &minioCacheRedis{
rcClient: rcClient,
expireTime: time.Hour * 24 * 7,
metaCache: NewMetaCacheRedis(rcClient),
}
}
type minioCacheRedis struct {
metaCache
rcClient *rockscache.Client
expireTime time.Duration
}
func (g *minioCacheRedis) NewCache() MinioCache {
return &minioCacheRedis{
rcClient: g.rcClient,
expireTime: g.expireTime,
metaCache: NewMetaCacheRedis(g.rcClient, g.metaCache.GetPreDelKeys()...),
}
}
func (g *minioCacheRedis) DelObjectImageInfoKey(keys ...string) MinioCache {
s3cache := g.NewCache()
ks := make([]string, 0, len(keys))
for _, key := range keys {
ks = append(ks, g.getObjectImageInfoKey(key))
}
s3cache.AddKeys(ks...)
return s3cache
}
func (g *minioCacheRedis) DelImageThumbnailKey(key string, format string, width int, height int) MinioCache {
s3cache := g.NewCache()
s3cache.AddKeys(g.getMinioImageThumbnailKey(key, format, width, height))
return s3cache
}
func (g *minioCacheRedis) getObjectImageInfoKey(key string) string {
return "MINIO:IMAGE:" + key
}
func (g *minioCacheRedis) getMinioImageThumbnailKey(key string, format string, width int, height int) string {
return "MINIO:THUMBNAIL:" + format + ":w" + strconv.Itoa(width) + ":h" + strconv.Itoa(height) + ":" + key
}
func (g *minioCacheRedis) GetImageObjectKeyInfo(ctx context.Context, key string, fn func(ctx context.Context) (*MinioImageInfo, error)) (*MinioImageInfo, error) {
info, err := getCache(ctx, g.rcClient, g.getObjectImageInfoKey(key), g.expireTime, fn)
if err != nil {
return nil, err
}
return info, nil
}
func (g *minioCacheRedis) GetThumbnailKey(ctx context.Context, key string, format string, width int, height int, minioCache func(ctx context.Context) (string, error)) (string, error) {
return getCache(ctx, g.rcClient, g.getMinioImageThumbnailKey(key, format, width, height), g.expireTime, minioCache)
}
type MinioImageInfo struct {
IsImg bool `json:"isImg"`
Width int `json:"width"`
Height int `json:"height"`
Format string `json:"format"`
Etag string `json:"etag"`
}

@ -50,6 +50,8 @@ type ConversationDatabase interface {
GetConversationIDs(ctx context.Context, userID string) ([]string, error)
GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error)
GetAllConversationIDs(ctx context.Context) ([]string, error)
GetAllConversationIDsNumber(ctx context.Context) (int64, error)
PageConversationIDs(ctx context.Context, pageNumber, showNumber int32) (conversationIDs []string, err error)
//GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error)
GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.ConversationModel, error)
GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationtb.ConversationModel, error)
@ -295,6 +297,14 @@ func (c *conversationDatabase) GetAllConversationIDs(ctx context.Context) ([]str
return c.conversationDB.GetAllConversationIDs(ctx)
}
func (c *conversationDatabase) GetAllConversationIDsNumber(ctx context.Context) (int64, error) {
return c.conversationDB.GetAllConversationIDsNumber(ctx)
}
func (c *conversationDatabase) PageConversationIDs(ctx context.Context, pageNumber, showNumber int32) ([]string, error) {
return c.conversationDB.PageConversationIDs(ctx, pageNumber, showNumber)
}
//func (c *conversationDatabase) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) {
// return c.cache.GetUserAllHasReadSeqs(ctx, ownerUserID)
//}

@ -16,12 +16,13 @@ package controller
import (
"context"
"path/filepath"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/s3"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/s3/cont"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"github.com/redis/go-redis/v9"
"path/filepath"
"time"
)
type S3Database interface {
@ -34,16 +35,18 @@ type S3Database interface {
SetObject(ctx context.Context, info *relation.ObjectModel) error
}
func NewS3Database(s3 s3.Interface, obj relation.ObjectInfoModelInterface) S3Database {
func NewS3Database(rdb redis.UniversalClient, s3 s3.Interface, obj relation.ObjectInfoModelInterface) S3Database {
return &s3Database{
s3: cont.New(s3),
obj: obj,
s3: cont.New(cache.NewS3Cache(rdb, s3), s3),
cache: cache.NewObjectCacheRedis(rdb, obj),
db: obj,
}
}
type s3Database struct {
s3 *cont.Controller
obj relation.ObjectInfoModelInterface
s3 *cont.Controller
cache cache.ObjectCache
db relation.ObjectInfoModelInterface
}
func (s *s3Database) PartSize(ctx context.Context, size int64) (int64, error) {
@ -67,11 +70,14 @@ func (s *s3Database) CompleteMultipartUpload(ctx context.Context, uploadID strin
}
func (s *s3Database) SetObject(ctx context.Context, info *relation.ObjectModel) error {
return s.obj.SetObject(ctx, info)
if err := s.db.SetObject(ctx, info); err != nil {
return err
}
return s.cache.DelObjectName(info.Name).ExecDel(ctx)
}
func (s *s3Database) AccessURL(ctx context.Context, name string, expire time.Duration, opt *s3.AccessURLOption) (time.Time, string, error) {
obj, err := s.obj.Take(ctx, name)
obj, err := s.cache.GetName(ctx, name)
if err != nil {
return time.Time{}, "", err
}

@ -16,9 +16,7 @@ package relation
import (
"context"
"github.com/OpenIMSDK/tools/errs"
"gorm.io/gorm"
"github.com/OpenIMSDK/protocol/constant"
@ -188,6 +186,18 @@ func (c *ConversationGorm) GetAllConversationIDs(ctx context.Context) (conversat
)
}
func (c *ConversationGorm) GetAllConversationIDsNumber(ctx context.Context) (int64, error) {
var num int64
err := c.db(ctx).Select("COUNT(DISTINCT conversation_id)").Model(&relation.ConversationModel{}).Count(&num).Error
return num, errs.Wrap(err)
}
func (c *ConversationGorm) PageConversationIDs(ctx context.Context, pageNumber, showNumber int32) (conversationIDs []string, err error) {
err = c.db(ctx).Distinct("conversation_id").Limit(int(showNumber)).Offset(int((pageNumber-1)*showNumber)).Pluck("conversation_id", &conversationIDs).Error
err = errs.Wrap(err)
return
}
func (c *ConversationGorm) GetUserAllHasReadSeqs(
ctx context.Context,
ownerUserID string,

@ -22,10 +22,11 @@ import (
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mw/specialerror"
mysqldriver "github.com/go-sql-driver/mysql"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/logger"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
)
const (

@ -20,6 +20,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"path"
"strings"
"time"
@ -32,12 +33,16 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/db/s3"
)
func New(impl s3.Interface) *Controller {
return &Controller{impl: impl}
func New(cache cache.S3Cache, impl s3.Interface) *Controller {
return &Controller{
cache: cache,
impl: impl,
}
}
type Controller struct {
impl s3.Interface
cache cache.S3Cache
impl s3.Interface
}
func (c *Controller) HashPath(md5 string) string {
@ -69,8 +74,12 @@ func (c *Controller) PartLimit() *s3.PartLimit {
return c.impl.PartLimit()
}
func (c *Controller) StatObject(ctx context.Context, name string) (*s3.ObjectInfo, error) {
return c.cache.GetKey(ctx, c.impl.Engine(), name)
}
func (c *Controller) GetHashObject(ctx context.Context, hash string) (*s3.ObjectInfo, error) {
return c.impl.StatObject(ctx, c.HashPath(hash))
return c.StatObject(ctx, c.HashPath(hash))
}
func (c *Controller) InitiateUpload(ctx context.Context, hash string, size int64, expire time.Duration, maxParts int) (*InitiateUploadResult, error) {
@ -94,7 +103,7 @@ func (c *Controller) InitiateUpload(ctx context.Context, hash string, size int64
if maxParts > 0 && partNumber > 0 && partNumber < maxParts {
return nil, errors.New(fmt.Sprintf("too many parts: %d", partNumber))
}
if info, err := c.impl.StatObject(ctx, c.HashPath(hash)); err == nil {
if info, err := c.StatObject(ctx, c.HashPath(hash)); err == nil {
return nil, &HashAlreadyExistsError{Object: info}
} else if !c.impl.IsNotFound(err) {
return nil, err
@ -168,13 +177,13 @@ func (c *Controller) CompleteUpload(ctx context.Context, uploadID string, partHa
fmt.Println("CompleteUpload sum:", hex.EncodeToString(md5Sum[:]), "upload hash:", upload.Hash)
return nil, errors.New("md5 mismatching")
}
if info, err := c.impl.StatObject(ctx, c.HashPath(upload.Hash)); err == nil {
if info, err := c.StatObject(ctx, c.HashPath(upload.Hash)); err == nil {
return &UploadResult{
Key: info.Key,
Size: info.Size,
Hash: info.ETag,
}, nil
} else if !c.impl.IsNotFound(err) {
} else if !c.IsNotFound(err) {
return nil, err
}
cleanObject := make(map[string]struct{})
@ -200,7 +209,7 @@ func (c *Controller) CompleteUpload(ctx context.Context, uploadID string, partHa
}
targetKey = result.Key
case UploadTypePresigned:
uploadInfo, err := c.impl.StatObject(ctx, upload.Key)
uploadInfo, err := c.StatObject(ctx, upload.Key)
if err != nil {
return nil, err
}
@ -230,6 +239,9 @@ func (c *Controller) CompleteUpload(ctx context.Context, uploadID string, partHa
default:
return nil, errors.New("invalid upload id type")
}
if err := c.cache.DelS3Key(c.impl.Engine(), targetKey).ExecDel(ctx); err != nil {
return nil, err
}
return &UploadResult{
Key: targetKey,
Size: upload.Size,
@ -253,7 +265,7 @@ func (c *Controller) AuthSign(ctx context.Context, uploadID string, partNumbers
}
func (c *Controller) IsNotFound(err error) bool {
return c.impl.IsNotFound(err)
return c.impl.IsNotFound(err) || errs.ErrRecordNotFound.Is(err)
}
func (c *Controller) AccessURL(ctx context.Context, name string, expire time.Duration, opt *s3.AccessURLOption) (string, error) {

@ -15,20 +15,14 @@
package minio
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"image"
"image/gif"
"image/jpeg"
"image/png"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"io"
"net/http"
"net/url"
"path"
"path/filepath"
"reflect"
"strconv"
"strings"
@ -56,13 +50,13 @@ const (
)
const (
maxImageWidth = 1024
maxImageHeight = 1024
maxImageSize = 1024 * 1024 * 50
pathInfo = "openim/thumbnail"
maxImageWidth = 1024
maxImageHeight = 1024
maxImageSize = 1024 * 1024 * 50
imageThumbnailPath = "openim/thumbnail"
)
func NewMinio() (s3.Interface, error) {
func NewMinio(cache cache.MinioCache) (s3.Interface, error) {
u, err := url.Parse(config.Config.Object.Minio.Endpoint)
if err != nil {
return nil, err
@ -80,6 +74,7 @@ func NewMinio() (s3.Interface, error) {
core: &minio.Core{Client: client},
lock: &sync.Mutex{},
init: false,
cache: cache,
}
if config.Config.Object.Minio.SignEndpoint == "" || config.Config.Object.Minio.SignEndpoint == config.Config.Object.Minio.Endpoint {
m.opts = opts
@ -124,6 +119,7 @@ type Minio struct {
lock sync.Locker
init bool
prefix string
cache cache.MinioCache
}
func (m *Minio) initMinio(ctx context.Context) error {
@ -227,6 +223,7 @@ func (m *Minio) CompleteMultipartUpload(ctx context.Context, uploadID string, na
if err != nil {
return nil, err
}
m.delObjectImageInfoKey(ctx, name, upload.Size)
return &s3.CompleteMultipartUploadResult{
Location: upload.Location,
Bucket: upload.Bucket,
@ -389,7 +386,7 @@ func (m *Minio) ListUploadedParts(ctx context.Context, uploadID string, name str
return res, nil
}
func (m *Minio) presignedGetObject(ctx context.Context, name string, expire time.Duration, query url.Values) (string, error) {
func (m *Minio) PresignedGetObject(ctx context.Context, name string, expire time.Duration, query url.Values) (string, error) {
if expire <= 0 {
expire = time.Hour * 24 * 365 * 99 // 99 years
} else if expire < time.Second {
@ -427,109 +424,9 @@ func (m *Minio) AccessURL(ctx context.Context, name string, expire time.Duration
}
}
if opt.Image == nil || (opt.Image.Width < 0 && opt.Image.Height < 0 && opt.Image.Format == "") || (opt.Image.Width > maxImageWidth || opt.Image.Height > maxImageHeight) {
return m.presignedGetObject(ctx, name, expire, reqParams)
return m.PresignedGetObject(ctx, name, expire, reqParams)
}
fileInfo, err := m.StatObject(ctx, name)
if err != nil {
return "", err
}
if fileInfo.Size > maxImageSize {
return "", errors.New("file size too large")
}
objectInfoPath := path.Join(pathInfo, fileInfo.ETag, "image.json")
var (
img image.Image
info minioImageInfo
)
data, err := m.getObjectData(ctx, objectInfoPath, 1024)
if err == nil {
if err := json.Unmarshal(data, &info); err != nil {
return "", fmt.Errorf("unmarshal minio image info.json error: %w", err)
}
if info.NotImage {
return "", errors.New("not image")
}
} else if m.IsNotFound(err) {
reader, err := m.core.Client.GetObject(ctx, m.bucket, name, minio.GetObjectOptions{})
if err != nil {
return "", err
}
defer reader.Close()
imageInfo, format, err := ImageStat(reader)
if err == nil {
info.NotImage = false
info.Format = format
info.Width, info.Height = ImageWidthHeight(imageInfo)
img = imageInfo
} else {
info.NotImage = true
}
data, err := json.Marshal(&info)
if err != nil {
return "", err
}
if _, err := m.core.Client.PutObject(ctx, m.bucket, objectInfoPath, bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{}); err != nil {
return "", err
}
} else {
return "", err
}
if opt.Image.Width > info.Width || opt.Image.Width <= 0 {
opt.Image.Width = info.Width
}
if opt.Image.Height > info.Height || opt.Image.Height <= 0 {
opt.Image.Height = info.Height
}
opt.Image.Format = strings.ToLower(opt.Image.Format)
if opt.Image.Format == formatJpg {
opt.Image.Format = formatJpeg
}
switch opt.Image.Format {
case formatPng:
case formatJpeg:
case formatGif:
default:
if info.Format == formatGif {
opt.Image.Format = formatGif
} else {
opt.Image.Format = formatJpeg
}
}
reqParams.Set("response-content-type", "image/"+opt.Image.Format)
if opt.Image.Width == info.Width && opt.Image.Height == info.Height && opt.Image.Format == info.Format {
return m.presignedGetObject(ctx, name, expire, reqParams)
}
cacheKey := filepath.Join(pathInfo, fileInfo.ETag, fmt.Sprintf("image_w%d_h%d.%s", opt.Image.Width, opt.Image.Height, opt.Image.Format))
if _, err := m.core.Client.StatObject(ctx, m.bucket, cacheKey, minio.StatObjectOptions{}); err == nil {
return m.presignedGetObject(ctx, cacheKey, expire, reqParams)
} else if !m.IsNotFound(err) {
return "", err
}
if img == nil {
reader, err := m.core.Client.GetObject(ctx, m.bucket, name, minio.GetObjectOptions{})
if err != nil {
return "", err
}
defer reader.Close()
img, _, err = ImageStat(reader)
if err != nil {
return "", err
}
}
thumbnail := resizeImage(img, opt.Image.Width, opt.Image.Height)
buf := bytes.NewBuffer(nil)
switch opt.Image.Format {
case formatPng:
err = png.Encode(buf, thumbnail)
case formatJpeg:
err = jpeg.Encode(buf, thumbnail, nil)
case formatGif:
err = gif.Encode(buf, thumbnail, nil)
}
if _, err := m.core.Client.PutObject(ctx, m.bucket, cacheKey, buf, int64(buf.Len()), minio.PutObjectOptions{}); err != nil {
return "", err
}
return m.presignedGetObject(ctx, cacheKey, expire, reqParams)
return m.getImageThumbnailURL(ctx, name, expire, opt.Image)
}
func (m *Minio) getObjectData(ctx context.Context, name string, limit int64) ([]byte, error) {
@ -541,5 +438,5 @@ func (m *Minio) getObjectData(ctx context.Context, name string, limit int64) ([]
if limit < 0 {
return io.ReadAll(object)
}
return io.ReadAll(io.LimitReader(object, 1024))
return io.ReadAll(io.LimitReader(object, limit))
}

@ -1,22 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package minio
type minioImageInfo struct {
NotImage bool `json:"notImage,omitempty"`
Width int `json:"width,omitempty"`
Height int `json:"height,omitempty"`
Format string `json:"format,omitempty"`
}

@ -0,0 +1,134 @@
package minio
import (
"bytes"
"context"
"errors"
"fmt"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log"
"github.com/minio/minio-go/v7"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/s3"
"image"
"image/gif"
"image/jpeg"
"image/png"
"net/url"
"path/filepath"
"strings"
"time"
)
func (m *Minio) getImageThumbnailURL(ctx context.Context, name string, expire time.Duration, opt *s3.Image) (string, error) {
var img image.Image
info, err := m.cache.GetImageObjectKeyInfo(ctx, name, func(ctx context.Context) (info *cache.MinioImageInfo, err error) {
info, img, err = m.getObjectImageInfo(ctx, name)
return
})
if err != nil {
return "", err
}
if !info.IsImg {
return "", errs.ErrData.Wrap("object not image")
}
if opt.Width > info.Width || opt.Width <= 0 {
opt.Width = info.Width
}
if opt.Height > info.Height || opt.Height <= 0 {
opt.Height = info.Height
}
opt.Format = strings.ToLower(opt.Format)
if opt.Format == formatJpg {
opt.Format = formatJpeg
}
switch opt.Format {
case formatPng, formatJpeg, formatGif:
default:
opt.Format = ""
}
reqParams := make(url.Values)
if opt.Width == info.Width && opt.Height == info.Height && (opt.Format == info.Format || opt.Format == "") {
reqParams.Set("response-content-type", "image/"+info.Format)
return m.PresignedGetObject(ctx, name, expire, reqParams)
}
if opt.Format == "" {
switch opt.Format {
case formatGif:
opt.Format = formatGif
case formatJpeg:
opt.Format = formatJpeg
case formatPng:
opt.Format = formatPng
default:
opt.Format = formatPng
}
}
key, err := m.cache.GetThumbnailKey(ctx, name, opt.Format, opt.Width, opt.Height, func(ctx context.Context) (string, error) {
if img == nil {
reader, err := m.core.Client.GetObject(ctx, m.bucket, name, minio.GetObjectOptions{})
if err != nil {
return "", err
}
defer reader.Close()
img, _, err = ImageStat(reader)
if err != nil {
return "", err
}
}
thumbnail := resizeImage(img, opt.Width, opt.Height)
buf := bytes.NewBuffer(nil)
switch opt.Format {
case formatPng:
err = png.Encode(buf, thumbnail)
case formatJpeg:
err = jpeg.Encode(buf, thumbnail, nil)
case formatGif:
err = gif.Encode(buf, thumbnail, nil)
}
cacheKey := filepath.Join(imageThumbnailPath, info.Etag, fmt.Sprintf("image_w%d_h%d.%s", opt.Width, opt.Height, opt.Format))
if _, err := m.core.Client.PutObject(ctx, m.bucket, cacheKey, buf, int64(buf.Len()), minio.PutObjectOptions{}); err != nil {
return "", err
}
return cacheKey, nil
})
if err != nil {
return "", err
}
reqParams.Set("response-content-type", "image/"+opt.Format)
return m.PresignedGetObject(ctx, key, expire, reqParams)
}
func (m *Minio) getObjectImageInfo(ctx context.Context, name string) (*cache.MinioImageInfo, image.Image, error) {
fileInfo, err := m.StatObject(ctx, name)
if err != nil {
return nil, nil, err
}
if fileInfo.Size > maxImageSize {
return nil, nil, errors.New("file size too large")
}
imageData, err := m.getObjectData(ctx, name, fileInfo.Size)
if err != nil {
return nil, nil, err
}
var info cache.MinioImageInfo
imageInfo, format, err := ImageStat(bytes.NewReader(imageData))
if err == nil {
info.IsImg = true
info.Format = format
info.Width, info.Height = ImageWidthHeight(imageInfo)
} else {
info.IsImg = false
}
info.Etag = fileInfo.ETag
return &info, imageInfo, nil
}
func (m *Minio) delObjectImageInfoKey(ctx context.Context, key string, size int64) {
if size > 0 && size > maxImageSize {
return
}
if err := m.cache.DelObjectImageInfoKey(key).ExecDel(ctx); err != nil {
log.ZError(ctx, "DelObjectImageInfoKey failed", err, "key", key)
}
}

@ -63,6 +63,8 @@ type ConversationModelInterface interface {
GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error)
FindSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error)
GetAllConversationIDs(ctx context.Context) ([]string, error)
GetAllConversationIDsNumber(ctx context.Context) (int64, error)
PageConversationIDs(ctx context.Context, pageNumber, showNumber int32) (conversationIDs []string, err error)
GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (hashReadSeqs map[string]int64, err error)
GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*ConversationModel, error)
GetConversationIDsNeedDestruct(ctx context.Context) ([]*ConversationModel, error)

@ -4,12 +4,14 @@ import (
"context"
"errors"
"fmt"
"time"
"github.com/OpenIMSDK/tools/discoveryregistry"
openkeeper "github.com/OpenIMSDK/tools/discoveryregistry/zookeeper"
"github.com/OpenIMSDK/tools/log"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"google.golang.org/grpc"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
)
func NewDiscoveryRegister(envType string) (discoveryregistry.SvcDiscoveryRegistry, error) {

@ -40,8 +40,8 @@ var (
// companion .gitattributes file containing 'export-subst' in this same
// directory. See also https://git-scm.com/docs/gitattributes
gitVersion string = "latest"
gitCommit string = "" // sha1 from git, output of $(git rev-parse HEAD)
gitTreeState string = "" // state of git tree, either "clean" or "dirty"
gitCommit string = "" // sha1 from git, output of $(git rev-parse HEAD)
gitTreeState string = "" // state of git tree, either "clean" or "dirty"
buildDate string = "1970-01-01T00:00:00Z" // build date in ISO8601 format, output of $(date -u +'%Y-%m-%dT%H:%M:%SZ')
)
)

@ -4,15 +4,15 @@ package version
// TODO: Add []string of api versions supported? It's still unclear
// how we'll want to distribute that information.
type Info struct {
Major string `json:"major,omitempty"`
Minor string `json:"minor,omitempty"`
GitVersion string `json:"gitVersion"`
Major string `json:"major,omitempty"`
Minor string `json:"minor,omitempty"`
GitVersion string `json:"gitVersion"`
GitTreeState string `json:"gitTreeState,omitempty"`
GitCommit string `json:"gitCommit,omitempty"`
BuildDate string `json:"buildDate"`
GoVersion string `json:"goVersion"`
Compiler string `json:"compiler"`
Platform string `json:"platform"`
GitCommit string `json:"gitCommit,omitempty"`
BuildDate string `json:"buildDate"`
GoVersion string `json:"goVersion"`
Compiler string `json:"compiler"`
Platform string `json:"platform"`
}
type Output struct {
@ -21,7 +21,7 @@ type Output struct {
}
type OpenIMClientVersion struct {
ClientVersion string `json:"clientVersion,omitempty" yaml:"clientVersion,omitempty"` //sdk core version
ClientVersion string `json:"clientVersion,omitempty" yaml:"clientVersion,omitempty"` //sdk core version
}
// String returns info as a human-friendly version string.

@ -13,15 +13,15 @@ func Get() Info {
// These variables typically come from -ldflags settings and in
// their absence fallback to the settings in ./base.go
return Info{
Major: gitMajor,
Minor: gitMinor,
GitVersion: gitVersion,
Major: gitMajor,
Minor: gitMinor,
GitVersion: gitVersion,
GitTreeState: gitTreeState,
GitCommit: gitCommit,
BuildDate: buildDate,
GoVersion: runtime.Version(),
Compiler: runtime.Compiler,
Platform: fmt.Sprintf("%s/%s", runtime.GOOS, runtime.GOARCH),
GitCommit: gitCommit,
BuildDate: buildDate,
GoVersion: runtime.Version(),
Compiler: runtime.Compiler,
Platform: fmt.Sprintf("%s/%s", runtime.GOOS, runtime.GOARCH),
}
}

Loading…
Cancel
Save