diff --git a/internal/msg_transfer/logic/offline_history_msg_handler.go b/internal/msg_transfer/logic/offline_history_msg_handler.go index cf3f206cb..c5cac9a82 100644 --- a/internal/msg_transfer/logic/offline_history_msg_handler.go +++ b/internal/msg_transfer/logic/offline_history_msg_handler.go @@ -45,14 +45,20 @@ func (mc *OfflineHistoryConsumerHandler) Run() { switch cmd.Cmd { case Msg: msgChannelValue := cmd.Value.(MsgChannelValue) - err := saveUserChat(msgChannelValue.userID, msgChannelValue.msg) + msg := msgChannelValue.msg + log.Debug(msg.OperationID, "msg arrived channel", msg.String()) + isSenderSync := utils.GetSwitchFromOptions(msg.MsgData.Options, constant.IsSenderSync) + err := saveUserChat(msgChannelValue.userID, &msg) if err != nil { singleMsgFailedCount++ - log.NewError(msgChannelValue.msg.OperationID, "single data insert to mongo err", err.Error(), msgChannelValue.msg.String()) + log.NewError(msg.OperationID, "single data insert to mongo err", err.Error(), msg.String()) } else { singleMsgSuccessCountMutex.Lock() singleMsgSuccessCount++ singleMsgSuccessCountMutex.Unlock() + if !(!isSenderSync && msgChannelValue.userID == msg.MsgData.SendID) { + go sendMessageToPush(&msg, msgChannelValue.userID) + } } } } @@ -131,7 +137,6 @@ func (mc *OfflineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.Consume } func (mc *OfflineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { msg := cMsg.Value - now := time.Now() msgFromMQ := pbMsg.MsgDataToMQ{} err := proto.Unmarshal(msg, &msgFromMQ) if err != nil { @@ -142,87 +147,32 @@ func (mc *OfflineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg * log.NewInfo(operationID, "msg come mongo!!!", "", "msg", string(msg)) //Control whether to store offline messages (mongo) isHistory := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsHistory) - //Control whether to store history messages (mysql) - isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent) isSenderSync := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync) - switch msgFromMQ.MsgData.SessionType { - case constant.SingleChatType: - log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = SingleChatType", isHistory, isPersist) - if isHistory { - seq, err := db.DB.IncrUserSeq(msgKey) - if err != nil { - log.NewError(operationID, "data insert to redis err", err.Error(), string(msg)) - return - } - sess.MarkMessage(cMsg, "") - msgFromMQ.MsgData.Seq = uint32(seq) - log.Debug(operationID, "send ch msg is ", msgFromMQ.String()) - mc.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, &msgFromMQ}} - //err := saveUserChat(msgKey, &msgFromMQ) - //if err != nil { - // singleMsgFailedCount++ - // log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) - // return - //} - //singleMsgSuccessCountMutex.Lock() - //singleMsgSuccessCount++ - //singleMsgSuccessCountMutex.Unlock() - //log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now)) - } - if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID { - } else { - go sendMessageToPush(&msgFromMQ, msgKey) - } - log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now)) - case constant.GroupChatType: - log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = GroupChatType", isHistory, isPersist) - if isHistory { - seq, err := db.DB.IncrUserSeq(msgKey) - if err != nil { - log.NewError(operationID, "data insert to redis err", err.Error(), string(msg)) - return - } - sess.MarkMessage(cMsg, "") - msgFromMQ.MsgData.Seq = uint32(seq) - log.Debug(operationID, "send ch msg is ", msgFromMQ.String()) - mc.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, &msgFromMQ}} - //err := saveUserChat(msgFromMQ.MsgData.RecvID, &msgFromMQ) - //if err != nil { - // log.NewError(operationID, "group data insert to mongo err", msgFromMQ.String(), msgFromMQ.MsgData.RecvID, err.Error()) - // return - //} - //groupMsgCount++ + if isHistory { + seq, err := db.DB.IncrUserSeq(msgKey) + if err != nil { + log.NewError(operationID, "data insert to redis err", err.Error(), string(msg)) + return } - go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID) - case constant.NotificationChatType: - log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = NotificationChatType", isHistory, isPersist) - if isHistory { - seq, err := db.DB.IncrUserSeq(msgKey) - if err != nil { - log.NewError(operationID, "data insert to redis err", err.Error(), string(msg)) - return - } - sess.MarkMessage(cMsg, "") - msgFromMQ.MsgData.Seq = uint32(seq) - log.Debug(operationID, "send ch msg is ", msgFromMQ.String()) - mc.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, &msgFromMQ}} - //err := saveUserChat(msgKey, &msgFromMQ) - //if err != nil { - // log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) - // return - //} - //log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now)) - } - if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID { - } else { + sess.MarkMessage(cMsg, "") + msgFromMQ.MsgData.Seq = uint32(seq) + log.Debug(operationID, "send ch msg is ", msgFromMQ.String()) + mc.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, msgFromMQ}} + //err := saveUserChat(msgKey, &msgFromMQ) + //if err != nil { + // singleMsgFailedCount++ + // log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) + // return + //} + //singleMsgSuccessCountMutex.Lock() + //singleMsgSuccessCount++ + //singleMsgSuccessCountMutex.Unlock() + //log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now)) + } else { + if !(!isSenderSync && msgKey == msgFromMQ.MsgData.SendID) { go sendMessageToPush(&msgFromMQ, msgKey) } - log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now)) - default: - log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String()) - return } - log.NewDebug(msgFromMQ.OperationID, "msg_transfer handle topic data to database success...", msgFromMQ.String()) } func (OfflineHistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index fb94ffe5f..ea787c107 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -20,7 +20,7 @@ import ( type MsgChannelValue struct { userID string - msg *pbMsg.MsgDataToMQ + msg pbMsg.MsgDataToMQ } type fcb func(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) type Cmd2Value struct { @@ -83,14 +83,29 @@ func (och *OnlineHistoryConsumerHandler) Run() { switch cmd.Cmd { case Msg: msgChannelValue := cmd.Value.(MsgChannelValue) - err := saveUserChat(msgChannelValue.userID, msgChannelValue.msg) + msg := msgChannelValue.msg + log.Debug(msg.OperationID, "msg arrived channel", msg.String()) + isSenderSync := utils.GetSwitchFromOptions(msg.MsgData.Options, constant.IsSenderSync) + //switch msgChannelValue.msg.MsgData.SessionType { + //case constant.SingleChatType: + //case constant.GroupChatType: + //case constant.NotificationChatType: + //default: + // log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String()) + // return + //} + + err := saveUserChat(msgChannelValue.userID, &msg) if err != nil { singleMsgFailedCount++ - log.NewError(msgChannelValue.msg.OperationID, "single data insert to mongo err", err.Error(), msgChannelValue.msg.String()) + log.NewError(msg.OperationID, "single data insert to mongo err", err.Error(), msg.String()) } else { singleMsgSuccessCountMutex.Lock() singleMsgSuccessCount++ singleMsgSuccessCountMutex.Unlock() + if !(!isSenderSync && msgChannelValue.userID == msg.MsgData.SendID) { + go sendMessageToPush(&msg, msgChannelValue.userID) + } } } } @@ -167,7 +182,6 @@ func (och *OnlineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.Consume } func (och *OnlineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { msg := cMsg.Value - now := time.Now() msgFromMQ := pbMsg.MsgDataToMQ{} err := proto.Unmarshal(msg, &msgFromMQ) if err != nil { @@ -178,87 +192,32 @@ func (och *OnlineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg * log.NewInfo(operationID, "msg come mongo!!!", "", "msg", string(msg)) //Control whether to store offline messages (mongo) isHistory := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsHistory) - //Control whether to store history messages (mysql) - isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent) isSenderSync := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync) - switch msgFromMQ.MsgData.SessionType { - case constant.SingleChatType: - log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = SingleChatType", isHistory, isPersist) - if isHistory { - seq, err := db.DB.IncrUserSeq(msgKey) - if err != nil { - log.NewError(operationID, "data insert to redis err", err.Error(), string(msg)) - return - } - sess.MarkMessage(cMsg, "") - msgFromMQ.MsgData.Seq = uint32(seq) - log.Debug(operationID, "send ch msg is ", msgFromMQ.String()) - och.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, &msgFromMQ}} - //err := saveUserChat(msgKey, &msgFromMQ) - //if err != nil { - // singleMsgFailedCount++ - // log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) - // return - //} - //singleMsgSuccessCountMutex.Lock() - //singleMsgSuccessCount++ - //singleMsgSuccessCountMutex.Unlock() - //log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now)) - } - if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID { - } else { - go sendMessageToPush(&msgFromMQ, msgKey) - } - log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now)) - case constant.GroupChatType: - log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = GroupChatType", isHistory, isPersist) - if isHistory { - seq, err := db.DB.IncrUserSeq(msgKey) - if err != nil { - log.NewError(operationID, "data insert to redis err", err.Error(), string(msg)) - return - } - sess.MarkMessage(cMsg, "") - msgFromMQ.MsgData.Seq = uint32(seq) - log.Debug(operationID, "send ch msg is ", msgFromMQ.String()) - och.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, &msgFromMQ}} - //err := saveUserChat(msgFromMQ.MsgData.RecvID, &msgFromMQ) - //if err != nil { - // log.NewError(operationID, "group data insert to mongo err", msgFromMQ.String(), msgFromMQ.MsgData.RecvID, err.Error()) - // return - //} - //groupMsgCount++ - } - go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID) - case constant.NotificationChatType: - log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = NotificationChatType", isHistory, isPersist) - if isHistory { - seq, err := db.DB.IncrUserSeq(msgKey) - if err != nil { - log.NewError(operationID, "data insert to redis err", err.Error(), string(msg)) - return - } - sess.MarkMessage(cMsg, "") - msgFromMQ.MsgData.Seq = uint32(seq) - log.Debug(operationID, "send ch msg is ", msgFromMQ.String()) - och.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, &msgFromMQ}} - //err := saveUserChat(msgKey, &msgFromMQ) - //if err != nil { - // log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) - // return - //} - //log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now)) + if isHistory { + seq, err := db.DB.IncrUserSeq(msgKey) + if err != nil { + log.NewError(operationID, "data insert to redis err", err.Error(), string(msg)) + return } - if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID { - } else { + sess.MarkMessage(cMsg, "") + msgFromMQ.MsgData.Seq = uint32(seq) + log.Debug(operationID, "send ch msg is ", msgFromMQ.String()) + och.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, msgFromMQ}} + //err := saveUserChat(msgKey, &msgFromMQ) + //if err != nil { + // singleMsgFailedCount++ + // log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) + // return + //} + //singleMsgSuccessCountMutex.Lock() + //singleMsgSuccessCount++ + //singleMsgSuccessCountMutex.Unlock() + //log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now)) + } else { + if !(!isSenderSync && msgKey == msgFromMQ.MsgData.SendID) { go sendMessageToPush(&msgFromMQ, msgKey) } - log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now)) - default: - log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String()) - return } - log.NewDebug(msgFromMQ.OperationID, "msg_transfer handle topic data to database success...", msgFromMQ.String()) } func (OnlineHistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }