From 39720d401f422422d0519669406f9f6aa6c64b5d Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Sat, 27 Aug 2022 12:24:06 +0800 Subject: [PATCH 1/7] fix bug: msg_gateway/gate/ws_server.go:138 --- internal/msg_gateway/gate/ws_server.go | 1 + pkg/grpc-etcdv3/getcdv3/resolver.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/msg_gateway/gate/ws_server.go b/internal/msg_gateway/gate/ws_server.go index a8565f4b8..f1ecc56e9 100644 --- a/internal/msg_gateway/gate/ws_server.go +++ b/internal/msg_gateway/gate/ws_server.go @@ -134,6 +134,7 @@ func (ws *WServer) MultiTerminalLoginRemoteChecker(userID string, platformID int resp, err := client.MultiTerminalLoginCheck(context.Background(), req) if err != nil { log.Error(operationID, "MultiTerminalLoginCheck failed ", err.Error()) + continue } if resp.ErrCode != 0 { log.Error(operationID, "MultiTerminalLoginCheck errCode, errMsg: ", resp.ErrCode, resp.ErrMsg) diff --git a/pkg/grpc-etcdv3/getcdv3/resolver.go b/pkg/grpc-etcdv3/getcdv3/resolver.go index d95909b6d..e77bb626b 100644 --- a/pkg/grpc-etcdv3/getcdv3/resolver.go +++ b/pkg/grpc-etcdv3/getcdv3/resolver.go @@ -282,7 +282,7 @@ func GetDefaultGatewayConn4Unique(schema, etcdaddr, operationID string) []*grpc. if len(grpcConns) > 0 { return grpcConns } - log.NewWarn(operationID, utils.GetSelfFuncName(), " len(grpcConns) < 0 ", schema, etcdaddr, config.Config.RpcRegisterName.OpenImRelayName) + log.NewWarn(operationID, utils.GetSelfFuncName(), " len(grpcConns) == 0 ", schema, etcdaddr, config.Config.RpcRegisterName.OpenImRelayName) grpcConns = GetDefaultGatewayConn4UniqueFromcfg(operationID) log.NewDebug(operationID, utils.GetSelfFuncName(), config.Config.RpcRegisterName.OpenImRelayName, grpcConns) return grpcConns From 96814109dc0c5beb606cf21d2d1f8d21af84b4f1 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Sat, 27 Aug 2022 15:20:49 +0800 Subject: [PATCH 2/7] fix bug: GetSelfUserInfo for no result --- internal/api/user/user.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/api/user/user.go b/internal/api/user/user.go index b67e63e72..96f38bd72 100644 --- a/internal/api/user/user.go +++ b/internal/api/user/user.go @@ -367,7 +367,7 @@ func GetSelfUserInfo(c *gin.Context) { log.NewInfo(req.OperationID, "GetUserInfo api return ", resp) c.JSON(http.StatusOK, resp) } else { - resp := api.GetSelfUserInfoResp{CommResp: api.CommResp{ErrCode: RpcResp.CommonResp.ErrCode, ErrMsg: RpcResp.CommonResp.ErrMsg}} + resp := api.GetSelfUserInfoResp{CommResp: api.CommResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}} log.NewInfo(req.OperationID, "GetUserInfo api return ", resp) c.JSON(http.StatusOK, resp) } From 0f0e99b42eb33ce8f0c60587ad0953db9608607f Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Mon, 29 Aug 2022 09:34:52 +0800 Subject: [PATCH 3/7] merge code : Filter push data by time --- internal/push/logic/push_handler.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/internal/push/logic/push_handler.go b/internal/push/logic/push_handler.go index ccc00b480..200120800 100644 --- a/internal/push/logic/push_handler.go +++ b/internal/push/logic/push_handler.go @@ -13,6 +13,7 @@ import ( "Open_IM/pkg/common/log" pbChat "Open_IM/pkg/proto/msg" pbPush "Open_IM/pkg/proto/push" + "Open_IM/pkg/utils" "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" ) @@ -43,6 +44,11 @@ func (ms *PushConsumerHandler) handleMs2PsChat(msg []byte) { MsgData: msgFromMQ.MsgData, PushToUserID: msgFromMQ.PushToUserID, } + sec := msgFromMQ.MsgData.SendTime / 1000 + nowSec := utils.GetCurrentTimestampBySecond() + if nowSec-sec > 10 { + return + } switch msgFromMQ.MsgData.SessionType { case constant.SuperGroupChatType: MsgToSuperGroupUser(pbData) From 6ff8fc7f87cc63a41ca69e2b3d638b3ca8e89650 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Mon, 29 Aug 2022 09:48:55 +0800 Subject: [PATCH 4/7] merge code : Filter push data by time --- internal/push/logic/push_handler.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/push/logic/push_handler.go b/internal/push/logic/push_handler.go index 200120800..f137c09d6 100644 --- a/internal/push/logic/push_handler.go +++ b/internal/push/logic/push_handler.go @@ -65,6 +65,7 @@ func (ms *PushConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, for msg := range claim.Messages() { log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) ms.msgHandle[msg.Topic](msg.Value) + sess.MarkMessage(msg, "") } return nil } From b401917c194658ad9ee61d9e11419b3f16c4a785 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 1 Sep 2022 21:58:53 +0800 Subject: [PATCH 5/7] cache --- internal/rpc/cache/cache.go | 6 ++++-- pkg/common/db/rocks_cache/rocks_cache.go | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/internal/rpc/cache/cache.go b/internal/rpc/cache/cache.go index 60cc52d32..510fc94a1 100644 --- a/internal/rpc/cache/cache.go +++ b/internal/rpc/cache/cache.go @@ -3,16 +3,17 @@ package cache import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" - "Open_IM/pkg/common/db/rocks_cache" + rocksCache "Open_IM/pkg/common/db/rocks_cache" "Open_IM/pkg/common/log" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbCache "Open_IM/pkg/proto/cache" "Open_IM/pkg/utils" "context" - "google.golang.org/grpc" "net" "strconv" "strings" + + "google.golang.org/grpc" ) type cacheServer struct { @@ -67,6 +68,7 @@ func (s *cacheServer) Run() { log.NewError("0", "RegisterEtcd failed ", err.Error()) return } + go rocksCache.DelKeys() err = srv.Serve(listener) if err != nil { log.NewError("0", "Serve failed ", err.Error()) diff --git a/pkg/common/db/rocks_cache/rocks_cache.go b/pkg/common/db/rocks_cache/rocks_cache.go index 103bd51f7..d698029b6 100644 --- a/pkg/common/db/rocks_cache/rocks_cache.go +++ b/pkg/common/db/rocks_cache/rocks_cache.go @@ -36,7 +36,7 @@ const ( conversationIDListCache = "CONVERSATION_ID_LIST_CACHE:" ) -func init() { +func DelKeys() { fmt.Println("init to del old keys") for _, key := range []string{groupCache, friendRelationCache, blackListCache, userInfoCache, groupInfoCache, groupOwnerIDCache, joinedGroupListCache, groupMemberInfoCache, groupAllMemberInfoCache, allFriendInfoCache} { From 6dfbb90b8015cb7e3824eed8adce529ab7433974 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Fri, 2 Sep 2022 17:22:27 +0800 Subject: [PATCH 6/7] fix signal --- internal/push/logic/push_to_client.go | 6 +++++- pkg/common/db/RedisModel.go | 23 +++++++++++++---------- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/internal/push/logic/push_to_client.go b/internal/push/logic/push_to_client.go index c91c93f16..aa3c367e4 100644 --- a/internal/push/logic/push_to_client.go +++ b/internal/push/logic/push_to_client.go @@ -79,10 +79,14 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) { } } if pushMsg.MsgData.ContentType == constant.SignalingNotification { - if err := db.DB.HandleSignalInfo(pushMsg.OperationID, pushMsg.MsgData); err != nil { + isSend, err := db.DB.HandleSignalInfo(pushMsg.OperationID, pushMsg.MsgData, pushMsg.PushToUserID) + if err != nil { log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), err.Error(), pushMsg.MsgData) return } + if !isSend { + return + } } customContent := OpenIMContent{ SessionType: int(pushMsg.MsgData.SessionType), diff --git a/pkg/common/db/RedisModel.go b/pkg/common/db/RedisModel.go index 1f81fc3ef..79db918bc 100644 --- a/pkg/common/db/RedisModel.go +++ b/pkg/common/db/RedisModel.go @@ -258,10 +258,10 @@ func (d *DataBases) CleanUpOneUserAllMsgFromRedis(userID string, operationID str return nil } -func (d *DataBases) HandleSignalInfo(operationID string, msg *pbCommon.MsgData) error { +func (d *DataBases) HandleSignalInfo(operationID string, msg *pbCommon.MsgData, pushToUserID string) (isSend bool, err error) { req := &pbRtc.SignalReq{} if err := proto.Unmarshal(msg.Content, req); err != nil { - return err + return false, err } //log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "SignalReq: ", req.String()) var inviteeUserIDList []string @@ -273,37 +273,40 @@ func (d *DataBases) HandleSignalInfo(operationID string, msg *pbCommon.MsgData) case *pbRtc.SignalReq_InviteInGroup: inviteeUserIDList = signalInfo.InviteInGroup.Invitation.InviteeUserIDList isInviteSignal = true + if !utils.IsContain(pushToUserID, inviteeUserIDList) { + return false, nil + } case *pbRtc.SignalReq_HungUp, *pbRtc.SignalReq_Cancel, *pbRtc.SignalReq_Reject, *pbRtc.SignalReq_Accept: - return errors.New("signalInfo do not need offlinePush") + return false, errors.New("signalInfo do not need offlinePush") default: log2.NewDebug(operationID, utils.GetSelfFuncName(), "req invalid type", string(msg.Content)) - return nil + return false, nil } if isInviteSignal { - log2.NewInfo(operationID, utils.GetSelfFuncName(), "invite userID list:", inviteeUserIDList) + log2.NewDebug(operationID, utils.GetSelfFuncName(), "invite userID list:", inviteeUserIDList) for _, userID := range inviteeUserIDList { log2.NewInfo(operationID, utils.GetSelfFuncName(), "invite userID:", userID) timeout, err := strconv.Atoi(config.Config.Rtc.SignalTimeout) if err != nil { - return err + return false, err } keyList := SignalListCache + userID err = d.RDB.LPush(context.Background(), keyList, msg.ClientMsgID).Err() if err != nil { - return err + return false, err } err = d.RDB.Expire(context.Background(), keyList, time.Duration(timeout)*time.Second).Err() if err != nil { - return err + return false, err } key := SignalCache + msg.ClientMsgID err = d.RDB.Set(context.Background(), key, msg.Content, time.Duration(timeout)*time.Second).Err() if err != nil { - return err + return false, err } } } - return nil + return true, nil } func (d *DataBases) GetSignalInfoFromCacheByClientMsgID(clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error) { From 3fcc2c267815a107cd1e62cc74df50355bea7d02 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Sat, 3 Sep 2022 15:48:45 +0800 Subject: [PATCH 7/7] cache for GetDefaultGatewayConn4Unique --- pkg/grpc-etcdv3/getcdv3/resolver.go | 33 +++++++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/pkg/grpc-etcdv3/getcdv3/resolver.go b/pkg/grpc-etcdv3/getcdv3/resolver.go index e77bb626b..878cabc64 100644 --- a/pkg/grpc-etcdv3/getcdv3/resolver.go +++ b/pkg/grpc-etcdv3/getcdv3/resolver.go @@ -6,7 +6,6 @@ import ( "Open_IM/pkg/utils" "context" "fmt" - "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" @@ -277,8 +276,38 @@ func (r *Resolver) watch(prefix string, addrList []resolver.Address) { } } +var Conn4UniqueList []*grpc.ClientConn +var Conn4UniqueListMtx sync.RWMutex +var IsUpdateStart bool +var IsUpdateStartMtx sync.RWMutex + func GetDefaultGatewayConn4Unique(schema, etcdaddr, operationID string) []*grpc.ClientConn { - grpcConns := getConn4Unique(schema, etcdaddr, config.Config.RpcRegisterName.OpenImRelayName) + IsUpdateStartMtx.Lock() + if IsUpdateStart == false { + Conn4UniqueList = getConn4Unique(schema, etcdaddr, config.Config.RpcRegisterName.OpenImRelayName) + go func() { + for { + select { + case <-time.After(time.Second * time.Duration(30)): + Conn4UniqueListMtx.Lock() + Conn4UniqueList = getConn4Unique(schema, etcdaddr, config.Config.RpcRegisterName.OpenImRelayName) + Conn4UniqueListMtx.Unlock() + } + } + }() + } + IsUpdateStart = true + IsUpdateStartMtx.Unlock() + + Conn4UniqueListMtx.Lock() + var clientConnList []*grpc.ClientConn + for _, v := range Conn4UniqueList { + clientConnList = append(clientConnList, v) + } + Conn4UniqueListMtx.Unlock() + + //grpcConns := getConn4Unique(schema, etcdaddr, config.Config.RpcRegisterName.OpenImRelayName) + grpcConns := clientConnList if len(grpcConns) > 0 { return grpcConns }