From b4cc62db92287348f69e77330635f80615baf67a Mon Sep 17 00:00:00 2001 From: x-shadow-man <1494445739@qq.com> Date: Tue, 16 Aug 2022 19:53:38 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B5=81=E7=A8=8B=E6=9B=B4=E6=96=B0=E5=90=8E?= =?UTF-8?q?=E7=9A=84=E4=BB=A3=E7=A0=81=E6=95=B4=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../logic/online_history_msg_handler.go | 287 +++++++++--------- internal/push/logic/init.go | 4 +- internal/push/logic/push_rpc_server.go | 2 +- 3 files changed, 145 insertions(+), 148 deletions(-) diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 34f950320..bab499ed6 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -3,18 +3,14 @@ package logic import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" - "Open_IM/pkg/common/db" kfk "Open_IM/pkg/common/kafka" "Open_IM/pkg/common/log" - "Open_IM/pkg/grpc-etcdv3/getcdv3" pbMsg "Open_IM/pkg/proto/msg" pbPush "Open_IM/pkg/proto/push" "Open_IM/pkg/utils" - "context" "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" "hash/crc32" - "strings" "sync" "time" ) @@ -49,12 +45,12 @@ func (och *OnlineHistoryRedisConsumerHandler) Init(cmdCh chan Cmd2Value) { och.chArrays[i] = make(chan Cmd2Value, 50) go och.Run(i) } - if config.Config.ReliableStorage { - och.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = och.handleChatWs2Mongo - } else { - och.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = och.handleChatWs2MongoLowReliability - - } + //if config.Config.ReliableStorage { + // och.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = och.handleChatWs2Mongo + //} else { + // och.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = och.handleChatWs2MongoLowReliability + // + //} och.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic}, config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis) @@ -227,117 +223,118 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() { } } -func (mc *OnlineHistoryRedisConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { - msg := cMsg.Value - now := time.Now() - msgFromMQ := pbMsg.MsgDataToMQ{} - err := proto.Unmarshal(msg, &msgFromMQ) - if err != nil { - log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error()) - return - } - operationID := msgFromMQ.OperationID - log.NewInfo(operationID, "msg come mongo!!!", "", "msg", string(msg)) - //Control whether to store offline messages (mongo) - isHistory := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsHistory) - //Control whether to store history messages (mysql) - isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent) - isSenderSync := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync) - switch msgFromMQ.MsgData.SessionType { - case constant.SingleChatType: - log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = SingleChatType", isHistory, isPersist) - if isHistory { - err := saveUserChat(msgKey, &msgFromMQ) - if err != nil { - singleMsgFailedCount++ - log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) - return - } - singleMsgSuccessCountMutex.Lock() - singleMsgSuccessCount++ - singleMsgSuccessCountMutex.Unlock() - log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now)) - } - if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID { - } else { - go sendMessageToPush(&msgFromMQ, msgKey) - } - log.NewDebug(operationID, "saveSingleMsg cost time ", time.Since(now)) - case constant.GroupChatType: - log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = GroupChatType", isHistory, isPersist) - if isHistory { - err := saveUserChat(msgFromMQ.MsgData.RecvID, &msgFromMQ) - if err != nil { - log.NewError(operationID, "group data insert to mongo err", msgFromMQ.String(), msgFromMQ.MsgData.RecvID, err.Error()) - return - } - groupMsgCount++ - } - go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID) - log.NewDebug(operationID, "saveGroupMsg cost time ", time.Since(now)) - case constant.NotificationChatType: - log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = NotificationChatType", isHistory, isPersist) - if isHistory { - err := saveUserChat(msgKey, &msgFromMQ) - if err != nil { - log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) - return - } - log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now)) - } - if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID { - } else { - go sendMessageToPush(&msgFromMQ, msgKey) - } - log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now)) - default: - log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String()) - return - } - sess.MarkMessage(cMsg, "") - log.NewDebug(msgFromMQ.OperationID, "msg_transfer handle topic data to database success...", msgFromMQ.String()) -} +//func (mc *OnlineHistoryRedisConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { +// msg := cMsg.Value +// now := time.Now() +// msgFromMQ := pbMsg.MsgDataToMQ{} +// err := proto.Unmarshal(msg, &msgFromMQ) +// if err != nil { +// log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error()) +// return +// } +// operationID := msgFromMQ.OperationID +// log.NewInfo(operationID, "msg come mongo!!!", "", "msg", string(msg)) +// //Control whether to store offline messages (mongo) +// isHistory := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsHistory) +// //Control whether to store history messages (mysql) +// isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent) +// isSenderSync := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync) +// switch msgFromMQ.MsgData.SessionType { +// case constant.SingleChatType: +// log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = SingleChatType", isHistory, isPersist) +// if isHistory { +// err := saveUserChat(msgKey, &msgFromMQ) +// if err != nil { +// singleMsgFailedCount++ +// log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) +// return +// } +// singleMsgSuccessCountMutex.Lock() +// singleMsgSuccessCount++ +// singleMsgSuccessCountMutex.Unlock() +// log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now)) +// } +// if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID { +// } else { +// go sendMessageToPush(&msgFromMQ, msgKey) +// } +// log.NewDebug(operationID, "saveSingleMsg cost time ", time.Since(now)) +// case constant.GroupChatType: +// log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = GroupChatType", isHistory, isPersist) +// if isHistory { +// err := saveUserChat(msgFromMQ.MsgData.RecvID, &msgFromMQ) +// if err != nil { +// log.NewError(operationID, "group data insert to mongo err", msgFromMQ.String(), msgFromMQ.MsgData.RecvID, err.Error()) +// return +// } +// groupMsgCount++ +// } +// go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID) +// log.NewDebug(operationID, "saveGroupMsg cost time ", time.Since(now)) +// +// case constant.NotificationChatType: +// log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = NotificationChatType", isHistory, isPersist) +// if isHistory { +// err := saveUserChat(msgKey, &msgFromMQ) +// if err != nil { +// log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) +// return +// } +// log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now)) +// } +// if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID { +// } else { +// go sendMessageToPush(&msgFromMQ, msgKey) +// } +// log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now)) +// default: +// log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String()) +// return +// } +// sess.MarkMessage(cMsg, "") +// log.NewDebug(msgFromMQ.OperationID, "msg_transfer handle topic data to database success...", msgFromMQ.String()) +//} -func (och *OnlineHistoryRedisConsumerHandler) handleChatWs2MongoLowReliability(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { - msg := cMsg.Value - msgFromMQ := pbMsg.MsgDataToMQ{} - err := proto.Unmarshal(msg, &msgFromMQ) - if err != nil { - log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error()) - return - } - operationID := msgFromMQ.OperationID - log.NewInfo(operationID, "msg come mongo!!!", "", "msg", string(msg)) - //Control whether to store offline messages (mongo) - isHistory := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsHistory) - isSenderSync := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync) - if isHistory { - seq, err := db.DB.IncrUserSeq(msgKey) - if err != nil { - log.NewError(operationID, "data insert to redis err", err.Error(), string(msg)) - return - } - sess.MarkMessage(cMsg, "") - msgFromMQ.MsgData.Seq = uint32(seq) - log.Debug(operationID, "send ch msg is ", msgFromMQ.String()) - //och.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, msgFromMQ}} - //err := saveUserChat(msgKey, &msgFromMQ) - //if err != nil { - // singleMsgFailedCount++ - // log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) - // return - //} - //singleMsgSuccessCountMutex.Lock() - //singleMsgSuccessCount++ - //singleMsgSuccessCountMutex.Unlock() - //log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now)) - } else { - if !(!isSenderSync && msgKey == msgFromMQ.MsgData.SendID) { - go sendMessageToPush(&msgFromMQ, msgKey) - } - } -} +//func (och *OnlineHistoryRedisConsumerHandler) handleChatWs2MongoLowReliability(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { +// msg := cMsg.Value +// msgFromMQ := pbMsg.MsgDataToMQ{} +// err := proto.Unmarshal(msg, &msgFromMQ) +// if err != nil { +// log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error()) +// return +// } +// operationID := msgFromMQ.OperationID +// log.NewInfo(operationID, "msg come mongo!!!", "", "msg", string(msg)) +// //Control whether to store offline messages (mongo) +// isHistory := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsHistory) +// isSenderSync := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync) +// if isHistory { +// seq, err := db.DB.IncrUserSeq(msgKey) +// if err != nil { +// log.NewError(operationID, "data insert to redis err", err.Error(), string(msg)) +// return +// } +// sess.MarkMessage(cMsg, "") +// msgFromMQ.MsgData.Seq = uint32(seq) +// log.Debug(operationID, "send ch msg is ", msgFromMQ.String()) +// //och.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, msgFromMQ}} +// //err := saveUserChat(msgKey, &msgFromMQ) +// //if err != nil { +// // singleMsgFailedCount++ +// // log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) +// // return +// //} +// //singleMsgSuccessCountMutex.Lock() +// //singleMsgSuccessCount++ +// //singleMsgSuccessCountMutex.Unlock() +// //log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now)) +// } else { +// if !(!isSenderSync && msgKey == msgFromMQ.MsgData.SendID) { +// go sendMessageToPush(&msgFromMQ, msgKey) +// } +// } +//} func (OnlineHistoryRedisConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (OnlineHistoryRedisConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } @@ -515,32 +512,32 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerG // return nil //} -func sendMessageToPush(message *pbMsg.MsgDataToMQ, pushToUserID string) { - log.Info(message.OperationID, "msg_transfer send message to push", "message", message.String()) - rpcPushMsg := pbPush.PushMsgReq{OperationID: message.OperationID, MsgData: message.MsgData, PushToUserID: pushToUserID} - mqPushMsg := pbMsg.PushMsgDataToMQ{OperationID: message.OperationID, MsgData: message.MsgData, PushToUserID: pushToUserID} - grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImPushName, message.OperationID) - if grpcConn == nil { - log.Error(rpcPushMsg.OperationID, "rpc dial failed", "push data", rpcPushMsg.String()) - pid, offset, err := producer.SendMessage(&mqPushMsg, mqPushMsg.PushToUserID, rpcPushMsg.OperationID) - if err != nil { - log.Error(mqPushMsg.OperationID, "kafka send failed", "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error()) - } - return - } - msgClient := pbPush.NewPushMsgServiceClient(grpcConn) - _, err := msgClient.PushMsg(context.Background(), &rpcPushMsg) - if err != nil { - log.Error(rpcPushMsg.OperationID, "rpc send failed", rpcPushMsg.OperationID, "push data", rpcPushMsg.String(), "err", err.Error()) - pid, offset, err := producer.SendMessage(&mqPushMsg, mqPushMsg.PushToUserID, rpcPushMsg.OperationID) - if err != nil { - log.Error(message.OperationID, "kafka send failed", mqPushMsg.OperationID, "send data", mqPushMsg.String(), "pid", pid, "offset", offset, "err", err.Error()) - } - } else { - log.Info(message.OperationID, "rpc send success", rpcPushMsg.OperationID, "push data", rpcPushMsg.String()) - - } -} +//func sendMessageToPush(message *pbMsg.MsgDataToMQ, pushToUserID string) { +// log.Info(message.OperationID, "msg_transfer send message to push", "message", message.String()) +// rpcPushMsg := pbPush.PushMsgReq{OperationID: message.OperationID, MsgData: message.MsgData, PushToUserID: pushToUserID} +// mqPushMsg := pbMsg.PushMsgDataToMQ{OperationID: message.OperationID, MsgData: message.MsgData, PushToUserID: pushToUserID} +// grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImPushName, message.OperationID) +// if grpcConn == nil { +// log.Error(rpcPushMsg.OperationID, "rpc dial failed", "push data", rpcPushMsg.String()) +// pid, offset, err := producer.SendMessage(&mqPushMsg, mqPushMsg.PushToUserID, rpcPushMsg.OperationID) +// if err != nil { +// log.Error(mqPushMsg.OperationID, "kafka send failed", "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error()) +// } +// return +// } +// msgClient := pbPush.NewPushMsgServiceClient(grpcConn) +// _, err := msgClient.PushMsg(context.Background(), &rpcPushMsg) +// if err != nil { +// log.Error(rpcPushMsg.OperationID, "rpc send failed", rpcPushMsg.OperationID, "push data", rpcPushMsg.String(), "err", err.Error()) +// pid, offset, err := producer.SendMessage(&mqPushMsg, mqPushMsg.PushToUserID, rpcPushMsg.OperationID) +// if err != nil { +// log.Error(message.OperationID, "kafka send failed", mqPushMsg.OperationID, "send data", mqPushMsg.String(), "pid", pid, "offset", offset, "err", err.Error()) +// } +// } else { +// log.Info(message.OperationID, "rpc send success", rpcPushMsg.OperationID, "push data", rpcPushMsg.String()) +// +// } +//} func sendMessageToPushMQ(message *pbMsg.MsgDataToMQ, pushToUserID string) { log.Info(message.OperationID, utils.GetSelfFuncName(), "msg ", message.String(), pushToUserID) diff --git a/internal/push/logic/init.go b/internal/push/logic/init.go index 6d6229dca..cc9b9f202 100644 --- a/internal/push/logic/init.go +++ b/internal/push/logic/init.go @@ -29,7 +29,7 @@ var ( func Init(rpcPort int) { - rpcServer.Init(rpcPort) + //rpcServer.Init(rpcPort) pushCh.Init() pushTerminal = []int32{constant.IOSPlatformID, constant.AndroidPlatformID} } @@ -49,6 +49,6 @@ func init() { } func Run() { - go rpcServer.run() + //go rpcServer.run() go pushCh.pushConsumerGroup.RegisterHandleAndConsumer(&pushCh) } diff --git a/internal/push/logic/push_rpc_server.go b/internal/push/logic/push_rpc_server.go index 08939ef1a..1e70f1764 100644 --- a/internal/push/logic/push_rpc_server.go +++ b/internal/push/logic/push_rpc_server.go @@ -63,7 +63,7 @@ func (r *RPCServer) run() { } } func (r *RPCServer) PushMsg(_ context.Context, pbData *pbPush.PushMsgReq) (*pbPush.PushMsgResp, error) { - //Call push module to send message to the user + //Call push module to send message to the user, but the service is not currently used switch pbData.MsgData.SessionType { case constant.SuperGroupChatType: MsgToSuperGroupUser(pbData)