feat: add local cache for high frequency reads (#2036)

* feat: msg local cache

* feat: msg local cache

* feat: msg local cache

* feat: msg local cache

* feat: msg local cache

* feat: msg local cache

* fix: mongo

* fix: mongo

* fix: mongo

* openim.yaml

* localcache

* localcache

* localcache

* localcache

* localcache

* localcache

* localcache

* localcache

* localcache

* local cache

* local cache

* local cache

* local cache

* fix: GroupApplicationAcceptedNotification

* fix: GroupApplicationAcceptedNotification

* fix: NotificationUserInfoUpdate

* feat: cache add single-flight and timing-wheel.

* feat: local cache

* feat: local cache

* feat: local cache

* feat: cache add single-flight and timing-wheel.

* feat: local cache

* feat: local cache

* feat: local cache

* feat: local cache

* feat: local cache

* feat: local cache

* feat: local cache

* feat: local cache

* feat: local cache

* feat: local cache

* feat: local cache

* feat: local cache

* feat: local cache

* feat: local cache

* feat: local cache

* feat: local cache

* feat: local cache

* feat: msg rpc local cache

* feat: msg rpc local cache

* feat: msg rpc local cache

* feat: msg rpc local cache

* feat: msg rpc local cache

* feat: msg rpc local cache

* refactor: refactor the code of push and optimization.

* cicd: robot automated Change

* refactor: rename cache.

* merge

* fix: refactor project dir avoid import cycle.

* update tools

* merge

* feat: conversation FindRecvMsgNotNotifyUserIDs

* feat: conversation FindRecvMsgNotNotifyUserIDs

* feat: conversation FindRecvMsgNotNotifyUserIDs

* merge

* merge the latest main

---------

Co-authored-by: Gordon <46924906+FGadvancer@users.noreply.github.com>
Co-authored-by: withchao <withchao@users.noreply.github.com>
pull/2047/head
chao 10 months ago committed by GitHub
parent 291443dd6b
commit b9cf40034c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -535,3 +535,39 @@ prometheus:
rtcPrometheusPort: [ ${RTC_PROM_PORT} ]
thirdPrometheusPort: [ ${THIRD_PROM_PORT} ]
messageTransferPrometheusPort: [ ${MSG_TRANSFER_PROM_PORT} ] # List of ports
###################### LocalCache configuration information ######################
# topic: redis subscriber channel
# slotNum: number of slots, multiple slots can prevent too many keys from competing for a lock
# slotSize: number of slots, the number of cached keys per slot, the overall cache quantity is slotNum * slotSize
# successExpire: successful cache time seconds
# failedExpire: failed cache time seconds
# disable local caching and annotate topic, slotNum, and slotSize
localCache:
user:
topic: DELETE_CACHE_USER
slotNum: 100
slotSize: 2000
successExpire: 300
failedExpire: 5
group:
topic: DELETE_CACHE_GROUP
slotNum: 100
slotSize: 2000
successExpire: 300
failedExpire: 5
friend:
topic: DELETE_CACHE_FRIEND
slotNum: 100
slotSize: 2000
successExpire: 300
failedExpire: 5
conversation:
topic: DELETE_CACHE_CONVERSATION
slotNum: 100
slotSize: 2000
successExpire: 300
failedExpire: 5

@ -18,6 +18,7 @@ require (
github.com/minio/minio-go/v7 v7.0.67
github.com/mitchellh/mapstructure v1.5.0
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 // indirect
github.com/openimsdk/localcache v0.0.1
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
github.com/robfig/cron/v3 v3.0.1
@ -84,6 +85,7 @@ require (
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
@ -162,3 +164,5 @@ require (
golang.org/x/crypto v0.19.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
)
replace github.com/openimsdk/localcache => ./pkg/localcache

@ -16,6 +16,12 @@ cloud.google.com/go/storage v1.36.0/go.mod h1:M6M/3V/D3KpzMTJyPOR/HU6n2Si5QdaXYE
firebase.google.com/go v3.13.0+incompatible h1:3TdYC3DDi6aHn20qoRkxwGqNgdjtblwVAyRLQwGn/+4=
firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIwjt8toICdV5Wh9ptHs=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c=
github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ=
github.com/OpenIMSDK/protocol v0.0.53 h1:PtePLTqMYRHjWSf8XIL3x5JRC3YoySTMA6tRKfbUjQY=
github.com/OpenIMSDK/protocol v0.0.53/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y=
github.com/OpenIMSDK/tools v0.0.29 h1:NS4PEwYl9sX3SWsMjDOLVxMo3LcTWREMr+2cjzWjcqc=
github.com/OpenIMSDK/tools v0.0.29/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI=
github.com/IBM/sarama v1.42.2 h1:VoY4hVIZ+WQJ8G9KNY/SQlWguBQXQ9uvFPOnrcu8hEw=
github.com/IBM/sarama v1.42.2/go.mod h1:FLPGUGwYqEs62hq2bVG6Io2+5n+pS6s/WOXVKWSLFtE=
github.com/OpenIMSDK/protocol v0.0.55 h1:eBjg8DyuhxGmuCUjpoZjg6MJJJXU/xJ3xJwFhrn34yA=
@ -178,6 +184,8 @@ github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9
github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8=
github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=

@ -100,7 +100,7 @@ func CallbackUserKickOff(ctx context.Context, globalConfig *config.GlobalConfig,
// func callbackUserOnline(operationID, userID string, platformID int, token string, isAppBackground bool, connID
// string) cbApi.CommonCallbackResp {
// callbackResp := cbApi.CommonCallbackResp{OperationID: operationID}
// if !config.Config.Callback.CallbackUserOnline.Enable {
// if !config.Config.Callback.CallbackUserOnline.WithEnable {
// return callbackResp
// }
// callbackUserOnlineReq := cbApi.CallbackUserOnlineReq{
@ -129,7 +129,7 @@ func CallbackUserKickOff(ctx context.Context, globalConfig *config.GlobalConfig,
//}
//func callbackUserOffline(operationID, userID string, platformID int, connID string) cbApi.CommonCallbackResp {
// callbackResp := cbApi.CommonCallbackResp{OperationID: operationID}
// if !config.Config.Callback.CallbackUserOffline.Enable {
// if !config.Config.Callback.CallbackUserOffline.WithEnable {
// return callbackResp
// }
// callbackOfflineReq := cbApi.CallbackUserOfflineReq{
@ -156,7 +156,7 @@ func CallbackUserKickOff(ctx context.Context, globalConfig *config.GlobalConfig,
//}
//func callbackUserKickOff(operationID string, userID string, platformID int) cbApi.CommonCallbackResp {
// callbackResp := cbApi.CommonCallbackResp{OperationID: operationID}
// if !config.Config.Callback.CallbackUserKickOff.Enable {
// if !config.Config.Callback.CallbackUserKickOff.WithEnable {
// return callbackResp
// }
// callbackUserKickOffReq := cbApi.CallbackUserKickOffReq{

@ -23,7 +23,6 @@ import (
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mcontext"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
@ -58,22 +57,25 @@ type Server struct {
rpcPort int
prometheusPort int
LongConnServer LongConnServer
pushTerminal []int
config *config.GlobalConfig
pushTerminal map[int]struct{}
}
func (s *Server) SetLongConnServer(LongConnServer LongConnServer) {
s.LongConnServer = LongConnServer
}
func NewServer(rpcPort int, proPort int, longConnServer LongConnServer, config *config.GlobalConfig) *Server {
return &Server{
func NewServer(rpcPort int, proPort int, longConnServer LongConnServer, conf *config.GlobalConfig) *Server {
s := &Server{
rpcPort: rpcPort,
prometheusPort: proPort,
LongConnServer: longConnServer,
pushTerminal: []int{constant.IOSPlatformID, constant.AndroidPlatformID},
config: config,
pushTerminal: make(map[int]struct{}),
config: conf,
}
s.pushTerminal[constant.IOSPlatformID] = struct{}{}
s.pushTerminal[constant.AndroidPlatformID] = struct{}{}
return s
}
func (s *Server) OnlinePushMsg(
@ -127,13 +129,9 @@ func (s *Server) OnlineBatchPushOneMsg(
panic("implement me")
}
func (s *Server) SuperGroupOnlineBatchPushOneMsg(
ctx context.Context,
req *msggateway.OnlineBatchPushOneMsgReq,
func (s *Server) SuperGroupOnlineBatchPushOneMsg(ctx context.Context, req *msggateway.OnlineBatchPushOneMsgReq,
) (*msggateway.OnlineBatchPushOneMsgResp, error) {
var singleUserResults []*msggateway.SingleMsgToUserResults
for _, v := range req.PushToUserIDs {
var resp []*msggateway.SingleMsgToUserPlatform
results := &msggateway.SingleMsgToUserResults{
@ -154,23 +152,22 @@ func (s *Server) SuperGroupOnlineBatchPushOneMsg(
}
userPlatform := &msggateway.SingleMsgToUserPlatform{
RecvID: v,
RecvPlatFormID: int32(client.PlatformID),
}
if !client.IsBackground ||
(client.IsBackground && client.PlatformID != constant.IOSPlatformID) {
err := client.PushMessage(ctx, req.MsgData)
if err != nil {
userPlatform.ResultCode = -2
userPlatform.ResultCode = int64(errs.ErrPushMsgErr.Code())
resp = append(resp, userPlatform)
} else {
if utils.IsContainInt(client.PlatformID, s.pushTerminal) {
if _, ok := s.pushTerminal[client.PlatformID]; ok {
results.OnlinePush = true
resp = append(resp, userPlatform)
}
}
} else {
userPlatform.ResultCode = -3
userPlatform.ResultCode = int64(errs.ErrIOSBackgroundPushErr.Code())
resp = append(resp, userPlatform)
}
}

@ -16,6 +16,7 @@ package push
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/rpccache"
"github.com/OpenIMSDK/protocol/constant"
pbpush "github.com/OpenIMSDK/protocol/push"
@ -25,7 +26,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"google.golang.org/grpc"
)
@ -51,8 +51,8 @@ func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryReg
client,
offlinePusher,
database,
localcache.NewGroupLocalCache(&groupRpcClient),
localcache.NewConversationLocalCache(&conversationRpcClient),
rpccache.NewGroupLocalCache(groupRpcClient, rdb),
rpccache.NewConversationLocalCache(conversationRpcClient, rdb),
&conversationRpcClient,
&groupRpcClient,
&msgRpcClient,

@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"errors"
"github.com/openimsdk/open-im-server/v3/pkg/rpccache"
"sync"
"github.com/OpenIMSDK/protocol/constant"
@ -36,7 +37,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
@ -49,8 +49,8 @@ type Pusher struct {
database controller.PushDatabase
discov discoveryregistry.SvcDiscoveryRegistry
offlinePusher offlinepush.OfflinePusher
groupLocalCache *localcache.GroupLocalCache
conversationLocalCache *localcache.ConversationLocalCache
groupLocalCache *rpccache.GroupLocalCache
conversationLocalCache *rpccache.ConversationLocalCache
msgRpcClient *rpcclient.MessageRpcClient
conversationRpcClient *rpcclient.ConversationRpcClient
groupRpcClient *rpcclient.GroupRpcClient
@ -59,7 +59,7 @@ type Pusher struct {
var errNoOfflinePusher = errors.New("no offlinePusher is configured")
func NewPusher(config *config.GlobalConfig, discov discoveryregistry.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase,
groupLocalCache *localcache.GroupLocalCache, conversationLocalCache *localcache.ConversationLocalCache,
groupLocalCache *rpccache.GroupLocalCache, conversationLocalCache *rpccache.ConversationLocalCache,
conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient, msgRpcClient *rpcclient.MessageRpcClient,
) *Pusher {
return &Pusher{

@ -48,14 +48,6 @@ type conversationServer struct {
config *config.GlobalConfig
}
func (c *conversationServer) GetConversationNotReceiveMessageUserIDs(
ctx context.Context,
req *pbconversation.GetConversationNotReceiveMessageUserIDsReq,
) (*pbconversation.GetConversationNotReceiveMessageUserIDsResp, error) {
//TODO implement me
panic("implement me")
}
func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis(config)
if err != nil {
@ -96,11 +88,11 @@ func (c *conversationServer) GetConversation(ctx context.Context, req *pbconvers
return resp, nil
}
func (m *conversationServer) GetSortedConversationList(ctx context.Context, req *pbconversation.GetSortedConversationListReq) (resp *pbconversation.GetSortedConversationListResp, err error) {
func (c *conversationServer) GetSortedConversationList(ctx context.Context, req *pbconversation.GetSortedConversationListReq) (resp *pbconversation.GetSortedConversationListResp, err error) {
log.ZDebug(ctx, "GetSortedConversationList", "seqs", req, "userID", req.UserID)
var conversationIDs []string
if len(req.ConversationIDs) == 0 {
conversationIDs, err = m.conversationDatabase.GetConversationIDs(ctx, req.UserID)
conversationIDs, err = c.conversationDatabase.GetConversationIDs(ctx, req.UserID)
if err != nil {
return nil, err
}
@ -108,7 +100,7 @@ func (m *conversationServer) GetSortedConversationList(ctx context.Context, req
conversationIDs = req.ConversationIDs
}
conversations, err := m.conversationDatabase.FindConversations(ctx, req.UserID, conversationIDs)
conversations, err := c.conversationDatabase.FindConversations(ctx, req.UserID, conversationIDs)
if err != nil {
return nil, err
}
@ -116,22 +108,22 @@ func (m *conversationServer) GetSortedConversationList(ctx context.Context, req
return nil, errs.ErrRecordNotFound.Wrap()
}
maxSeqs, err := m.msgRpcClient.GetMaxSeqs(ctx, conversationIDs)
maxSeqs, err := c.msgRpcClient.GetMaxSeqs(ctx, conversationIDs)
if err != nil {
return nil, err
}
chatLogs, err := m.msgRpcClient.GetMsgByConversationIDs(ctx, conversationIDs, maxSeqs)
chatLogs, err := c.msgRpcClient.GetMsgByConversationIDs(ctx, conversationIDs, maxSeqs)
if err != nil {
return nil, err
}
conversationMsg, err := m.getConversationInfo(ctx, chatLogs, req.UserID)
conversationMsg, err := c.getConversationInfo(ctx, chatLogs, req.UserID)
if err != nil {
return nil, err
}
hasReadSeqs, err := m.msgRpcClient.GetHasReadSeqs(ctx, req.UserID, conversationIDs)
hasReadSeqs, err := c.msgRpcClient.GetHasReadSeqs(ctx, req.UserID, conversationIDs)
if err != nil {
return nil, err
}
@ -163,8 +155,8 @@ func (m *conversationServer) GetSortedConversationList(ctx context.Context, req
UnreadTotal: unreadTotal,
}
m.conversationSort(conversation_isPinTime, resp, conversation_unreadCount, conversationMsg)
m.conversationSort(conversation_notPinTime, resp, conversation_unreadCount, conversationMsg)
c.conversationSort(conversation_isPinTime, resp, conversation_unreadCount, conversationMsg)
c.conversationSort(conversation_notPinTime, resp, conversation_unreadCount, conversationMsg)
resp.ConversationElems = utils.Paginate(resp.ConversationElems, int(req.Pagination.GetPageNumber()), int(req.Pagination.GetShowNumber()))
return resp, nil
@ -535,3 +527,11 @@ func (c *conversationServer) getConversationInfo(
}
return conversationMsg, nil
}
func (c *conversationServer) GetConversationNotReceiveMessageUserIDs(ctx context.Context, req *pbconversation.GetConversationNotReceiveMessageUserIDsReq) (*pbconversation.GetConversationNotReceiveMessageUserIDsResp, error) {
userIDs, err := c.conversationDatabase.GetConversationNotReceiveMessageUserIDs(ctx, req.ConversationID)
if err != nil {
return nil, err
}
return &pbconversation.GetConversationNotReceiveMessageUserIDsResp{UserIDs: userIDs}, nil
}

@ -16,26 +16,31 @@ package friend
import (
"context"
"github.com/OpenIMSDK/tools/tx"
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/OpenIMSDK/tools/log"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"google.golang.org/grpc"
"github.com/OpenIMSDK/protocol/constant"
pbfriend "github.com/OpenIMSDK/protocol/friend"
"github.com/OpenIMSDK/protocol/sdkws"
registry "github.com/OpenIMSDK/tools/discoveryregistry"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/tx"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo"
tablerelation "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification"
"google.golang.org/grpc"
)
type friendServer struct {

@ -41,7 +41,7 @@ func (m *msgServer) GetConversationsHasReadAndMaxSeq(ctx context.Context, req *m
if err != nil {
return nil, err
}
conversations, err := m.Conversation.GetConversations(ctx, req.UserID, conversationIDs)
conversations, err := m.ConversationLocalCache.GetConversations(ctx, req.UserID, conversationIDs)
if err != nil {
return nil, err
}
@ -104,7 +104,7 @@ func (m *msgServer) MarkMsgsAsRead(
if hasReadSeq > maxSeq {
return nil, errs.ErrArgs.Wrap("hasReadSeq must not be bigger than maxSeq")
}
conversation, err := m.Conversation.GetConversation(ctx, req.UserID, req.ConversationID)
conversation, err := m.ConversationLocalCache.GetConversation(ctx, req.UserID, req.ConversationID)
if err != nil {
return
}
@ -144,7 +144,7 @@ func (m *msgServer) MarkConversationAsRead(
ctx context.Context,
req *msg.MarkConversationAsReadReq,
) (resp *msg.MarkConversationAsReadResp, err error) {
conversation, err := m.Conversation.GetConversation(ctx, req.UserID, req.ConversationID)
conversation, err := m.ConversationLocalCache.GetConversation(ctx, req.UserID, req.ConversationID)
if err != nil {
return nil, err
}

@ -44,7 +44,7 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.
if err := authverify.CheckAccessV3(ctx, req.UserID, m.config); err != nil {
return nil, err
}
user, err := m.User.GetUserInfo(ctx, req.UserID)
user, err := m.UserLocalCache.GetUserInfo(ctx, req.UserID)
if err != nil {
return nil, err
}
@ -70,12 +70,7 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.
}
role = user.AppMangerLevel
case constant.SuperGroupChatType:
members, err := m.Group.GetGroupMemberInfoMap(
ctx,
msgs[0].GroupID,
utils.Distinct([]string{req.UserID, msgs[0].SendID}),
true,
)
members, err := m.GroupLocalCache.GetGroupMemberInfoMap(ctx, msgs[0].GroupID, utils.Distinct([]string{req.UserID, msgs[0].SendID}))
if err != nil {
return nil, err
}

@ -97,7 +97,7 @@ func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgDa
}
tagAll := utils.IsContain(constant.AtAllString, msg.AtUserIDList)
if tagAll {
memberUserIDList, err := m.Group.GetGroupMemberIDs(ctx, msg.GroupID)
memberUserIDList, err := m.GroupLocalCache.GetGroupMemberIDs(ctx, msg.GroupID)
if err != nil {
log.ZWarn(ctx, "GetGroupMemberIDs", err)
return
@ -143,6 +143,7 @@ func (m *msgServer) sendMsgNotification(
}
func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq) (resp *pbmsg.SendMsgResp, err error) {
log.ZDebug(ctx, "sendMsgSingleChat return")
if err := m.messageVerification(ctx, req); err != nil {
return nil, err
}

@ -15,6 +15,8 @@
package msg
import (
"github.com/openimsdk/open-im-server/v3/pkg/rpccache"
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/conversation"
"github.com/OpenIMSDK/protocol/msg"
@ -22,7 +24,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"google.golang.org/grpc"
@ -33,12 +34,11 @@ type (
msgServer struct {
RegisterCenter discoveryregistry.SvcDiscoveryRegistry
MsgDatabase controller.CommonMsgDatabase
Group *rpcclient.GroupRpcClient
User *rpcclient.UserRpcClient
Conversation *rpcclient.ConversationRpcClient
friend *rpcclient.FriendRpcClient
GroupLocalCache *localcache.GroupLocalCache
ConversationLocalCache *localcache.ConversationLocalCache
UserLocalCache *rpccache.UserLocalCache
FriendLocalCache *rpccache.FriendLocalCache
GroupLocalCache *rpccache.GroupLocalCache
ConversationLocalCache *rpccache.ConversationLocalCache
Handlers MessageInterceptorChain
notificationSender *rpcclient.NotificationSender
config *config.GlobalConfig
@ -84,13 +84,12 @@ func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryReg
}
s := &msgServer{
Conversation: &conversationClient,
User: &userRpcClient,
Group: &groupRpcClient,
MsgDatabase: msgDatabase,
RegisterCenter: client,
GroupLocalCache: localcache.NewGroupLocalCache(&groupRpcClient),
ConversationLocalCache: localcache.NewConversationLocalCache(&conversationClient),
friend: &friendRpcClient,
UserLocalCache: rpccache.NewUserLocalCache(userRpcClient, rdb),
GroupLocalCache: rpccache.NewGroupLocalCache(groupRpcClient, rdb),
ConversationLocalCache: rpccache.NewConversationLocalCache(conversationClient, rdb),
FriendLocalCache: rpccache.NewFriendLocalCache(friendRpcClient, rdb),
config: config,
}
s.notificationSender = rpcclient.NewNotificationSender(config, rpcclient.WithLocalSendMsg(s.SendMsg))

@ -40,7 +40,7 @@ func (m *msgServer) GetActiveUser(ctx context.Context, req *msg.GetActiveUserReq
var pbUsers []*msg.ActiveUser
if len(users) > 0 {
userIDs := utils.Slice(users, func(e *unrelation.UserCount) string { return e.UserID })
userMap, err := m.User.GetUsersInfoMap(ctx, userIDs)
userMap, err := m.UserLocalCache.GetUsersInfoMap(ctx, userIDs)
if err != nil {
return nil, err
}
@ -82,7 +82,7 @@ func (m *msgServer) GetActiveGroup(ctx context.Context, req *msg.GetActiveGroupR
var pbgroups []*msg.ActiveGroup
if len(groups) > 0 {
groupIDs := utils.Slice(groups, func(e *unrelation.GroupCount) string { return e.GroupID })
resp, err := m.Group.GetGroupInfos(ctx, groupIDs, false)
resp, err := m.GroupLocalCache.GetGroupInfos(ctx, groupIDs)
if err != nil {
return nil, err
}

@ -35,7 +35,7 @@ func (m *msgServer) PullMessageBySeqs(
resp.NotificationMsgs = make(map[string]*sdkws.PullMsgs)
for _, seq := range req.SeqRanges {
if !msgprocessor.IsNotification(seq.ConversationID) {
conversation, err := m.Conversation.GetConversation(ctx, req.UserID, seq.ConversationID)
conversation, err := m.ConversationLocalCache.GetConversation(ctx, req.UserID, seq.ConversationID)
if err != nil {
log.ZError(ctx, "GetConversation error", err, "conversationID", seq.ConversationID)
continue
@ -138,7 +138,7 @@ func (m *msgServer) SearchMessage(ctx context.Context, req *msg.SearchMessageReq
}
}
if len(sendIDs) != 0 {
sendInfos, err := m.User.GetUsersInfo(ctx, sendIDs)
sendInfos, err := m.UserLocalCache.GetUsersInfo(ctx, sendIDs)
if err != nil {
return nil, err
}
@ -147,7 +147,7 @@ func (m *msgServer) SearchMessage(ctx context.Context, req *msg.SearchMessageReq
}
}
if len(recvIDs) != 0 {
recvInfos, err := m.User.GetUsersInfo(ctx, recvIDs)
recvInfos, err := m.UserLocalCache.GetUsersInfo(ctx, recvIDs)
if err != nil {
return nil, err
}
@ -156,7 +156,7 @@ func (m *msgServer) SearchMessage(ctx context.Context, req *msg.SearchMessageReq
}
}
if len(groupIDs) != 0 {
groupInfos, err := m.Group.GetGroupInfos(ctx, groupIDs, true)
groupInfos, err := m.GroupLocalCache.GetGroupInfos(ctx, groupIDs)
if err != nil {
return nil, err
}

@ -16,6 +16,7 @@ package msg
import (
"context"
"github.com/OpenIMSDK/tools/log"
"math/rand"
"strconv"
"time"
@ -59,15 +60,15 @@ func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgRe
data.MsgData.ContentType >= constant.NotificationBegin {
return nil
}
black, err := m.friend.IsBlocked(ctx, data.MsgData.SendID, data.MsgData.RecvID)
black, err := m.FriendLocalCache.IsBlack(ctx, data.MsgData.SendID, data.MsgData.RecvID)
if err != nil {
return err
}
if black {
return errs.ErrBlockedByPeer.Wrap()
}
if *m.config.MessageVerify.FriendVerify {
friend, err := m.friend.IsFriend(ctx, data.MsgData.SendID, data.MsgData.RecvID)
if m.config.MessageVerify.FriendVerify != nil && *m.config.MessageVerify.FriendVerify {
friend, err := m.FriendLocalCache.IsFriend(ctx, data.MsgData.SendID, data.MsgData.RecvID)
if err != nil {
return err
}
@ -78,7 +79,7 @@ func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgRe
}
return nil
case constant.SuperGroupChatType:
groupInfo, err := m.Group.GetGroupInfoCache(ctx, data.MsgData.GroupID)
groupInfo, err := m.GroupLocalCache.GetGroupInfo(ctx, data.MsgData.GroupID)
if err != nil {
return err
}
@ -99,17 +100,17 @@ func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgRe
data.MsgData.ContentType >= constant.NotificationBegin {
return nil
}
// memberIDs, err := m.GroupLocalCache.GetGroupMemberIDs(ctx, data.MsgData.GroupID)
// if err != nil {
// return err
// }
// if !utils.IsContain(data.MsgData.SendID, memberIDs) {
// return errs.ErrNotInGroupYet.Wrap()
// }
memberIDs, err := m.GroupLocalCache.GetGroupMemberIDMap(ctx, data.MsgData.GroupID)
if err != nil {
return err
}
if _, ok := memberIDs[data.MsgData.SendID]; !ok {
return errs.ErrNotInGroupYet.Wrap()
}
groupMemberInfo, err := m.Group.GetGroupMemberCache(ctx, data.MsgData.GroupID, data.MsgData.SendID)
groupMemberInfo, err := m.GroupLocalCache.GetGroupMember(ctx, data.MsgData.GroupID, data.MsgData.SendID)
if err != nil {
if err == errs.ErrRecordNotFound {
if errs.ErrRecordNotFound.Is(err) {
return errs.ErrNotInGroupYet.Wrap(err.Error())
}
return err
@ -157,6 +158,9 @@ func (m *msgServer) encapsulateMsgData(msg *sdkws.MsgData) {
case constant.Custom:
fallthrough
case constant.Quote:
utils.SetSwitchFromOptions(msg.Options, constant.IsConversationUpdate, true)
utils.SetSwitchFromOptions(msg.Options, constant.IsUnreadCount, true)
utils.SetSwitchFromOptions(msg.Options, constant.IsSenderSync, true)
case constant.Revoke:
utils.SetSwitchFromOptions(msg.Options, constant.IsUnreadCount, false)
utils.SetSwitchFromOptions(msg.Options, constant.IsOfflinePush, false)
@ -187,7 +191,8 @@ func (m *msgServer) modifyMessageByUserMessageReceiveOpt(
sessionType int,
pb *msg.SendMsgReq,
) (bool, error) {
opt, err := m.User.GetUserGlobalMsgRecvOpt(ctx, userID)
defer log.ZDebug(ctx, "modifyMessageByUserMessageReceiveOpt return")
opt, err := m.UserLocalCache.GetUserGlobalMsgRecvOpt(ctx, userID)
if err != nil {
return false, err
}
@ -203,7 +208,7 @@ func (m *msgServer) modifyMessageByUserMessageReceiveOpt(
return true, nil
}
// conversationID := utils.GetConversationIDBySessionType(conversationID, sessionType)
singleOpt, err := m.Conversation.GetSingleConversationRecvMsgOpt(ctx, userID, conversationID)
singleOpt, err := m.ConversationLocalCache.GetSingleConversationRecvMsgOpt(ctx, userID, conversationID)
if errs.ErrRecordNotFound.Is(err) {
return true, nil
} else if err != nil {

@ -0,0 +1,15 @@
package cachekey
const (
BlackIDsKey = "BLACK_IDS:"
IsBlackKey = "IS_BLACK:" // local cache
)
func GetBlackIDsKey(ownerUserID string) string {
return BlackIDsKey + ownerUserID
}
func GetIsBlackIDsKey(possibleBlackUserID, userID string) string {
return IsBlackKey + userID + "-" + possibleBlackUserID
}

@ -0,0 +1,44 @@
package cachekey
const (
ConversationKey = "CONVERSATION:"
ConversationIDsKey = "CONVERSATION_IDS:"
ConversationIDsHashKey = "CONVERSATION_IDS_HASH:"
ConversationHasReadSeqKey = "CONVERSATION_HAS_READ_SEQ:"
RecvMsgOptKey = "RECV_MSG_OPT:"
SuperGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:"
SuperGroupRecvMsgNotNotifyUserIDsHashKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS_HASH:"
ConversationNotReceiveMessageUserIDsKey = "CONVERSATION_NOT_RECEIVE_MESSAGE_USER_IDS:"
)
func GetConversationKey(ownerUserID, conversationID string) string {
return ConversationKey + ownerUserID + ":" + conversationID
}
func GetConversationIDsKey(ownerUserID string) string {
return ConversationIDsKey + ownerUserID
}
func GetSuperGroupRecvNotNotifyUserIDsKey(groupID string) string {
return SuperGroupRecvMsgNotNotifyUserIDsKey + groupID
}
func GetRecvMsgOptKey(ownerUserID, conversationID string) string {
return RecvMsgOptKey + ownerUserID + ":" + conversationID
}
func GetSuperGroupRecvNotNotifyUserIDsHashKey(groupID string) string {
return SuperGroupRecvMsgNotNotifyUserIDsHashKey + groupID
}
func GetConversationHasReadSeqKey(ownerUserID, conversationID string) string {
return ConversationHasReadSeqKey + ownerUserID + ":" + conversationID
}
func GetConversationNotReceiveMessageUserIDsKey(conversationID string) string {
return ConversationNotReceiveMessageUserIDsKey + conversationID
}
func GetUserConversationIDsHashKey(ownerUserID string) string {
return ConversationIDsHashKey + ownerUserID
}

@ -0,0 +1,24 @@
package cachekey
const (
FriendIDsKey = "FRIEND_IDS:"
TwoWayFriendsIDsKey = "COMMON_FRIENDS_IDS:"
FriendKey = "FRIEND_INFO:"
IsFriendKey = "IS_FRIEND:" // local cache key
)
func GetFriendIDsKey(ownerUserID string) string {
return FriendIDsKey + ownerUserID
}
func GetTwoWayFriendsIDsKey(ownerUserID string) string {
return TwoWayFriendsIDsKey + ownerUserID
}
func GetFriendKey(ownerUserID, friendUserID string) string {
return FriendKey + ownerUserID + "-" + friendUserID
}
func GetIsFriendKey(possibleFriendUserID, userID string) string {
return IsFriendKey + possibleFriendUserID + "-" + userID
}

@ -0,0 +1,45 @@
package cachekey
import (
"strconv"
"time"
)
const (
groupExpireTime = time.Second * 60 * 60 * 12
GroupInfoKey = "GROUP_INFO:"
GroupMemberIDsKey = "GROUP_MEMBER_IDS:"
GroupMembersHashKey = "GROUP_MEMBERS_HASH2:"
GroupMemberInfoKey = "GROUP_MEMBER_INFO:"
JoinedGroupsKey = "JOIN_GROUPS_KEY:"
GroupMemberNumKey = "GROUP_MEMBER_NUM_CACHE:"
GroupRoleLevelMemberIDsKey = "GROUP_ROLE_LEVEL_MEMBER_IDS:"
)
func GetGroupInfoKey(groupID string) string {
return GroupInfoKey + groupID
}
func GetJoinedGroupsKey(userID string) string {
return JoinedGroupsKey + userID
}
func GetGroupMembersHashKey(groupID string) string {
return GroupMembersHashKey + groupID
}
func GetGroupMemberIDsKey(groupID string) string {
return GroupMemberIDsKey + groupID
}
func GetGroupMemberInfoKey(groupID, userID string) string {
return GroupMemberInfoKey + groupID + "-" + userID
}
func GetGroupMemberNumKey(groupID string) string {
return GroupMemberNumKey + groupID
}
func GetGroupRoleLevelMemberIDsKey(groupID string, roleLevel int32) string {
return GroupRoleLevelMemberIDsKey + groupID + "-" + strconv.Itoa(int(roleLevel))
}

@ -0,0 +1,14 @@
package cachekey
const (
UserInfoKey = "USER_INFO:"
UserGlobalRecvMsgOptKey = "USER_GLOBAL_RECV_MSG_OPT_KEY:"
)
func GetUserInfoKey(userID string) string {
return UserInfoKey + userID
}
func GetUserGlobalRecvMsgOptKey(userID string) string {
return UserGlobalRecvMsgOptKey + userID
}

@ -16,6 +16,7 @@ package config
import (
"bytes"
"time"
"github.com/OpenIMSDK/tools/discoveryregistry"
"gopkg.in/yaml.v3"
@ -266,6 +267,8 @@ type GlobalConfig struct {
FriendVerify *bool `yaml:"friendVerify"`
} `yaml:"messageVerify"`
LocalCache localCache `yaml:"localCache"`
IOSPush struct {
PushSound string `yaml:"pushSound"`
BadgeCount bool `yaml:"badgeCount"`
@ -382,6 +385,33 @@ type notification struct {
ConversationSetPrivate NotificationConf `yaml:"conversationSetPrivate"`
}
type LocalCache struct {
Topic string `yaml:"topic"`
SlotNum int `yaml:"slotNum"`
SlotSize int `yaml:"slotSize"`
SuccessExpire int `yaml:"successExpire"` // second
FailedExpire int `yaml:"failedExpire"` // second
}
func (l LocalCache) Failed() time.Duration {
return time.Second * time.Duration(l.FailedExpire)
}
func (l LocalCache) Success() time.Duration {
return time.Second * time.Duration(l.SuccessExpire)
}
func (l LocalCache) Enable() bool {
return l.Topic != "" && l.SlotNum > 0 && l.SlotSize > 0
}
type localCache struct {
User LocalCache `yaml:"user"`
Group LocalCache `yaml:"group"`
Friend LocalCache `yaml:"friend"`
Conversation LocalCache `yaml:"conversation"`
}
func (c *GlobalConfig) GetServiceNames() []string {
return []string{
c.RpcRegisterName.OpenImUserName,

@ -16,6 +16,8 @@ package config
import (
_ "embed"
"fmt"
"gopkg.in/yaml.v3"
"reflect"
"testing"
@ -116,3 +118,13 @@ func TestInitConfig(t *testing.T) {
})
}
}
func TestName(t *testing.T) {
Config.LocalCache.Friend.Topic = "friend"
Config.LocalCache.Friend.SlotNum = 500
Config.LocalCache.Friend.SlotSize = 20000
data, _ := yaml.Marshal(&Config)
fmt.Println(string(data))
}

@ -16,6 +16,9 @@ package cache
import (
"context"
"github.com/OpenIMSDK/tools/log"
"github.com/openimsdk/open-im-server/v3/pkg/common/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"time"
"github.com/dtm-labs/rockscache"
@ -47,11 +50,15 @@ type BlackCacheRedis struct {
func NewBlackCacheRedis(rdb redis.UniversalClient, blackDB relationtb.BlackModelInterface, options rockscache.Options) BlackCache {
rcClient := rockscache.NewClient(rdb, options)
mc := NewMetaCacheRedis(rcClient)
b := config.Config.LocalCache.Friend
log.ZDebug(context.Background(), "black local cache init", "Topic", b.Topic, "SlotNum", b.SlotNum, "SlotSize", b.SlotSize, "enable", b.Enable())
mc.SetTopic(b.Topic)
mc.SetRawRedisClient(rdb)
return &BlackCacheRedis{
expireTime: blackExpireTime,
rcClient: rcClient,
metaCache: NewMetaCacheRedis(rcClient),
metaCache: mc,
blackDB: blackDB,
}
}
@ -61,12 +68,12 @@ func (b *BlackCacheRedis) NewCache() BlackCache {
expireTime: b.expireTime,
rcClient: b.rcClient,
blackDB: b.blackDB,
metaCache: NewMetaCacheRedis(b.rcClient, b.metaCache.GetPreDelKeys()...),
metaCache: b.Copy(),
}
}
func (b *BlackCacheRedis) getBlackIDsKey(ownerUserID string) string {
return blackIDsKey + ownerUserID
return cachekey.GetBlackIDsKey(ownerUserID)
}
func (b *BlackCacheRedis) GetBlackIDs(ctx context.Context, userID string) (blackIDs []string, err error) {

@ -0,0 +1,66 @@
package cache
import (
"github.com/openimsdk/open-im-server/v3/pkg/common/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"strings"
"sync"
)
var (
once sync.Once
subscribe map[string][]string
)
func getPublishKey(topic string, key []string) []string {
if topic == "" || len(key) == 0 {
return nil
}
once.Do(func() {
list := []struct {
Local config.LocalCache
Keys []string
}{
{
Local: config.Config.LocalCache.User,
Keys: []string{cachekey.UserInfoKey, cachekey.UserGlobalRecvMsgOptKey},
},
{
Local: config.Config.LocalCache.Group,
Keys: []string{cachekey.GroupMemberIDsKey, cachekey.GroupInfoKey, cachekey.GroupMemberInfoKey},
},
{
Local: config.Config.LocalCache.Friend,
Keys: []string{cachekey.FriendIDsKey, cachekey.BlackIDsKey},
},
{
Local: config.Config.LocalCache.Conversation,
Keys: []string{cachekey.ConversationKey, cachekey.ConversationIDsKey, cachekey.ConversationNotReceiveMessageUserIDsKey},
},
}
subscribe = make(map[string][]string)
for _, v := range list {
if v.Local.Enable() {
subscribe[v.Local.Topic] = v.Keys
}
}
})
prefix, ok := subscribe[topic]
if !ok {
return nil
}
res := make([]string, 0, len(key))
for _, k := range key {
var exist bool
for _, p := range prefix {
if strings.HasPrefix(k, p) {
exist = true
break
}
}
if exist {
res = append(res, k)
}
}
return res
}

@ -16,6 +16,9 @@ package cache
import (
"context"
"github.com/OpenIMSDK/tools/log"
"github.com/openimsdk/open-im-server/v3/pkg/common/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"math/big"
"strings"
"time"
@ -27,14 +30,14 @@ import (
)
const (
conversationKey = "CONVERSATION:"
conversationIDsKey = "CONVERSATION_IDS:"
conversationIDsHashKey = "CONVERSATION_IDS_HASH:"
conversationHasReadSeqKey = "CONVERSATION_HAS_READ_SEQ:"
recvMsgOptKey = "RECV_MSG_OPT:"
superGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:"
superGroupRecvMsgNotNotifyUserIDsHashKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS_HASH:"
conversationNotReceiveMessageUserIDsKey = "CONVERSATION_NOT_RECEIVE_MESSAGE_USER_IDS:"
//conversationKey = "CONVERSATION:"
//conversationIDsKey = "CONVERSATION_IDS:"
//conversationIDsHashKey = "CONVERSATION_IDS_HASH:"
//conversationHasReadSeqKey = "CONVERSATION_HAS_READ_SEQ:"
//recvMsgOptKey = "RECV_MSG_OPT:"
//superGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:"
//superGroupRecvMsgNotNotifyUserIDsHashKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS_HASH:"
//conversationNotReceiveMessageUserIDsKey = "CONVERSATION_NOT_RECEIVE_MESSAGE_USER_IDS:"
conversationExpireTime = time.Second * 60 * 60 * 12
)
@ -81,10 +84,14 @@ type ConversationCache interface {
func NewConversationRedis(rdb redis.UniversalClient, opts rockscache.Options, db relationtb.ConversationModelInterface) ConversationCache {
rcClient := rockscache.NewClient(rdb, opts)
mc := NewMetaCacheRedis(rcClient)
c := config.Config.LocalCache.Conversation
log.ZDebug(context.Background(), "black local cache init", "Topic", c.Topic, "SlotNum", c.SlotNum, "SlotSize", c.SlotSize, "enable", c.Enable())
mc.SetTopic(c.Topic)
mc.SetRawRedisClient(rdb)
return &ConversationRedisCache{
rcClient: rcClient,
metaCache: NewMetaCacheRedis(rcClient),
metaCache: mc,
conversationDB: db,
expireTime: conversationExpireTime,
}
@ -115,38 +122,42 @@ type ConversationRedisCache struct {
func (c *ConversationRedisCache) NewCache() ConversationCache {
return &ConversationRedisCache{
rcClient: c.rcClient,
metaCache: NewMetaCacheRedis(c.rcClient, c.metaCache.GetPreDelKeys()...),
metaCache: c.Copy(),
conversationDB: c.conversationDB,
expireTime: c.expireTime,
}
}
func (c *ConversationRedisCache) getConversationKey(ownerUserID, conversationID string) string {
return conversationKey + ownerUserID + ":" + conversationID
return cachekey.GetConversationKey(ownerUserID, conversationID)
}
func (c *ConversationRedisCache) getConversationIDsKey(ownerUserID string) string {
return conversationIDsKey + ownerUserID
return cachekey.GetConversationIDsKey(ownerUserID)
}
func (c *ConversationRedisCache) getSuperGroupRecvNotNotifyUserIDsKey(groupID string) string {
return superGroupRecvMsgNotNotifyUserIDsKey + groupID
return cachekey.GetSuperGroupRecvNotNotifyUserIDsKey(groupID)
}
func (c *ConversationRedisCache) getRecvMsgOptKey(ownerUserID, conversationID string) string {
return recvMsgOptKey + ownerUserID + ":" + conversationID
return cachekey.GetRecvMsgOptKey(ownerUserID, conversationID)
}
func (c *ConversationRedisCache) getSuperGroupRecvNotNotifyUserIDsHashKey(groupID string) string {
return superGroupRecvMsgNotNotifyUserIDsHashKey + groupID
return cachekey.GetSuperGroupRecvNotNotifyUserIDsHashKey(groupID)
}
func (c *ConversationRedisCache) getConversationHasReadSeqKey(ownerUserID, conversationID string) string {
return conversationHasReadSeqKey + ownerUserID + ":" + conversationID
return cachekey.GetConversationHasReadSeqKey(ownerUserID, conversationID)
}
func (c *ConversationRedisCache) getConversationNotReceiveMessageUserIDsKey(conversationID string) string {
return conversationNotReceiveMessageUserIDsKey + conversationID
return cachekey.GetConversationNotReceiveMessageUserIDsKey(conversationID)
}
func (c *ConversationRedisCache) getUserConversationIDsHashKey(ownerUserID string) string {
return cachekey.GetUserConversationIDsHashKey(ownerUserID)
}
func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) {
@ -166,10 +177,6 @@ func (c *ConversationRedisCache) DelConversationIDs(userIDs ...string) Conversat
return cache
}
func (c *ConversationRedisCache) getUserConversationIDsHashKey(ownerUserID string) string {
return conversationIDsHashKey + ownerUserID
}
func (c *ConversationRedisCache) GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error) {
return getCache(
ctx,

@ -16,6 +16,9 @@ package cache
import (
"context"
"github.com/OpenIMSDK/tools/log"
"github.com/openimsdk/open-im-server/v3/pkg/common/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"time"
"github.com/OpenIMSDK/tools/utils"
@ -25,10 +28,10 @@ import (
)
const (
friendExpireTime = time.Second * 60 * 60 * 12
friendIDsKey = "FRIEND_IDS:"
TwoWayFriendsIDsKey = "COMMON_FRIENDS_IDS:"
friendKey = "FRIEND_INFO:"
friendExpireTime = time.Second * 60 * 60 * 12
//friendIDsKey = "FRIEND_IDS:"
//TwoWayFriendsIDsKey = "COMMON_FRIENDS_IDS:"
//friendKey = "FRIEND_INFO:"
)
// FriendCache is an interface for caching friend-related data.
@ -58,8 +61,13 @@ type FriendCacheRedis struct {
func NewFriendCacheRedis(rdb redis.UniversalClient, friendDB relationtb.FriendModelInterface,
options rockscache.Options) FriendCache {
rcClient := rockscache.NewClient(rdb, options)
mc := NewMetaCacheRedis(rcClient)
f := config.Config.LocalCache.Friend
log.ZDebug(context.Background(), "friend local cache init", "Topic", f.Topic, "SlotNum", f.SlotNum, "SlotSize", f.SlotSize, "enable", f.Enable())
mc.SetTopic(f.Topic)
mc.SetRawRedisClient(rdb)
return &FriendCacheRedis{
metaCache: NewMetaCacheRedis(rcClient),
metaCache: mc,
friendDB: friendDB,
expireTime: friendExpireTime,
rcClient: rcClient,
@ -70,7 +78,7 @@ func NewFriendCacheRedis(rdb redis.UniversalClient, friendDB relationtb.FriendMo
func (f *FriendCacheRedis) NewCache() FriendCache {
return &FriendCacheRedis{
rcClient: f.rcClient,
metaCache: NewMetaCacheRedis(f.rcClient, f.metaCache.GetPreDelKeys()...),
metaCache: f.Copy(),
friendDB: f.friendDB,
expireTime: f.expireTime,
}
@ -78,17 +86,17 @@ func (f *FriendCacheRedis) NewCache() FriendCache {
// getFriendIDsKey returns the key for storing friend IDs in the cache.
func (f *FriendCacheRedis) getFriendIDsKey(ownerUserID string) string {
return friendIDsKey + ownerUserID
return cachekey.GetFriendIDsKey(ownerUserID)
}
// getTwoWayFriendsIDsKey returns the key for storing two-way friend IDs in the cache.
func (f *FriendCacheRedis) getTwoWayFriendsIDsKey(ownerUserID string) string {
return TwoWayFriendsIDsKey + ownerUserID
return cachekey.GetTwoWayFriendsIDsKey(ownerUserID)
}
// getFriendKey returns the key for storing friend info in the cache.
func (f *FriendCacheRedis) getFriendKey(ownerUserID, friendUserID string) string {
return friendKey + ownerUserID + "-" + friendUserID
return cachekey.GetFriendKey(ownerUserID, friendUserID)
}
// GetFriendIDs retrieves friend IDs from the cache or the database if not found.

@ -17,7 +17,8 @@ package cache
import (
"context"
"fmt"
"strconv"
"github.com/openimsdk/open-im-server/v3/pkg/common/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"time"
"github.com/OpenIMSDK/protocol/constant"
@ -30,15 +31,14 @@ import (
)
const (
groupExpireTime = time.Second * 60 * 60 * 12
groupInfoKey = "GROUP_INFO:"
groupMemberIDsKey = "GROUP_MEMBER_IDS:"
groupMembersHashKey = "GROUP_MEMBERS_HASH2:"
groupMemberInfoKey = "GROUP_MEMBER_INFO:"
//groupOwnerInfoKey = "GROUP_OWNER_INFO:".
joinedGroupsKey = "JOIN_GROUPS_KEY:"
groupMemberNumKey = "GROUP_MEMBER_NUM_CACHE:"
groupRoleLevelMemberIDsKey = "GROUP_ROLE_LEVEL_MEMBER_IDS:"
groupExpireTime = time.Second * 60 * 60 * 12
//groupInfoKey = "GROUP_INFO:"
//groupMemberIDsKey = "GROUP_MEMBER_IDS:"
//groupMembersHashKey = "GROUP_MEMBERS_HASH2:"
//groupMemberInfoKey = "GROUP_MEMBER_INFO:"
//joinedGroupsKey = "JOIN_GROUPS_KEY:"
//groupMemberNumKey = "GROUP_MEMBER_NUM_CACHE:"
//groupRoleLevelMemberIDsKey = "GROUP_ROLE_LEVEL_MEMBER_IDS:"
)
type GroupHash interface {
@ -101,12 +101,16 @@ func NewGroupCacheRedis(
opts rockscache.Options,
) GroupCache {
rcClient := rockscache.NewClient(rdb, opts)
mc := NewMetaCacheRedis(rcClient)
g := config.Config.LocalCache.Group
mc.SetTopic(g.Topic)
log.ZDebug(context.Background(), "group local cache init", "Topic", g.Topic, "SlotNum", g.SlotNum, "SlotSize", g.SlotSize, "enable", g.Enable())
mc.SetRawRedisClient(rdb)
return &GroupCacheRedis{
rcClient: rcClient, expireTime: groupExpireTime,
groupDB: groupDB, groupMemberDB: groupMemberDB, groupRequestDB: groupRequestDB,
groupHash: hashCode,
metaCache: NewMetaCacheRedis(rcClient),
metaCache: mc,
}
}
@ -117,36 +121,36 @@ func (g *GroupCacheRedis) NewCache() GroupCache {
groupDB: g.groupDB,
groupMemberDB: g.groupMemberDB,
groupRequestDB: g.groupRequestDB,
metaCache: NewMetaCacheRedis(g.rcClient, g.metaCache.GetPreDelKeys()...),
metaCache: g.Copy(),
}
}
func (g *GroupCacheRedis) getGroupInfoKey(groupID string) string {
return groupInfoKey + groupID
return cachekey.GetGroupInfoKey(groupID)
}
func (g *GroupCacheRedis) getJoinedGroupsKey(userID string) string {
return joinedGroupsKey + userID
return cachekey.GetJoinedGroupsKey(userID)
}
func (g *GroupCacheRedis) getGroupMembersHashKey(groupID string) string {
return groupMembersHashKey + groupID
return cachekey.GetGroupMembersHashKey(groupID)
}
func (g *GroupCacheRedis) getGroupMemberIDsKey(groupID string) string {
return groupMemberIDsKey + groupID
return cachekey.GetGroupMemberIDsKey(groupID)
}
func (g *GroupCacheRedis) getGroupMemberInfoKey(groupID, userID string) string {
return groupMemberInfoKey + groupID + "-" + userID
return cachekey.GetGroupMemberInfoKey(groupID, userID)
}
func (g *GroupCacheRedis) getGroupMemberNumKey(groupID string) string {
return groupMemberNumKey + groupID
return cachekey.GetGroupMemberNumKey(groupID)
}
func (g *GroupCacheRedis) getGroupRoleLevelMemberIDsKey(groupID string, roleLevel int32) string {
return groupRoleLevelMemberIDsKey + groupID + "-" + strconv.Itoa(int(roleLevel))
return cachekey.GetGroupRoleLevelMemberIDsKey(groupID, roleLevel)
}
func (g *GroupCacheRedis) GetGroupIndex(group *relationtb.GroupModel, keys []string) (int, error) {

@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"errors"
"github.com/redis/go-redis/v9"
"fmt"
"time"
@ -43,6 +44,9 @@ type metaCache interface {
AddKeys(keys ...string)
ClearKeys()
GetPreDelKeys() []string
SetTopic(topic string)
SetRawRedisClient(cli redis.UniversalClient)
Copy() metaCache
}
func NewMetaCacheRedis(rcClient *rockscache.Client, keys ...string) metaCache {
@ -50,10 +54,36 @@ func NewMetaCacheRedis(rcClient *rockscache.Client, keys ...string) metaCache {
}
type metaCacheRedis struct {
topic string
rcClient *rockscache.Client
keys []string
maxRetryTimes int
retryInterval time.Duration
redisClient redis.UniversalClient
}
func (m *metaCacheRedis) Copy() metaCache {
var keys []string
if len(m.keys) > 0 {
keys = make([]string, 0, len(m.keys)*2)
keys = append(keys, m.keys...)
}
return &metaCacheRedis{
topic: m.topic,
rcClient: m.rcClient,
keys: keys,
maxRetryTimes: m.maxRetryTimes,
retryInterval: m.retryInterval,
redisClient: redisClient,
}
}
func (m *metaCacheRedis) SetTopic(topic string) {
m.topic = topic
}
func (m *metaCacheRedis) SetRawRedisClient(cli redis.UniversalClient) {
m.redisClient = cli
}
func (m *metaCacheRedis) ExecDel(ctx context.Context, distinct ...bool) error {
@ -61,7 +91,7 @@ func (m *metaCacheRedis) ExecDel(ctx context.Context, distinct ...bool) error {
m.keys = utils.Distinct(m.keys)
}
if len(m.keys) > 0 {
log.ZDebug(ctx, "delete cache", "keys", m.keys)
log.ZDebug(ctx, "delete cache", "topic", m.topic, "keys", m.keys)
for _, key := range m.keys {
for i := 0; i < m.maxRetryTimes; i++ {
if err := m.rcClient.TagAsDeleted(key); err != nil {
@ -71,31 +101,18 @@ func (m *metaCacheRedis) ExecDel(ctx context.Context, distinct ...bool) error {
}
break
}
//retryTimes := 0
//for {
// m.rcClient.TagAsDeleted()
// if err := m.rcClient.TagAsDeletedBatch2(ctx, []string{key}); err != nil {
// if retryTimes >= m.maxRetryTimes {
// err = errs.ErrInternalServer.Wrap(
// fmt.Sprintf(
// "delete cache error: %v, keys: %v, retry times %d, please check redis server",
// err,
// key,
// retryTimes,
// ),
// )
// log.ZWarn(ctx, "delete cache failed, please handle keys", err, "keys", key)
// return err
// }
// retryTimes++
// } else {
// break
// }
//}
}
if pk := getPublishKey(m.topic, m.keys); len(pk) > 0 {
data, err := json.Marshal(pk)
if err != nil {
log.ZError(ctx, "keys json marshal failed", err, "topic", m.topic, "keys", pk)
} else {
if err := m.redisClient.Publish(ctx, m.topic, string(data)).Err(); err != nil {
log.ZError(ctx, "redis publish cache delete error", err, "topic", m.topic, "keys", pk)
}
}
}
}
return nil
}

@ -38,12 +38,12 @@ const (
conversationUserMinSeq = "CON_USER_MIN_SEQ:"
hasReadSeq = "HAS_READ_SEQ:"
appleDeviceToken = "DEVICE_TOKEN"
getuiToken = "GETUI_TOKEN"
getuiTaskID = "GETUI_TASK_ID"
signalCache = "SIGNAL_CACHE:"
signalListCache = "SIGNAL_LIST_CACHE:"
FCM_TOKEN = "FCM_TOKEN:"
//appleDeviceToken = "DEVICE_TOKEN"
getuiToken = "GETUI_TOKEN"
getuiTaskID = "GETUI_TASK_ID"
//signalCache = "SIGNAL_CACHE:"
//signalListCache = "SIGNAL_LIST_CACHE:"
FCM_TOKEN = "FCM_TOKEN:"
messageCache = "MESSAGE_CACHE:"
messageDelUserList = "MESSAGE_DEL_USER_LIST:"
@ -143,6 +143,10 @@ func (c *msgCache) getHasReadSeqKey(conversationID string, userID string) string
return hasReadSeq + userID + ":" + conversationID
}
func (c *msgCache) getConversationUserMinSeqKey(conversationID, userID string) string {
return conversationUserMinSeq + conversationID + "u:" + userID
}
func (c *msgCache) setSeq(ctx context.Context, conversationID string, seq int64, getkey func(conversationID string) string) error {
return errs.Wrap(c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err())
}
@ -208,10 +212,6 @@ func (c *msgCache) GetMinSeq(ctx context.Context, conversationID string) (int64,
return c.getSeq(ctx, conversationID, c.getMinSeqKey)
}
func (c *msgCache) getConversationUserMinSeqKey(conversationID, userID string) string {
return conversationUserMinSeq + conversationID + "u:" + userID
}
func (c *msgCache) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
val, err := c.rdb.Get(ctx, c.getConversationUserMinSeqKey(conversationID, userID)).Int64()
if err != nil {

@ -18,6 +18,8 @@ import (
"context"
"encoding/json"
"errors"
"github.com/openimsdk/open-im-server/v3/pkg/common/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"hash/crc32"
"strconv"
"time"
@ -32,8 +34,8 @@ import (
)
const (
userExpireTime = time.Second * 60 * 60 * 12
userInfoKey = "USER_INFO:"
userExpireTime = time.Second * 60 * 60 * 12
//userInfoKey = "USER_INFO:"
userGlobalRecvMsgOptKey = "USER_GLOBAL_RECV_MSG_OPT_KEY:"
olineStatusKey = "ONLINE_STATUS:"
userOlineStatusExpireTime = time.Second * 60 * 60 * 24
@ -68,7 +70,11 @@ func NewUserCacheRedis(
options rockscache.Options,
) UserCache {
rcClient := rockscache.NewClient(rdb, options)
mc := NewMetaCacheRedis(rcClient)
u := config.Config.LocalCache.User
log.ZDebug(context.Background(), "user local cache init", "Topic", u.Topic, "SlotNum", u.SlotNum, "SlotSize", u.SlotSize, "enable", u.Enable())
mc.SetTopic(u.Topic)
mc.SetRawRedisClient(rdb)
return &UserCacheRedis{
rdb: rdb,
metaCache: NewMetaCacheRedis(rcClient),
@ -81,7 +87,7 @@ func NewUserCacheRedis(
func (u *UserCacheRedis) NewCache() UserCache {
return &UserCacheRedis{
rdb: u.rdb,
metaCache: NewMetaCacheRedis(u.rcClient, u.metaCache.GetPreDelKeys()...),
metaCache: u.Copy(),
userDB: u.userDB,
expireTime: u.expireTime,
rcClient: u.rcClient,
@ -89,18 +95,17 @@ func (u *UserCacheRedis) NewCache() UserCache {
}
func (u *UserCacheRedis) getUserInfoKey(userID string) string {
return userInfoKey + userID
return cachekey.GetUserInfoKey(userID)
}
func (u *UserCacheRedis) getUserGlobalRecvMsgOptKey(userID string) string {
return userGlobalRecvMsgOptKey + userID
return cachekey.GetUserGlobalRecvMsgOptKey(userID)
}
func (u *UserCacheRedis) GetUserInfo(ctx context.Context, userID string) (userInfo *relationtb.UserModel, err error) {
return getCache(ctx, u.rcClient, u.getUserInfoKey(userID), u.expireTime, func(ctx context.Context) (*relationtb.UserModel, error) {
return u.userDB.Take(ctx, userID)
},
)
})
}
func (u *UserCacheRedis) GetUsersInfo(ctx context.Context, userIDs []string) ([]*relationtb.UserModel, error) {

@ -1,86 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package localcache
import (
"context"
"sync"
"github.com/OpenIMSDK/protocol/conversation"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
)
type ConversationLocalCache struct {
lock sync.Mutex
superGroupRecvMsgNotNotifyUserIDs map[string]Hash
conversationIDs map[string]Hash
client *rpcclient.ConversationRpcClient
}
type Hash struct {
hash uint64
ids []string
}
func NewConversationLocalCache(client *rpcclient.ConversationRpcClient) *ConversationLocalCache {
return &ConversationLocalCache{
superGroupRecvMsgNotNotifyUserIDs: make(map[string]Hash),
conversationIDs: make(map[string]Hash),
client: client,
}
}
func (g *ConversationLocalCache) GetRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) {
resp, err := g.client.Client.GetRecvMsgNotNotifyUserIDs(ctx, &conversation.GetRecvMsgNotNotifyUserIDsReq{
GroupID: groupID,
})
if err != nil {
return nil, err
}
return resp.UserIDs, nil
}
func (g *ConversationLocalCache) GetConversationIDs(ctx context.Context, userID string) ([]string, error) {
resp, err := g.client.Client.GetUserConversationIDsHash(ctx, &conversation.GetUserConversationIDsHashReq{
OwnerUserID: userID,
})
if err != nil {
return nil, err
}
g.lock.Lock()
hash, ok := g.conversationIDs[userID]
g.lock.Unlock()
if !ok || hash.hash != resp.Hash {
conversationIDsResp, err := g.client.Client.GetConversationIDs(ctx, &conversation.GetConversationIDsReq{
UserID: userID,
})
if err != nil {
return nil, err
}
g.lock.Lock()
defer g.lock.Unlock()
g.conversationIDs[userID] = Hash{
hash: resp.Hash,
ids: conversationIDsResp.ConversationIDs,
}
return conversationIDsResp.ConversationIDs, nil
}
return hash.ids, nil
}

@ -1,15 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package localcache // import "github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache"

@ -1,77 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package localcache
import (
"context"
"sync"
"github.com/OpenIMSDK/protocol/group"
"github.com/OpenIMSDK/tools/errs"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
)
type GroupLocalCache struct {
lock sync.Mutex
cache map[string]GroupMemberIDsHash
client *rpcclient.GroupRpcClient
}
type GroupMemberIDsHash struct {
memberListHash uint64
userIDs []string
}
func NewGroupLocalCache(client *rpcclient.GroupRpcClient) *GroupLocalCache {
return &GroupLocalCache{
cache: make(map[string]GroupMemberIDsHash, 0),
client: client,
}
}
func (g *GroupLocalCache) GetGroupMemberIDs(ctx context.Context, groupID string) ([]string, error) {
resp, err := g.client.Client.GetGroupAbstractInfo(ctx, &group.GetGroupAbstractInfoReq{
GroupIDs: []string{groupID},
})
if err != nil {
return nil, err
}
if len(resp.GroupAbstractInfos) < 1 {
return nil, errs.ErrGroupIDNotFound
}
g.lock.Lock()
localHashInfo, ok := g.cache[groupID]
if ok && localHashInfo.memberListHash == resp.GroupAbstractInfos[0].GroupMemberListHash {
g.lock.Unlock()
return localHashInfo.userIDs, nil
}
g.lock.Unlock()
groupMembersResp, err := g.client.Client.GetGroupMemberUserIDs(ctx, &group.GetGroupMemberUserIDsReq{
GroupID: groupID,
})
if err != nil {
return nil, err
}
g.lock.Lock()
defer g.lock.Unlock()
g.cache[groupID] = GroupMemberIDsHash{
memberListHash: resp.GroupAbstractInfos[0].GroupMemberListHash,
userIDs: groupMembersResp.UserIDs,
}
return g.cache[groupID].userIDs, nil
}

@ -1,15 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package localcache

@ -96,8 +96,14 @@ func (c *ConversationMgo) FindUserIDAllConversations(ctx context.Context, userID
return mgoutil.Find[*relation.ConversationModel](ctx, c.coll, bson.M{"owner_user_id": userID})
}
func (c *ConversationMgo) FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) {
return mgoutil.Find[string](ctx, c.coll, bson.M{"group_id": groupID, "recv_msg_opt": constant.ReceiveNotNotifyMessage}, options.Find().SetProjection(bson.M{"_id": 0, "owner_user_id": 1}))
func (c *ConversationMgo) FindRecvMsgUserIDs(ctx context.Context, conversationID string, recvOpts []int) ([]string, error) {
var filter any
if len(recvOpts) == 0 {
filter = bson.M{"conversation_id": conversationID}
} else {
filter = bson.M{"conversation_id": conversationID, "recv_msg_opt": bson.M{"$in": recvOpts}}
}
return mgoutil.Find[string](ctx, c.coll, filter, options.Find().SetProjection(bson.M{"_id": 0, "owner_user_id": 1}))
}
func (c *ConversationMgo) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) {

@ -53,7 +53,7 @@ type ConversationModelInterface interface {
Take(ctx context.Context, userID, conversationID string) (conversation *ConversationModel, err error)
FindConversationID(ctx context.Context, userID string, conversationIDs []string) (existConversationID []string, err error)
FindUserIDAllConversations(ctx context.Context, userID string) (conversations []*ConversationModel, err error)
FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error)
FindRecvMsgUserIDs(ctx context.Context, conversationID string, recvOpts []int) ([]string, error)
GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error)
GetAllConversationIDs(ctx context.Context) ([]string, error)
GetAllConversationIDsNumber(ctx context.Context) (int64, error)

@ -0,0 +1,16 @@
package redispubsub
import "github.com/redis/go-redis/v9"
type Publisher struct {
client redis.UniversalClient
channel string
}
func NewPublisher(client redis.UniversalClient, channel string) *Publisher {
return &Publisher{client: client, channel: channel}
}
func (p *Publisher) Publish(message string) error {
return p.client.Publish(ctx, p.channel, message).Err()
}

@ -0,0 +1,34 @@
package redispubsub
import (
"context"
"github.com/redis/go-redis/v9"
)
var ctx = context.Background()
type Subscriber struct {
client redis.UniversalClient
channel string
}
func NewSubscriber(client redis.UniversalClient, channel string) *Subscriber {
return &Subscriber{client: client, channel: channel}
}
func (s *Subscriber) OnMessage(ctx context.Context, callback func(string)) error {
messageChannel := s.client.Subscribe(ctx, s.channel).Channel()
go func() {
for {
select {
case <-ctx.Done():
return
case msg := <-messageChannel:
callback(msg.Payload)
}
}
}()
return nil
}

@ -0,0 +1,112 @@
package localcache
import (
"context"
"github.com/openimsdk/localcache/link"
"github.com/openimsdk/localcache/lru"
"hash/fnv"
"unsafe"
)
type Cache[V any] interface {
Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error)) (V, error)
GetLink(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), link ...string) (V, error)
Del(ctx context.Context, key ...string)
DelLocal(ctx context.Context, key ...string)
Stop()
}
func New[V any](opts ...Option) Cache[V] {
opt := defaultOption()
for _, o := range opts {
o(opt)
}
c := cache[V]{opt: opt}
if opt.localSlotNum > 0 && opt.localSlotSize > 0 {
createSimpleLRU := func() lru.LRU[string, V] {
if opt.expirationEvict {
return lru.NewExpirationLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
} else {
return lru.NewLayLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
}
}
if opt.localSlotNum == 1 {
c.local = createSimpleLRU()
} else {
c.local = lru.NewSlotLRU[string, V](opt.localSlotNum, func(key string) uint64 {
h := fnv.New64a()
h.Write(*(*[]byte)(unsafe.Pointer(&key)))
return h.Sum64()
}, createSimpleLRU)
}
if opt.linkSlotNum > 0 {
c.link = link.New(opt.linkSlotNum)
}
}
return &c
}
type cache[V any] struct {
opt *option
link link.Link
local lru.LRU[string, V]
}
func (c *cache[V]) onEvict(key string, value V) {
if c.link != nil {
lks := c.link.Del(key)
for k := range lks {
if key != k { // prevent deadlock
c.local.Del(k)
}
}
}
}
func (c *cache[V]) del(key ...string) {
if c.local == nil {
return
}
for _, k := range key {
c.local.Del(k)
if c.link != nil {
lks := c.link.Del(k)
for k := range lks {
c.local.Del(k)
}
}
}
}
func (c *cache[V]) Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error)) (V, error) {
return c.GetLink(ctx, key, fetch)
}
func (c *cache[V]) GetLink(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), link ...string) (V, error) {
if c.local != nil {
return c.local.Get(key, func() (V, error) {
if len(link) > 0 {
c.link.Link(key, link...)
}
return fetch(ctx)
})
} else {
return fetch(ctx)
}
}
func (c *cache[V]) Del(ctx context.Context, key ...string) {
for _, fn := range c.opt.delFn {
fn(ctx, key...)
}
c.del(key...)
}
func (c *cache[V]) DelLocal(ctx context.Context, key ...string) {
c.del(key...)
}
func (c *cache[V]) Stop() {
c.local.Stop()
}

@ -0,0 +1,79 @@
package localcache
import (
"context"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"testing"
"time"
)
func TestName(t *testing.T) {
c := New[string](WithExpirationEvict())
//c := New[string]()
ctx := context.Background()
const (
num = 10000
tNum = 10000
kNum = 100000
pNum = 100
)
getKey := func(v uint64) string {
return fmt.Sprintf("key_%d", v%kNum)
}
start := time.Now()
t.Log("start", start)
var (
get atomic.Int64
del atomic.Int64
)
incrGet := func() {
if v := get.Add(1); v%pNum == 0 {
//t.Log("#get count", v/pNum)
}
}
incrDel := func() {
if v := del.Add(1); v%pNum == 0 {
//t.Log("@del count", v/pNum)
}
}
var wg sync.WaitGroup
for i := 0; i < tNum; i++ {
wg.Add(2)
go func() {
defer wg.Done()
for i := 0; i < num; i++ {
c.Get(ctx, getKey(rand.Uint64()), func(ctx context.Context) (string, error) {
return fmt.Sprintf("index_%d", i), nil
})
incrGet()
}
}()
go func() {
defer wg.Done()
time.Sleep(time.Second / 10)
for i := 0; i < num; i++ {
c.Del(ctx, getKey(rand.Uint64()))
incrDel()
}
}()
}
wg.Wait()
end := time.Now()
t.Log("end", end)
t.Log("time", end.Sub(start))
t.Log("get", get.Load())
t.Log("del", del.Load())
// 137.35s
}

@ -0,0 +1,5 @@
module github.com/openimsdk/localcache
go 1.19
require github.com/hashicorp/golang-lru/v2 v2.0.7

@ -0,0 +1,109 @@
package link
import (
"hash/fnv"
"sync"
"unsafe"
)
type Link interface {
Link(key string, link ...string)
Del(key string) map[string]struct{}
}
func newLinkKey() *linkKey {
return &linkKey{
data: make(map[string]map[string]struct{}),
}
}
type linkKey struct {
lock sync.Mutex
data map[string]map[string]struct{}
}
func (x *linkKey) link(key string, link ...string) {
x.lock.Lock()
defer x.lock.Unlock()
v, ok := x.data[key]
if !ok {
v = make(map[string]struct{})
x.data[key] = v
}
for _, k := range link {
v[k] = struct{}{}
}
}
func (x *linkKey) del(key string) map[string]struct{} {
x.lock.Lock()
defer x.lock.Unlock()
ks, ok := x.data[key]
if !ok {
return nil
}
delete(x.data, key)
return ks
}
func New(n int) Link {
if n <= 0 {
panic("must be greater than 0")
}
slots := make([]*linkKey, n)
for i := 0; i < len(slots); i++ {
slots[i] = newLinkKey()
}
return &slot{
n: uint64(n),
slots: slots,
}
}
type slot struct {
n uint64
slots []*linkKey
}
func (x *slot) index(s string) uint64 {
h := fnv.New64a()
_, _ = h.Write(*(*[]byte)(unsafe.Pointer(&s)))
return h.Sum64() % x.n
}
func (x *slot) Link(key string, link ...string) {
if len(link) == 0 {
return
}
mk := key
lks := make([]string, len(link))
for i, k := range link {
lks[i] = k
}
x.slots[x.index(mk)].link(mk, lks...)
for _, lk := range lks {
x.slots[x.index(lk)].link(lk, mk)
}
}
func (x *slot) Del(key string) map[string]struct{} {
return x.delKey(key)
}
func (x *slot) delKey(k string) map[string]struct{} {
del := make(map[string]struct{})
stack := []string{k}
for len(stack) > 0 {
curr := stack[len(stack)-1]
stack = stack[:len(stack)-1]
if _, ok := del[curr]; ok {
continue
}
del[curr] = struct{}{}
childKeys := x.slots[x.index(curr)].del(curr)
for ck := range childKeys {
stack = append(stack, ck)
}
}
return del
}

@ -0,0 +1,20 @@
package link
import (
"testing"
)
func TestName(t *testing.T) {
v := New(1)
//v.Link("a:1", "b:1", "c:1", "d:1")
v.Link("a:1", "b:1", "c:1")
v.Link("z:1", "b:1")
//v.DelKey("a:1")
v.Del("z:1")
t.Log(v)
}

@ -0,0 +1,20 @@
package lru
import "github.com/hashicorp/golang-lru/v2/simplelru"
type EvictCallback[K comparable, V any] simplelru.EvictCallback[K, V]
type LRU[K comparable, V any] interface {
Get(key K, fetch func() (V, error)) (V, error)
Del(key K) bool
Stop()
}
type Target interface {
IncrGetHit()
IncrGetSuccess()
IncrGetFailed()
IncrDelHit()
IncrDelNotFound()
}

@ -0,0 +1,78 @@
package lru
import (
"github.com/hashicorp/golang-lru/v2/expirable"
"sync"
"time"
)
func NewExpirationLRU[K comparable, V any](size int, successTTL, failedTTL time.Duration, target Target, onEvict EvictCallback[K, V]) LRU[K, V] {
var cb expirable.EvictCallback[K, *expirationLruItem[V]]
if onEvict != nil {
cb = func(key K, value *expirationLruItem[V]) {
onEvict(key, value.value)
}
}
core := expirable.NewLRU[K, *expirationLruItem[V]](size, cb, successTTL)
return &ExpirationLRU[K, V]{
core: core,
successTTL: successTTL,
failedTTL: failedTTL,
target: target,
}
}
type expirationLruItem[V any] struct {
lock sync.RWMutex
err error
value V
}
type ExpirationLRU[K comparable, V any] struct {
lock sync.Mutex
core *expirable.LRU[K, *expirationLruItem[V]]
successTTL time.Duration
failedTTL time.Duration
target Target
}
func (x *ExpirationLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
x.lock.Lock()
v, ok := x.core.Get(key)
if ok {
x.lock.Unlock()
x.target.IncrGetSuccess()
v.lock.RLock()
defer v.lock.RUnlock()
return v.value, v.err
} else {
v = &expirationLruItem[V]{}
x.core.Add(key, v)
v.lock.Lock()
x.lock.Unlock()
defer v.lock.Unlock()
v.value, v.err = fetch()
if v.err == nil {
x.target.IncrGetSuccess()
} else {
x.target.IncrGetFailed()
x.core.Remove(key)
}
return v.value, v.err
}
}
func (x *ExpirationLRU[K, V]) Del(key K) bool {
x.lock.Lock()
ok := x.core.Remove(key)
x.lock.Unlock()
if ok {
x.target.IncrDelHit()
} else {
x.target.IncrDelNotFound()
}
return ok
}
func (x *ExpirationLRU[K, V]) Stop() {
}

@ -0,0 +1,90 @@
package lru
import (
"github.com/hashicorp/golang-lru/v2/simplelru"
"sync"
"time"
)
type layLruItem[V any] struct {
lock sync.Mutex
expires int64
err error
value V
}
func NewLayLRU[K comparable, V any](size int, successTTL, failedTTL time.Duration, target Target, onEvict EvictCallback[K, V]) *LayLRU[K, V] {
var cb simplelru.EvictCallback[K, *layLruItem[V]]
if onEvict != nil {
cb = func(key K, value *layLruItem[V]) {
onEvict(key, value.value)
}
}
core, err := simplelru.NewLRU[K, *layLruItem[V]](size, cb)
if err != nil {
panic(err)
}
return &LayLRU[K, V]{
core: core,
successTTL: successTTL,
failedTTL: failedTTL,
target: target,
}
}
type LayLRU[K comparable, V any] struct {
lock sync.Mutex
core *simplelru.LRU[K, *layLruItem[V]]
successTTL time.Duration
failedTTL time.Duration
target Target
}
func (x *LayLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
x.lock.Lock()
v, ok := x.core.Get(key)
if ok {
x.lock.Unlock()
v.lock.Lock()
expires, value, err := v.expires, v.value, v.err
if expires != 0 && expires > time.Now().UnixMilli() {
v.lock.Unlock()
x.target.IncrGetHit()
return value, err
}
} else {
v = &layLruItem[V]{}
x.core.Add(key, v)
v.lock.Lock()
x.lock.Unlock()
}
defer v.lock.Unlock()
if v.expires > time.Now().UnixMilli() {
return v.value, v.err
}
v.value, v.err = fetch()
if v.err == nil {
v.expires = time.Now().Add(x.successTTL).UnixMilli()
x.target.IncrGetSuccess()
} else {
v.expires = time.Now().Add(x.failedTTL).UnixMilli()
x.target.IncrGetFailed()
}
return v.value, v.err
}
func (x *LayLRU[K, V]) Del(key K) bool {
x.lock.Lock()
ok := x.core.Remove(key)
x.lock.Unlock()
if ok {
x.target.IncrDelHit()
} else {
x.target.IncrDelNotFound()
}
return ok
}
func (x *LayLRU[K, V]) Stop() {
}

@ -0,0 +1,104 @@
package lru
import (
"fmt"
"hash/fnv"
"sync"
"sync/atomic"
"testing"
"time"
"unsafe"
)
type cacheTarget struct {
getHit int64
getSuccess int64
getFailed int64
delHit int64
delNotFound int64
}
func (r *cacheTarget) IncrGetHit() {
atomic.AddInt64(&r.getHit, 1)
}
func (r *cacheTarget) IncrGetSuccess() {
atomic.AddInt64(&r.getSuccess, 1)
}
func (r *cacheTarget) IncrGetFailed() {
atomic.AddInt64(&r.getFailed, 1)
}
func (r *cacheTarget) IncrDelHit() {
atomic.AddInt64(&r.delHit, 1)
}
func (r *cacheTarget) IncrDelNotFound() {
atomic.AddInt64(&r.delNotFound, 1)
}
func (r *cacheTarget) String() string {
return fmt.Sprintf("getHit: %d, getSuccess: %d, getFailed: %d, delHit: %d, delNotFound: %d", r.getHit, r.getSuccess, r.getFailed, r.delHit, r.delNotFound)
}
func TestName(t *testing.T) {
target := &cacheTarget{}
l := NewSlotLRU[string, string](100, func(k string) uint64 {
h := fnv.New64a()
h.Write(*(*[]byte)(unsafe.Pointer(&k)))
return h.Sum64()
}, func() LRU[string, string] {
return NewExpirationLRU[string, string](100, time.Second*60, time.Second, target, nil)
})
//l := NewInertiaLRU[string, string](1000, time.Second*20, time.Second*5, target)
fn := func(key string, n int, fetch func() (string, error)) {
for i := 0; i < n; i++ {
//v, err := l.Get(key, fetch)
//if err == nil {
// t.Log("key", key, "value", v)
//} else {
// t.Error("key", key, err)
//}
v, err := l.Get(key, fetch)
//time.Sleep(time.Second / 100)
func(v ...any) {}(v, err)
}
}
tmp := make(map[string]struct{})
var wg sync.WaitGroup
for i := 0; i < 10000; i++ {
wg.Add(1)
key := fmt.Sprintf("key_%d", i%200)
tmp[key] = struct{}{}
go func() {
defer wg.Done()
//t.Log(key)
fn(key, 10000, func() (string, error) {
//time.Sleep(time.Second * 3)
//t.Log(time.Now(), "key", key, "fetch")
//if rand.Uint32()%5 == 0 {
// return "value_" + key, nil
//}
//return "", errors.New("rand error")
return "value_" + key, nil
})
}()
//wg.Add(1)
//go func() {
// defer wg.Done()
// for i := 0; i < 10; i++ {
// l.Del(key)
// time.Sleep(time.Second / 3)
// }
//}()
}
wg.Wait()
t.Log(len(tmp))
t.Log(target.String())
}

@ -0,0 +1,37 @@
package lru
func NewSlotLRU[K comparable, V any](slotNum int, hash func(K) uint64, create func() LRU[K, V]) LRU[K, V] {
x := &slotLRU[K, V]{
n: uint64(slotNum),
slots: make([]LRU[K, V], slotNum),
hash: hash,
}
for i := 0; i < slotNum; i++ {
x.slots[i] = create()
}
return x
}
type slotLRU[K comparable, V any] struct {
n uint64
slots []LRU[K, V]
hash func(k K) uint64
}
func (x *slotLRU[K, V]) getIndex(k K) uint64 {
return x.hash(k) % x.n
}
func (x *slotLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
return x.slots[x.getIndex(key)].Get(key, fetch)
}
func (x *slotLRU[K, V]) Del(key K) bool {
return x.slots[x.getIndex(key)].Del(key)
}
func (x *slotLRU[K, V]) Stop() {
for _, slot := range x.slots {
slot.Stop()
}
}

@ -0,0 +1,121 @@
package localcache
import (
"context"
"github.com/openimsdk/localcache/lru"
"time"
)
func defaultOption() *option {
return &option{
localSlotNum: 500,
localSlotSize: 20000,
linkSlotNum: 500,
expirationEvict: false,
localSuccessTTL: time.Minute,
localFailedTTL: time.Second * 5,
delFn: make([]func(ctx context.Context, key ...string), 0, 2),
target: emptyTarget{},
}
}
type option struct {
localSlotNum int
localSlotSize int
linkSlotNum int
// expirationEvict: true means that the cache will be actively cleared when the timer expires,
// false means that the cache will be lazily deleted.
expirationEvict bool
localSuccessTTL time.Duration
localFailedTTL time.Duration
delFn []func(ctx context.Context, key ...string)
target lru.Target
}
type Option func(o *option)
func WithExpirationEvict() Option {
return func(o *option) {
o.expirationEvict = true
}
}
func WithLazy() Option {
return func(o *option) {
o.expirationEvict = false
}
}
func WithLocalDisable() Option {
return WithLinkSlotNum(0)
}
func WithLinkDisable() Option {
return WithLinkSlotNum(0)
}
func WithLinkSlotNum(linkSlotNum int) Option {
return func(o *option) {
o.linkSlotNum = linkSlotNum
}
}
func WithLocalSlotNum(localSlotNum int) Option {
return func(o *option) {
o.localSlotNum = localSlotNum
}
}
func WithLocalSlotSize(localSlotSize int) Option {
return func(o *option) {
o.localSlotSize = localSlotSize
}
}
func WithLocalSuccessTTL(localSuccessTTL time.Duration) Option {
if localSuccessTTL < 0 {
panic("localSuccessTTL should be greater than 0")
}
return func(o *option) {
o.localSuccessTTL = localSuccessTTL
}
}
func WithLocalFailedTTL(localFailedTTL time.Duration) Option {
if localFailedTTL < 0 {
panic("localFailedTTL should be greater than 0")
}
return func(o *option) {
o.localFailedTTL = localFailedTTL
}
}
func WithTarget(target lru.Target) Option {
if target == nil {
panic("target should not be nil")
}
return func(o *option) {
o.target = target
}
}
func WithDeleteKeyBefore(fn func(ctx context.Context, key ...string)) Option {
if fn == nil {
panic("fn should not be nil")
}
return func(o *option) {
o.delFn = append(o.delFn, fn)
}
}
type emptyTarget struct{}
func (e emptyTarget) IncrGetHit() {}
func (e emptyTarget) IncrGetSuccess() {}
func (e emptyTarget) IncrGetFailed() {}
func (e emptyTarget) IncrDelHit() {}
func (e emptyTarget) IncrDelNotFound() {}

@ -0,0 +1,9 @@
package localcache
func AnyValue[V any](v any, err error) (V, error) {
if err != nil {
var zero V
return zero, err
}
return v.(V), nil
}

@ -0,0 +1,20 @@
package rpccache
func newListMap[V comparable](values []V, err error) (*listMap[V], error) {
if err != nil {
return nil, err
}
lm := &listMap[V]{
List: values,
Map: make(map[V]struct{}, len(values)),
}
for _, value := range values {
lm.Map[value] = struct{}{}
}
return lm, nil
}
type listMap[V comparable] struct {
List []V
Map map[V]struct{}
}

@ -0,0 +1,112 @@
package rpccache
import (
"context"
pbconversation "github.com/OpenIMSDK/protocol/conversation"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log"
"github.com/openimsdk/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/common/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/redis/go-redis/v9"
)
func NewConversationLocalCache(client rpcclient.ConversationRpcClient, cli redis.UniversalClient) *ConversationLocalCache {
lc := config.Config.LocalCache.Conversation
log.ZDebug(context.Background(), "ConversationLocalCache", "topic", lc.Topic, "slotNum", lc.SlotNum, "slotSize", lc.SlotSize, "enable", lc.Enable())
x := &ConversationLocalCache{
client: client,
local: localcache.New[any](
localcache.WithLocalSlotNum(lc.SlotNum),
localcache.WithLocalSlotSize(lc.SlotSize),
localcache.WithLinkSlotNum(lc.SlotNum),
localcache.WithLocalSuccessTTL(lc.Success()),
localcache.WithLocalFailedTTL(lc.Failed()),
),
}
if lc.Enable() {
go subscriberRedisDeleteCache(context.Background(), cli, lc.Topic, x.local.DelLocal)
}
return x
}
type ConversationLocalCache struct {
client rpcclient.ConversationRpcClient
local localcache.Cache[any]
}
func (c *ConversationLocalCache) GetConversationIDs(ctx context.Context, ownerUserID string) (val []string, err error) {
log.ZDebug(ctx, "ConversationLocalCache GetConversationIDs req", "ownerUserID", ownerUserID)
defer func() {
if err == nil {
log.ZDebug(ctx, "ConversationLocalCache GetConversationIDs return", "value", val)
} else {
log.ZError(ctx, "ConversationLocalCache GetConversationIDs return", err)
}
}()
return localcache.AnyValue[[]string](c.local.Get(ctx, cachekey.GetConversationIDsKey(ownerUserID), func(ctx context.Context) (any, error) {
log.ZDebug(ctx, "ConversationLocalCache GetConversationIDs rpc", "ownerUserID", ownerUserID)
return c.client.GetConversationIDs(ctx, ownerUserID)
}))
}
func (c *ConversationLocalCache) GetConversation(ctx context.Context, userID, conversationID string) (val *pbconversation.Conversation, err error) {
log.ZDebug(ctx, "ConversationLocalCache GetConversation req", "userID", userID, "conversationID", conversationID)
defer func() {
if err == nil {
log.ZDebug(ctx, "ConversationLocalCache GetConversation return", "value", val)
} else {
log.ZError(ctx, "ConversationLocalCache GetConversation return", err)
}
}()
return localcache.AnyValue[*pbconversation.Conversation](c.local.Get(ctx, cachekey.GetConversationKey(userID, conversationID), func(ctx context.Context) (any, error) {
log.ZDebug(ctx, "ConversationLocalCache GetConversation rpc", "userID", userID, "conversationID", conversationID)
return c.client.GetConversation(ctx, userID, conversationID)
}))
}
func (c *ConversationLocalCache) GetSingleConversationRecvMsgOpt(ctx context.Context, userID, conversationID string) (int32, error) {
conv, err := c.GetConversation(ctx, userID, conversationID)
if err != nil {
return 0, err
}
return conv.RecvMsgOpt, nil
}
func (c *ConversationLocalCache) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*pbconversation.Conversation, error) {
conversations := make([]*pbconversation.Conversation, 0, len(conversationIDs))
for _, conversationID := range conversationIDs {
conversation, err := c.GetConversation(ctx, ownerUserID, conversationID)
if err != nil {
if errs.ErrRecordNotFound.Is(err) {
continue
}
return nil, err
}
conversations = append(conversations, conversation)
}
return conversations, nil
}
func (c *ConversationLocalCache) getConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) (*listMap[string], error) {
return localcache.AnyValue[*listMap[string]](c.local.Get(ctx, cachekey.GetConversationNotReceiveMessageUserIDsKey(conversationID), func(ctx context.Context) (any, error) {
return newListMap(c.client.GetConversationNotReceiveMessageUserIDs(ctx, conversationID))
}))
}
func (c *ConversationLocalCache) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) {
res, err := c.getConversationNotReceiveMessageUserIDs(ctx, conversationID)
if err != nil {
return nil, err
}
return res.List, nil
}
func (c *ConversationLocalCache) GetConversationNotReceiveMessageUserIDMap(ctx context.Context, conversationID string) (map[string]struct{}, error) {
res, err := c.getConversationNotReceiveMessageUserIDs(ctx, conversationID)
if err != nil {
return nil, err
}
return res.Map, nil
}

@ -0,0 +1,66 @@
package rpccache
import (
"context"
"github.com/OpenIMSDK/tools/log"
"github.com/openimsdk/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/common/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/redis/go-redis/v9"
)
func NewFriendLocalCache(client rpcclient.FriendRpcClient, cli redis.UniversalClient) *FriendLocalCache {
lc := config.Config.LocalCache.Friend
log.ZDebug(context.Background(), "FriendLocalCache", "topic", lc.Topic, "slotNum", lc.SlotNum, "slotSize", lc.SlotSize, "enable", lc.Enable())
x := &FriendLocalCache{
client: client,
local: localcache.New[any](
localcache.WithLocalSlotNum(lc.SlotNum),
localcache.WithLocalSlotSize(lc.SlotSize),
localcache.WithLinkSlotNum(lc.SlotNum),
localcache.WithLocalSuccessTTL(lc.Success()),
localcache.WithLocalFailedTTL(lc.Failed()),
),
}
if lc.Enable() {
go subscriberRedisDeleteCache(context.Background(), cli, lc.Topic, x.local.DelLocal)
}
return x
}
type FriendLocalCache struct {
client rpcclient.FriendRpcClient
local localcache.Cache[any]
}
func (f *FriendLocalCache) IsFriend(ctx context.Context, possibleFriendUserID, userID string) (val bool, err error) {
log.ZDebug(ctx, "FriendLocalCache IsFriend req", "possibleFriendUserID", possibleFriendUserID, "userID", userID)
defer func() {
if err == nil {
log.ZDebug(ctx, "FriendLocalCache IsFriend return", "value", val)
} else {
log.ZError(ctx, "FriendLocalCache IsFriend return", err)
}
}()
return localcache.AnyValue[bool](f.local.GetLink(ctx, cachekey.GetIsFriendKey(possibleFriendUserID, userID), func(ctx context.Context) (any, error) {
log.ZDebug(ctx, "FriendLocalCache IsFriend rpc", "possibleFriendUserID", possibleFriendUserID, "userID", userID)
return f.client.IsFriend(ctx, possibleFriendUserID, userID)
}, cachekey.GetFriendIDsKey(possibleFriendUserID)))
}
// IsBlack possibleBlackUserID selfUserID
func (f *FriendLocalCache) IsBlack(ctx context.Context, possibleBlackUserID, userID string) (val bool, err error) {
log.ZDebug(ctx, "FriendLocalCache IsBlack req", "possibleBlackUserID", possibleBlackUserID, "userID", userID)
defer func() {
if err == nil {
log.ZDebug(ctx, "FriendLocalCache IsBlack return", "value", val)
} else {
log.ZError(ctx, "FriendLocalCache IsBlack return", err)
}
}()
return localcache.AnyValue[bool](f.local.GetLink(ctx, cachekey.GetIsBlackIDsKey(possibleBlackUserID, userID), func(ctx context.Context) (any, error) {
log.ZDebug(ctx, "FriendLocalCache IsBlack rpc", "possibleBlackUserID", possibleBlackUserID, "userID", userID)
return f.client.IsBlack(ctx, possibleBlackUserID, userID)
}, cachekey.GetBlackIDsKey(userID)))
}

@ -0,0 +1,143 @@
package rpccache
import (
"context"
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log"
"github.com/openimsdk/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/common/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/redis/go-redis/v9"
)
func NewGroupLocalCache(client rpcclient.GroupRpcClient, cli redis.UniversalClient) *GroupLocalCache {
lc := config.Config.LocalCache.Group
log.ZDebug(context.Background(), "GroupLocalCache", "topic", lc.Topic, "slotNum", lc.SlotNum, "slotSize", lc.SlotSize, "enable", lc.Enable())
x := &GroupLocalCache{
client: client,
local: localcache.New[any](
localcache.WithLocalSlotNum(lc.SlotNum),
localcache.WithLocalSlotSize(lc.SlotSize),
localcache.WithLinkSlotNum(lc.SlotNum),
localcache.WithLocalSuccessTTL(lc.Success()),
localcache.WithLocalFailedTTL(lc.Failed()),
),
}
if lc.Enable() {
go subscriberRedisDeleteCache(context.Background(), cli, lc.Topic, x.local.DelLocal)
}
return x
}
type GroupLocalCache struct {
client rpcclient.GroupRpcClient
local localcache.Cache[any]
}
func (g *GroupLocalCache) getGroupMemberIDs(ctx context.Context, groupID string) (val *listMap[string], err error) {
log.ZDebug(ctx, "GroupLocalCache getGroupMemberIDs req", "groupID", groupID)
defer func() {
if err == nil {
log.ZDebug(ctx, "GroupLocalCache getGroupMemberIDs return", "value", val)
} else {
log.ZError(ctx, "GroupLocalCache getGroupMemberIDs return", err)
}
}()
return localcache.AnyValue[*listMap[string]](g.local.Get(ctx, cachekey.GetGroupMemberIDsKey(groupID), func(ctx context.Context) (any, error) {
log.ZDebug(ctx, "GroupLocalCache getGroupMemberIDs rpc", "groupID", groupID)
return newListMap(g.client.GetGroupMemberIDs(ctx, groupID))
}))
}
func (g *GroupLocalCache) GetGroupMember(ctx context.Context, groupID, userID string) (val *sdkws.GroupMemberFullInfo, err error) {
log.ZDebug(ctx, "GroupLocalCache GetGroupInfo req", "groupID", groupID, "userID", userID)
defer func() {
if err == nil {
log.ZDebug(ctx, "GroupLocalCache GetGroupInfo return", "value", val)
} else {
log.ZError(ctx, "GroupLocalCache GetGroupInfo return", err)
}
}()
return localcache.AnyValue[*sdkws.GroupMemberFullInfo](g.local.Get(ctx, cachekey.GetGroupMemberInfoKey(groupID, userID), func(ctx context.Context) (any, error) {
log.ZDebug(ctx, "GroupLocalCache GetGroupInfo rpc", "groupID", groupID, "userID", userID)
return g.client.GetGroupMemberCache(ctx, groupID, userID)
}))
}
func (g *GroupLocalCache) GetGroupInfo(ctx context.Context, groupID string) (val *sdkws.GroupInfo, err error) {
log.ZDebug(ctx, "GroupLocalCache GetGroupInfo req", "groupID", groupID)
defer func() {
if err == nil {
log.ZDebug(ctx, "GroupLocalCache GetGroupInfo return", "value", val)
} else {
log.ZError(ctx, "GroupLocalCache GetGroupInfo return", err)
}
}()
return localcache.AnyValue[*sdkws.GroupInfo](g.local.Get(ctx, cachekey.GetGroupInfoKey(groupID), func(ctx context.Context) (any, error) {
log.ZDebug(ctx, "GroupLocalCache GetGroupInfo rpc", "groupID", groupID)
return g.client.GetGroupInfoCache(ctx, groupID)
}))
}
func (g *GroupLocalCache) GetGroupMemberIDs(ctx context.Context, groupID string) ([]string, error) {
res, err := g.getGroupMemberIDs(ctx, groupID)
if err != nil {
return nil, err
}
return res.List, nil
}
func (g *GroupLocalCache) GetGroupMemberIDMap(ctx context.Context, groupID string) (map[string]struct{}, error) {
res, err := g.getGroupMemberIDs(ctx, groupID)
if err != nil {
return nil, err
}
return res.Map, nil
}
func (g *GroupLocalCache) GetGroupInfos(ctx context.Context, groupIDs []string) ([]*sdkws.GroupInfo, error) {
groupInfos := make([]*sdkws.GroupInfo, 0, len(groupIDs))
for _, groupID := range groupIDs {
groupInfo, err := g.GetGroupInfo(ctx, groupID)
if err != nil {
if errs.ErrRecordNotFound.Is(err) {
continue
}
return nil, err
}
groupInfos = append(groupInfos, groupInfo)
}
return groupInfos, nil
}
func (g *GroupLocalCache) GetGroupMembers(ctx context.Context, groupID string, userIDs []string) ([]*sdkws.GroupMemberFullInfo, error) {
members := make([]*sdkws.GroupMemberFullInfo, 0, len(userIDs))
for _, userID := range userIDs {
member, err := g.GetGroupMember(ctx, groupID, userID)
if err != nil {
if errs.ErrRecordNotFound.Is(err) {
continue
}
return nil, err
}
members = append(members, member)
}
return members, nil
}
func (g *GroupLocalCache) GetGroupMemberInfoMap(ctx context.Context, groupID string, userIDs []string) (map[string]*sdkws.GroupMemberFullInfo, error) {
members := make(map[string]*sdkws.GroupMemberFullInfo)
for _, userID := range userIDs {
member, err := g.GetGroupMember(ctx, groupID, userID)
if err != nil {
if errs.ErrRecordNotFound.Is(err) {
continue
}
return nil, err
}
members[userID] = member
}
return members, nil
}

@ -0,0 +1,23 @@
package rpccache
import (
"context"
"encoding/json"
"github.com/OpenIMSDK/tools/log"
"github.com/redis/go-redis/v9"
)
func subscriberRedisDeleteCache(ctx context.Context, client redis.UniversalClient, channel string, del func(ctx context.Context, key ...string)) {
for message := range client.Subscribe(ctx, channel).Channel() {
log.ZDebug(ctx, "subscriberRedisDeleteCache", "channel", channel, "payload", message.Payload)
var keys []string
if err := json.Unmarshal([]byte(message.Payload), &keys); err != nil {
log.ZError(ctx, "subscriberRedisDeleteCache json.Unmarshal error", err)
continue
}
if len(keys) == 0 {
continue
}
del(ctx, keys...)
}
}

@ -0,0 +1,97 @@
package rpccache
import (
"context"
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log"
"github.com/openimsdk/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/common/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/redis/go-redis/v9"
)
func NewUserLocalCache(client rpcclient.UserRpcClient, cli redis.UniversalClient) *UserLocalCache {
lc := config.Config.LocalCache.User
log.ZDebug(context.Background(), "UserLocalCache", "topic", lc.Topic, "slotNum", lc.SlotNum, "slotSize", lc.SlotSize, "enable", lc.Enable())
x := &UserLocalCache{
client: client,
local: localcache.New[any](
localcache.WithLocalSlotNum(lc.SlotNum),
localcache.WithLocalSlotSize(lc.SlotSize),
localcache.WithLinkSlotNum(lc.SlotNum),
localcache.WithLocalSuccessTTL(lc.Success()),
localcache.WithLocalFailedTTL(lc.Failed()),
),
}
if lc.Enable() {
go subscriberRedisDeleteCache(context.Background(), cli, lc.Topic, x.local.DelLocal)
}
return x
}
type UserLocalCache struct {
client rpcclient.UserRpcClient
local localcache.Cache[any]
}
func (u *UserLocalCache) GetUserInfo(ctx context.Context, userID string) (val *sdkws.UserInfo, err error) {
log.ZDebug(ctx, "UserLocalCache GetUserInfo req", "userID", userID)
defer func() {
if err == nil {
log.ZDebug(ctx, "UserLocalCache GetUserInfo return", "value", val)
} else {
log.ZError(ctx, "UserLocalCache GetUserInfo return", err)
}
}()
return localcache.AnyValue[*sdkws.UserInfo](u.local.Get(ctx, cachekey.GetUserInfoKey(userID), func(ctx context.Context) (any, error) {
log.ZDebug(ctx, "UserLocalCache GetUserInfo rpc", "userID", userID)
return u.client.GetUserInfo(ctx, userID)
}))
}
func (u *UserLocalCache) GetUserGlobalMsgRecvOpt(ctx context.Context, userID string) (val int32, err error) {
log.ZDebug(ctx, "UserLocalCache GetUserGlobalMsgRecvOpt req", "userID", userID)
defer func() {
if err == nil {
log.ZDebug(ctx, "UserLocalCache GetUserGlobalMsgRecvOpt return", "value", val)
} else {
log.ZError(ctx, "UserLocalCache GetUserGlobalMsgRecvOpt return", err)
}
}()
return localcache.AnyValue[int32](u.local.Get(ctx, cachekey.GetUserGlobalRecvMsgOptKey(userID), func(ctx context.Context) (any, error) {
log.ZDebug(ctx, "UserLocalCache GetUserGlobalMsgRecvOpt rpc", "userID", userID)
return u.client.GetUserGlobalMsgRecvOpt(ctx, userID)
}))
}
func (u *UserLocalCache) GetUsersInfo(ctx context.Context, userIDs []string) ([]*sdkws.UserInfo, error) {
users := make([]*sdkws.UserInfo, 0, len(userIDs))
for _, userID := range userIDs {
user, err := u.GetUserInfo(ctx, userID)
if err != nil {
if errs.ErrRecordNotFound.Is(err) {
continue
}
return nil, err
}
users = append(users, user)
}
return users, nil
}
func (u *UserLocalCache) GetUsersInfoMap(ctx context.Context, userIDs []string) (map[string]*sdkws.UserInfo, error) {
users := make(map[string]*sdkws.UserInfo, len(userIDs))
for _, userID := range userIDs {
user, err := u.GetUserInfo(ctx, userID)
if err != nil {
if errs.ErrRecordNotFound.Is(err) {
continue
}
return nil, err
}
users[userID] = user
}
return users, nil
}

@ -114,6 +114,14 @@ func (c *ConversationRpcClient) GetConversationsByConversationID(ctx context.Con
return resp.Conversations, nil
}
func (c *ConversationRpcClient) GetConversationOfflinePushUserIDs(ctx context.Context, conversationID string, userIDs []string) ([]string, error) {
resp, err := c.Client.GetConversationOfflinePushUserIDs(ctx, &pbconversation.GetConversationOfflinePushUserIDsReq{ConversationID: conversationID, UserIDs: userIDs})
if err != nil {
return nil, err
}
return resp.UserIDs, nil
}
func (c *ConversationRpcClient) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*pbconversation.Conversation, error) {
if len(conversationIDs) == 0 {
return nil, nil
@ -127,3 +135,11 @@ func (c *ConversationRpcClient) GetConversations(ctx context.Context, ownerUserI
}
return resp.Conversations, nil
}
func (c *ConversationRpcClient) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) {
resp, err := c.Client.GetConversationNotReceiveMessageUserIDs(ctx, &pbconversation.GetConversationNotReceiveMessageUserIDsReq{ConversationID: conversationID})
if err != nil {
return nil, err
}
return resp.UserIDs, nil
}

@ -80,7 +80,7 @@ func (f *FriendRpcClient) GetFriendIDs(ctx context.Context, ownerUserID string)
return resp.FriendIDs, nil
}
func (b *FriendRpcClient) IsBlocked(ctx context.Context, possibleBlackUserID, userID string) (bool, error) {
func (b *FriendRpcClient) IsBlack(ctx context.Context, possibleBlackUserID, userID string) (bool, error) {
r, err := b.Client.IsBlack(ctx, &friend.IsBlackReq{UserID1: possibleBlackUserID, UserID2: userID})
if err != nil {
return false, err

@ -26,11 +26,9 @@ import (
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
"google.golang.org/grpc"
)
type Group struct {
conn grpc.ClientConnInterface
Client group.GroupClient
discov discoveryregistry.SvcDiscoveryRegistry
Config *config.GlobalConfig
@ -42,7 +40,7 @@ func NewGroup(discov discoveryregistry.SvcDiscoveryRegistry, config *config.Glob
util.ExitWithError(err)
}
client := group.NewGroupClient(conn)
return &Group{discov: discov, conn: conn, Client: client, Config: config}
return &Group{discov: discov, Client: client, Config: config}
}
type GroupRpcClient Group

Loading…
Cancel
Save