From c16f17582a7d6735895306cebcf7a012c371d68e Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Wed, 17 Jan 2024 16:37:26 +0800 Subject: [PATCH] feat: msg rpc local cache --- deployments/templates/openim.yaml | 9 ++- internal/rpc/msg/as_read.go | 6 +- internal/rpc/msg/revoke.go | 9 +-- internal/rpc/msg/send.go | 2 +- internal/rpc/msg/server.go | 6 +- internal/rpc/msg/statistics.go | 4 +- internal/rpc/msg/sync_msg.go | 8 +-- internal/rpc/msg/verify.go | 8 +-- pkg/common/cachekey/user.go | 8 +-- pkg/common/config/config.go | 3 +- pkg/common/db/cache/black.go | 5 +- pkg/common/db/cache/config.go | 6 +- pkg/common/db/cache/conversation.go | 5 +- pkg/common/db/cache/friend.go | 2 +- pkg/common/db/cache/group.go | 4 +- pkg/common/db/cache/user.go | 9 ++- pkg/rpccache/conversation.go | 16 +++++ pkg/rpccache/group.go | 77 +++++++++++++++++++++++ pkg/rpccache/user.go | 94 +++++++++++++++++++++++++++++ 19 files changed, 242 insertions(+), 39 deletions(-) create mode 100644 pkg/rpccache/user.go diff --git a/deployments/templates/openim.yaml b/deployments/templates/openim.yaml index 3a416a806..b7548ffce 100644 --- a/deployments/templates/openim.yaml +++ b/deployments/templates/openim.yaml @@ -529,8 +529,8 @@ prometheus: messageTransferPrometheusPort: [ ${MSG_TRANSFER_PROM_PORT} ] # List of ports localCache: - friend: - topic: delete_cache_friend + user: + topic: delete_cache_user slotNum: 500 slotSize: 20000 @@ -539,6 +539,11 @@ localCache: slotNum: 500 slotSize: 20000 + friend: + topic: delete_cache_friend + slotNum: 500 + slotSize: 20000 + conversation: topic: delete_cache_conversation slotNum: 500 diff --git a/internal/rpc/msg/as_read.go b/internal/rpc/msg/as_read.go index 71e038b39..a2bcb84bf 100644 --- a/internal/rpc/msg/as_read.go +++ b/internal/rpc/msg/as_read.go @@ -43,7 +43,7 @@ func (m *msgServer) GetConversationsHasReadAndMaxSeq(ctx context.Context, req *m if err != nil { return nil, err } - conversations, err := m.Conversation.GetConversations(ctx, req.UserID, conversationIDs) + conversations, err := m.ConversationLocalCache.GetConversations(ctx, req.UserID, conversationIDs) if err != nil { return nil, err } @@ -106,7 +106,7 @@ func (m *msgServer) MarkMsgsAsRead( if hasReadSeq > maxSeq { return nil, errs.ErrArgs.Wrap("hasReadSeq must not be bigger than maxSeq") } - conversation, err := m.Conversation.GetConversation(ctx, req.UserID, req.ConversationID) + conversation, err := m.ConversationLocalCache.GetConversation(ctx, req.UserID, req.ConversationID) if err != nil { return } @@ -135,7 +135,7 @@ func (m *msgServer) MarkConversationAsRead( ctx context.Context, req *msg.MarkConversationAsReadReq, ) (resp *msg.MarkConversationAsReadResp, err error) { - conversation, err := m.Conversation.GetConversation(ctx, req.UserID, req.ConversationID) + conversation, err := m.ConversationLocalCache.GetConversation(ctx, req.UserID, req.ConversationID) if err != nil { return nil, err } diff --git a/internal/rpc/msg/revoke.go b/internal/rpc/msg/revoke.go index d7362d339..8640524ec 100644 --- a/internal/rpc/msg/revoke.go +++ b/internal/rpc/msg/revoke.go @@ -47,7 +47,7 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg. if err := authverify.CheckAccessV3(ctx, req.UserID); err != nil { return nil, err } - user, err := m.User.GetUserInfo(ctx, req.UserID) + user, err := m.UserLocalCache.GetUserInfo(ctx, req.UserID) if err != nil { return nil, err } @@ -73,12 +73,7 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg. } role = user.AppMangerLevel case constant.SuperGroupChatType: - members, err := m.Group.GetGroupMemberInfoMap( - ctx, - msgs[0].GroupID, - utils.Distinct([]string{req.UserID, msgs[0].SendID}), - true, - ) + members, err := m.GroupLocalCache.GetGroupMemberInfoMap(ctx, msgs[0].GroupID, utils.Distinct([]string{req.UserID, msgs[0].SendID})) if err != nil { return nil, err } diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index b1052b192..20e3f85f1 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -98,7 +98,7 @@ func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgDa } tagAll := utils.IsContain(constant.AtAllString, msg.AtUserIDList) if tagAll { - memberUserIDList, err := m.Group.GetGroupMemberIDs(ctx, msg.GroupID) + memberUserIDList, err := m.GroupLocalCache.GetGroupMemberIDs(ctx, msg.GroupID) if err != nil { log.ZWarn(ctx, "GetGroupMemberIDs", err) return diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 798ba0ee6..e1593443d 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -36,9 +36,8 @@ type ( msgServer struct { RegisterCenter discoveryregistry.SvcDiscoveryRegistry MsgDatabase controller.CommonMsgDatabase - Group *rpcclient.GroupRpcClient - User *rpcclient.UserRpcClient Conversation *rpcclient.ConversationRpcClient + UserLocalCache *rpccache.UserLocalCache FriendLocalCache *rpccache.FriendLocalCache GroupLocalCache *rpccache.GroupLocalCache ConversationLocalCache *rpccache.ConversationLocalCache @@ -84,10 +83,9 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e s := &msgServer{ Conversation: &conversationClient, - User: &userRpcClient, - Group: &groupRpcClient, MsgDatabase: msgDatabase, RegisterCenter: client, + UserLocalCache: rpccache.NewUserLocalCache(userRpcClient, rdb), GroupLocalCache: rpccache.NewGroupLocalCache(groupRpcClient, rdb), ConversationLocalCache: rpccache.NewConversationLocalCache(conversationClient, rdb), FriendLocalCache: rpccache.NewFriendLocalCache(friendRpcClient, rdb), diff --git a/internal/rpc/msg/statistics.go b/internal/rpc/msg/statistics.go index ac09e3f69..620e6c7b0 100644 --- a/internal/rpc/msg/statistics.go +++ b/internal/rpc/msg/statistics.go @@ -41,7 +41,7 @@ func (m *msgServer) GetActiveUser(ctx context.Context, req *msg.GetActiveUserReq var pbUsers []*msg.ActiveUser if len(users) > 0 { userIDs := utils.Slice(users, func(e *unrelation.UserCount) string { return e.UserID }) - userMap, err := m.User.GetUsersInfoMap(ctx, userIDs) + userMap, err := m.UserLocalCache.GetUsersInfoMap(ctx, userIDs) if err != nil { return nil, err } @@ -83,7 +83,7 @@ func (m *msgServer) GetActiveGroup(ctx context.Context, req *msg.GetActiveGroupR var pbgroups []*msg.ActiveGroup if len(groups) > 0 { groupIDs := utils.Slice(groups, func(e *unrelation.GroupCount) string { return e.GroupID }) - resp, err := m.Group.GetGroupInfos(ctx, groupIDs, false) + resp, err := m.GroupLocalCache.GetGroupInfos(ctx, groupIDs) if err != nil { return nil, err } diff --git a/internal/rpc/msg/sync_msg.go b/internal/rpc/msg/sync_msg.go index dbd8da4d8..404ca6218 100644 --- a/internal/rpc/msg/sync_msg.go +++ b/internal/rpc/msg/sync_msg.go @@ -37,7 +37,7 @@ func (m *msgServer) PullMessageBySeqs( resp.NotificationMsgs = make(map[string]*sdkws.PullMsgs) for _, seq := range req.SeqRanges { if !msgprocessor.IsNotification(seq.ConversationID) { - conversation, err := m.Conversation.GetConversation(ctx, req.UserID, seq.ConversationID) + conversation, err := m.ConversationLocalCache.GetConversation(ctx, req.UserID, seq.ConversationID) if err != nil { log.ZError(ctx, "GetConversation error", err, "conversationID", seq.ConversationID) continue @@ -140,7 +140,7 @@ func (m *msgServer) SearchMessage(ctx context.Context, req *msg.SearchMessageReq } } if len(sendIDs) != 0 { - sendInfos, err := m.User.GetUsersInfo(ctx, sendIDs) + sendInfos, err := m.UserLocalCache.GetUsersInfo(ctx, sendIDs) if err != nil { return nil, err } @@ -149,7 +149,7 @@ func (m *msgServer) SearchMessage(ctx context.Context, req *msg.SearchMessageReq } } if len(recvIDs) != 0 { - recvInfos, err := m.User.GetUsersInfo(ctx, recvIDs) + recvInfos, err := m.UserLocalCache.GetUsersInfo(ctx, recvIDs) if err != nil { return nil, err } @@ -158,7 +158,7 @@ func (m *msgServer) SearchMessage(ctx context.Context, req *msg.SearchMessageReq } } if len(groupIDs) != 0 { - groupInfos, err := m.Group.GetGroupInfos(ctx, groupIDs, true) + groupInfos, err := m.GroupLocalCache.GetGroupInfos(ctx, groupIDs) if err != nil { return nil, err } diff --git a/internal/rpc/msg/verify.go b/internal/rpc/msg/verify.go index 0af56aaa2..50b7718ce 100644 --- a/internal/rpc/msg/verify.go +++ b/internal/rpc/msg/verify.go @@ -78,7 +78,7 @@ func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgRe } return nil case constant.SuperGroupChatType: - groupInfo, err := m.Group.GetGroupInfoCache(ctx, data.MsgData.GroupID) + groupInfo, err := m.GroupLocalCache.GetGroupInfo(ctx, data.MsgData.GroupID) if err != nil { return err } @@ -104,9 +104,9 @@ func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgRe return errs.ErrNotInGroupYet.Wrap() } - groupMemberInfo, err := m.Group.GetGroupMemberCache(ctx, data.MsgData.GroupID, data.MsgData.SendID) + groupMemberInfo, err := m.GroupLocalCache.GetGroupMember(ctx, data.MsgData.GroupID, data.MsgData.SendID) if err != nil { - if err == errs.ErrRecordNotFound { + if errs.ErrRecordNotFound.Is(err) { return errs.ErrNotInGroupYet.Wrap(err.Error()) } return err @@ -188,7 +188,7 @@ func (m *msgServer) modifyMessageByUserMessageReceiveOpt( pb *msg.SendMsgReq, ) (bool, error) { defer log.ZDebug(ctx, "modifyMessageByUserMessageReceiveOpt return") - opt, err := m.User.GetUserGlobalMsgRecvOpt(ctx, userID) // todo local cache + opt, err := m.UserLocalCache.GetUserGlobalMsgRecvOpt(ctx, userID) if err != nil { return false, err } diff --git a/pkg/common/cachekey/user.go b/pkg/common/cachekey/user.go index fbea5168b..3fb877e22 100644 --- a/pkg/common/cachekey/user.go +++ b/pkg/common/cachekey/user.go @@ -1,14 +1,14 @@ package cachekey const ( - userInfoKey = "USER_INFO:" - userGlobalRecvMsgOptKey = "USER_GLOBAL_RECV_MSG_OPT_KEY:" + UserInfoKey = "USER_INFO:" + UserGlobalRecvMsgOptKey = "USER_GLOBAL_RECV_MSG_OPT_KEY:" ) func GetUserInfoKey(userID string) string { - return userInfoKey + userID + return UserInfoKey + userID } func GetUserGlobalRecvMsgOptKey(userID string) string { - return userGlobalRecvMsgOptKey + userID + return UserGlobalRecvMsgOptKey + userID } diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index b06209c16..3cc1f4e6e 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -383,8 +383,9 @@ func (l LocalCache) Enable() bool { } type localCache struct { - Friend LocalCache `yaml:"friend"` + User LocalCache `yaml:"user"` Group LocalCache `yaml:"group"` + Friend LocalCache `yaml:"friend"` Conversation LocalCache `yaml:"conversation"` } diff --git a/pkg/common/db/cache/black.go b/pkg/common/db/cache/black.go index 716c9eef0..8328306ff 100644 --- a/pkg/common/db/cache/black.go +++ b/pkg/common/db/cache/black.go @@ -16,6 +16,7 @@ package cache import ( "context" + "github.com/OpenIMSDK/tools/log" "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "time" @@ -55,7 +56,9 @@ func NewBlackCacheRedis( ) BlackCache { rcClient := rockscache.NewClient(rdb, options) mc := NewMetaCacheRedis(rcClient) - mc.SetTopic(config.Config.LocalCache.Friend.Topic) + b := config.Config.LocalCache.Friend + log.ZDebug(context.Background(), "black local cache init", "Topic", b.Topic, "SlotNum", b.SlotNum, "SlotSize", b.SlotSize, "enable", b.Enable()) + mc.SetTopic(b.Topic) mc.SetRawRedisClient(rdb) return &BlackCacheRedis{ expireTime: blackExpireTime, diff --git a/pkg/common/db/cache/config.go b/pkg/common/db/cache/config.go index 7fd08e247..52ece95f7 100644 --- a/pkg/common/db/cache/config.go +++ b/pkg/common/db/cache/config.go @@ -21,9 +21,13 @@ func getPublishKey(topic string, key []string) []string { Local config.LocalCache Keys []string }{ + { + Local: config.Config.LocalCache.User, + Keys: []string{cachekey.UserInfoKey, cachekey.UserGlobalRecvMsgOptKey}, + }, { Local: config.Config.LocalCache.Group, - Keys: []string{cachekey.GroupMemberIDsKey}, + Keys: []string{cachekey.GroupMemberIDsKey, cachekey.GroupInfoKey, cachekey.GroupMemberInfoKey}, }, { Local: config.Config.LocalCache.Friend, diff --git a/pkg/common/db/cache/conversation.go b/pkg/common/db/cache/conversation.go index 9c0391e67..61489ff92 100644 --- a/pkg/common/db/cache/conversation.go +++ b/pkg/common/db/cache/conversation.go @@ -17,6 +17,7 @@ package cache import ( "context" "errors" + "github.com/OpenIMSDK/tools/log" "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "math/big" @@ -87,7 +88,9 @@ type ConversationCache interface { func NewConversationRedis(rdb redis.UniversalClient, opts rockscache.Options, db relationtb.ConversationModelInterface) ConversationCache { rcClient := rockscache.NewClient(rdb, opts) mc := NewMetaCacheRedis(rcClient) - mc.SetTopic(config.Config.LocalCache.Conversation.Topic) + c := config.Config.LocalCache.Conversation + log.ZDebug(context.Background(), "black local cache init", "Topic", c.Topic, "SlotNum", c.SlotNum, "SlotSize", c.SlotSize, "enable", c.Enable()) + mc.SetTopic(c.Topic) mc.SetRawRedisClient(rdb) return &ConversationRedisCache{ rcClient: rcClient, diff --git a/pkg/common/db/cache/friend.go b/pkg/common/db/cache/friend.go index 432d08572..d09d00312 100644 --- a/pkg/common/db/cache/friend.go +++ b/pkg/common/db/cache/friend.go @@ -65,7 +65,7 @@ func NewFriendCacheRedis(rdb redis.UniversalClient, friendDB relationtb.FriendMo rcClient := rockscache.NewClient(rdb, options) mc := NewMetaCacheRedis(rcClient) f := config.Config.LocalCache.Friend - log.ZDebug(context.Background(), "friend local cache init", "Topic", f.Topic, "SlotNum", f.SlotNum, "SlotSize", f.SlotSize) + log.ZDebug(context.Background(), "friend local cache init", "Topic", f.Topic, "SlotNum", f.SlotNum, "SlotSize", f.SlotSize, "enable", f.Enable()) mc.SetTopic(f.Topic) mc.SetRawRedisClient(rdb) return &FriendCacheRedis{ diff --git a/pkg/common/db/cache/group.go b/pkg/common/db/cache/group.go index 783f2e515..71f5d06fd 100644 --- a/pkg/common/db/cache/group.go +++ b/pkg/common/db/cache/group.go @@ -106,7 +106,9 @@ func NewGroupCacheRedis( ) GroupCache { rcClient := rockscache.NewClient(rdb, opts) mc := NewMetaCacheRedis(rcClient) - mc.SetTopic(config.Config.LocalCache.Group.Topic) + g := config.Config.LocalCache.Group + mc.SetTopic(g.Topic) + log.ZDebug(context.Background(), "group local cache init", "Topic", g.Topic, "SlotNum", g.SlotNum, "SlotSize", g.SlotSize, "enable", g.Enable()) mc.SetRawRedisClient(rdb) return &GroupCacheRedis{ rcClient: rcClient, expireTime: groupExpireTime, diff --git a/pkg/common/db/cache/user.go b/pkg/common/db/cache/user.go index 14ed7988e..c18f2af25 100644 --- a/pkg/common/db/cache/user.go +++ b/pkg/common/db/cache/user.go @@ -19,6 +19,7 @@ import ( "encoding/json" "errors" "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "hash/crc32" "strconv" "time" @@ -73,7 +74,11 @@ func NewUserCacheRedis( options rockscache.Options, ) UserCache { rcClient := rockscache.NewClient(rdb, options) - + mc := NewMetaCacheRedis(rcClient) + u := config.Config.LocalCache.User + log.ZDebug(context.Background(), "user local cache init", "Topic", u.Topic, "SlotNum", u.SlotNum, "SlotSize", u.SlotSize, "enable", u.Enable()) + mc.SetTopic(u.Topic) + mc.SetRawRedisClient(rdb) return &UserCacheRedis{ rdb: rdb, metaCache: NewMetaCacheRedis(rcClient), @@ -86,7 +91,7 @@ func NewUserCacheRedis( func (u *UserCacheRedis) NewCache() UserCache { return &UserCacheRedis{ rdb: u.rdb, - metaCache: NewMetaCacheRedis(u.rcClient, u.metaCache.GetPreDelKeys()...), + metaCache: u.Copy(), userDB: u.userDB, expireTime: u.expireTime, rcClient: u.rcClient, diff --git a/pkg/rpccache/conversation.go b/pkg/rpccache/conversation.go index 95520f7cb..061b05bec 100644 --- a/pkg/rpccache/conversation.go +++ b/pkg/rpccache/conversation.go @@ -3,6 +3,7 @@ package rpccache import ( "context" pbconversation "github.com/OpenIMSDK/protocol/conversation" + "github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/log" "github.com/openimsdk/localcache" "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" @@ -69,3 +70,18 @@ func (c *ConversationLocalCache) GetSingleConversationRecvMsgOpt(ctx context.Con } return conv.RecvMsgOpt, nil } + +func (c *ConversationLocalCache) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*pbconversation.Conversation, error) { + conversations := make([]*pbconversation.Conversation, 0, len(conversationIDs)) + for _, conversationID := range conversationIDs { + conversation, err := c.GetConversation(ctx, ownerUserID, conversationID) + if err != nil { + if errs.ErrRecordNotFound.Is(err) { + continue + } + return nil, err + } + conversations = append(conversations, conversation) + } + return conversations, nil +} diff --git a/pkg/rpccache/group.go b/pkg/rpccache/group.go index c3dfd9d93..4b4720539 100644 --- a/pkg/rpccache/group.go +++ b/pkg/rpccache/group.go @@ -2,6 +2,8 @@ package rpccache import ( "context" + "github.com/OpenIMSDK/protocol/sdkws" + "github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/log" "github.com/openimsdk/localcache" "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" @@ -61,3 +63,78 @@ func (g *GroupLocalCache) GetGroupMemberIDMap(ctx context.Context, groupID strin } return res.Map, nil } + +func (g *GroupLocalCache) GetGroupInfo(ctx context.Context, groupID string) (val *sdkws.GroupInfo, err error) { + log.ZDebug(ctx, "GroupLocalCache GetGroupInfo req", "groupID", groupID) + defer func() { + if err == nil { + log.ZDebug(ctx, "GroupLocalCache GetGroupInfo return", "value", val) + } else { + log.ZError(ctx, "GroupLocalCache GetGroupInfo return", err) + } + }() + return localcache.AnyValue[*sdkws.GroupInfo](g.local.Get(ctx, cachekey.GetGroupMemberIDsKey(groupID), func(ctx context.Context) (any, error) { + log.ZDebug(ctx, "GroupLocalCache GetGroupInfo rpc", "groupID", groupID) + return g.client.GetGroupInfoCache(ctx, groupID) + })) +} + +func (g *GroupLocalCache) GetGroupInfos(ctx context.Context, groupIDs []string) ([]*sdkws.GroupInfo, error) { + groupInfos := make([]*sdkws.GroupInfo, 0, len(groupIDs)) + for _, groupID := range groupIDs { + groupInfo, err := g.GetGroupInfo(ctx, groupID) + if err != nil { + if errs.ErrRecordNotFound.Is(err) { + continue + } + return nil, err + } + groupInfos = append(groupInfos, groupInfo) + } + return groupInfos, nil +} + +func (g *GroupLocalCache) GetGroupMember(ctx context.Context, groupID, userID string) (val *sdkws.GroupMemberFullInfo, err error) { + log.ZDebug(ctx, "GroupLocalCache GetGroupInfo req", "groupID", groupID, "userID", userID) + defer func() { + if err == nil { + log.ZDebug(ctx, "GroupLocalCache GetGroupInfo return", "value", val) + } else { + log.ZError(ctx, "GroupLocalCache GetGroupInfo return", err) + } + }() + return localcache.AnyValue[*sdkws.GroupMemberFullInfo](g.local.Get(ctx, cachekey.GetGroupMemberInfoKey(groupID, userID), func(ctx context.Context) (any, error) { + log.ZDebug(ctx, "GroupLocalCache GetGroupInfo rpc", "groupID", groupID, "userID", userID) + return g.client.GetGroupMemberCache(ctx, groupID, userID) + })) +} + +func (g *GroupLocalCache) GetGroupMembers(ctx context.Context, groupID string, userIDs []string) ([]*sdkws.GroupMemberFullInfo, error) { + members := make([]*sdkws.GroupMemberFullInfo, 0, len(userIDs)) + for _, userID := range userIDs { + member, err := g.GetGroupMember(ctx, groupID, userID) + if err != nil { + if errs.ErrRecordNotFound.Is(err) { + continue + } + return nil, err + } + members = append(members, member) + } + return members, nil +} + +func (g *GroupLocalCache) GetGroupMemberInfoMap(ctx context.Context, groupID string, userIDs []string) (map[string]*sdkws.GroupMemberFullInfo, error) { + members := make(map[string]*sdkws.GroupMemberFullInfo) + for _, userID := range userIDs { + member, err := g.GetGroupMember(ctx, groupID, userID) + if err != nil { + if errs.ErrRecordNotFound.Is(err) { + continue + } + return nil, err + } + members[userID] = member + } + return members, nil +} diff --git a/pkg/rpccache/user.go b/pkg/rpccache/user.go new file mode 100644 index 000000000..fd4c24c75 --- /dev/null +++ b/pkg/rpccache/user.go @@ -0,0 +1,94 @@ +package rpccache + +import ( + "context" + "github.com/OpenIMSDK/protocol/sdkws" + "github.com/OpenIMSDK/tools/errs" + "github.com/OpenIMSDK/tools/log" + "github.com/openimsdk/localcache" + "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" + "github.com/redis/go-redis/v9" +) + +func NewUserLocalCache(client rpcclient.UserRpcClient, cli redis.UniversalClient) *UserLocalCache { + lc := config.Config.LocalCache.User + log.ZDebug(context.Background(), "UserLocalCache", "topic", lc.Topic, "slotNum", lc.SlotNum, "slotSize", lc.SlotSize, "enable", lc.Enable()) + x := &UserLocalCache{ + client: client, + local: localcache.New[any]( + localcache.WithLocalSlotNum(lc.SlotNum), + localcache.WithLocalSlotSize(lc.SlotSize), + ), + } + if lc.Enable() { + go subscriberRedisDeleteCache(context.Background(), cli, lc.Topic, x.local.DelLocal) + } + return x +} + +type UserLocalCache struct { + client rpcclient.UserRpcClient + local localcache.Cache[any] +} + +func (u *UserLocalCache) GetUserInfo(ctx context.Context, userID string) (val *sdkws.UserInfo, err error) { + log.ZDebug(ctx, "UserLocalCache GetUserInfo req", "userID", userID) + defer func() { + if err == nil { + log.ZDebug(ctx, "UserLocalCache GetUserInfo return", "value", val) + } else { + log.ZError(ctx, "UserLocalCache GetUserInfo return", err) + } + }() + return localcache.AnyValue[*sdkws.UserInfo](u.local.Get(ctx, cachekey.GetUserInfoKey(userID), func(ctx context.Context) (any, error) { + log.ZDebug(ctx, "UserLocalCache GetUserInfo rpc", "userID", userID) + return u.client.GetUserInfo(ctx, userID) + })) +} + +func (u *UserLocalCache) GetUserGlobalMsgRecvOpt(ctx context.Context, userID string) (val int32, err error) { + log.ZDebug(ctx, "UserLocalCache GetUserGlobalMsgRecvOpt req", "userID", userID) + defer func() { + if err == nil { + log.ZDebug(ctx, "UserLocalCache GetUserGlobalMsgRecvOpt return", "value", val) + } else { + log.ZError(ctx, "UserLocalCache GetUserGlobalMsgRecvOpt return", err) + } + }() + return localcache.AnyValue[int32](u.local.Get(ctx, cachekey.GetUserGlobalRecvMsgOptKey(userID), func(ctx context.Context) (any, error) { + log.ZDebug(ctx, "UserLocalCache GetUserGlobalMsgRecvOpt rpc", "userID", userID) + return u.client.GetUserGlobalMsgRecvOpt(ctx, userID) + })) +} + +func (u *UserLocalCache) GetUsersInfo(ctx context.Context, userIDs []string) ([]*sdkws.UserInfo, error) { + users := make([]*sdkws.UserInfo, 0, len(userIDs)) + for _, userID := range userIDs { + user, err := u.GetUserInfo(ctx, userID) + if err != nil { + if errs.ErrRecordNotFound.Is(err) { + continue + } + return nil, err + } + users = append(users, user) + } + return users, nil +} + +func (u *UserLocalCache) GetUsersInfoMap(ctx context.Context, userIDs []string) (map[string]*sdkws.UserInfo, error) { + users := make(map[string]*sdkws.UserInfo, len(userIDs)) + for _, userID := range userIDs { + user, err := u.GetUserInfo(ctx, userID) + if err != nil { + if errs.ErrRecordNotFound.Is(err) { + continue + } + return nil, err + } + users[userID] = user + } + return users, nil +}