From 1e62ddce738f04a050e0fbf58715cba0453bb685 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 15 Jun 2022 11:31:24 +0800 Subject: [PATCH] super update --- internal/msg_gateway/gate/logic.go | 3 +-- internal/push/logic/push_to_client.go | 2 +- internal/rpc/msg/send_msg.go | 4 ++-- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/internal/msg_gateway/gate/logic.go b/internal/msg_gateway/gate/logic.go index 35059ec5b..e12062338 100644 --- a/internal/msg_gateway/gate/logic.go +++ b/internal/msg_gateway/gate/logic.go @@ -3,7 +3,6 @@ package gate import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" - "Open_IM/pkg/common/db" "Open_IM/pkg/common/log" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbChat "Open_IM/pkg/proto/chat" @@ -93,7 +92,7 @@ func (ws *WServer) getSeqReq(conn *UserConn, m *Req) { } } -func (ws *WServer) getSeqResp(conn *UserConn, m *Req, pb *pbChat.GetMaxAndMinSeqResp) { +func (ws *WServer) getSeqResp(conn *UserConn, m *Req, pb *sdk_ws.GetMaxAndMinSeqResp) { log.Debug(m.OperationID, "getSeqResp come here ", pb.String()) b, _ := proto.Marshal(pb) mReply := Resp{ diff --git a/internal/push/logic/push_to_client.go b/internal/push/logic/push_to_client.go index 909a97d8c..450f57fb0 100644 --- a/internal/push/logic/push_to_client.go +++ b/internal/push/logic/push_to_client.go @@ -51,7 +51,7 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) { log.Debug(pushMsg.OperationID, "len grpc", len(grpcCons), "data", pushMsg.String()) for _, v := range grpcCons { msgClient := pbRelay.NewOnlineMessageRelayServiceClient(v) - reply, err := msgClient.OnlineBatchPushOneMsg(context.Background(), &pbRelay.OnlineBatchPushOneMsgReq{OperationID: pushMsg.OperationID, MsgData: pushMsg.MsgData, PushToUserIDList: []string{pushMsg.PushToUserID}}) + reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(context.Background(), &pbRelay.OnlineBatchPushOneMsgReq{OperationID: pushMsg.OperationID, MsgData: pushMsg.MsgData, PushToUserIDList: []string{pushMsg.PushToUserID}}) if err != nil { log.NewError("SuperGroupOnlineBatchPushOneMsg push data to client rpc err", pushMsg.OperationID, "err", err) continue diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index aa6506f7c..0419458e6 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -431,7 +431,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string, status string) error { switch status { case constant.OnlineStatus: - pid, offset, err := rpc.onlineProducer.SendMessage(m, key) + pid, offset, err := rpc.onlineProducer.SendMessage(m, key, m.OperationID) if err != nil { log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status) } else { @@ -439,7 +439,7 @@ func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string, status str } return err case constant.OfflineStatus: - pid, offset, err := rpc.onlineProducer.SendMessage(m, key) + pid, offset, err := rpc.onlineProducer.SendMessage(m, key, m.OperationID) if err != nil { log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status) }