diff --git a/deployments/templates/config.yaml b/deployments/templates/config.yaml index 0aa6e68d6..5da6d5d0b 100644 --- a/deployments/templates/config.yaml +++ b/deployments/templates/config.yaml @@ -535,3 +535,39 @@ prometheus: rtcPrometheusPort: [ ${RTC_PROM_PORT} ] thirdPrometheusPort: [ ${THIRD_PROM_PORT} ] messageTransferPrometheusPort: [ ${MSG_TRANSFER_PROM_PORT} ] # List of ports + +###################### LocalCache configuration information ###################### +# topic: redis subscriber channel +# slotNum: number of slots, multiple slots can prevent too many keys from competing for a lock +# slotSize: number of slots, the number of cached keys per slot, the overall cache quantity is slotNum * slotSize +# successExpire: successful cache time seconds +# failedExpire: failed cache time seconds +# disable local caching and annotate topic, slotNum, and slotSize +localCache: + user: + topic: DELETE_CACHE_USER + slotNum: 100 + slotSize: 2000 + successExpire: 300 + failedExpire: 5 + + group: + topic: DELETE_CACHE_GROUP + slotNum: 100 + slotSize: 2000 + successExpire: 300 + failedExpire: 5 + + friend: + topic: DELETE_CACHE_FRIEND + slotNum: 100 + slotSize: 2000 + successExpire: 300 + failedExpire: 5 + + conversation: + topic: DELETE_CACHE_CONVERSATION + slotNum: 100 + slotSize: 2000 + successExpire: 300 + failedExpire: 5 \ No newline at end of file diff --git a/go.mod b/go.mod index 38443ba5a..10f08fa53 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/minio/minio-go/v7 v7.0.67 github.com/mitchellh/mapstructure v1.5.0 github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 // indirect + github.com/openimsdk/localcache v0.0.1 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/robfig/cron/v3 v3.0.1 @@ -84,6 +85,7 @@ require ( github.com/hashicorp/errwrap v1.0.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-uuid v1.0.3 // indirect + github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect github.com/jcmturner/aescts/v2 v2.0.0 // indirect @@ -162,3 +164,5 @@ require ( golang.org/x/crypto v0.19.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect ) + +replace github.com/openimsdk/localcache => ./pkg/localcache diff --git a/go.sum b/go.sum index aa41279af..0bba2d65f 100644 --- a/go.sum +++ b/go.sum @@ -16,6 +16,12 @@ cloud.google.com/go/storage v1.36.0/go.mod h1:M6M/3V/D3KpzMTJyPOR/HU6n2Si5QdaXYE firebase.google.com/go v3.13.0+incompatible h1:3TdYC3DDi6aHn20qoRkxwGqNgdjtblwVAyRLQwGn/+4= firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIwjt8toICdV5Wh9ptHs= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c= +github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= +github.com/OpenIMSDK/protocol v0.0.53 h1:PtePLTqMYRHjWSf8XIL3x5JRC3YoySTMA6tRKfbUjQY= +github.com/OpenIMSDK/protocol v0.0.53/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= +github.com/OpenIMSDK/tools v0.0.29 h1:NS4PEwYl9sX3SWsMjDOLVxMo3LcTWREMr+2cjzWjcqc= +github.com/OpenIMSDK/tools v0.0.29/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI= github.com/IBM/sarama v1.42.2 h1:VoY4hVIZ+WQJ8G9KNY/SQlWguBQXQ9uvFPOnrcu8hEw= github.com/IBM/sarama v1.42.2/go.mod h1:FLPGUGwYqEs62hq2bVG6Io2+5n+pS6s/WOXVKWSLFtE= github.com/OpenIMSDK/protocol v0.0.55 h1:eBjg8DyuhxGmuCUjpoZjg6MJJJXU/xJ3xJwFhrn34yA= @@ -178,6 +184,8 @@ github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9 github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= diff --git a/internal/msggateway/callback.go b/internal/msggateway/callback.go index ab8c1f51f..afb83bcc4 100644 --- a/internal/msggateway/callback.go +++ b/internal/msggateway/callback.go @@ -100,7 +100,7 @@ func CallbackUserKickOff(ctx context.Context, globalConfig *config.GlobalConfig, // func callbackUserOnline(operationID, userID string, platformID int, token string, isAppBackground bool, connID // string) cbApi.CommonCallbackResp { // callbackResp := cbApi.CommonCallbackResp{OperationID: operationID} -// if !config.Config.Callback.CallbackUserOnline.Enable { +// if !config.Config.Callback.CallbackUserOnline.WithEnable { // return callbackResp // } // callbackUserOnlineReq := cbApi.CallbackUserOnlineReq{ @@ -129,7 +129,7 @@ func CallbackUserKickOff(ctx context.Context, globalConfig *config.GlobalConfig, //} //func callbackUserOffline(operationID, userID string, platformID int, connID string) cbApi.CommonCallbackResp { // callbackResp := cbApi.CommonCallbackResp{OperationID: operationID} -// if !config.Config.Callback.CallbackUserOffline.Enable { +// if !config.Config.Callback.CallbackUserOffline.WithEnable { // return callbackResp // } // callbackOfflineReq := cbApi.CallbackUserOfflineReq{ @@ -156,7 +156,7 @@ func CallbackUserKickOff(ctx context.Context, globalConfig *config.GlobalConfig, //} //func callbackUserKickOff(operationID string, userID string, platformID int) cbApi.CommonCallbackResp { // callbackResp := cbApi.CommonCallbackResp{OperationID: operationID} -// if !config.Config.Callback.CallbackUserKickOff.Enable { +// if !config.Config.Callback.CallbackUserKickOff.WithEnable { // return callbackResp // } // callbackUserKickOffReq := cbApi.CallbackUserKickOffReq{ diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index 739c4232f..146565561 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -23,7 +23,6 @@ import ( "github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/log" "github.com/OpenIMSDK/tools/mcontext" - "github.com/OpenIMSDK/tools/utils" "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" @@ -58,22 +57,25 @@ type Server struct { rpcPort int prometheusPort int LongConnServer LongConnServer - pushTerminal []int config *config.GlobalConfig + pushTerminal map[int]struct{} } func (s *Server) SetLongConnServer(LongConnServer LongConnServer) { s.LongConnServer = LongConnServer } -func NewServer(rpcPort int, proPort int, longConnServer LongConnServer, config *config.GlobalConfig) *Server { - return &Server{ +func NewServer(rpcPort int, proPort int, longConnServer LongConnServer, conf *config.GlobalConfig) *Server { + s := &Server{ rpcPort: rpcPort, prometheusPort: proPort, LongConnServer: longConnServer, - pushTerminal: []int{constant.IOSPlatformID, constant.AndroidPlatformID}, - config: config, + pushTerminal: make(map[int]struct{}), + config: conf, } + s.pushTerminal[constant.IOSPlatformID] = struct{}{} + s.pushTerminal[constant.AndroidPlatformID] = struct{}{} + return s } func (s *Server) OnlinePushMsg( @@ -127,13 +129,9 @@ func (s *Server) OnlineBatchPushOneMsg( panic("implement me") } -func (s *Server) SuperGroupOnlineBatchPushOneMsg( - ctx context.Context, - req *msggateway.OnlineBatchPushOneMsgReq, +func (s *Server) SuperGroupOnlineBatchPushOneMsg(ctx context.Context, req *msggateway.OnlineBatchPushOneMsgReq, ) (*msggateway.OnlineBatchPushOneMsgResp, error) { - var singleUserResults []*msggateway.SingleMsgToUserResults - for _, v := range req.PushToUserIDs { var resp []*msggateway.SingleMsgToUserPlatform results := &msggateway.SingleMsgToUserResults{ @@ -154,23 +152,22 @@ func (s *Server) SuperGroupOnlineBatchPushOneMsg( } userPlatform := &msggateway.SingleMsgToUserPlatform{ - RecvID: v, RecvPlatFormID: int32(client.PlatformID), } if !client.IsBackground || (client.IsBackground && client.PlatformID != constant.IOSPlatformID) { err := client.PushMessage(ctx, req.MsgData) if err != nil { - userPlatform.ResultCode = -2 + userPlatform.ResultCode = int64(errs.ErrPushMsgErr.Code()) resp = append(resp, userPlatform) } else { - if utils.IsContainInt(client.PlatformID, s.pushTerminal) { + if _, ok := s.pushTerminal[client.PlatformID]; ok { results.OnlinePush = true resp = append(resp, userPlatform) } } } else { - userPlatform.ResultCode = -3 + userPlatform.ResultCode = int64(errs.ErrIOSBackgroundPushErr.Code()) resp = append(resp, userPlatform) } } diff --git a/internal/push/push_rpc_server.go b/internal/push/push_rpc_server.go index 2d2f95336..d5d4c9242 100644 --- a/internal/push/push_rpc_server.go +++ b/internal/push/push_rpc_server.go @@ -16,6 +16,7 @@ package push import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/rpccache" "github.com/OpenIMSDK/protocol/constant" pbpush "github.com/OpenIMSDK/protocol/push" @@ -25,7 +26,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "google.golang.org/grpc" ) @@ -51,8 +51,8 @@ func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryReg client, offlinePusher, database, - localcache.NewGroupLocalCache(&groupRpcClient), - localcache.NewConversationLocalCache(&conversationRpcClient), + rpccache.NewGroupLocalCache(groupRpcClient, rdb), + rpccache.NewConversationLocalCache(conversationRpcClient, rdb), &conversationRpcClient, &groupRpcClient, &msgRpcClient, diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index 49bce70ab..2bf17eaf9 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "errors" + "github.com/openimsdk/open-im-server/v3/pkg/rpccache" "sync" "github.com/OpenIMSDK/protocol/constant" @@ -36,7 +37,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" @@ -49,8 +49,8 @@ type Pusher struct { database controller.PushDatabase discov discoveryregistry.SvcDiscoveryRegistry offlinePusher offlinepush.OfflinePusher - groupLocalCache *localcache.GroupLocalCache - conversationLocalCache *localcache.ConversationLocalCache + groupLocalCache *rpccache.GroupLocalCache + conversationLocalCache *rpccache.ConversationLocalCache msgRpcClient *rpcclient.MessageRpcClient conversationRpcClient *rpcclient.ConversationRpcClient groupRpcClient *rpcclient.GroupRpcClient @@ -59,7 +59,7 @@ type Pusher struct { var errNoOfflinePusher = errors.New("no offlinePusher is configured") func NewPusher(config *config.GlobalConfig, discov discoveryregistry.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase, - groupLocalCache *localcache.GroupLocalCache, conversationLocalCache *localcache.ConversationLocalCache, + groupLocalCache *rpccache.GroupLocalCache, conversationLocalCache *rpccache.ConversationLocalCache, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient, msgRpcClient *rpcclient.MessageRpcClient, ) *Pusher { return &Pusher{ diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 15e7998af..81b4d9d45 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -48,14 +48,6 @@ type conversationServer struct { config *config.GlobalConfig } -func (c *conversationServer) GetConversationNotReceiveMessageUserIDs( - ctx context.Context, - req *pbconversation.GetConversationNotReceiveMessageUserIDsReq, -) (*pbconversation.GetConversationNotReceiveMessageUserIDsResp, error) { - //TODO implement me - panic("implement me") -} - func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { rdb, err := cache.NewRedis(config) if err != nil { @@ -96,11 +88,11 @@ func (c *conversationServer) GetConversation(ctx context.Context, req *pbconvers return resp, nil } -func (m *conversationServer) GetSortedConversationList(ctx context.Context, req *pbconversation.GetSortedConversationListReq) (resp *pbconversation.GetSortedConversationListResp, err error) { +func (c *conversationServer) GetSortedConversationList(ctx context.Context, req *pbconversation.GetSortedConversationListReq) (resp *pbconversation.GetSortedConversationListResp, err error) { log.ZDebug(ctx, "GetSortedConversationList", "seqs", req, "userID", req.UserID) var conversationIDs []string if len(req.ConversationIDs) == 0 { - conversationIDs, err = m.conversationDatabase.GetConversationIDs(ctx, req.UserID) + conversationIDs, err = c.conversationDatabase.GetConversationIDs(ctx, req.UserID) if err != nil { return nil, err } @@ -108,7 +100,7 @@ func (m *conversationServer) GetSortedConversationList(ctx context.Context, req conversationIDs = req.ConversationIDs } - conversations, err := m.conversationDatabase.FindConversations(ctx, req.UserID, conversationIDs) + conversations, err := c.conversationDatabase.FindConversations(ctx, req.UserID, conversationIDs) if err != nil { return nil, err } @@ -116,22 +108,22 @@ func (m *conversationServer) GetSortedConversationList(ctx context.Context, req return nil, errs.ErrRecordNotFound.Wrap() } - maxSeqs, err := m.msgRpcClient.GetMaxSeqs(ctx, conversationIDs) + maxSeqs, err := c.msgRpcClient.GetMaxSeqs(ctx, conversationIDs) if err != nil { return nil, err } - chatLogs, err := m.msgRpcClient.GetMsgByConversationIDs(ctx, conversationIDs, maxSeqs) + chatLogs, err := c.msgRpcClient.GetMsgByConversationIDs(ctx, conversationIDs, maxSeqs) if err != nil { return nil, err } - conversationMsg, err := m.getConversationInfo(ctx, chatLogs, req.UserID) + conversationMsg, err := c.getConversationInfo(ctx, chatLogs, req.UserID) if err != nil { return nil, err } - hasReadSeqs, err := m.msgRpcClient.GetHasReadSeqs(ctx, req.UserID, conversationIDs) + hasReadSeqs, err := c.msgRpcClient.GetHasReadSeqs(ctx, req.UserID, conversationIDs) if err != nil { return nil, err } @@ -163,8 +155,8 @@ func (m *conversationServer) GetSortedConversationList(ctx context.Context, req UnreadTotal: unreadTotal, } - m.conversationSort(conversation_isPinTime, resp, conversation_unreadCount, conversationMsg) - m.conversationSort(conversation_notPinTime, resp, conversation_unreadCount, conversationMsg) + c.conversationSort(conversation_isPinTime, resp, conversation_unreadCount, conversationMsg) + c.conversationSort(conversation_notPinTime, resp, conversation_unreadCount, conversationMsg) resp.ConversationElems = utils.Paginate(resp.ConversationElems, int(req.Pagination.GetPageNumber()), int(req.Pagination.GetShowNumber())) return resp, nil @@ -535,3 +527,11 @@ func (c *conversationServer) getConversationInfo( } return conversationMsg, nil } + +func (c *conversationServer) GetConversationNotReceiveMessageUserIDs(ctx context.Context, req *pbconversation.GetConversationNotReceiveMessageUserIDsReq) (*pbconversation.GetConversationNotReceiveMessageUserIDsResp, error) { + userIDs, err := c.conversationDatabase.GetConversationNotReceiveMessageUserIDs(ctx, req.ConversationID) + if err != nil { + return nil, err + } + return &pbconversation.GetConversationNotReceiveMessageUserIDsResp{UserIDs: userIDs}, nil +} diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index 6403a4159..9466c5224 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -16,26 +16,31 @@ package friend import ( "context" + "github.com/OpenIMSDK/tools/tx" + + "github.com/OpenIMSDK/protocol/sdkws" + + "github.com/openimsdk/open-im-server/v3/pkg/authverify" + + "github.com/OpenIMSDK/tools/log" + + "github.com/openimsdk/open-im-server/v3/pkg/common/convert" + "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" + + "google.golang.org/grpc" "github.com/OpenIMSDK/protocol/constant" pbfriend "github.com/OpenIMSDK/protocol/friend" - "github.com/OpenIMSDK/protocol/sdkws" registry "github.com/OpenIMSDK/tools/discoveryregistry" "github.com/OpenIMSDK/tools/errs" - "github.com/OpenIMSDK/tools/log" - "github.com/OpenIMSDK/tools/tx" "github.com/OpenIMSDK/tools/utils" - "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo" tablerelation "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification" - "google.golang.org/grpc" ) type friendServer struct { diff --git a/internal/rpc/msg/as_read.go b/internal/rpc/msg/as_read.go index f16fdc62f..ef7c72368 100644 --- a/internal/rpc/msg/as_read.go +++ b/internal/rpc/msg/as_read.go @@ -41,7 +41,7 @@ func (m *msgServer) GetConversationsHasReadAndMaxSeq(ctx context.Context, req *m if err != nil { return nil, err } - conversations, err := m.Conversation.GetConversations(ctx, req.UserID, conversationIDs) + conversations, err := m.ConversationLocalCache.GetConversations(ctx, req.UserID, conversationIDs) if err != nil { return nil, err } @@ -104,7 +104,7 @@ func (m *msgServer) MarkMsgsAsRead( if hasReadSeq > maxSeq { return nil, errs.ErrArgs.Wrap("hasReadSeq must not be bigger than maxSeq") } - conversation, err := m.Conversation.GetConversation(ctx, req.UserID, req.ConversationID) + conversation, err := m.ConversationLocalCache.GetConversation(ctx, req.UserID, req.ConversationID) if err != nil { return } @@ -144,7 +144,7 @@ func (m *msgServer) MarkConversationAsRead( ctx context.Context, req *msg.MarkConversationAsReadReq, ) (resp *msg.MarkConversationAsReadResp, err error) { - conversation, err := m.Conversation.GetConversation(ctx, req.UserID, req.ConversationID) + conversation, err := m.ConversationLocalCache.GetConversation(ctx, req.UserID, req.ConversationID) if err != nil { return nil, err } diff --git a/internal/rpc/msg/revoke.go b/internal/rpc/msg/revoke.go index 5ac8804c2..99690b0cc 100644 --- a/internal/rpc/msg/revoke.go +++ b/internal/rpc/msg/revoke.go @@ -44,7 +44,7 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg. if err := authverify.CheckAccessV3(ctx, req.UserID, m.config); err != nil { return nil, err } - user, err := m.User.GetUserInfo(ctx, req.UserID) + user, err := m.UserLocalCache.GetUserInfo(ctx, req.UserID) if err != nil { return nil, err } @@ -70,12 +70,7 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg. } role = user.AppMangerLevel case constant.SuperGroupChatType: - members, err := m.Group.GetGroupMemberInfoMap( - ctx, - msgs[0].GroupID, - utils.Distinct([]string{req.UserID, msgs[0].SendID}), - true, - ) + members, err := m.GroupLocalCache.GetGroupMemberInfoMap(ctx, msgs[0].GroupID, utils.Distinct([]string{req.UserID, msgs[0].SendID})) if err != nil { return nil, err } diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index 949a880b9..ea59c40cf 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -97,7 +97,7 @@ func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgDa } tagAll := utils.IsContain(constant.AtAllString, msg.AtUserIDList) if tagAll { - memberUserIDList, err := m.Group.GetGroupMemberIDs(ctx, msg.GroupID) + memberUserIDList, err := m.GroupLocalCache.GetGroupMemberIDs(ctx, msg.GroupID) if err != nil { log.ZWarn(ctx, "GetGroupMemberIDs", err) return @@ -143,6 +143,7 @@ func (m *msgServer) sendMsgNotification( } func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq) (resp *pbmsg.SendMsgResp, err error) { + log.ZDebug(ctx, "sendMsgSingleChat return") if err := m.messageVerification(ctx, req); err != nil { return nil, err } diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 72e0b065a..6212dea03 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -15,6 +15,8 @@ package msg import ( + "github.com/openimsdk/open-im-server/v3/pkg/rpccache" + "github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/conversation" "github.com/OpenIMSDK/protocol/msg" @@ -22,7 +24,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "google.golang.org/grpc" @@ -33,12 +34,11 @@ type ( msgServer struct { RegisterCenter discoveryregistry.SvcDiscoveryRegistry MsgDatabase controller.CommonMsgDatabase - Group *rpcclient.GroupRpcClient - User *rpcclient.UserRpcClient Conversation *rpcclient.ConversationRpcClient - friend *rpcclient.FriendRpcClient - GroupLocalCache *localcache.GroupLocalCache - ConversationLocalCache *localcache.ConversationLocalCache + UserLocalCache *rpccache.UserLocalCache + FriendLocalCache *rpccache.FriendLocalCache + GroupLocalCache *rpccache.GroupLocalCache + ConversationLocalCache *rpccache.ConversationLocalCache Handlers MessageInterceptorChain notificationSender *rpcclient.NotificationSender config *config.GlobalConfig @@ -84,13 +84,12 @@ func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryReg } s := &msgServer{ Conversation: &conversationClient, - User: &userRpcClient, - Group: &groupRpcClient, MsgDatabase: msgDatabase, RegisterCenter: client, - GroupLocalCache: localcache.NewGroupLocalCache(&groupRpcClient), - ConversationLocalCache: localcache.NewConversationLocalCache(&conversationClient), - friend: &friendRpcClient, + UserLocalCache: rpccache.NewUserLocalCache(userRpcClient, rdb), + GroupLocalCache: rpccache.NewGroupLocalCache(groupRpcClient, rdb), + ConversationLocalCache: rpccache.NewConversationLocalCache(conversationClient, rdb), + FriendLocalCache: rpccache.NewFriendLocalCache(friendRpcClient, rdb), config: config, } s.notificationSender = rpcclient.NewNotificationSender(config, rpcclient.WithLocalSendMsg(s.SendMsg)) diff --git a/internal/rpc/msg/statistics.go b/internal/rpc/msg/statistics.go index d7d33459b..e62954dea 100644 --- a/internal/rpc/msg/statistics.go +++ b/internal/rpc/msg/statistics.go @@ -40,7 +40,7 @@ func (m *msgServer) GetActiveUser(ctx context.Context, req *msg.GetActiveUserReq var pbUsers []*msg.ActiveUser if len(users) > 0 { userIDs := utils.Slice(users, func(e *unrelation.UserCount) string { return e.UserID }) - userMap, err := m.User.GetUsersInfoMap(ctx, userIDs) + userMap, err := m.UserLocalCache.GetUsersInfoMap(ctx, userIDs) if err != nil { return nil, err } @@ -82,7 +82,7 @@ func (m *msgServer) GetActiveGroup(ctx context.Context, req *msg.GetActiveGroupR var pbgroups []*msg.ActiveGroup if len(groups) > 0 { groupIDs := utils.Slice(groups, func(e *unrelation.GroupCount) string { return e.GroupID }) - resp, err := m.Group.GetGroupInfos(ctx, groupIDs, false) + resp, err := m.GroupLocalCache.GetGroupInfos(ctx, groupIDs) if err != nil { return nil, err } diff --git a/internal/rpc/msg/sync_msg.go b/internal/rpc/msg/sync_msg.go index b714da375..6fa4a0c9d 100644 --- a/internal/rpc/msg/sync_msg.go +++ b/internal/rpc/msg/sync_msg.go @@ -35,7 +35,7 @@ func (m *msgServer) PullMessageBySeqs( resp.NotificationMsgs = make(map[string]*sdkws.PullMsgs) for _, seq := range req.SeqRanges { if !msgprocessor.IsNotification(seq.ConversationID) { - conversation, err := m.Conversation.GetConversation(ctx, req.UserID, seq.ConversationID) + conversation, err := m.ConversationLocalCache.GetConversation(ctx, req.UserID, seq.ConversationID) if err != nil { log.ZError(ctx, "GetConversation error", err, "conversationID", seq.ConversationID) continue @@ -138,7 +138,7 @@ func (m *msgServer) SearchMessage(ctx context.Context, req *msg.SearchMessageReq } } if len(sendIDs) != 0 { - sendInfos, err := m.User.GetUsersInfo(ctx, sendIDs) + sendInfos, err := m.UserLocalCache.GetUsersInfo(ctx, sendIDs) if err != nil { return nil, err } @@ -147,7 +147,7 @@ func (m *msgServer) SearchMessage(ctx context.Context, req *msg.SearchMessageReq } } if len(recvIDs) != 0 { - recvInfos, err := m.User.GetUsersInfo(ctx, recvIDs) + recvInfos, err := m.UserLocalCache.GetUsersInfo(ctx, recvIDs) if err != nil { return nil, err } @@ -156,7 +156,7 @@ func (m *msgServer) SearchMessage(ctx context.Context, req *msg.SearchMessageReq } } if len(groupIDs) != 0 { - groupInfos, err := m.Group.GetGroupInfos(ctx, groupIDs, true) + groupInfos, err := m.GroupLocalCache.GetGroupInfos(ctx, groupIDs) if err != nil { return nil, err } diff --git a/internal/rpc/msg/verify.go b/internal/rpc/msg/verify.go index d72e4923e..318464cf8 100644 --- a/internal/rpc/msg/verify.go +++ b/internal/rpc/msg/verify.go @@ -16,6 +16,7 @@ package msg import ( "context" + "github.com/OpenIMSDK/tools/log" "math/rand" "strconv" "time" @@ -59,15 +60,15 @@ func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgRe data.MsgData.ContentType >= constant.NotificationBegin { return nil } - black, err := m.friend.IsBlocked(ctx, data.MsgData.SendID, data.MsgData.RecvID) + black, err := m.FriendLocalCache.IsBlack(ctx, data.MsgData.SendID, data.MsgData.RecvID) if err != nil { return err } if black { return errs.ErrBlockedByPeer.Wrap() } - if *m.config.MessageVerify.FriendVerify { - friend, err := m.friend.IsFriend(ctx, data.MsgData.SendID, data.MsgData.RecvID) + if m.config.MessageVerify.FriendVerify != nil && *m.config.MessageVerify.FriendVerify { + friend, err := m.FriendLocalCache.IsFriend(ctx, data.MsgData.SendID, data.MsgData.RecvID) if err != nil { return err } @@ -78,7 +79,7 @@ func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgRe } return nil case constant.SuperGroupChatType: - groupInfo, err := m.Group.GetGroupInfoCache(ctx, data.MsgData.GroupID) + groupInfo, err := m.GroupLocalCache.GetGroupInfo(ctx, data.MsgData.GroupID) if err != nil { return err } @@ -99,17 +100,17 @@ func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgRe data.MsgData.ContentType >= constant.NotificationBegin { return nil } - // memberIDs, err := m.GroupLocalCache.GetGroupMemberIDs(ctx, data.MsgData.GroupID) - // if err != nil { - // return err - // } - // if !utils.IsContain(data.MsgData.SendID, memberIDs) { - // return errs.ErrNotInGroupYet.Wrap() - // } + memberIDs, err := m.GroupLocalCache.GetGroupMemberIDMap(ctx, data.MsgData.GroupID) + if err != nil { + return err + } + if _, ok := memberIDs[data.MsgData.SendID]; !ok { + return errs.ErrNotInGroupYet.Wrap() + } - groupMemberInfo, err := m.Group.GetGroupMemberCache(ctx, data.MsgData.GroupID, data.MsgData.SendID) + groupMemberInfo, err := m.GroupLocalCache.GetGroupMember(ctx, data.MsgData.GroupID, data.MsgData.SendID) if err != nil { - if err == errs.ErrRecordNotFound { + if errs.ErrRecordNotFound.Is(err) { return errs.ErrNotInGroupYet.Wrap(err.Error()) } return err @@ -157,6 +158,9 @@ func (m *msgServer) encapsulateMsgData(msg *sdkws.MsgData) { case constant.Custom: fallthrough case constant.Quote: + utils.SetSwitchFromOptions(msg.Options, constant.IsConversationUpdate, true) + utils.SetSwitchFromOptions(msg.Options, constant.IsUnreadCount, true) + utils.SetSwitchFromOptions(msg.Options, constant.IsSenderSync, true) case constant.Revoke: utils.SetSwitchFromOptions(msg.Options, constant.IsUnreadCount, false) utils.SetSwitchFromOptions(msg.Options, constant.IsOfflinePush, false) @@ -187,7 +191,8 @@ func (m *msgServer) modifyMessageByUserMessageReceiveOpt( sessionType int, pb *msg.SendMsgReq, ) (bool, error) { - opt, err := m.User.GetUserGlobalMsgRecvOpt(ctx, userID) + defer log.ZDebug(ctx, "modifyMessageByUserMessageReceiveOpt return") + opt, err := m.UserLocalCache.GetUserGlobalMsgRecvOpt(ctx, userID) if err != nil { return false, err } @@ -203,7 +208,7 @@ func (m *msgServer) modifyMessageByUserMessageReceiveOpt( return true, nil } // conversationID := utils.GetConversationIDBySessionType(conversationID, sessionType) - singleOpt, err := m.Conversation.GetSingleConversationRecvMsgOpt(ctx, userID, conversationID) + singleOpt, err := m.ConversationLocalCache.GetSingleConversationRecvMsgOpt(ctx, userID, conversationID) if errs.ErrRecordNotFound.Is(err) { return true, nil } else if err != nil { diff --git a/pkg/common/cachekey/black.go b/pkg/common/cachekey/black.go new file mode 100644 index 000000000..527ad14dc --- /dev/null +++ b/pkg/common/cachekey/black.go @@ -0,0 +1,15 @@ +package cachekey + +const ( + BlackIDsKey = "BLACK_IDS:" + IsBlackKey = "IS_BLACK:" // local cache +) + +func GetBlackIDsKey(ownerUserID string) string { + return BlackIDsKey + ownerUserID + +} + +func GetIsBlackIDsKey(possibleBlackUserID, userID string) string { + return IsBlackKey + userID + "-" + possibleBlackUserID +} diff --git a/pkg/common/cachekey/conversation.go b/pkg/common/cachekey/conversation.go new file mode 100644 index 000000000..665ca11c6 --- /dev/null +++ b/pkg/common/cachekey/conversation.go @@ -0,0 +1,44 @@ +package cachekey + +const ( + ConversationKey = "CONVERSATION:" + ConversationIDsKey = "CONVERSATION_IDS:" + ConversationIDsHashKey = "CONVERSATION_IDS_HASH:" + ConversationHasReadSeqKey = "CONVERSATION_HAS_READ_SEQ:" + RecvMsgOptKey = "RECV_MSG_OPT:" + SuperGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:" + SuperGroupRecvMsgNotNotifyUserIDsHashKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS_HASH:" + ConversationNotReceiveMessageUserIDsKey = "CONVERSATION_NOT_RECEIVE_MESSAGE_USER_IDS:" +) + +func GetConversationKey(ownerUserID, conversationID string) string { + return ConversationKey + ownerUserID + ":" + conversationID +} + +func GetConversationIDsKey(ownerUserID string) string { + return ConversationIDsKey + ownerUserID +} + +func GetSuperGroupRecvNotNotifyUserIDsKey(groupID string) string { + return SuperGroupRecvMsgNotNotifyUserIDsKey + groupID +} + +func GetRecvMsgOptKey(ownerUserID, conversationID string) string { + return RecvMsgOptKey + ownerUserID + ":" + conversationID +} + +func GetSuperGroupRecvNotNotifyUserIDsHashKey(groupID string) string { + return SuperGroupRecvMsgNotNotifyUserIDsHashKey + groupID +} + +func GetConversationHasReadSeqKey(ownerUserID, conversationID string) string { + return ConversationHasReadSeqKey + ownerUserID + ":" + conversationID +} + +func GetConversationNotReceiveMessageUserIDsKey(conversationID string) string { + return ConversationNotReceiveMessageUserIDsKey + conversationID +} + +func GetUserConversationIDsHashKey(ownerUserID string) string { + return ConversationIDsHashKey + ownerUserID +} diff --git a/pkg/common/cachekey/friend.go b/pkg/common/cachekey/friend.go new file mode 100644 index 000000000..f37c9da37 --- /dev/null +++ b/pkg/common/cachekey/friend.go @@ -0,0 +1,24 @@ +package cachekey + +const ( + FriendIDsKey = "FRIEND_IDS:" + TwoWayFriendsIDsKey = "COMMON_FRIENDS_IDS:" + FriendKey = "FRIEND_INFO:" + IsFriendKey = "IS_FRIEND:" // local cache key +) + +func GetFriendIDsKey(ownerUserID string) string { + return FriendIDsKey + ownerUserID +} + +func GetTwoWayFriendsIDsKey(ownerUserID string) string { + return TwoWayFriendsIDsKey + ownerUserID +} + +func GetFriendKey(ownerUserID, friendUserID string) string { + return FriendKey + ownerUserID + "-" + friendUserID +} + +func GetIsFriendKey(possibleFriendUserID, userID string) string { + return IsFriendKey + possibleFriendUserID + "-" + userID +} diff --git a/pkg/common/cachekey/group.go b/pkg/common/cachekey/group.go new file mode 100644 index 000000000..1dcf0ffce --- /dev/null +++ b/pkg/common/cachekey/group.go @@ -0,0 +1,45 @@ +package cachekey + +import ( + "strconv" + "time" +) + +const ( + groupExpireTime = time.Second * 60 * 60 * 12 + GroupInfoKey = "GROUP_INFO:" + GroupMemberIDsKey = "GROUP_MEMBER_IDS:" + GroupMembersHashKey = "GROUP_MEMBERS_HASH2:" + GroupMemberInfoKey = "GROUP_MEMBER_INFO:" + JoinedGroupsKey = "JOIN_GROUPS_KEY:" + GroupMemberNumKey = "GROUP_MEMBER_NUM_CACHE:" + GroupRoleLevelMemberIDsKey = "GROUP_ROLE_LEVEL_MEMBER_IDS:" +) + +func GetGroupInfoKey(groupID string) string { + return GroupInfoKey + groupID +} + +func GetJoinedGroupsKey(userID string) string { + return JoinedGroupsKey + userID +} + +func GetGroupMembersHashKey(groupID string) string { + return GroupMembersHashKey + groupID +} + +func GetGroupMemberIDsKey(groupID string) string { + return GroupMemberIDsKey + groupID +} + +func GetGroupMemberInfoKey(groupID, userID string) string { + return GroupMemberInfoKey + groupID + "-" + userID +} + +func GetGroupMemberNumKey(groupID string) string { + return GroupMemberNumKey + groupID +} + +func GetGroupRoleLevelMemberIDsKey(groupID string, roleLevel int32) string { + return GroupRoleLevelMemberIDsKey + groupID + "-" + strconv.Itoa(int(roleLevel)) +} diff --git a/pkg/common/cachekey/user.go b/pkg/common/cachekey/user.go new file mode 100644 index 000000000..3fb877e22 --- /dev/null +++ b/pkg/common/cachekey/user.go @@ -0,0 +1,14 @@ +package cachekey + +const ( + UserInfoKey = "USER_INFO:" + UserGlobalRecvMsgOptKey = "USER_GLOBAL_RECV_MSG_OPT_KEY:" +) + +func GetUserInfoKey(userID string) string { + return UserInfoKey + userID +} + +func GetUserGlobalRecvMsgOptKey(userID string) string { + return UserGlobalRecvMsgOptKey + userID +} diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index ac42395bf..8bc871355 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -16,6 +16,7 @@ package config import ( "bytes" + "time" "github.com/OpenIMSDK/tools/discoveryregistry" "gopkg.in/yaml.v3" @@ -266,6 +267,8 @@ type GlobalConfig struct { FriendVerify *bool `yaml:"friendVerify"` } `yaml:"messageVerify"` + LocalCache localCache `yaml:"localCache"` + IOSPush struct { PushSound string `yaml:"pushSound"` BadgeCount bool `yaml:"badgeCount"` @@ -382,6 +385,33 @@ type notification struct { ConversationSetPrivate NotificationConf `yaml:"conversationSetPrivate"` } +type LocalCache struct { + Topic string `yaml:"topic"` + SlotNum int `yaml:"slotNum"` + SlotSize int `yaml:"slotSize"` + SuccessExpire int `yaml:"successExpire"` // second + FailedExpire int `yaml:"failedExpire"` // second +} + +func (l LocalCache) Failed() time.Duration { + return time.Second * time.Duration(l.FailedExpire) +} + +func (l LocalCache) Success() time.Duration { + return time.Second * time.Duration(l.SuccessExpire) +} + +func (l LocalCache) Enable() bool { + return l.Topic != "" && l.SlotNum > 0 && l.SlotSize > 0 +} + +type localCache struct { + User LocalCache `yaml:"user"` + Group LocalCache `yaml:"group"` + Friend LocalCache `yaml:"friend"` + Conversation LocalCache `yaml:"conversation"` +} + func (c *GlobalConfig) GetServiceNames() []string { return []string{ c.RpcRegisterName.OpenImUserName, diff --git a/pkg/common/config/parse_test.go b/pkg/common/config/parse_test.go index 84dee1165..9b964a277 100644 --- a/pkg/common/config/parse_test.go +++ b/pkg/common/config/parse_test.go @@ -16,6 +16,8 @@ package config import ( _ "embed" + "fmt" + "gopkg.in/yaml.v3" "reflect" "testing" @@ -116,3 +118,13 @@ func TestInitConfig(t *testing.T) { }) } } + +func TestName(t *testing.T) { + Config.LocalCache.Friend.Topic = "friend" + Config.LocalCache.Friend.SlotNum = 500 + Config.LocalCache.Friend.SlotSize = 20000 + + data, _ := yaml.Marshal(&Config) + + fmt.Println(string(data)) +} diff --git a/pkg/common/db/cache/black.go b/pkg/common/db/cache/black.go index c8cf2ddd3..fd743e917 100644 --- a/pkg/common/db/cache/black.go +++ b/pkg/common/db/cache/black.go @@ -16,6 +16,9 @@ package cache import ( "context" + "github.com/OpenIMSDK/tools/log" + "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "time" "github.com/dtm-labs/rockscache" @@ -47,11 +50,15 @@ type BlackCacheRedis struct { func NewBlackCacheRedis(rdb redis.UniversalClient, blackDB relationtb.BlackModelInterface, options rockscache.Options) BlackCache { rcClient := rockscache.NewClient(rdb, options) - + mc := NewMetaCacheRedis(rcClient) + b := config.Config.LocalCache.Friend + log.ZDebug(context.Background(), "black local cache init", "Topic", b.Topic, "SlotNum", b.SlotNum, "SlotSize", b.SlotSize, "enable", b.Enable()) + mc.SetTopic(b.Topic) + mc.SetRawRedisClient(rdb) return &BlackCacheRedis{ expireTime: blackExpireTime, rcClient: rcClient, - metaCache: NewMetaCacheRedis(rcClient), + metaCache: mc, blackDB: blackDB, } } @@ -61,12 +68,12 @@ func (b *BlackCacheRedis) NewCache() BlackCache { expireTime: b.expireTime, rcClient: b.rcClient, blackDB: b.blackDB, - metaCache: NewMetaCacheRedis(b.rcClient, b.metaCache.GetPreDelKeys()...), + metaCache: b.Copy(), } } func (b *BlackCacheRedis) getBlackIDsKey(ownerUserID string) string { - return blackIDsKey + ownerUserID + return cachekey.GetBlackIDsKey(ownerUserID) } func (b *BlackCacheRedis) GetBlackIDs(ctx context.Context, userID string) (blackIDs []string, err error) { diff --git a/pkg/common/db/cache/config.go b/pkg/common/db/cache/config.go new file mode 100644 index 000000000..7599d8a11 --- /dev/null +++ b/pkg/common/db/cache/config.go @@ -0,0 +1,66 @@ +package cache + +import ( + "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "strings" + "sync" +) + +var ( + once sync.Once + subscribe map[string][]string +) + +func getPublishKey(topic string, key []string) []string { + if topic == "" || len(key) == 0 { + return nil + } + once.Do(func() { + list := []struct { + Local config.LocalCache + Keys []string + }{ + { + Local: config.Config.LocalCache.User, + Keys: []string{cachekey.UserInfoKey, cachekey.UserGlobalRecvMsgOptKey}, + }, + { + Local: config.Config.LocalCache.Group, + Keys: []string{cachekey.GroupMemberIDsKey, cachekey.GroupInfoKey, cachekey.GroupMemberInfoKey}, + }, + { + Local: config.Config.LocalCache.Friend, + Keys: []string{cachekey.FriendIDsKey, cachekey.BlackIDsKey}, + }, + { + Local: config.Config.LocalCache.Conversation, + Keys: []string{cachekey.ConversationKey, cachekey.ConversationIDsKey, cachekey.ConversationNotReceiveMessageUserIDsKey}, + }, + } + subscribe = make(map[string][]string) + for _, v := range list { + if v.Local.Enable() { + subscribe[v.Local.Topic] = v.Keys + } + } + }) + prefix, ok := subscribe[topic] + if !ok { + return nil + } + res := make([]string, 0, len(key)) + for _, k := range key { + var exist bool + for _, p := range prefix { + if strings.HasPrefix(k, p) { + exist = true + break + } + } + if exist { + res = append(res, k) + } + } + return res +} diff --git a/pkg/common/db/cache/conversation.go b/pkg/common/db/cache/conversation.go index 90e400363..ef725642c 100644 --- a/pkg/common/db/cache/conversation.go +++ b/pkg/common/db/cache/conversation.go @@ -16,6 +16,9 @@ package cache import ( "context" + "github.com/OpenIMSDK/tools/log" + "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "math/big" "strings" "time" @@ -27,14 +30,14 @@ import ( ) const ( - conversationKey = "CONVERSATION:" - conversationIDsKey = "CONVERSATION_IDS:" - conversationIDsHashKey = "CONVERSATION_IDS_HASH:" - conversationHasReadSeqKey = "CONVERSATION_HAS_READ_SEQ:" - recvMsgOptKey = "RECV_MSG_OPT:" - superGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:" - superGroupRecvMsgNotNotifyUserIDsHashKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS_HASH:" - conversationNotReceiveMessageUserIDsKey = "CONVERSATION_NOT_RECEIVE_MESSAGE_USER_IDS:" + //conversationKey = "CONVERSATION:" + //conversationIDsKey = "CONVERSATION_IDS:" + //conversationIDsHashKey = "CONVERSATION_IDS_HASH:" + //conversationHasReadSeqKey = "CONVERSATION_HAS_READ_SEQ:" + //recvMsgOptKey = "RECV_MSG_OPT:" + //superGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:" + //superGroupRecvMsgNotNotifyUserIDsHashKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS_HASH:" + //conversationNotReceiveMessageUserIDsKey = "CONVERSATION_NOT_RECEIVE_MESSAGE_USER_IDS:" conversationExpireTime = time.Second * 60 * 60 * 12 ) @@ -81,10 +84,14 @@ type ConversationCache interface { func NewConversationRedis(rdb redis.UniversalClient, opts rockscache.Options, db relationtb.ConversationModelInterface) ConversationCache { rcClient := rockscache.NewClient(rdb, opts) - + mc := NewMetaCacheRedis(rcClient) + c := config.Config.LocalCache.Conversation + log.ZDebug(context.Background(), "black local cache init", "Topic", c.Topic, "SlotNum", c.SlotNum, "SlotSize", c.SlotSize, "enable", c.Enable()) + mc.SetTopic(c.Topic) + mc.SetRawRedisClient(rdb) return &ConversationRedisCache{ rcClient: rcClient, - metaCache: NewMetaCacheRedis(rcClient), + metaCache: mc, conversationDB: db, expireTime: conversationExpireTime, } @@ -115,38 +122,42 @@ type ConversationRedisCache struct { func (c *ConversationRedisCache) NewCache() ConversationCache { return &ConversationRedisCache{ rcClient: c.rcClient, - metaCache: NewMetaCacheRedis(c.rcClient, c.metaCache.GetPreDelKeys()...), + metaCache: c.Copy(), conversationDB: c.conversationDB, expireTime: c.expireTime, } } func (c *ConversationRedisCache) getConversationKey(ownerUserID, conversationID string) string { - return conversationKey + ownerUserID + ":" + conversationID + return cachekey.GetConversationKey(ownerUserID, conversationID) } func (c *ConversationRedisCache) getConversationIDsKey(ownerUserID string) string { - return conversationIDsKey + ownerUserID + return cachekey.GetConversationIDsKey(ownerUserID) } func (c *ConversationRedisCache) getSuperGroupRecvNotNotifyUserIDsKey(groupID string) string { - return superGroupRecvMsgNotNotifyUserIDsKey + groupID + return cachekey.GetSuperGroupRecvNotNotifyUserIDsKey(groupID) } func (c *ConversationRedisCache) getRecvMsgOptKey(ownerUserID, conversationID string) string { - return recvMsgOptKey + ownerUserID + ":" + conversationID + return cachekey.GetRecvMsgOptKey(ownerUserID, conversationID) } func (c *ConversationRedisCache) getSuperGroupRecvNotNotifyUserIDsHashKey(groupID string) string { - return superGroupRecvMsgNotNotifyUserIDsHashKey + groupID + return cachekey.GetSuperGroupRecvNotNotifyUserIDsHashKey(groupID) } func (c *ConversationRedisCache) getConversationHasReadSeqKey(ownerUserID, conversationID string) string { - return conversationHasReadSeqKey + ownerUserID + ":" + conversationID + return cachekey.GetConversationHasReadSeqKey(ownerUserID, conversationID) } func (c *ConversationRedisCache) getConversationNotReceiveMessageUserIDsKey(conversationID string) string { - return conversationNotReceiveMessageUserIDsKey + conversationID + return cachekey.GetConversationNotReceiveMessageUserIDsKey(conversationID) +} + +func (c *ConversationRedisCache) getUserConversationIDsHashKey(ownerUserID string) string { + return cachekey.GetUserConversationIDsHashKey(ownerUserID) } func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) { @@ -166,10 +177,6 @@ func (c *ConversationRedisCache) DelConversationIDs(userIDs ...string) Conversat return cache } -func (c *ConversationRedisCache) getUserConversationIDsHashKey(ownerUserID string) string { - return conversationIDsHashKey + ownerUserID -} - func (c *ConversationRedisCache) GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error) { return getCache( ctx, diff --git a/pkg/common/db/cache/friend.go b/pkg/common/db/cache/friend.go index a160837ba..30b8bf7ea 100644 --- a/pkg/common/db/cache/friend.go +++ b/pkg/common/db/cache/friend.go @@ -16,6 +16,9 @@ package cache import ( "context" + "github.com/OpenIMSDK/tools/log" + "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "time" "github.com/OpenIMSDK/tools/utils" @@ -25,10 +28,10 @@ import ( ) const ( - friendExpireTime = time.Second * 60 * 60 * 12 - friendIDsKey = "FRIEND_IDS:" - TwoWayFriendsIDsKey = "COMMON_FRIENDS_IDS:" - friendKey = "FRIEND_INFO:" + friendExpireTime = time.Second * 60 * 60 * 12 + //friendIDsKey = "FRIEND_IDS:" + //TwoWayFriendsIDsKey = "COMMON_FRIENDS_IDS:" + //friendKey = "FRIEND_INFO:" ) // FriendCache is an interface for caching friend-related data. @@ -58,8 +61,13 @@ type FriendCacheRedis struct { func NewFriendCacheRedis(rdb redis.UniversalClient, friendDB relationtb.FriendModelInterface, options rockscache.Options) FriendCache { rcClient := rockscache.NewClient(rdb, options) + mc := NewMetaCacheRedis(rcClient) + f := config.Config.LocalCache.Friend + log.ZDebug(context.Background(), "friend local cache init", "Topic", f.Topic, "SlotNum", f.SlotNum, "SlotSize", f.SlotSize, "enable", f.Enable()) + mc.SetTopic(f.Topic) + mc.SetRawRedisClient(rdb) return &FriendCacheRedis{ - metaCache: NewMetaCacheRedis(rcClient), + metaCache: mc, friendDB: friendDB, expireTime: friendExpireTime, rcClient: rcClient, @@ -70,7 +78,7 @@ func NewFriendCacheRedis(rdb redis.UniversalClient, friendDB relationtb.FriendMo func (f *FriendCacheRedis) NewCache() FriendCache { return &FriendCacheRedis{ rcClient: f.rcClient, - metaCache: NewMetaCacheRedis(f.rcClient, f.metaCache.GetPreDelKeys()...), + metaCache: f.Copy(), friendDB: f.friendDB, expireTime: f.expireTime, } @@ -78,17 +86,17 @@ func (f *FriendCacheRedis) NewCache() FriendCache { // getFriendIDsKey returns the key for storing friend IDs in the cache. func (f *FriendCacheRedis) getFriendIDsKey(ownerUserID string) string { - return friendIDsKey + ownerUserID + return cachekey.GetFriendIDsKey(ownerUserID) } // getTwoWayFriendsIDsKey returns the key for storing two-way friend IDs in the cache. func (f *FriendCacheRedis) getTwoWayFriendsIDsKey(ownerUserID string) string { - return TwoWayFriendsIDsKey + ownerUserID + return cachekey.GetTwoWayFriendsIDsKey(ownerUserID) } // getFriendKey returns the key for storing friend info in the cache. func (f *FriendCacheRedis) getFriendKey(ownerUserID, friendUserID string) string { - return friendKey + ownerUserID + "-" + friendUserID + return cachekey.GetFriendKey(ownerUserID, friendUserID) } // GetFriendIDs retrieves friend IDs from the cache or the database if not found. diff --git a/pkg/common/db/cache/group.go b/pkg/common/db/cache/group.go index 4a5266db5..37b21ae90 100644 --- a/pkg/common/db/cache/group.go +++ b/pkg/common/db/cache/group.go @@ -17,7 +17,8 @@ package cache import ( "context" "fmt" - "strconv" + "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "time" "github.com/OpenIMSDK/protocol/constant" @@ -30,15 +31,14 @@ import ( ) const ( - groupExpireTime = time.Second * 60 * 60 * 12 - groupInfoKey = "GROUP_INFO:" - groupMemberIDsKey = "GROUP_MEMBER_IDS:" - groupMembersHashKey = "GROUP_MEMBERS_HASH2:" - groupMemberInfoKey = "GROUP_MEMBER_INFO:" - //groupOwnerInfoKey = "GROUP_OWNER_INFO:". - joinedGroupsKey = "JOIN_GROUPS_KEY:" - groupMemberNumKey = "GROUP_MEMBER_NUM_CACHE:" - groupRoleLevelMemberIDsKey = "GROUP_ROLE_LEVEL_MEMBER_IDS:" + groupExpireTime = time.Second * 60 * 60 * 12 + //groupInfoKey = "GROUP_INFO:" + //groupMemberIDsKey = "GROUP_MEMBER_IDS:" + //groupMembersHashKey = "GROUP_MEMBERS_HASH2:" + //groupMemberInfoKey = "GROUP_MEMBER_INFO:" + //joinedGroupsKey = "JOIN_GROUPS_KEY:" + //groupMemberNumKey = "GROUP_MEMBER_NUM_CACHE:" + //groupRoleLevelMemberIDsKey = "GROUP_ROLE_LEVEL_MEMBER_IDS:" ) type GroupHash interface { @@ -101,12 +101,16 @@ func NewGroupCacheRedis( opts rockscache.Options, ) GroupCache { rcClient := rockscache.NewClient(rdb, opts) - + mc := NewMetaCacheRedis(rcClient) + g := config.Config.LocalCache.Group + mc.SetTopic(g.Topic) + log.ZDebug(context.Background(), "group local cache init", "Topic", g.Topic, "SlotNum", g.SlotNum, "SlotSize", g.SlotSize, "enable", g.Enable()) + mc.SetRawRedisClient(rdb) return &GroupCacheRedis{ rcClient: rcClient, expireTime: groupExpireTime, groupDB: groupDB, groupMemberDB: groupMemberDB, groupRequestDB: groupRequestDB, groupHash: hashCode, - metaCache: NewMetaCacheRedis(rcClient), + metaCache: mc, } } @@ -117,36 +121,36 @@ func (g *GroupCacheRedis) NewCache() GroupCache { groupDB: g.groupDB, groupMemberDB: g.groupMemberDB, groupRequestDB: g.groupRequestDB, - metaCache: NewMetaCacheRedis(g.rcClient, g.metaCache.GetPreDelKeys()...), + metaCache: g.Copy(), } } func (g *GroupCacheRedis) getGroupInfoKey(groupID string) string { - return groupInfoKey + groupID + return cachekey.GetGroupInfoKey(groupID) } func (g *GroupCacheRedis) getJoinedGroupsKey(userID string) string { - return joinedGroupsKey + userID + return cachekey.GetJoinedGroupsKey(userID) } func (g *GroupCacheRedis) getGroupMembersHashKey(groupID string) string { - return groupMembersHashKey + groupID + return cachekey.GetGroupMembersHashKey(groupID) } func (g *GroupCacheRedis) getGroupMemberIDsKey(groupID string) string { - return groupMemberIDsKey + groupID + return cachekey.GetGroupMemberIDsKey(groupID) } func (g *GroupCacheRedis) getGroupMemberInfoKey(groupID, userID string) string { - return groupMemberInfoKey + groupID + "-" + userID + return cachekey.GetGroupMemberInfoKey(groupID, userID) } func (g *GroupCacheRedis) getGroupMemberNumKey(groupID string) string { - return groupMemberNumKey + groupID + return cachekey.GetGroupMemberNumKey(groupID) } func (g *GroupCacheRedis) getGroupRoleLevelMemberIDsKey(groupID string, roleLevel int32) string { - return groupRoleLevelMemberIDsKey + groupID + "-" + strconv.Itoa(int(roleLevel)) + return cachekey.GetGroupRoleLevelMemberIDsKey(groupID, roleLevel) } func (g *GroupCacheRedis) GetGroupIndex(group *relationtb.GroupModel, keys []string) (int, error) { diff --git a/pkg/common/db/cache/meta_cache.go b/pkg/common/db/cache/meta_cache.go index 4d4f077b6..b14851381 100644 --- a/pkg/common/db/cache/meta_cache.go +++ b/pkg/common/db/cache/meta_cache.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "errors" + "github.com/redis/go-redis/v9" "fmt" "time" @@ -43,6 +44,9 @@ type metaCache interface { AddKeys(keys ...string) ClearKeys() GetPreDelKeys() []string + SetTopic(topic string) + SetRawRedisClient(cli redis.UniversalClient) + Copy() metaCache } func NewMetaCacheRedis(rcClient *rockscache.Client, keys ...string) metaCache { @@ -50,10 +54,36 @@ func NewMetaCacheRedis(rcClient *rockscache.Client, keys ...string) metaCache { } type metaCacheRedis struct { + topic string rcClient *rockscache.Client keys []string maxRetryTimes int retryInterval time.Duration + redisClient redis.UniversalClient +} + +func (m *metaCacheRedis) Copy() metaCache { + var keys []string + if len(m.keys) > 0 { + keys = make([]string, 0, len(m.keys)*2) + keys = append(keys, m.keys...) + } + return &metaCacheRedis{ + topic: m.topic, + rcClient: m.rcClient, + keys: keys, + maxRetryTimes: m.maxRetryTimes, + retryInterval: m.retryInterval, + redisClient: redisClient, + } +} + +func (m *metaCacheRedis) SetTopic(topic string) { + m.topic = topic +} + +func (m *metaCacheRedis) SetRawRedisClient(cli redis.UniversalClient) { + m.redisClient = cli } func (m *metaCacheRedis) ExecDel(ctx context.Context, distinct ...bool) error { @@ -61,7 +91,7 @@ func (m *metaCacheRedis) ExecDel(ctx context.Context, distinct ...bool) error { m.keys = utils.Distinct(m.keys) } if len(m.keys) > 0 { - log.ZDebug(ctx, "delete cache", "keys", m.keys) + log.ZDebug(ctx, "delete cache", "topic", m.topic, "keys", m.keys) for _, key := range m.keys { for i := 0; i < m.maxRetryTimes; i++ { if err := m.rcClient.TagAsDeleted(key); err != nil { @@ -71,31 +101,18 @@ func (m *metaCacheRedis) ExecDel(ctx context.Context, distinct ...bool) error { } break } - - //retryTimes := 0 - //for { - // m.rcClient.TagAsDeleted() - // if err := m.rcClient.TagAsDeletedBatch2(ctx, []string{key}); err != nil { - // if retryTimes >= m.maxRetryTimes { - // err = errs.ErrInternalServer.Wrap( - // fmt.Sprintf( - // "delete cache error: %v, keys: %v, retry times %d, please check redis server", - // err, - // key, - // retryTimes, - // ), - // ) - // log.ZWarn(ctx, "delete cache failed, please handle keys", err, "keys", key) - // return err - // } - // retryTimes++ - // } else { - // break - // } - //} + } + if pk := getPublishKey(m.topic, m.keys); len(pk) > 0 { + data, err := json.Marshal(pk) + if err != nil { + log.ZError(ctx, "keys json marshal failed", err, "topic", m.topic, "keys", pk) + } else { + if err := m.redisClient.Publish(ctx, m.topic, string(data)).Err(); err != nil { + log.ZError(ctx, "redis publish cache delete error", err, "topic", m.topic, "keys", pk) + } + } } } - return nil } diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index 9b488c1bf..eb79fdb70 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -38,12 +38,12 @@ const ( conversationUserMinSeq = "CON_USER_MIN_SEQ:" hasReadSeq = "HAS_READ_SEQ:" - appleDeviceToken = "DEVICE_TOKEN" - getuiToken = "GETUI_TOKEN" - getuiTaskID = "GETUI_TASK_ID" - signalCache = "SIGNAL_CACHE:" - signalListCache = "SIGNAL_LIST_CACHE:" - FCM_TOKEN = "FCM_TOKEN:" + //appleDeviceToken = "DEVICE_TOKEN" + getuiToken = "GETUI_TOKEN" + getuiTaskID = "GETUI_TASK_ID" + //signalCache = "SIGNAL_CACHE:" + //signalListCache = "SIGNAL_LIST_CACHE:" + FCM_TOKEN = "FCM_TOKEN:" messageCache = "MESSAGE_CACHE:" messageDelUserList = "MESSAGE_DEL_USER_LIST:" @@ -143,6 +143,10 @@ func (c *msgCache) getHasReadSeqKey(conversationID string, userID string) string return hasReadSeq + userID + ":" + conversationID } +func (c *msgCache) getConversationUserMinSeqKey(conversationID, userID string) string { + return conversationUserMinSeq + conversationID + "u:" + userID +} + func (c *msgCache) setSeq(ctx context.Context, conversationID string, seq int64, getkey func(conversationID string) string) error { return errs.Wrap(c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err()) } @@ -208,10 +212,6 @@ func (c *msgCache) GetMinSeq(ctx context.Context, conversationID string) (int64, return c.getSeq(ctx, conversationID, c.getMinSeqKey) } -func (c *msgCache) getConversationUserMinSeqKey(conversationID, userID string) string { - return conversationUserMinSeq + conversationID + "u:" + userID -} - func (c *msgCache) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) { val, err := c.rdb.Get(ctx, c.getConversationUserMinSeqKey(conversationID, userID)).Int64() if err != nil { diff --git a/pkg/common/db/cache/user.go b/pkg/common/db/cache/user.go index 837e7a5bc..c081731ce 100644 --- a/pkg/common/db/cache/user.go +++ b/pkg/common/db/cache/user.go @@ -18,6 +18,8 @@ import ( "context" "encoding/json" "errors" + "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "hash/crc32" "strconv" "time" @@ -32,8 +34,8 @@ import ( ) const ( - userExpireTime = time.Second * 60 * 60 * 12 - userInfoKey = "USER_INFO:" + userExpireTime = time.Second * 60 * 60 * 12 + //userInfoKey = "USER_INFO:" userGlobalRecvMsgOptKey = "USER_GLOBAL_RECV_MSG_OPT_KEY:" olineStatusKey = "ONLINE_STATUS:" userOlineStatusExpireTime = time.Second * 60 * 60 * 24 @@ -68,7 +70,11 @@ func NewUserCacheRedis( options rockscache.Options, ) UserCache { rcClient := rockscache.NewClient(rdb, options) - + mc := NewMetaCacheRedis(rcClient) + u := config.Config.LocalCache.User + log.ZDebug(context.Background(), "user local cache init", "Topic", u.Topic, "SlotNum", u.SlotNum, "SlotSize", u.SlotSize, "enable", u.Enable()) + mc.SetTopic(u.Topic) + mc.SetRawRedisClient(rdb) return &UserCacheRedis{ rdb: rdb, metaCache: NewMetaCacheRedis(rcClient), @@ -81,7 +87,7 @@ func NewUserCacheRedis( func (u *UserCacheRedis) NewCache() UserCache { return &UserCacheRedis{ rdb: u.rdb, - metaCache: NewMetaCacheRedis(u.rcClient, u.metaCache.GetPreDelKeys()...), + metaCache: u.Copy(), userDB: u.userDB, expireTime: u.expireTime, rcClient: u.rcClient, @@ -89,18 +95,17 @@ func (u *UserCacheRedis) NewCache() UserCache { } func (u *UserCacheRedis) getUserInfoKey(userID string) string { - return userInfoKey + userID + return cachekey.GetUserInfoKey(userID) } func (u *UserCacheRedis) getUserGlobalRecvMsgOptKey(userID string) string { - return userGlobalRecvMsgOptKey + userID + return cachekey.GetUserGlobalRecvMsgOptKey(userID) } func (u *UserCacheRedis) GetUserInfo(ctx context.Context, userID string) (userInfo *relationtb.UserModel, err error) { return getCache(ctx, u.rcClient, u.getUserInfoKey(userID), u.expireTime, func(ctx context.Context) (*relationtb.UserModel, error) { return u.userDB.Take(ctx, userID) - }, - ) + }) } func (u *UserCacheRedis) GetUsersInfo(ctx context.Context, userIDs []string) ([]*relationtb.UserModel, error) { diff --git a/pkg/common/db/localcache/conversation.go b/pkg/common/db/localcache/conversation.go deleted file mode 100644 index 9147fd3ce..000000000 --- a/pkg/common/db/localcache/conversation.go +++ /dev/null @@ -1,86 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package localcache - -import ( - "context" - "sync" - - "github.com/OpenIMSDK/protocol/conversation" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" -) - -type ConversationLocalCache struct { - lock sync.Mutex - superGroupRecvMsgNotNotifyUserIDs map[string]Hash - conversationIDs map[string]Hash - client *rpcclient.ConversationRpcClient -} - -type Hash struct { - hash uint64 - ids []string -} - -func NewConversationLocalCache(client *rpcclient.ConversationRpcClient) *ConversationLocalCache { - return &ConversationLocalCache{ - superGroupRecvMsgNotNotifyUserIDs: make(map[string]Hash), - conversationIDs: make(map[string]Hash), - client: client, - } -} - -func (g *ConversationLocalCache) GetRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) { - resp, err := g.client.Client.GetRecvMsgNotNotifyUserIDs(ctx, &conversation.GetRecvMsgNotNotifyUserIDsReq{ - GroupID: groupID, - }) - if err != nil { - return nil, err - } - return resp.UserIDs, nil -} - -func (g *ConversationLocalCache) GetConversationIDs(ctx context.Context, userID string) ([]string, error) { - resp, err := g.client.Client.GetUserConversationIDsHash(ctx, &conversation.GetUserConversationIDsHashReq{ - OwnerUserID: userID, - }) - if err != nil { - return nil, err - } - - g.lock.Lock() - hash, ok := g.conversationIDs[userID] - g.lock.Unlock() - - if !ok || hash.hash != resp.Hash { - conversationIDsResp, err := g.client.Client.GetConversationIDs(ctx, &conversation.GetConversationIDsReq{ - UserID: userID, - }) - if err != nil { - return nil, err - } - - g.lock.Lock() - defer g.lock.Unlock() - g.conversationIDs[userID] = Hash{ - hash: resp.Hash, - ids: conversationIDsResp.ConversationIDs, - } - - return conversationIDsResp.ConversationIDs, nil - } - - return hash.ids, nil -} diff --git a/pkg/common/db/localcache/doc.go b/pkg/common/db/localcache/doc.go deleted file mode 100644 index d349373ee..000000000 --- a/pkg/common/db/localcache/doc.go +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package localcache // import "github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache" diff --git a/pkg/common/db/localcache/group.go b/pkg/common/db/localcache/group.go deleted file mode 100644 index 0fdea8642..000000000 --- a/pkg/common/db/localcache/group.go +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package localcache - -import ( - "context" - "sync" - - "github.com/OpenIMSDK/protocol/group" - "github.com/OpenIMSDK/tools/errs" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" -) - -type GroupLocalCache struct { - lock sync.Mutex - cache map[string]GroupMemberIDsHash - client *rpcclient.GroupRpcClient -} - -type GroupMemberIDsHash struct { - memberListHash uint64 - userIDs []string -} - -func NewGroupLocalCache(client *rpcclient.GroupRpcClient) *GroupLocalCache { - return &GroupLocalCache{ - cache: make(map[string]GroupMemberIDsHash, 0), - client: client, - } -} - -func (g *GroupLocalCache) GetGroupMemberIDs(ctx context.Context, groupID string) ([]string, error) { - resp, err := g.client.Client.GetGroupAbstractInfo(ctx, &group.GetGroupAbstractInfoReq{ - GroupIDs: []string{groupID}, - }) - if err != nil { - return nil, err - } - if len(resp.GroupAbstractInfos) < 1 { - return nil, errs.ErrGroupIDNotFound - } - - g.lock.Lock() - localHashInfo, ok := g.cache[groupID] - if ok && localHashInfo.memberListHash == resp.GroupAbstractInfos[0].GroupMemberListHash { - g.lock.Unlock() - return localHashInfo.userIDs, nil - } - g.lock.Unlock() - - groupMembersResp, err := g.client.Client.GetGroupMemberUserIDs(ctx, &group.GetGroupMemberUserIDsReq{ - GroupID: groupID, - }) - if err != nil { - return nil, err - } - - g.lock.Lock() - defer g.lock.Unlock() - g.cache[groupID] = GroupMemberIDsHash{ - memberListHash: resp.GroupAbstractInfos[0].GroupMemberListHash, - userIDs: groupMembersResp.UserIDs, - } - return g.cache[groupID].userIDs, nil -} diff --git a/pkg/common/db/localcache/meta_local_cache.go b/pkg/common/db/localcache/meta_local_cache.go deleted file mode 100644 index ed9389c27..000000000 --- a/pkg/common/db/localcache/meta_local_cache.go +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package localcache diff --git a/pkg/common/db/mgo/conversation.go b/pkg/common/db/mgo/conversation.go index 35e8a28e6..bc37ed759 100644 --- a/pkg/common/db/mgo/conversation.go +++ b/pkg/common/db/mgo/conversation.go @@ -96,8 +96,14 @@ func (c *ConversationMgo) FindUserIDAllConversations(ctx context.Context, userID return mgoutil.Find[*relation.ConversationModel](ctx, c.coll, bson.M{"owner_user_id": userID}) } -func (c *ConversationMgo) FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) { - return mgoutil.Find[string](ctx, c.coll, bson.M{"group_id": groupID, "recv_msg_opt": constant.ReceiveNotNotifyMessage}, options.Find().SetProjection(bson.M{"_id": 0, "owner_user_id": 1})) +func (c *ConversationMgo) FindRecvMsgUserIDs(ctx context.Context, conversationID string, recvOpts []int) ([]string, error) { + var filter any + if len(recvOpts) == 0 { + filter = bson.M{"conversation_id": conversationID} + } else { + filter = bson.M{"conversation_id": conversationID, "recv_msg_opt": bson.M{"$in": recvOpts}} + } + return mgoutil.Find[string](ctx, c.coll, filter, options.Find().SetProjection(bson.M{"_id": 0, "owner_user_id": 1})) } func (c *ConversationMgo) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) { diff --git a/pkg/common/db/table/relation/conversation.go b/pkg/common/db/table/relation/conversation.go index e0a5268ca..583e41c0f 100644 --- a/pkg/common/db/table/relation/conversation.go +++ b/pkg/common/db/table/relation/conversation.go @@ -53,7 +53,7 @@ type ConversationModelInterface interface { Take(ctx context.Context, userID, conversationID string) (conversation *ConversationModel, err error) FindConversationID(ctx context.Context, userID string, conversationIDs []string) (existConversationID []string, err error) FindUserIDAllConversations(ctx context.Context, userID string) (conversations []*ConversationModel, err error) - FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) + FindRecvMsgUserIDs(ctx context.Context, conversationID string, recvOpts []int) ([]string, error) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) GetAllConversationIDs(ctx context.Context) ([]string, error) GetAllConversationIDsNumber(ctx context.Context) (int64, error) diff --git a/pkg/common/redispubsub/redispubliser.go b/pkg/common/redispubsub/redispubliser.go new file mode 100644 index 000000000..822b25bf9 --- /dev/null +++ b/pkg/common/redispubsub/redispubliser.go @@ -0,0 +1,16 @@ +package redispubsub + +import "github.com/redis/go-redis/v9" + +type Publisher struct { + client redis.UniversalClient + channel string +} + +func NewPublisher(client redis.UniversalClient, channel string) *Publisher { + return &Publisher{client: client, channel: channel} +} + +func (p *Publisher) Publish(message string) error { + return p.client.Publish(ctx, p.channel, message).Err() +} diff --git a/pkg/common/redispubsub/redissubscriber.go b/pkg/common/redispubsub/redissubscriber.go new file mode 100644 index 000000000..a7029a993 --- /dev/null +++ b/pkg/common/redispubsub/redissubscriber.go @@ -0,0 +1,34 @@ +package redispubsub + +import ( + "context" + "github.com/redis/go-redis/v9" +) + +var ctx = context.Background() + +type Subscriber struct { + client redis.UniversalClient + channel string +} + +func NewSubscriber(client redis.UniversalClient, channel string) *Subscriber { + return &Subscriber{client: client, channel: channel} +} + +func (s *Subscriber) OnMessage(ctx context.Context, callback func(string)) error { + messageChannel := s.client.Subscribe(ctx, s.channel).Channel() + + go func() { + for { + select { + case <-ctx.Done(): + return + case msg := <-messageChannel: + callback(msg.Payload) + } + } + }() + + return nil +} diff --git a/pkg/localcache/cache.go b/pkg/localcache/cache.go new file mode 100644 index 000000000..4b405b46a --- /dev/null +++ b/pkg/localcache/cache.go @@ -0,0 +1,112 @@ +package localcache + +import ( + "context" + "github.com/openimsdk/localcache/link" + "github.com/openimsdk/localcache/lru" + "hash/fnv" + "unsafe" +) + +type Cache[V any] interface { + Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error)) (V, error) + GetLink(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), link ...string) (V, error) + Del(ctx context.Context, key ...string) + DelLocal(ctx context.Context, key ...string) + Stop() +} + +func New[V any](opts ...Option) Cache[V] { + opt := defaultOption() + for _, o := range opts { + o(opt) + } + + c := cache[V]{opt: opt} + if opt.localSlotNum > 0 && opt.localSlotSize > 0 { + createSimpleLRU := func() lru.LRU[string, V] { + if opt.expirationEvict { + return lru.NewExpirationLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) + } else { + return lru.NewLayLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) + } + } + if opt.localSlotNum == 1 { + c.local = createSimpleLRU() + } else { + c.local = lru.NewSlotLRU[string, V](opt.localSlotNum, func(key string) uint64 { + h := fnv.New64a() + h.Write(*(*[]byte)(unsafe.Pointer(&key))) + return h.Sum64() + }, createSimpleLRU) + } + if opt.linkSlotNum > 0 { + c.link = link.New(opt.linkSlotNum) + } + } + return &c +} + +type cache[V any] struct { + opt *option + link link.Link + local lru.LRU[string, V] +} + +func (c *cache[V]) onEvict(key string, value V) { + if c.link != nil { + lks := c.link.Del(key) + for k := range lks { + if key != k { // prevent deadlock + c.local.Del(k) + } + } + } +} + +func (c *cache[V]) del(key ...string) { + if c.local == nil { + return + } + for _, k := range key { + c.local.Del(k) + if c.link != nil { + lks := c.link.Del(k) + for k := range lks { + c.local.Del(k) + } + } + } +} + +func (c *cache[V]) Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error)) (V, error) { + return c.GetLink(ctx, key, fetch) +} + +func (c *cache[V]) GetLink(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), link ...string) (V, error) { + if c.local != nil { + return c.local.Get(key, func() (V, error) { + if len(link) > 0 { + c.link.Link(key, link...) + } + return fetch(ctx) + }) + } else { + return fetch(ctx) + } +} + +func (c *cache[V]) Del(ctx context.Context, key ...string) { + for _, fn := range c.opt.delFn { + fn(ctx, key...) + } + c.del(key...) +} + +func (c *cache[V]) DelLocal(ctx context.Context, key ...string) { + c.del(key...) +} + +func (c *cache[V]) Stop() { + c.local.Stop() +} diff --git a/pkg/localcache/cache_test.go b/pkg/localcache/cache_test.go new file mode 100644 index 000000000..c497b7b4a --- /dev/null +++ b/pkg/localcache/cache_test.go @@ -0,0 +1,79 @@ +package localcache + +import ( + "context" + "fmt" + "math/rand" + "sync" + "sync/atomic" + "testing" + "time" +) + +func TestName(t *testing.T) { + c := New[string](WithExpirationEvict()) + //c := New[string]() + ctx := context.Background() + + const ( + num = 10000 + tNum = 10000 + kNum = 100000 + pNum = 100 + ) + + getKey := func(v uint64) string { + return fmt.Sprintf("key_%d", v%kNum) + } + + start := time.Now() + t.Log("start", start) + + var ( + get atomic.Int64 + del atomic.Int64 + ) + + incrGet := func() { + if v := get.Add(1); v%pNum == 0 { + //t.Log("#get count", v/pNum) + } + } + incrDel := func() { + if v := del.Add(1); v%pNum == 0 { + //t.Log("@del count", v/pNum) + } + } + + var wg sync.WaitGroup + + for i := 0; i < tNum; i++ { + wg.Add(2) + go func() { + defer wg.Done() + for i := 0; i < num; i++ { + c.Get(ctx, getKey(rand.Uint64()), func(ctx context.Context) (string, error) { + return fmt.Sprintf("index_%d", i), nil + }) + incrGet() + } + }() + + go func() { + defer wg.Done() + time.Sleep(time.Second / 10) + for i := 0; i < num; i++ { + c.Del(ctx, getKey(rand.Uint64())) + incrDel() + } + }() + } + + wg.Wait() + end := time.Now() + t.Log("end", end) + t.Log("time", end.Sub(start)) + t.Log("get", get.Load()) + t.Log("del", del.Load()) + // 137.35s +} diff --git a/pkg/localcache/go.mod b/pkg/localcache/go.mod new file mode 100644 index 000000000..5f0793042 --- /dev/null +++ b/pkg/localcache/go.mod @@ -0,0 +1,5 @@ +module github.com/openimsdk/localcache + +go 1.19 + +require github.com/hashicorp/golang-lru/v2 v2.0.7 diff --git a/pkg/localcache/link/link.go b/pkg/localcache/link/link.go new file mode 100644 index 000000000..4f238907b --- /dev/null +++ b/pkg/localcache/link/link.go @@ -0,0 +1,109 @@ +package link + +import ( + "hash/fnv" + "sync" + "unsafe" +) + +type Link interface { + Link(key string, link ...string) + Del(key string) map[string]struct{} +} + +func newLinkKey() *linkKey { + return &linkKey{ + data: make(map[string]map[string]struct{}), + } +} + +type linkKey struct { + lock sync.Mutex + data map[string]map[string]struct{} +} + +func (x *linkKey) link(key string, link ...string) { + x.lock.Lock() + defer x.lock.Unlock() + v, ok := x.data[key] + if !ok { + v = make(map[string]struct{}) + x.data[key] = v + } + for _, k := range link { + v[k] = struct{}{} + } +} + +func (x *linkKey) del(key string) map[string]struct{} { + x.lock.Lock() + defer x.lock.Unlock() + ks, ok := x.data[key] + if !ok { + return nil + } + delete(x.data, key) + return ks +} + +func New(n int) Link { + if n <= 0 { + panic("must be greater than 0") + } + slots := make([]*linkKey, n) + for i := 0; i < len(slots); i++ { + slots[i] = newLinkKey() + } + return &slot{ + n: uint64(n), + slots: slots, + } +} + +type slot struct { + n uint64 + slots []*linkKey +} + +func (x *slot) index(s string) uint64 { + h := fnv.New64a() + _, _ = h.Write(*(*[]byte)(unsafe.Pointer(&s))) + return h.Sum64() % x.n +} + +func (x *slot) Link(key string, link ...string) { + if len(link) == 0 { + return + } + mk := key + lks := make([]string, len(link)) + for i, k := range link { + lks[i] = k + } + x.slots[x.index(mk)].link(mk, lks...) + for _, lk := range lks { + x.slots[x.index(lk)].link(lk, mk) + } +} + +func (x *slot) Del(key string) map[string]struct{} { + return x.delKey(key) +} + +func (x *slot) delKey(k string) map[string]struct{} { + del := make(map[string]struct{}) + stack := []string{k} + for len(stack) > 0 { + curr := stack[len(stack)-1] + stack = stack[:len(stack)-1] + if _, ok := del[curr]; ok { + continue + } + del[curr] = struct{}{} + childKeys := x.slots[x.index(curr)].del(curr) + for ck := range childKeys { + stack = append(stack, ck) + } + } + return del +} diff --git a/pkg/localcache/link/link_test.go b/pkg/localcache/link/link_test.go new file mode 100644 index 000000000..ed684e693 --- /dev/null +++ b/pkg/localcache/link/link_test.go @@ -0,0 +1,20 @@ +package link + +import ( + "testing" +) + +func TestName(t *testing.T) { + + v := New(1) + + //v.Link("a:1", "b:1", "c:1", "d:1") + v.Link("a:1", "b:1", "c:1") + v.Link("z:1", "b:1") + + //v.DelKey("a:1") + v.Del("z:1") + + t.Log(v) + +} diff --git a/pkg/localcache/lru/lru.go b/pkg/localcache/lru/lru.go new file mode 100644 index 000000000..2995ec028 --- /dev/null +++ b/pkg/localcache/lru/lru.go @@ -0,0 +1,20 @@ +package lru + +import "github.com/hashicorp/golang-lru/v2/simplelru" + +type EvictCallback[K comparable, V any] simplelru.EvictCallback[K, V] + +type LRU[K comparable, V any] interface { + Get(key K, fetch func() (V, error)) (V, error) + Del(key K) bool + Stop() +} + +type Target interface { + IncrGetHit() + IncrGetSuccess() + IncrGetFailed() + + IncrDelHit() + IncrDelNotFound() +} diff --git a/pkg/localcache/lru/lru_expiration.go b/pkg/localcache/lru/lru_expiration.go new file mode 100644 index 000000000..3cf61f061 --- /dev/null +++ b/pkg/localcache/lru/lru_expiration.go @@ -0,0 +1,78 @@ +package lru + +import ( + "github.com/hashicorp/golang-lru/v2/expirable" + "sync" + "time" +) + +func NewExpirationLRU[K comparable, V any](size int, successTTL, failedTTL time.Duration, target Target, onEvict EvictCallback[K, V]) LRU[K, V] { + var cb expirable.EvictCallback[K, *expirationLruItem[V]] + if onEvict != nil { + cb = func(key K, value *expirationLruItem[V]) { + onEvict(key, value.value) + } + } + core := expirable.NewLRU[K, *expirationLruItem[V]](size, cb, successTTL) + return &ExpirationLRU[K, V]{ + core: core, + successTTL: successTTL, + failedTTL: failedTTL, + target: target, + } +} + +type expirationLruItem[V any] struct { + lock sync.RWMutex + err error + value V +} + +type ExpirationLRU[K comparable, V any] struct { + lock sync.Mutex + core *expirable.LRU[K, *expirationLruItem[V]] + successTTL time.Duration + failedTTL time.Duration + target Target +} + +func (x *ExpirationLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { + x.lock.Lock() + v, ok := x.core.Get(key) + if ok { + x.lock.Unlock() + x.target.IncrGetSuccess() + v.lock.RLock() + defer v.lock.RUnlock() + return v.value, v.err + } else { + v = &expirationLruItem[V]{} + x.core.Add(key, v) + v.lock.Lock() + x.lock.Unlock() + defer v.lock.Unlock() + v.value, v.err = fetch() + if v.err == nil { + x.target.IncrGetSuccess() + } else { + x.target.IncrGetFailed() + x.core.Remove(key) + } + return v.value, v.err + } +} + +func (x *ExpirationLRU[K, V]) Del(key K) bool { + x.lock.Lock() + ok := x.core.Remove(key) + x.lock.Unlock() + if ok { + x.target.IncrDelHit() + } else { + x.target.IncrDelNotFound() + } + return ok +} + +func (x *ExpirationLRU[K, V]) Stop() { +} diff --git a/pkg/localcache/lru/lru_lazy.go b/pkg/localcache/lru/lru_lazy.go new file mode 100644 index 000000000..a9270ea4a --- /dev/null +++ b/pkg/localcache/lru/lru_lazy.go @@ -0,0 +1,90 @@ +package lru + +import ( + "github.com/hashicorp/golang-lru/v2/simplelru" + "sync" + "time" +) + +type layLruItem[V any] struct { + lock sync.Mutex + expires int64 + err error + value V +} + +func NewLayLRU[K comparable, V any](size int, successTTL, failedTTL time.Duration, target Target, onEvict EvictCallback[K, V]) *LayLRU[K, V] { + var cb simplelru.EvictCallback[K, *layLruItem[V]] + if onEvict != nil { + cb = func(key K, value *layLruItem[V]) { + onEvict(key, value.value) + } + } + core, err := simplelru.NewLRU[K, *layLruItem[V]](size, cb) + if err != nil { + panic(err) + } + return &LayLRU[K, V]{ + core: core, + successTTL: successTTL, + failedTTL: failedTTL, + target: target, + } +} + +type LayLRU[K comparable, V any] struct { + lock sync.Mutex + core *simplelru.LRU[K, *layLruItem[V]] + successTTL time.Duration + failedTTL time.Duration + target Target +} + +func (x *LayLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { + x.lock.Lock() + v, ok := x.core.Get(key) + if ok { + x.lock.Unlock() + v.lock.Lock() + expires, value, err := v.expires, v.value, v.err + if expires != 0 && expires > time.Now().UnixMilli() { + v.lock.Unlock() + x.target.IncrGetHit() + return value, err + } + } else { + v = &layLruItem[V]{} + x.core.Add(key, v) + v.lock.Lock() + x.lock.Unlock() + } + defer v.lock.Unlock() + if v.expires > time.Now().UnixMilli() { + return v.value, v.err + } + v.value, v.err = fetch() + if v.err == nil { + v.expires = time.Now().Add(x.successTTL).UnixMilli() + x.target.IncrGetSuccess() + } else { + v.expires = time.Now().Add(x.failedTTL).UnixMilli() + x.target.IncrGetFailed() + } + return v.value, v.err +} + +func (x *LayLRU[K, V]) Del(key K) bool { + x.lock.Lock() + ok := x.core.Remove(key) + x.lock.Unlock() + if ok { + x.target.IncrDelHit() + } else { + x.target.IncrDelNotFound() + } + return ok +} + +func (x *LayLRU[K, V]) Stop() { + +} diff --git a/pkg/localcache/lru/lru_lazy_test.go b/pkg/localcache/lru/lru_lazy_test.go new file mode 100644 index 000000000..09fd04cd3 --- /dev/null +++ b/pkg/localcache/lru/lru_lazy_test.go @@ -0,0 +1,104 @@ +package lru + +import ( + "fmt" + "hash/fnv" + "sync" + "sync/atomic" + "testing" + "time" + "unsafe" +) + +type cacheTarget struct { + getHit int64 + getSuccess int64 + getFailed int64 + delHit int64 + delNotFound int64 +} + +func (r *cacheTarget) IncrGetHit() { + atomic.AddInt64(&r.getHit, 1) +} + +func (r *cacheTarget) IncrGetSuccess() { + atomic.AddInt64(&r.getSuccess, 1) +} + +func (r *cacheTarget) IncrGetFailed() { + atomic.AddInt64(&r.getFailed, 1) +} + +func (r *cacheTarget) IncrDelHit() { + atomic.AddInt64(&r.delHit, 1) +} + +func (r *cacheTarget) IncrDelNotFound() { + atomic.AddInt64(&r.delNotFound, 1) +} + +func (r *cacheTarget) String() string { + return fmt.Sprintf("getHit: %d, getSuccess: %d, getFailed: %d, delHit: %d, delNotFound: %d", r.getHit, r.getSuccess, r.getFailed, r.delHit, r.delNotFound) +} + +func TestName(t *testing.T) { + target := &cacheTarget{} + l := NewSlotLRU[string, string](100, func(k string) uint64 { + h := fnv.New64a() + h.Write(*(*[]byte)(unsafe.Pointer(&k))) + return h.Sum64() + }, func() LRU[string, string] { + return NewExpirationLRU[string, string](100, time.Second*60, time.Second, target, nil) + }) + //l := NewInertiaLRU[string, string](1000, time.Second*20, time.Second*5, target) + + fn := func(key string, n int, fetch func() (string, error)) { + for i := 0; i < n; i++ { + //v, err := l.Get(key, fetch) + //if err == nil { + // t.Log("key", key, "value", v) + //} else { + // t.Error("key", key, err) + //} + v, err := l.Get(key, fetch) + //time.Sleep(time.Second / 100) + func(v ...any) {}(v, err) + } + } + + tmp := make(map[string]struct{}) + + var wg sync.WaitGroup + for i := 0; i < 10000; i++ { + wg.Add(1) + key := fmt.Sprintf("key_%d", i%200) + tmp[key] = struct{}{} + go func() { + defer wg.Done() + //t.Log(key) + fn(key, 10000, func() (string, error) { + //time.Sleep(time.Second * 3) + //t.Log(time.Now(), "key", key, "fetch") + //if rand.Uint32()%5 == 0 { + // return "value_" + key, nil + //} + //return "", errors.New("rand error") + return "value_" + key, nil + }) + }() + + //wg.Add(1) + //go func() { + // defer wg.Done() + // for i := 0; i < 10; i++ { + // l.Del(key) + // time.Sleep(time.Second / 3) + // } + //}() + } + wg.Wait() + t.Log(len(tmp)) + t.Log(target.String()) + +} diff --git a/pkg/localcache/lru/lru_slot.go b/pkg/localcache/lru/lru_slot.go new file mode 100644 index 000000000..c1b8b94d0 --- /dev/null +++ b/pkg/localcache/lru/lru_slot.go @@ -0,0 +1,37 @@ +package lru + +func NewSlotLRU[K comparable, V any](slotNum int, hash func(K) uint64, create func() LRU[K, V]) LRU[K, V] { + x := &slotLRU[K, V]{ + n: uint64(slotNum), + slots: make([]LRU[K, V], slotNum), + hash: hash, + } + for i := 0; i < slotNum; i++ { + x.slots[i] = create() + } + return x +} + +type slotLRU[K comparable, V any] struct { + n uint64 + slots []LRU[K, V] + hash func(k K) uint64 +} + +func (x *slotLRU[K, V]) getIndex(k K) uint64 { + return x.hash(k) % x.n +} + +func (x *slotLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { + return x.slots[x.getIndex(key)].Get(key, fetch) +} + +func (x *slotLRU[K, V]) Del(key K) bool { + return x.slots[x.getIndex(key)].Del(key) +} + +func (x *slotLRU[K, V]) Stop() { + for _, slot := range x.slots { + slot.Stop() + } +} diff --git a/pkg/localcache/option.go b/pkg/localcache/option.go new file mode 100644 index 000000000..ecb5da0e6 --- /dev/null +++ b/pkg/localcache/option.go @@ -0,0 +1,121 @@ +package localcache + +import ( + "context" + "github.com/openimsdk/localcache/lru" + "time" +) + +func defaultOption() *option { + return &option{ + localSlotNum: 500, + localSlotSize: 20000, + linkSlotNum: 500, + expirationEvict: false, + localSuccessTTL: time.Minute, + localFailedTTL: time.Second * 5, + delFn: make([]func(ctx context.Context, key ...string), 0, 2), + target: emptyTarget{}, + } +} + +type option struct { + localSlotNum int + localSlotSize int + linkSlotNum int + // expirationEvict: true means that the cache will be actively cleared when the timer expires, + // false means that the cache will be lazily deleted. + expirationEvict bool + localSuccessTTL time.Duration + localFailedTTL time.Duration + delFn []func(ctx context.Context, key ...string) + target lru.Target +} + +type Option func(o *option) + +func WithExpirationEvict() Option { + return func(o *option) { + o.expirationEvict = true + } +} + +func WithLazy() Option { + return func(o *option) { + o.expirationEvict = false + } +} + +func WithLocalDisable() Option { + return WithLinkSlotNum(0) +} + +func WithLinkDisable() Option { + return WithLinkSlotNum(0) +} + +func WithLinkSlotNum(linkSlotNum int) Option { + return func(o *option) { + o.linkSlotNum = linkSlotNum + } +} + +func WithLocalSlotNum(localSlotNum int) Option { + return func(o *option) { + o.localSlotNum = localSlotNum + } +} + +func WithLocalSlotSize(localSlotSize int) Option { + return func(o *option) { + o.localSlotSize = localSlotSize + } +} + +func WithLocalSuccessTTL(localSuccessTTL time.Duration) Option { + if localSuccessTTL < 0 { + panic("localSuccessTTL should be greater than 0") + } + return func(o *option) { + o.localSuccessTTL = localSuccessTTL + } +} + +func WithLocalFailedTTL(localFailedTTL time.Duration) Option { + if localFailedTTL < 0 { + panic("localFailedTTL should be greater than 0") + } + return func(o *option) { + o.localFailedTTL = localFailedTTL + } +} + +func WithTarget(target lru.Target) Option { + if target == nil { + panic("target should not be nil") + } + return func(o *option) { + o.target = target + } +} + +func WithDeleteKeyBefore(fn func(ctx context.Context, key ...string)) Option { + if fn == nil { + panic("fn should not be nil") + } + return func(o *option) { + o.delFn = append(o.delFn, fn) + } +} + +type emptyTarget struct{} + +func (e emptyTarget) IncrGetHit() {} + +func (e emptyTarget) IncrGetSuccess() {} + +func (e emptyTarget) IncrGetFailed() {} + +func (e emptyTarget) IncrDelHit() {} + +func (e emptyTarget) IncrDelNotFound() {} diff --git a/pkg/localcache/tool.go b/pkg/localcache/tool.go new file mode 100644 index 000000000..ea3590823 --- /dev/null +++ b/pkg/localcache/tool.go @@ -0,0 +1,9 @@ +package localcache + +func AnyValue[V any](v any, err error) (V, error) { + if err != nil { + var zero V + return zero, err + } + return v.(V), nil +} diff --git a/pkg/rpccache/common.go b/pkg/rpccache/common.go new file mode 100644 index 000000000..6dc826e30 --- /dev/null +++ b/pkg/rpccache/common.go @@ -0,0 +1,20 @@ +package rpccache + +func newListMap[V comparable](values []V, err error) (*listMap[V], error) { + if err != nil { + return nil, err + } + lm := &listMap[V]{ + List: values, + Map: make(map[V]struct{}, len(values)), + } + for _, value := range values { + lm.Map[value] = struct{}{} + } + return lm, nil +} + +type listMap[V comparable] struct { + List []V + Map map[V]struct{} +} diff --git a/pkg/rpccache/conversation.go b/pkg/rpccache/conversation.go new file mode 100644 index 000000000..ae77b29b7 --- /dev/null +++ b/pkg/rpccache/conversation.go @@ -0,0 +1,112 @@ +package rpccache + +import ( + "context" + pbconversation "github.com/OpenIMSDK/protocol/conversation" + "github.com/OpenIMSDK/tools/errs" + "github.com/OpenIMSDK/tools/log" + "github.com/openimsdk/localcache" + "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" + "github.com/redis/go-redis/v9" +) + +func NewConversationLocalCache(client rpcclient.ConversationRpcClient, cli redis.UniversalClient) *ConversationLocalCache { + lc := config.Config.LocalCache.Conversation + log.ZDebug(context.Background(), "ConversationLocalCache", "topic", lc.Topic, "slotNum", lc.SlotNum, "slotSize", lc.SlotSize, "enable", lc.Enable()) + x := &ConversationLocalCache{ + client: client, + local: localcache.New[any]( + localcache.WithLocalSlotNum(lc.SlotNum), + localcache.WithLocalSlotSize(lc.SlotSize), + localcache.WithLinkSlotNum(lc.SlotNum), + localcache.WithLocalSuccessTTL(lc.Success()), + localcache.WithLocalFailedTTL(lc.Failed()), + ), + } + if lc.Enable() { + go subscriberRedisDeleteCache(context.Background(), cli, lc.Topic, x.local.DelLocal) + } + return x +} + +type ConversationLocalCache struct { + client rpcclient.ConversationRpcClient + local localcache.Cache[any] +} + +func (c *ConversationLocalCache) GetConversationIDs(ctx context.Context, ownerUserID string) (val []string, err error) { + log.ZDebug(ctx, "ConversationLocalCache GetConversationIDs req", "ownerUserID", ownerUserID) + defer func() { + if err == nil { + log.ZDebug(ctx, "ConversationLocalCache GetConversationIDs return", "value", val) + } else { + log.ZError(ctx, "ConversationLocalCache GetConversationIDs return", err) + } + }() + return localcache.AnyValue[[]string](c.local.Get(ctx, cachekey.GetConversationIDsKey(ownerUserID), func(ctx context.Context) (any, error) { + log.ZDebug(ctx, "ConversationLocalCache GetConversationIDs rpc", "ownerUserID", ownerUserID) + return c.client.GetConversationIDs(ctx, ownerUserID) + })) +} + +func (c *ConversationLocalCache) GetConversation(ctx context.Context, userID, conversationID string) (val *pbconversation.Conversation, err error) { + log.ZDebug(ctx, "ConversationLocalCache GetConversation req", "userID", userID, "conversationID", conversationID) + defer func() { + if err == nil { + log.ZDebug(ctx, "ConversationLocalCache GetConversation return", "value", val) + } else { + log.ZError(ctx, "ConversationLocalCache GetConversation return", err) + } + }() + return localcache.AnyValue[*pbconversation.Conversation](c.local.Get(ctx, cachekey.GetConversationKey(userID, conversationID), func(ctx context.Context) (any, error) { + log.ZDebug(ctx, "ConversationLocalCache GetConversation rpc", "userID", userID, "conversationID", conversationID) + return c.client.GetConversation(ctx, userID, conversationID) + })) +} + +func (c *ConversationLocalCache) GetSingleConversationRecvMsgOpt(ctx context.Context, userID, conversationID string) (int32, error) { + conv, err := c.GetConversation(ctx, userID, conversationID) + if err != nil { + return 0, err + } + return conv.RecvMsgOpt, nil +} + +func (c *ConversationLocalCache) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*pbconversation.Conversation, error) { + conversations := make([]*pbconversation.Conversation, 0, len(conversationIDs)) + for _, conversationID := range conversationIDs { + conversation, err := c.GetConversation(ctx, ownerUserID, conversationID) + if err != nil { + if errs.ErrRecordNotFound.Is(err) { + continue + } + return nil, err + } + conversations = append(conversations, conversation) + } + return conversations, nil +} + +func (c *ConversationLocalCache) getConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) (*listMap[string], error) { + return localcache.AnyValue[*listMap[string]](c.local.Get(ctx, cachekey.GetConversationNotReceiveMessageUserIDsKey(conversationID), func(ctx context.Context) (any, error) { + return newListMap(c.client.GetConversationNotReceiveMessageUserIDs(ctx, conversationID)) + })) +} + +func (c *ConversationLocalCache) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) { + res, err := c.getConversationNotReceiveMessageUserIDs(ctx, conversationID) + if err != nil { + return nil, err + } + return res.List, nil +} + +func (c *ConversationLocalCache) GetConversationNotReceiveMessageUserIDMap(ctx context.Context, conversationID string) (map[string]struct{}, error) { + res, err := c.getConversationNotReceiveMessageUserIDs(ctx, conversationID) + if err != nil { + return nil, err + } + return res.Map, nil +} diff --git a/pkg/rpccache/friend.go b/pkg/rpccache/friend.go new file mode 100644 index 000000000..66694f618 --- /dev/null +++ b/pkg/rpccache/friend.go @@ -0,0 +1,66 @@ +package rpccache + +import ( + "context" + "github.com/OpenIMSDK/tools/log" + "github.com/openimsdk/localcache" + "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" + "github.com/redis/go-redis/v9" +) + +func NewFriendLocalCache(client rpcclient.FriendRpcClient, cli redis.UniversalClient) *FriendLocalCache { + lc := config.Config.LocalCache.Friend + log.ZDebug(context.Background(), "FriendLocalCache", "topic", lc.Topic, "slotNum", lc.SlotNum, "slotSize", lc.SlotSize, "enable", lc.Enable()) + x := &FriendLocalCache{ + client: client, + local: localcache.New[any]( + localcache.WithLocalSlotNum(lc.SlotNum), + localcache.WithLocalSlotSize(lc.SlotSize), + localcache.WithLinkSlotNum(lc.SlotNum), + localcache.WithLocalSuccessTTL(lc.Success()), + localcache.WithLocalFailedTTL(lc.Failed()), + ), + } + if lc.Enable() { + go subscriberRedisDeleteCache(context.Background(), cli, lc.Topic, x.local.DelLocal) + } + return x +} + +type FriendLocalCache struct { + client rpcclient.FriendRpcClient + local localcache.Cache[any] +} + +func (f *FriendLocalCache) IsFriend(ctx context.Context, possibleFriendUserID, userID string) (val bool, err error) { + log.ZDebug(ctx, "FriendLocalCache IsFriend req", "possibleFriendUserID", possibleFriendUserID, "userID", userID) + defer func() { + if err == nil { + log.ZDebug(ctx, "FriendLocalCache IsFriend return", "value", val) + } else { + log.ZError(ctx, "FriendLocalCache IsFriend return", err) + } + }() + return localcache.AnyValue[bool](f.local.GetLink(ctx, cachekey.GetIsFriendKey(possibleFriendUserID, userID), func(ctx context.Context) (any, error) { + log.ZDebug(ctx, "FriendLocalCache IsFriend rpc", "possibleFriendUserID", possibleFriendUserID, "userID", userID) + return f.client.IsFriend(ctx, possibleFriendUserID, userID) + }, cachekey.GetFriendIDsKey(possibleFriendUserID))) +} + +// IsBlack possibleBlackUserID selfUserID +func (f *FriendLocalCache) IsBlack(ctx context.Context, possibleBlackUserID, userID string) (val bool, err error) { + log.ZDebug(ctx, "FriendLocalCache IsBlack req", "possibleBlackUserID", possibleBlackUserID, "userID", userID) + defer func() { + if err == nil { + log.ZDebug(ctx, "FriendLocalCache IsBlack return", "value", val) + } else { + log.ZError(ctx, "FriendLocalCache IsBlack return", err) + } + }() + return localcache.AnyValue[bool](f.local.GetLink(ctx, cachekey.GetIsBlackIDsKey(possibleBlackUserID, userID), func(ctx context.Context) (any, error) { + log.ZDebug(ctx, "FriendLocalCache IsBlack rpc", "possibleBlackUserID", possibleBlackUserID, "userID", userID) + return f.client.IsBlack(ctx, possibleBlackUserID, userID) + }, cachekey.GetBlackIDsKey(userID))) +} diff --git a/pkg/rpccache/group.go b/pkg/rpccache/group.go new file mode 100644 index 000000000..b3c666da4 --- /dev/null +++ b/pkg/rpccache/group.go @@ -0,0 +1,143 @@ +package rpccache + +import ( + "context" + "github.com/OpenIMSDK/protocol/sdkws" + "github.com/OpenIMSDK/tools/errs" + "github.com/OpenIMSDK/tools/log" + "github.com/openimsdk/localcache" + "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" + "github.com/redis/go-redis/v9" +) + +func NewGroupLocalCache(client rpcclient.GroupRpcClient, cli redis.UniversalClient) *GroupLocalCache { + lc := config.Config.LocalCache.Group + log.ZDebug(context.Background(), "GroupLocalCache", "topic", lc.Topic, "slotNum", lc.SlotNum, "slotSize", lc.SlotSize, "enable", lc.Enable()) + x := &GroupLocalCache{ + client: client, + local: localcache.New[any]( + localcache.WithLocalSlotNum(lc.SlotNum), + localcache.WithLocalSlotSize(lc.SlotSize), + localcache.WithLinkSlotNum(lc.SlotNum), + localcache.WithLocalSuccessTTL(lc.Success()), + localcache.WithLocalFailedTTL(lc.Failed()), + ), + } + if lc.Enable() { + go subscriberRedisDeleteCache(context.Background(), cli, lc.Topic, x.local.DelLocal) + } + return x +} + +type GroupLocalCache struct { + client rpcclient.GroupRpcClient + local localcache.Cache[any] +} + +func (g *GroupLocalCache) getGroupMemberIDs(ctx context.Context, groupID string) (val *listMap[string], err error) { + log.ZDebug(ctx, "GroupLocalCache getGroupMemberIDs req", "groupID", groupID) + defer func() { + if err == nil { + log.ZDebug(ctx, "GroupLocalCache getGroupMemberIDs return", "value", val) + } else { + log.ZError(ctx, "GroupLocalCache getGroupMemberIDs return", err) + } + }() + return localcache.AnyValue[*listMap[string]](g.local.Get(ctx, cachekey.GetGroupMemberIDsKey(groupID), func(ctx context.Context) (any, error) { + log.ZDebug(ctx, "GroupLocalCache getGroupMemberIDs rpc", "groupID", groupID) + return newListMap(g.client.GetGroupMemberIDs(ctx, groupID)) + })) +} + +func (g *GroupLocalCache) GetGroupMember(ctx context.Context, groupID, userID string) (val *sdkws.GroupMemberFullInfo, err error) { + log.ZDebug(ctx, "GroupLocalCache GetGroupInfo req", "groupID", groupID, "userID", userID) + defer func() { + if err == nil { + log.ZDebug(ctx, "GroupLocalCache GetGroupInfo return", "value", val) + } else { + log.ZError(ctx, "GroupLocalCache GetGroupInfo return", err) + } + }() + return localcache.AnyValue[*sdkws.GroupMemberFullInfo](g.local.Get(ctx, cachekey.GetGroupMemberInfoKey(groupID, userID), func(ctx context.Context) (any, error) { + log.ZDebug(ctx, "GroupLocalCache GetGroupInfo rpc", "groupID", groupID, "userID", userID) + return g.client.GetGroupMemberCache(ctx, groupID, userID) + })) +} + +func (g *GroupLocalCache) GetGroupInfo(ctx context.Context, groupID string) (val *sdkws.GroupInfo, err error) { + log.ZDebug(ctx, "GroupLocalCache GetGroupInfo req", "groupID", groupID) + defer func() { + if err == nil { + log.ZDebug(ctx, "GroupLocalCache GetGroupInfo return", "value", val) + } else { + log.ZError(ctx, "GroupLocalCache GetGroupInfo return", err) + } + }() + return localcache.AnyValue[*sdkws.GroupInfo](g.local.Get(ctx, cachekey.GetGroupInfoKey(groupID), func(ctx context.Context) (any, error) { + log.ZDebug(ctx, "GroupLocalCache GetGroupInfo rpc", "groupID", groupID) + return g.client.GetGroupInfoCache(ctx, groupID) + })) +} + +func (g *GroupLocalCache) GetGroupMemberIDs(ctx context.Context, groupID string) ([]string, error) { + res, err := g.getGroupMemberIDs(ctx, groupID) + if err != nil { + return nil, err + } + return res.List, nil +} + +func (g *GroupLocalCache) GetGroupMemberIDMap(ctx context.Context, groupID string) (map[string]struct{}, error) { + res, err := g.getGroupMemberIDs(ctx, groupID) + if err != nil { + return nil, err + } + return res.Map, nil +} + +func (g *GroupLocalCache) GetGroupInfos(ctx context.Context, groupIDs []string) ([]*sdkws.GroupInfo, error) { + groupInfos := make([]*sdkws.GroupInfo, 0, len(groupIDs)) + for _, groupID := range groupIDs { + groupInfo, err := g.GetGroupInfo(ctx, groupID) + if err != nil { + if errs.ErrRecordNotFound.Is(err) { + continue + } + return nil, err + } + groupInfos = append(groupInfos, groupInfo) + } + return groupInfos, nil +} + +func (g *GroupLocalCache) GetGroupMembers(ctx context.Context, groupID string, userIDs []string) ([]*sdkws.GroupMemberFullInfo, error) { + members := make([]*sdkws.GroupMemberFullInfo, 0, len(userIDs)) + for _, userID := range userIDs { + member, err := g.GetGroupMember(ctx, groupID, userID) + if err != nil { + if errs.ErrRecordNotFound.Is(err) { + continue + } + return nil, err + } + members = append(members, member) + } + return members, nil +} + +func (g *GroupLocalCache) GetGroupMemberInfoMap(ctx context.Context, groupID string, userIDs []string) (map[string]*sdkws.GroupMemberFullInfo, error) { + members := make(map[string]*sdkws.GroupMemberFullInfo) + for _, userID := range userIDs { + member, err := g.GetGroupMember(ctx, groupID, userID) + if err != nil { + if errs.ErrRecordNotFound.Is(err) { + continue + } + return nil, err + } + members[userID] = member + } + return members, nil +} diff --git a/pkg/rpccache/subscriber.go b/pkg/rpccache/subscriber.go new file mode 100644 index 000000000..571ff6d2d --- /dev/null +++ b/pkg/rpccache/subscriber.go @@ -0,0 +1,23 @@ +package rpccache + +import ( + "context" + "encoding/json" + "github.com/OpenIMSDK/tools/log" + "github.com/redis/go-redis/v9" +) + +func subscriberRedisDeleteCache(ctx context.Context, client redis.UniversalClient, channel string, del func(ctx context.Context, key ...string)) { + for message := range client.Subscribe(ctx, channel).Channel() { + log.ZDebug(ctx, "subscriberRedisDeleteCache", "channel", channel, "payload", message.Payload) + var keys []string + if err := json.Unmarshal([]byte(message.Payload), &keys); err != nil { + log.ZError(ctx, "subscriberRedisDeleteCache json.Unmarshal error", err) + continue + } + if len(keys) == 0 { + continue + } + del(ctx, keys...) + } +} diff --git a/pkg/rpccache/user.go b/pkg/rpccache/user.go new file mode 100644 index 000000000..7d6cd5c7e --- /dev/null +++ b/pkg/rpccache/user.go @@ -0,0 +1,97 @@ +package rpccache + +import ( + "context" + "github.com/OpenIMSDK/protocol/sdkws" + "github.com/OpenIMSDK/tools/errs" + "github.com/OpenIMSDK/tools/log" + "github.com/openimsdk/localcache" + "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" + "github.com/redis/go-redis/v9" +) + +func NewUserLocalCache(client rpcclient.UserRpcClient, cli redis.UniversalClient) *UserLocalCache { + lc := config.Config.LocalCache.User + log.ZDebug(context.Background(), "UserLocalCache", "topic", lc.Topic, "slotNum", lc.SlotNum, "slotSize", lc.SlotSize, "enable", lc.Enable()) + x := &UserLocalCache{ + client: client, + local: localcache.New[any]( + localcache.WithLocalSlotNum(lc.SlotNum), + localcache.WithLocalSlotSize(lc.SlotSize), + localcache.WithLinkSlotNum(lc.SlotNum), + localcache.WithLocalSuccessTTL(lc.Success()), + localcache.WithLocalFailedTTL(lc.Failed()), + ), + } + if lc.Enable() { + go subscriberRedisDeleteCache(context.Background(), cli, lc.Topic, x.local.DelLocal) + } + return x +} + +type UserLocalCache struct { + client rpcclient.UserRpcClient + local localcache.Cache[any] +} + +func (u *UserLocalCache) GetUserInfo(ctx context.Context, userID string) (val *sdkws.UserInfo, err error) { + log.ZDebug(ctx, "UserLocalCache GetUserInfo req", "userID", userID) + defer func() { + if err == nil { + log.ZDebug(ctx, "UserLocalCache GetUserInfo return", "value", val) + } else { + log.ZError(ctx, "UserLocalCache GetUserInfo return", err) + } + }() + return localcache.AnyValue[*sdkws.UserInfo](u.local.Get(ctx, cachekey.GetUserInfoKey(userID), func(ctx context.Context) (any, error) { + log.ZDebug(ctx, "UserLocalCache GetUserInfo rpc", "userID", userID) + return u.client.GetUserInfo(ctx, userID) + })) +} + +func (u *UserLocalCache) GetUserGlobalMsgRecvOpt(ctx context.Context, userID string) (val int32, err error) { + log.ZDebug(ctx, "UserLocalCache GetUserGlobalMsgRecvOpt req", "userID", userID) + defer func() { + if err == nil { + log.ZDebug(ctx, "UserLocalCache GetUserGlobalMsgRecvOpt return", "value", val) + } else { + log.ZError(ctx, "UserLocalCache GetUserGlobalMsgRecvOpt return", err) + } + }() + return localcache.AnyValue[int32](u.local.Get(ctx, cachekey.GetUserGlobalRecvMsgOptKey(userID), func(ctx context.Context) (any, error) { + log.ZDebug(ctx, "UserLocalCache GetUserGlobalMsgRecvOpt rpc", "userID", userID) + return u.client.GetUserGlobalMsgRecvOpt(ctx, userID) + })) +} + +func (u *UserLocalCache) GetUsersInfo(ctx context.Context, userIDs []string) ([]*sdkws.UserInfo, error) { + users := make([]*sdkws.UserInfo, 0, len(userIDs)) + for _, userID := range userIDs { + user, err := u.GetUserInfo(ctx, userID) + if err != nil { + if errs.ErrRecordNotFound.Is(err) { + continue + } + return nil, err + } + users = append(users, user) + } + return users, nil +} + +func (u *UserLocalCache) GetUsersInfoMap(ctx context.Context, userIDs []string) (map[string]*sdkws.UserInfo, error) { + users := make(map[string]*sdkws.UserInfo, len(userIDs)) + for _, userID := range userIDs { + user, err := u.GetUserInfo(ctx, userID) + if err != nil { + if errs.ErrRecordNotFound.Is(err) { + continue + } + return nil, err + } + users[userID] = user + } + return users, nil +} diff --git a/pkg/rpcclient/conversation.go b/pkg/rpcclient/conversation.go index 6981844b6..127e029e1 100644 --- a/pkg/rpcclient/conversation.go +++ b/pkg/rpcclient/conversation.go @@ -114,6 +114,14 @@ func (c *ConversationRpcClient) GetConversationsByConversationID(ctx context.Con return resp.Conversations, nil } +func (c *ConversationRpcClient) GetConversationOfflinePushUserIDs(ctx context.Context, conversationID string, userIDs []string) ([]string, error) { + resp, err := c.Client.GetConversationOfflinePushUserIDs(ctx, &pbconversation.GetConversationOfflinePushUserIDsReq{ConversationID: conversationID, UserIDs: userIDs}) + if err != nil { + return nil, err + } + return resp.UserIDs, nil +} + func (c *ConversationRpcClient) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*pbconversation.Conversation, error) { if len(conversationIDs) == 0 { return nil, nil @@ -127,3 +135,11 @@ func (c *ConversationRpcClient) GetConversations(ctx context.Context, ownerUserI } return resp.Conversations, nil } + +func (c *ConversationRpcClient) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) { + resp, err := c.Client.GetConversationNotReceiveMessageUserIDs(ctx, &pbconversation.GetConversationNotReceiveMessageUserIDsReq{ConversationID: conversationID}) + if err != nil { + return nil, err + } + return resp.UserIDs, nil +} diff --git a/pkg/rpcclient/friend.go b/pkg/rpcclient/friend.go index e1f6ed076..5a5f38698 100644 --- a/pkg/rpcclient/friend.go +++ b/pkg/rpcclient/friend.go @@ -80,7 +80,7 @@ func (f *FriendRpcClient) GetFriendIDs(ctx context.Context, ownerUserID string) return resp.FriendIDs, nil } -func (b *FriendRpcClient) IsBlocked(ctx context.Context, possibleBlackUserID, userID string) (bool, error) { +func (b *FriendRpcClient) IsBlack(ctx context.Context, possibleBlackUserID, userID string) (bool, error) { r, err := b.Client.IsBlack(ctx, &friend.IsBlackReq{UserID1: possibleBlackUserID, UserID2: userID}) if err != nil { return false, err diff --git a/pkg/rpcclient/group.go b/pkg/rpcclient/group.go index 773637858..ec7aab695 100644 --- a/pkg/rpcclient/group.go +++ b/pkg/rpcclient/group.go @@ -26,11 +26,9 @@ import ( "github.com/OpenIMSDK/tools/utils" "github.com/openimsdk/open-im-server/v3/pkg/common/config" util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" - "google.golang.org/grpc" ) type Group struct { - conn grpc.ClientConnInterface Client group.GroupClient discov discoveryregistry.SvcDiscoveryRegistry Config *config.GlobalConfig @@ -42,7 +40,7 @@ func NewGroup(discov discoveryregistry.SvcDiscoveryRegistry, config *config.Glob util.ExitWithError(err) } client := group.NewGroupClient(conn) - return &Group{discov: discov, conn: conn, Client: client, Config: config} + return &Group{discov: discov, Client: client, Config: config} } type GroupRpcClient Group