From 67319031ea53718e3a3eb6ef63b1c7af5021a265 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Tue, 30 Aug 2022 21:22:14 +0800 Subject: [PATCH 1/4] log --- config/config.yaml | 6 +-- internal/msg_gateway/gate/logic.go | 4 +- internal/push/logic/init.go | 17 +++++--- internal/push/logic/push_to_client.go | 62 ++++++++++++++++++++++++++- 4 files changed, 75 insertions(+), 14 deletions(-) diff --git a/config/config.yaml b/config/config.yaml index 156b5a94e..55a96f0b8 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -17,9 +17,9 @@ mysql: dbMysqlDatabaseName: openIM_v2 #默认即可 dbTableName: eMsg #默认即可 dbMsgTableNum: 1 - dbMaxOpenConns: 2000 - dbMaxIdleConns: 100 - dbMaxLifeTime: 3600 + dbMaxOpenConns: 100 + dbMaxIdleConns: 10 + dbMaxLifeTime: 5 mongo: dbUri: ""#当dbUri值不为空则直接使用该值 diff --git a/internal/msg_gateway/gate/logic.go b/internal/msg_gateway/gate/logic.go index a6f3a1974..2ee9be712 100644 --- a/internal/msg_gateway/gate/logic.go +++ b/internal/msg_gateway/gate/logic.go @@ -153,6 +153,7 @@ func (ws *WServer) pullMsgBySeqListReq(conn *UserConn, m *Req) { ws.pullMsgBySeqListResp(conn, m, nReply) } } + func (ws *WServer) pullMsgBySeqListResp(conn *UserConn, m *Req, pb *sdk_ws.PullMessageBySeqListResp) { log.NewInfo(m.OperationID, "pullMsgBySeqListResp come here ", pb.String()) c, _ := proto.Marshal(pb) @@ -166,10 +167,9 @@ func (ws *WServer) pullMsgBySeqListResp(conn *UserConn, m *Req, pb *sdk_ws.PullM } log.NewInfo(m.OperationID, "pullMsgBySeqListResp all data is ", mReply.ReqIdentifier, mReply.MsgIncr, mReply.ErrCode, mReply.ErrMsg, len(mReply.Data)) - ws.sendMsg(conn, mReply) - } + func (ws *WServer) sendMsgReq(conn *UserConn, m *Req) { sendMsgAllCountLock.Lock() sendMsgAllCount++ diff --git a/internal/push/logic/init.go b/internal/push/logic/init.go index 6d6229dca..2a147fd27 100644 --- a/internal/push/logic/init.go +++ b/internal/push/logic/init.go @@ -16,22 +16,25 @@ import ( "Open_IM/pkg/common/kafka" "Open_IM/pkg/statistics" "fmt" + "sync" ) var ( - rpcServer RPCServer - pushCh PushConsumerHandler - pushTerminal []int32 - producer *kafka.Producer - offlinePusher pusher.OfflinePusher - successCount uint64 + rpcServer RPCServer + pushCh PushConsumerHandler + pushTerminal []int32 + producer *kafka.Producer + offlinePusher pusher.OfflinePusher + successCount uint64 + CacheGroupMemberUserIDList map[string]*GroupMemberUserIDListHash + CacheGroupMtx sync.RWMutex ) func Init(rpcPort int) { - rpcServer.Init(rpcPort) pushCh.Init() pushTerminal = []int32{constant.IOSPlatformID, constant.AndroidPlatformID} + CacheGroupMemberUserIDList = make(map[string]*GroupMemberUserIDListHash, 0) } func init() { producer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic) diff --git a/internal/push/logic/push_to_client.go b/internal/push/logic/push_to_client.go index 766704e1b..04c9ef664 100644 --- a/internal/push/logic/push_to_client.go +++ b/internal/push/logic/push_to_client.go @@ -11,6 +11,7 @@ import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db" + rocksCache "Open_IM/pkg/common/db/rocks_cache" "Open_IM/pkg/common/log" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbCache "Open_IM/pkg/proto/cache" @@ -21,9 +22,9 @@ import ( "Open_IM/pkg/utils" "context" "encoding/json" - "strings" - + "errors" "github.com/golang/protobuf/proto" + "strings" ) type OpenIMContent struct { @@ -38,8 +39,65 @@ type AtContent struct { IsAtSelf bool `json:"isAtSelf"` } +type GroupMemberUserIDListHash struct { + MemberListHash uint64 + UserIDList []string +} + //var grpcCons []*grpc.ClientConn +func GetGroupMemberUserIDList(groupID string, operationID string) ([]string, error) { + groupHashRemote, err := GetGroupMemberUserIDListHashFromRemote(groupID) + if err != nil { + CacheGroupMtx.Lock() + defer CacheGroupMtx.Unlock() + delete(CacheGroupMemberUserIDList, groupID) + log.Error(operationID, "GetGroupMemberUserIDListHashFromRemote failed ", err.Error(), groupID) + return nil, utils.Wrap(err, groupID) + } + + CacheGroupMtx.Lock() + defer CacheGroupMtx.Unlock() + groupInLocalCache, ok := CacheGroupMemberUserIDList[groupID] + if ok && groupInLocalCache.MemberListHash == groupHashRemote { + return groupInLocalCache.UserIDList, nil + } + + memberUserIDListRemote, err := GetGroupMemberUserIDListFromRemote(groupID, operationID) + if err != nil { + log.Error(operationID, "GetGroupMemberUserIDListFromRemote failed ", err.Error(), groupID) + return nil, utils.Wrap(err, groupID) + } + CacheGroupMemberUserIDList[groupID] = &GroupMemberUserIDListHash{MemberListHash: groupHashRemote, UserIDList: memberUserIDListRemote} + return memberUserIDListRemote, nil +} + +func GetGroupMemberUserIDListHashFromRemote(groupID string) (uint64, error) { + return rocksCache.GetGroupMemberListHashFromCache(groupID) +} + +func GetGroupMemberUserIDListFromRemote(groupID string, operationID string) ([]string, error) { + getGroupMemberIDListFromCacheReq := &pbCache.GetGroupMemberIDListFromCacheReq{OperationID: operationID, GroupID: groupID} + etcdConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, operationID) + if etcdConn == nil { + errMsg := operationID + "getcdv3.GetDefaultConn == nil" + log.NewError(operationID, errMsg) + return nil, errors.New("errMsg") + } + client := pbCache.NewCacheClient(etcdConn) + cacheResp, err := client.GetGroupMemberIDListFromCache(context.Background(), getGroupMemberIDListFromCacheReq) + if err != nil { + log.NewError(operationID, "GetGroupMemberIDListFromCache rpc call failed ", err.Error()) + return nil, utils.Wrap(err, "GetGroupMemberIDListFromCache rpc call failed") + } + if cacheResp.CommonResp.ErrCode != 0 { + errMsg := operationID + "GetGroupMemberIDListFromCache rpc logic call failed " + cacheResp.CommonResp.ErrMsg + log.NewError(operationID, errMsg) + return nil, errors.New("errMsg") + } + return cacheResp.UserIDList, nil +} + func MsgToUser(pushMsg *pbPush.PushMsgReq) { var wsResult []*pbRelay.SingelMsgToUserResultList isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush) From 99e913637a84ad9e4ce9ecda63b08213b5cd8bf3 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Tue, 30 Aug 2022 21:31:53 +0800 Subject: [PATCH 2/4] add group member cache in push --- internal/push/logic/push_to_client.go | 39 ++++++++++++++++----------- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/internal/push/logic/push_to_client.go b/internal/push/logic/push_to_client.go index 04c9ef664..451355d7f 100644 --- a/internal/push/logic/push_to_client.go +++ b/internal/push/logic/push_to_client.go @@ -60,9 +60,10 @@ func GetGroupMemberUserIDList(groupID string, operationID string) ([]string, err defer CacheGroupMtx.Unlock() groupInLocalCache, ok := CacheGroupMemberUserIDList[groupID] if ok && groupInLocalCache.MemberListHash == groupHashRemote { + log.Debug(operationID, "in local cache ", groupID) return groupInLocalCache.UserIDList, nil } - + log.Debug(operationID, "not in local cache or hash changed", groupID, " remote hash ", groupHashRemote, " in cache ", ok) memberUserIDListRemote, err := GetGroupMemberUserIDListFromRemote(groupID, operationID) if err != nil { log.Error(operationID, "GetGroupMemberUserIDListFromRemote failed ", err.Error(), groupID) @@ -234,24 +235,30 @@ func MsgToSuperGroupUser(pushMsg *pbPush.PushMsgReq) { log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "callback userIDList Resp", pushToUserIDList) } if len(pushToUserIDList) == 0 { - getGroupMemberIDListFromCacheReq := &pbCache.GetGroupMemberIDListFromCacheReq{OperationID: pushMsg.OperationID, GroupID: pushMsg.MsgData.GroupID} - etcdConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, pushMsg.OperationID) - if etcdConn == nil { - errMsg := pushMsg.OperationID + "getcdv3.GetDefaultConn == nil" - log.NewError(pushMsg.OperationID, errMsg) - return - } - client := pbCache.NewCacheClient(etcdConn) - cacheResp, err := client.GetGroupMemberIDListFromCache(context.Background(), getGroupMemberIDListFromCacheReq) + userIDList, err := GetGroupMemberUserIDList(pushMsg.MsgData.GroupID, pushMsg.OperationID) if err != nil { - log.NewError(pushMsg.OperationID, "GetGroupMemberIDListFromCache rpc call failed ", err.Error()) - return - } - if cacheResp.CommonResp.ErrCode != 0 { - log.NewError(pushMsg.OperationID, "GetGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String()) + log.Error(pushMsg.OperationID, "GetGroupMemberUserIDList failed ", err.Error(), pushMsg.MsgData.GroupID) return } - pushToUserIDList = cacheResp.UserIDList + pushToUserIDList = userIDList + //getGroupMemberIDListFromCacheReq := &pbCache.GetGroupMemberIDListFromCacheReq{OperationID: pushMsg.OperationID, GroupID: pushMsg.MsgData.GroupID} + //etcdConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, pushMsg.OperationID) + //if etcdConn == nil { + // errMsg := pushMsg.OperationID + "getcdv3.GetDefaultConn == nil" + // log.NewError(pushMsg.OperationID, errMsg) + // return + //} + //client := pbCache.NewCacheClient(etcdConn) + //cacheResp, err := client.GetGroupMemberIDListFromCache(context.Background(), getGroupMemberIDListFromCacheReq) + //if err != nil { + // log.NewError(pushMsg.OperationID, "GetGroupMemberIDListFromCache rpc call failed ", err.Error()) + // return + //} + //if cacheResp.CommonResp.ErrCode != 0 { + // log.NewError(pushMsg.OperationID, "GetGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String()) + // return + //} + //pushToUserIDList = cacheResp.UserIDList } grpcCons := getcdv3.GetDefaultGatewayConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), pushMsg.OperationID) From e5a7810d6e51399ec8995c15acc7f15317687e68 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Tue, 30 Aug 2022 22:04:13 +0800 Subject: [PATCH 3/4] add group member cache in msg rpc --- cmd/Open-IM-SDK-Core | 2 +- internal/push/logic/init.go | 17 +++--- internal/push/logic/push_to_client.go | 64 +---------------------- internal/rpc/msg/send_msg.go | 51 ++++++++++-------- internal/utils/local_cache.go | 75 +++++++++++++++++++++++++++ 5 files changed, 115 insertions(+), 94 deletions(-) create mode 100644 internal/utils/local_cache.go diff --git a/cmd/Open-IM-SDK-Core b/cmd/Open-IM-SDK-Core index 5e8d3f536..1667b0f4e 160000 --- a/cmd/Open-IM-SDK-Core +++ b/cmd/Open-IM-SDK-Core @@ -1 +1 @@ -Subproject commit 5e8d3f5366700f00db7db2905da27189b9353630 +Subproject commit 1667b0f4e205fc4ed7c690ab55b662087d61c277 diff --git a/internal/push/logic/init.go b/internal/push/logic/init.go index 2a147fd27..bfe5e1187 100644 --- a/internal/push/logic/init.go +++ b/internal/push/logic/init.go @@ -16,25 +16,22 @@ import ( "Open_IM/pkg/common/kafka" "Open_IM/pkg/statistics" "fmt" - "sync" ) var ( - rpcServer RPCServer - pushCh PushConsumerHandler - pushTerminal []int32 - producer *kafka.Producer - offlinePusher pusher.OfflinePusher - successCount uint64 - CacheGroupMemberUserIDList map[string]*GroupMemberUserIDListHash - CacheGroupMtx sync.RWMutex + rpcServer RPCServer + pushCh PushConsumerHandler + pushTerminal []int32 + producer *kafka.Producer + offlinePusher pusher.OfflinePusher + successCount uint64 ) func Init(rpcPort int) { rpcServer.Init(rpcPort) pushCh.Init() pushTerminal = []int32{constant.IOSPlatformID, constant.AndroidPlatformID} - CacheGroupMemberUserIDList = make(map[string]*GroupMemberUserIDListHash, 0) + } func init() { producer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic) diff --git a/internal/push/logic/push_to_client.go b/internal/push/logic/push_to_client.go index 451355d7f..e6e8fd6e4 100644 --- a/internal/push/logic/push_to_client.go +++ b/internal/push/logic/push_to_client.go @@ -8,13 +8,12 @@ package logic import ( "Open_IM/internal/push" + utils2 "Open_IM/internal/utils" "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db" - rocksCache "Open_IM/pkg/common/db/rocks_cache" "Open_IM/pkg/common/log" "Open_IM/pkg/grpc-etcdv3/getcdv3" - pbCache "Open_IM/pkg/proto/cache" pbPush "Open_IM/pkg/proto/push" pbRelay "Open_IM/pkg/proto/relay" pbRtc "Open_IM/pkg/proto/rtc" @@ -22,7 +21,6 @@ import ( "Open_IM/pkg/utils" "context" "encoding/json" - "errors" "github.com/golang/protobuf/proto" "strings" ) @@ -39,66 +37,8 @@ type AtContent struct { IsAtSelf bool `json:"isAtSelf"` } -type GroupMemberUserIDListHash struct { - MemberListHash uint64 - UserIDList []string -} - //var grpcCons []*grpc.ClientConn -func GetGroupMemberUserIDList(groupID string, operationID string) ([]string, error) { - groupHashRemote, err := GetGroupMemberUserIDListHashFromRemote(groupID) - if err != nil { - CacheGroupMtx.Lock() - defer CacheGroupMtx.Unlock() - delete(CacheGroupMemberUserIDList, groupID) - log.Error(operationID, "GetGroupMemberUserIDListHashFromRemote failed ", err.Error(), groupID) - return nil, utils.Wrap(err, groupID) - } - - CacheGroupMtx.Lock() - defer CacheGroupMtx.Unlock() - groupInLocalCache, ok := CacheGroupMemberUserIDList[groupID] - if ok && groupInLocalCache.MemberListHash == groupHashRemote { - log.Debug(operationID, "in local cache ", groupID) - return groupInLocalCache.UserIDList, nil - } - log.Debug(operationID, "not in local cache or hash changed", groupID, " remote hash ", groupHashRemote, " in cache ", ok) - memberUserIDListRemote, err := GetGroupMemberUserIDListFromRemote(groupID, operationID) - if err != nil { - log.Error(operationID, "GetGroupMemberUserIDListFromRemote failed ", err.Error(), groupID) - return nil, utils.Wrap(err, groupID) - } - CacheGroupMemberUserIDList[groupID] = &GroupMemberUserIDListHash{MemberListHash: groupHashRemote, UserIDList: memberUserIDListRemote} - return memberUserIDListRemote, nil -} - -func GetGroupMemberUserIDListHashFromRemote(groupID string) (uint64, error) { - return rocksCache.GetGroupMemberListHashFromCache(groupID) -} - -func GetGroupMemberUserIDListFromRemote(groupID string, operationID string) ([]string, error) { - getGroupMemberIDListFromCacheReq := &pbCache.GetGroupMemberIDListFromCacheReq{OperationID: operationID, GroupID: groupID} - etcdConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, operationID) - if etcdConn == nil { - errMsg := operationID + "getcdv3.GetDefaultConn == nil" - log.NewError(operationID, errMsg) - return nil, errors.New("errMsg") - } - client := pbCache.NewCacheClient(etcdConn) - cacheResp, err := client.GetGroupMemberIDListFromCache(context.Background(), getGroupMemberIDListFromCacheReq) - if err != nil { - log.NewError(operationID, "GetGroupMemberIDListFromCache rpc call failed ", err.Error()) - return nil, utils.Wrap(err, "GetGroupMemberIDListFromCache rpc call failed") - } - if cacheResp.CommonResp.ErrCode != 0 { - errMsg := operationID + "GetGroupMemberIDListFromCache rpc logic call failed " + cacheResp.CommonResp.ErrMsg - log.NewError(operationID, errMsg) - return nil, errors.New("errMsg") - } - return cacheResp.UserIDList, nil -} - func MsgToUser(pushMsg *pbPush.PushMsgReq) { var wsResult []*pbRelay.SingelMsgToUserResultList isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush) @@ -235,7 +175,7 @@ func MsgToSuperGroupUser(pushMsg *pbPush.PushMsgReq) { log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "callback userIDList Resp", pushToUserIDList) } if len(pushToUserIDList) == 0 { - userIDList, err := GetGroupMemberUserIDList(pushMsg.MsgData.GroupID, pushMsg.OperationID) + userIDList, err := utils2.GetGroupMemberUserIDList(pushMsg.MsgData.GroupID, pushMsg.OperationID) if err != nil { log.Error(pushMsg.OperationID, "GetGroupMemberUserIDList failed ", err.Error(), pushMsg.MsgData.GroupID) return diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index ab39a3662..02d8a163e 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -1,6 +1,7 @@ package msg import ( + utils2 "Open_IM/internal/utils" "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db" @@ -9,7 +10,6 @@ import ( "Open_IM/pkg/common/token_verify" "Open_IM/pkg/grpc-etcdv3/getcdv3" cacheRpc "Open_IM/pkg/proto/cache" - pbCache "Open_IM/pkg/proto/cache" pbConversation "Open_IM/pkg/proto/conversation" pbChat "Open_IM/pkg/proto/msg" pbRelay "Open_IM/pkg/proto/relay" @@ -139,36 +139,45 @@ func messageVerification(data *pbChat.SendMsgReq) (bool, int32, string, []string if groupInfo.GroupType == constant.SuperGroup { return true, 0, "", nil } else { - getGroupMemberIDListFromCacheReq := &pbCache.GetGroupMemberIDListFromCacheReq{OperationID: data.OperationID, GroupID: data.MsgData.GroupID} - etcdConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, data.OperationID) - if etcdConn == nil { - errMsg := data.OperationID + "getcdv3.GetDefaultConn == nil" + userIDList, err := utils2.GetGroupMemberUserIDList(data.MsgData.GroupID, data.OperationID) + if err != nil { + errMsg := data.OperationID + err.Error() log.NewError(data.OperationID, errMsg) - //return returnMsg(&replay, pb, 201, errMsg, "", 0) return false, 201, errMsg, nil } - client := pbCache.NewCacheClient(etcdConn) - cacheResp, err := client.GetGroupMemberIDListFromCache(context.Background(), getGroupMemberIDListFromCacheReq) - if err != nil { - log.NewError(data.OperationID, "GetGroupMemberIDListFromCache rpc call failed ", err.Error()) - //return returnMsg(&replay, pb, 201, "GetGroupMemberIDListFromCache failed", "", 0) - return false, 201, err.Error(), nil - } - if cacheResp.CommonResp.ErrCode != 0 { - log.NewError(data.OperationID, "GetGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String()) - //return returnMsg(&replay, pb, 201, "GetGroupMemberIDListFromCache logic failed", "", 0) - return false, cacheResp.CommonResp.ErrCode, cacheResp.CommonResp.ErrMsg, nil - } + + // + //getGroupMemberIDListFromCacheReq := &pbCache.GetGroupMemberIDListFromCacheReq{OperationID: data.OperationID, GroupID: data.MsgData.GroupID} + //etcdConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, data.OperationID) + //if etcdConn == nil { + // errMsg := data.OperationID + "getcdv3.GetDefaultConn == nil" + // log.NewError(data.OperationID, errMsg) + // return false, 201, errMsg, nil + //} + //client := pbCache.NewCacheClient(etcdConn) + // cacheResp, err := client.GetGroupMemberIDListFromCache(context.Background(), getGroupMemberIDListFromCacheReq) + // + // + //if err != nil { + // log.NewError(data.OperationID, "GetGroupMemberIDListFromCache rpc call failed ", err.Error()) + // //return returnMsg(&replay, pb, 201, "GetGroupMemberIDListFromCache failed", "", 0) + // return false, 201, err.Error(), nil + //} + //if cacheResp.CommonResp.ErrCode != 0 { + // log.NewError(data.OperationID, "GetGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String()) + // //return returnMsg(&replay, pb, 201, "GetGroupMemberIDListFromCache logic failed", "", 0) + // return false, cacheResp.CommonResp.ErrCode, cacheResp.CommonResp.ErrMsg, nil + //} if !token_verify.IsManagerUserID(data.MsgData.SendID) { if data.MsgData.ContentType <= constant.NotificationEnd && data.MsgData.ContentType >= constant.NotificationBegin { - return true, 0, "", cacheResp.UserIDList + return true, 0, "", userIDList } - if !utils.IsContain(data.MsgData.SendID, cacheResp.UserIDList) { + if !utils.IsContain(data.MsgData.SendID, userIDList) { //return returnMsg(&replay, pb, 202, "you are not in group", "", 0) return false, 202, "you are not in group", nil } } - return true, 0, "", cacheResp.UserIDList + return true, 0, "", userIDList } default: return true, 0, "", nil diff --git a/internal/utils/local_cache.go b/internal/utils/local_cache.go new file mode 100644 index 000000000..3be1226a1 --- /dev/null +++ b/internal/utils/local_cache.go @@ -0,0 +1,75 @@ +package utils + +import ( + "Open_IM/pkg/common/config" + rocksCache "Open_IM/pkg/common/db/rocks_cache" + "Open_IM/pkg/common/log" + "Open_IM/pkg/grpc-etcdv3/getcdv3" + pbCache "Open_IM/pkg/proto/cache" + "Open_IM/pkg/utils" + "context" + "errors" + "strings" + "sync" +) + +type GroupMemberUserIDListHash struct { + MemberListHash uint64 + UserIDList []string +} + +var CacheGroupMemberUserIDList map[string]*GroupMemberUserIDListHash = make(map[string]*GroupMemberUserIDListHash, 0) +var CacheGroupMtx sync.RWMutex + +func GetGroupMemberUserIDList(groupID string, operationID string) ([]string, error) { + groupHashRemote, err := GetGroupMemberUserIDListHashFromRemote(groupID) + if err != nil { + CacheGroupMtx.Lock() + defer CacheGroupMtx.Unlock() + delete(CacheGroupMemberUserIDList, groupID) + log.Error(operationID, "GetGroupMemberUserIDListHashFromRemote failed ", err.Error(), groupID) + return nil, utils.Wrap(err, groupID) + } + + CacheGroupMtx.Lock() + defer CacheGroupMtx.Unlock() + groupInLocalCache, ok := CacheGroupMemberUserIDList[groupID] + if ok && groupInLocalCache.MemberListHash == groupHashRemote { + log.Debug(operationID, "in local cache ", groupID) + return groupInLocalCache.UserIDList, nil + } + log.Debug(operationID, "not in local cache or hash changed", groupID, " remote hash ", groupHashRemote, " in cache ", ok) + memberUserIDListRemote, err := GetGroupMemberUserIDListFromRemote(groupID, operationID) + if err != nil { + log.Error(operationID, "GetGroupMemberUserIDListFromRemote failed ", err.Error(), groupID) + return nil, utils.Wrap(err, groupID) + } + CacheGroupMemberUserIDList[groupID] = &GroupMemberUserIDListHash{MemberListHash: groupHashRemote, UserIDList: memberUserIDListRemote} + return memberUserIDListRemote, nil +} + +func GetGroupMemberUserIDListHashFromRemote(groupID string) (uint64, error) { + return rocksCache.GetGroupMemberListHashFromCache(groupID) +} + +func GetGroupMemberUserIDListFromRemote(groupID string, operationID string) ([]string, error) { + getGroupMemberIDListFromCacheReq := &pbCache.GetGroupMemberIDListFromCacheReq{OperationID: operationID, GroupID: groupID} + etcdConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, operationID) + if etcdConn == nil { + errMsg := operationID + "getcdv3.GetDefaultConn == nil" + log.NewError(operationID, errMsg) + return nil, errors.New("errMsg") + } + client := pbCache.NewCacheClient(etcdConn) + cacheResp, err := client.GetGroupMemberIDListFromCache(context.Background(), getGroupMemberIDListFromCacheReq) + if err != nil { + log.NewError(operationID, "GetGroupMemberIDListFromCache rpc call failed ", err.Error()) + return nil, utils.Wrap(err, "GetGroupMemberIDListFromCache rpc call failed") + } + if cacheResp.CommonResp.ErrCode != 0 { + errMsg := operationID + "GetGroupMemberIDListFromCache rpc logic call failed " + cacheResp.CommonResp.ErrMsg + log.NewError(operationID, errMsg) + return nil, errors.New("errMsg") + } + return cacheResp.UserIDList, nil +} From 452904734913c2525352aac02c483abd13276701 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Wed, 31 Aug 2022 11:16:44 +0800 Subject: [PATCH 4/4] log for msg_gateway cluster --- internal/msg_gateway/gate/ws_server.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/msg_gateway/gate/ws_server.go b/internal/msg_gateway/gate/ws_server.go index 29ec4f8ea..f631a6c33 100644 --- a/internal/msg_gateway/gate/ws_server.go +++ b/internal/msg_gateway/gate/ws_server.go @@ -138,7 +138,9 @@ func (ws *WServer) MultiTerminalLoginRemoteChecker(userID string, platformID int } if resp.ErrCode != 0 { log.Error(operationID, "MultiTerminalLoginCheck errCode, errMsg: ", resp.ErrCode, resp.ErrMsg) + continue } + log.Debug(operationID, "MultiTerminalLoginCheck resp ", resp.String()) } }