From 38ab3e0ed791a9d981494ce6b5726f616b0043f4 Mon Sep 17 00:00:00 2001 From: Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Thu, 26 Oct 2023 12:22:43 +0800 Subject: [PATCH] fix: zk add close to avoid zk block. (#1284) * fix: zk add close to avoid zk block. * fix: zk add close to avoid zk block. * fix: zk add close to avoid zk block. * fix: zk add close to avoid zk block. --- go.mod | 2 +- internal/rpc/msg/sync_msg.go | 11 +++++++++++ .../k8s_discovery_register.go | 3 +++ pkg/common/startrpc/start.go | 17 ++++++----------- 4 files changed, 21 insertions(+), 12 deletions(-) diff --git a/go.mod b/go.mod index 4bd7999b7..4d4ed879b 100644 --- a/go.mod +++ b/go.mod @@ -38,7 +38,7 @@ require github.com/google/uuid v1.3.1 require ( github.com/IBM/sarama v1.41.3 github.com/OpenIMSDK/protocol v0.0.30 - github.com/OpenIMSDK/tools v0.0.14 + github.com/OpenIMSDK/tools v0.0.15 github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible github.com/go-redis/redis v6.15.9+incompatible github.com/go-sql-driver/mysql v1.7.1 diff --git a/internal/rpc/msg/sync_msg.go b/internal/rpc/msg/sync_msg.go index aaafb45bb..7c67ff05f 100644 --- a/internal/rpc/msg/sync_msg.go +++ b/internal/rpc/msg/sync_msg.go @@ -62,6 +62,11 @@ func (m *msgServer) PullMessageBySeqs( case sdkws.PullOrder_PullOrderDesc: isEnd = seq.Begin <= minSeq } + if len(msgs) == 0 { + log.ZWarn(ctx, "not have msgs", nil, "conversationID", seq.ConversationID, "seq", seq) + + continue + } resp.Msgs[seq.ConversationID] = &sdkws.PullMsgs{Msgs: msgs, IsEnd: isEnd} } else { var seqs []int64 @@ -71,6 +76,7 @@ func (m *msgServer) PullMessageBySeqs( minSeq, maxSeq, notificationMsgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, req.UserID, seq.ConversationID, seqs) if err != nil { log.ZWarn(ctx, "GetMsgBySeqs error", err, "conversationID", seq.ConversationID, "seq", seq) + continue } var isEnd bool @@ -80,6 +86,11 @@ func (m *msgServer) PullMessageBySeqs( case sdkws.PullOrder_PullOrderDesc: isEnd = seq.Begin <= minSeq } + if len(notificationMsgs) == 0 { + log.ZWarn(ctx, "not have notificationMsgs", nil, "conversationID", seq.ConversationID, "seq", seq) + + continue + } resp.NotificationMsgs[seq.ConversationID] = &sdkws.PullMsgs{Msgs: notificationMsgs, IsEnd: isEnd} } } diff --git a/pkg/common/discovery_register/k8s_discovery_register.go b/pkg/common/discovery_register/k8s_discovery_register.go index 70f9f39f3..811d35b96 100644 --- a/pkg/common/discovery_register/k8s_discovery_register.go +++ b/pkg/common/discovery_register/k8s_discovery_register.go @@ -86,3 +86,6 @@ func (cli *K8sDR) GetClientLocalConns() map[string][]*grpc.ClientConn { fmt.Println("should not call this function!!!!!!!!!!!!!!!!!!!!!!!!!") return nil } +func (cli *K8sDR) Close() { + return +} diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 2aeff3cf0..f04ab2508 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -51,17 +51,12 @@ func Start( return err } defer listener.Close() - zkClient, err := discovery_register.NewDiscoveryRegister(config.Config.Envs.Discovery) - /* - zkClient, err := zookeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, - zookeeper.WithFreq(time.Hour), zookeeper.WithUserNameAndPassword( - config.Config.Zookeeper.Username, - config.Config.Zookeeper.Password, - ), zookeeper.WithRoundRobin(), zookeeper.WithTimeout(10), zookeeper.WithLogger(log.NewZkLogger()))*/if err != nil { + client, err := discovery_register.NewDiscoveryRegister(config.Config.Envs.Discovery) + if err != nil { return utils.Wrap1(err) } - // defer zkClient.CloseZK() - zkClient.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) + defer client.Close() + client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) registerIP, err := network.GetRpcRegisterIP(config.Config.Rpc.RegisterIP) if err != nil { return err @@ -81,11 +76,11 @@ func Start( } srv := grpc.NewServer(options...) defer srv.GracefulStop() - err = rpcFn(zkClient, srv) + err = rpcFn(client, srv) if err != nil { return utils.Wrap1(err) } - err = zkClient.Register( + err = client.Register( rpcRegisterName, registerIP, rpcPort,