From e5a7810d6e51399ec8995c15acc7f15317687e68 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Tue, 30 Aug 2022 22:04:13 +0800 Subject: [PATCH] add group member cache in msg rpc --- cmd/Open-IM-SDK-Core | 2 +- internal/push/logic/init.go | 17 +++--- internal/push/logic/push_to_client.go | 64 +---------------------- internal/rpc/msg/send_msg.go | 51 ++++++++++-------- internal/utils/local_cache.go | 75 +++++++++++++++++++++++++++ 5 files changed, 115 insertions(+), 94 deletions(-) create mode 100644 internal/utils/local_cache.go diff --git a/cmd/Open-IM-SDK-Core b/cmd/Open-IM-SDK-Core index 5e8d3f536..1667b0f4e 160000 --- a/cmd/Open-IM-SDK-Core +++ b/cmd/Open-IM-SDK-Core @@ -1 +1 @@ -Subproject commit 5e8d3f5366700f00db7db2905da27189b9353630 +Subproject commit 1667b0f4e205fc4ed7c690ab55b662087d61c277 diff --git a/internal/push/logic/init.go b/internal/push/logic/init.go index 2a147fd27..bfe5e1187 100644 --- a/internal/push/logic/init.go +++ b/internal/push/logic/init.go @@ -16,25 +16,22 @@ import ( "Open_IM/pkg/common/kafka" "Open_IM/pkg/statistics" "fmt" - "sync" ) var ( - rpcServer RPCServer - pushCh PushConsumerHandler - pushTerminal []int32 - producer *kafka.Producer - offlinePusher pusher.OfflinePusher - successCount uint64 - CacheGroupMemberUserIDList map[string]*GroupMemberUserIDListHash - CacheGroupMtx sync.RWMutex + rpcServer RPCServer + pushCh PushConsumerHandler + pushTerminal []int32 + producer *kafka.Producer + offlinePusher pusher.OfflinePusher + successCount uint64 ) func Init(rpcPort int) { rpcServer.Init(rpcPort) pushCh.Init() pushTerminal = []int32{constant.IOSPlatformID, constant.AndroidPlatformID} - CacheGroupMemberUserIDList = make(map[string]*GroupMemberUserIDListHash, 0) + } func init() { producer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic) diff --git a/internal/push/logic/push_to_client.go b/internal/push/logic/push_to_client.go index 451355d7f..e6e8fd6e4 100644 --- a/internal/push/logic/push_to_client.go +++ b/internal/push/logic/push_to_client.go @@ -8,13 +8,12 @@ package logic import ( "Open_IM/internal/push" + utils2 "Open_IM/internal/utils" "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db" - rocksCache "Open_IM/pkg/common/db/rocks_cache" "Open_IM/pkg/common/log" "Open_IM/pkg/grpc-etcdv3/getcdv3" - pbCache "Open_IM/pkg/proto/cache" pbPush "Open_IM/pkg/proto/push" pbRelay "Open_IM/pkg/proto/relay" pbRtc "Open_IM/pkg/proto/rtc" @@ -22,7 +21,6 @@ import ( "Open_IM/pkg/utils" "context" "encoding/json" - "errors" "github.com/golang/protobuf/proto" "strings" ) @@ -39,66 +37,8 @@ type AtContent struct { IsAtSelf bool `json:"isAtSelf"` } -type GroupMemberUserIDListHash struct { - MemberListHash uint64 - UserIDList []string -} - //var grpcCons []*grpc.ClientConn -func GetGroupMemberUserIDList(groupID string, operationID string) ([]string, error) { - groupHashRemote, err := GetGroupMemberUserIDListHashFromRemote(groupID) - if err != nil { - CacheGroupMtx.Lock() - defer CacheGroupMtx.Unlock() - delete(CacheGroupMemberUserIDList, groupID) - log.Error(operationID, "GetGroupMemberUserIDListHashFromRemote failed ", err.Error(), groupID) - return nil, utils.Wrap(err, groupID) - } - - CacheGroupMtx.Lock() - defer CacheGroupMtx.Unlock() - groupInLocalCache, ok := CacheGroupMemberUserIDList[groupID] - if ok && groupInLocalCache.MemberListHash == groupHashRemote { - log.Debug(operationID, "in local cache ", groupID) - return groupInLocalCache.UserIDList, nil - } - log.Debug(operationID, "not in local cache or hash changed", groupID, " remote hash ", groupHashRemote, " in cache ", ok) - memberUserIDListRemote, err := GetGroupMemberUserIDListFromRemote(groupID, operationID) - if err != nil { - log.Error(operationID, "GetGroupMemberUserIDListFromRemote failed ", err.Error(), groupID) - return nil, utils.Wrap(err, groupID) - } - CacheGroupMemberUserIDList[groupID] = &GroupMemberUserIDListHash{MemberListHash: groupHashRemote, UserIDList: memberUserIDListRemote} - return memberUserIDListRemote, nil -} - -func GetGroupMemberUserIDListHashFromRemote(groupID string) (uint64, error) { - return rocksCache.GetGroupMemberListHashFromCache(groupID) -} - -func GetGroupMemberUserIDListFromRemote(groupID string, operationID string) ([]string, error) { - getGroupMemberIDListFromCacheReq := &pbCache.GetGroupMemberIDListFromCacheReq{OperationID: operationID, GroupID: groupID} - etcdConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, operationID) - if etcdConn == nil { - errMsg := operationID + "getcdv3.GetDefaultConn == nil" - log.NewError(operationID, errMsg) - return nil, errors.New("errMsg") - } - client := pbCache.NewCacheClient(etcdConn) - cacheResp, err := client.GetGroupMemberIDListFromCache(context.Background(), getGroupMemberIDListFromCacheReq) - if err != nil { - log.NewError(operationID, "GetGroupMemberIDListFromCache rpc call failed ", err.Error()) - return nil, utils.Wrap(err, "GetGroupMemberIDListFromCache rpc call failed") - } - if cacheResp.CommonResp.ErrCode != 0 { - errMsg := operationID + "GetGroupMemberIDListFromCache rpc logic call failed " + cacheResp.CommonResp.ErrMsg - log.NewError(operationID, errMsg) - return nil, errors.New("errMsg") - } - return cacheResp.UserIDList, nil -} - func MsgToUser(pushMsg *pbPush.PushMsgReq) { var wsResult []*pbRelay.SingelMsgToUserResultList isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush) @@ -235,7 +175,7 @@ func MsgToSuperGroupUser(pushMsg *pbPush.PushMsgReq) { log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "callback userIDList Resp", pushToUserIDList) } if len(pushToUserIDList) == 0 { - userIDList, err := GetGroupMemberUserIDList(pushMsg.MsgData.GroupID, pushMsg.OperationID) + userIDList, err := utils2.GetGroupMemberUserIDList(pushMsg.MsgData.GroupID, pushMsg.OperationID) if err != nil { log.Error(pushMsg.OperationID, "GetGroupMemberUserIDList failed ", err.Error(), pushMsg.MsgData.GroupID) return diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index ab39a3662..02d8a163e 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -1,6 +1,7 @@ package msg import ( + utils2 "Open_IM/internal/utils" "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db" @@ -9,7 +10,6 @@ import ( "Open_IM/pkg/common/token_verify" "Open_IM/pkg/grpc-etcdv3/getcdv3" cacheRpc "Open_IM/pkg/proto/cache" - pbCache "Open_IM/pkg/proto/cache" pbConversation "Open_IM/pkg/proto/conversation" pbChat "Open_IM/pkg/proto/msg" pbRelay "Open_IM/pkg/proto/relay" @@ -139,36 +139,45 @@ func messageVerification(data *pbChat.SendMsgReq) (bool, int32, string, []string if groupInfo.GroupType == constant.SuperGroup { return true, 0, "", nil } else { - getGroupMemberIDListFromCacheReq := &pbCache.GetGroupMemberIDListFromCacheReq{OperationID: data.OperationID, GroupID: data.MsgData.GroupID} - etcdConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, data.OperationID) - if etcdConn == nil { - errMsg := data.OperationID + "getcdv3.GetDefaultConn == nil" + userIDList, err := utils2.GetGroupMemberUserIDList(data.MsgData.GroupID, data.OperationID) + if err != nil { + errMsg := data.OperationID + err.Error() log.NewError(data.OperationID, errMsg) - //return returnMsg(&replay, pb, 201, errMsg, "", 0) return false, 201, errMsg, nil } - client := pbCache.NewCacheClient(etcdConn) - cacheResp, err := client.GetGroupMemberIDListFromCache(context.Background(), getGroupMemberIDListFromCacheReq) - if err != nil { - log.NewError(data.OperationID, "GetGroupMemberIDListFromCache rpc call failed ", err.Error()) - //return returnMsg(&replay, pb, 201, "GetGroupMemberIDListFromCache failed", "", 0) - return false, 201, err.Error(), nil - } - if cacheResp.CommonResp.ErrCode != 0 { - log.NewError(data.OperationID, "GetGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String()) - //return returnMsg(&replay, pb, 201, "GetGroupMemberIDListFromCache logic failed", "", 0) - return false, cacheResp.CommonResp.ErrCode, cacheResp.CommonResp.ErrMsg, nil - } + + // + //getGroupMemberIDListFromCacheReq := &pbCache.GetGroupMemberIDListFromCacheReq{OperationID: data.OperationID, GroupID: data.MsgData.GroupID} + //etcdConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, data.OperationID) + //if etcdConn == nil { + // errMsg := data.OperationID + "getcdv3.GetDefaultConn == nil" + // log.NewError(data.OperationID, errMsg) + // return false, 201, errMsg, nil + //} + //client := pbCache.NewCacheClient(etcdConn) + // cacheResp, err := client.GetGroupMemberIDListFromCache(context.Background(), getGroupMemberIDListFromCacheReq) + // + // + //if err != nil { + // log.NewError(data.OperationID, "GetGroupMemberIDListFromCache rpc call failed ", err.Error()) + // //return returnMsg(&replay, pb, 201, "GetGroupMemberIDListFromCache failed", "", 0) + // return false, 201, err.Error(), nil + //} + //if cacheResp.CommonResp.ErrCode != 0 { + // log.NewError(data.OperationID, "GetGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String()) + // //return returnMsg(&replay, pb, 201, "GetGroupMemberIDListFromCache logic failed", "", 0) + // return false, cacheResp.CommonResp.ErrCode, cacheResp.CommonResp.ErrMsg, nil + //} if !token_verify.IsManagerUserID(data.MsgData.SendID) { if data.MsgData.ContentType <= constant.NotificationEnd && data.MsgData.ContentType >= constant.NotificationBegin { - return true, 0, "", cacheResp.UserIDList + return true, 0, "", userIDList } - if !utils.IsContain(data.MsgData.SendID, cacheResp.UserIDList) { + if !utils.IsContain(data.MsgData.SendID, userIDList) { //return returnMsg(&replay, pb, 202, "you are not in group", "", 0) return false, 202, "you are not in group", nil } } - return true, 0, "", cacheResp.UserIDList + return true, 0, "", userIDList } default: return true, 0, "", nil diff --git a/internal/utils/local_cache.go b/internal/utils/local_cache.go new file mode 100644 index 000000000..3be1226a1 --- /dev/null +++ b/internal/utils/local_cache.go @@ -0,0 +1,75 @@ +package utils + +import ( + "Open_IM/pkg/common/config" + rocksCache "Open_IM/pkg/common/db/rocks_cache" + "Open_IM/pkg/common/log" + "Open_IM/pkg/grpc-etcdv3/getcdv3" + pbCache "Open_IM/pkg/proto/cache" + "Open_IM/pkg/utils" + "context" + "errors" + "strings" + "sync" +) + +type GroupMemberUserIDListHash struct { + MemberListHash uint64 + UserIDList []string +} + +var CacheGroupMemberUserIDList map[string]*GroupMemberUserIDListHash = make(map[string]*GroupMemberUserIDListHash, 0) +var CacheGroupMtx sync.RWMutex + +func GetGroupMemberUserIDList(groupID string, operationID string) ([]string, error) { + groupHashRemote, err := GetGroupMemberUserIDListHashFromRemote(groupID) + if err != nil { + CacheGroupMtx.Lock() + defer CacheGroupMtx.Unlock() + delete(CacheGroupMemberUserIDList, groupID) + log.Error(operationID, "GetGroupMemberUserIDListHashFromRemote failed ", err.Error(), groupID) + return nil, utils.Wrap(err, groupID) + } + + CacheGroupMtx.Lock() + defer CacheGroupMtx.Unlock() + groupInLocalCache, ok := CacheGroupMemberUserIDList[groupID] + if ok && groupInLocalCache.MemberListHash == groupHashRemote { + log.Debug(operationID, "in local cache ", groupID) + return groupInLocalCache.UserIDList, nil + } + log.Debug(operationID, "not in local cache or hash changed", groupID, " remote hash ", groupHashRemote, " in cache ", ok) + memberUserIDListRemote, err := GetGroupMemberUserIDListFromRemote(groupID, operationID) + if err != nil { + log.Error(operationID, "GetGroupMemberUserIDListFromRemote failed ", err.Error(), groupID) + return nil, utils.Wrap(err, groupID) + } + CacheGroupMemberUserIDList[groupID] = &GroupMemberUserIDListHash{MemberListHash: groupHashRemote, UserIDList: memberUserIDListRemote} + return memberUserIDListRemote, nil +} + +func GetGroupMemberUserIDListHashFromRemote(groupID string) (uint64, error) { + return rocksCache.GetGroupMemberListHashFromCache(groupID) +} + +func GetGroupMemberUserIDListFromRemote(groupID string, operationID string) ([]string, error) { + getGroupMemberIDListFromCacheReq := &pbCache.GetGroupMemberIDListFromCacheReq{OperationID: operationID, GroupID: groupID} + etcdConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, operationID) + if etcdConn == nil { + errMsg := operationID + "getcdv3.GetDefaultConn == nil" + log.NewError(operationID, errMsg) + return nil, errors.New("errMsg") + } + client := pbCache.NewCacheClient(etcdConn) + cacheResp, err := client.GetGroupMemberIDListFromCache(context.Background(), getGroupMemberIDListFromCacheReq) + if err != nil { + log.NewError(operationID, "GetGroupMemberIDListFromCache rpc call failed ", err.Error()) + return nil, utils.Wrap(err, "GetGroupMemberIDListFromCache rpc call failed") + } + if cacheResp.CommonResp.ErrCode != 0 { + errMsg := operationID + "GetGroupMemberIDListFromCache rpc logic call failed " + cacheResp.CommonResp.ErrMsg + log.NewError(operationID, errMsg) + return nil, errors.New("errMsg") + } + return cacheResp.UserIDList, nil +}