|
|
|
@ -20,7 +20,7 @@ import (
|
|
|
|
|
|
|
|
|
|
type MsgChannelValue struct {
|
|
|
|
|
userID string
|
|
|
|
|
msg *pbMsg.MsgDataToMQ
|
|
|
|
|
msg pbMsg.MsgDataToMQ
|
|
|
|
|
}
|
|
|
|
|
type fcb func(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession)
|
|
|
|
|
type Cmd2Value struct {
|
|
|
|
@ -83,14 +83,29 @@ func (och *OnlineHistoryConsumerHandler) Run() {
|
|
|
|
|
switch cmd.Cmd {
|
|
|
|
|
case Msg:
|
|
|
|
|
msgChannelValue := cmd.Value.(MsgChannelValue)
|
|
|
|
|
err := saveUserChat(msgChannelValue.userID, msgChannelValue.msg)
|
|
|
|
|
msg := msgChannelValue.msg
|
|
|
|
|
log.Debug(msg.OperationID, "msg arrived channel", msg.String())
|
|
|
|
|
isSenderSync := utils.GetSwitchFromOptions(msg.MsgData.Options, constant.IsSenderSync)
|
|
|
|
|
//switch msgChannelValue.msg.MsgData.SessionType {
|
|
|
|
|
//case constant.SingleChatType:
|
|
|
|
|
//case constant.GroupChatType:
|
|
|
|
|
//case constant.NotificationChatType:
|
|
|
|
|
//default:
|
|
|
|
|
// log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String())
|
|
|
|
|
// return
|
|
|
|
|
//}
|
|
|
|
|
|
|
|
|
|
err := saveUserChat(msgChannelValue.userID, &msg)
|
|
|
|
|
if err != nil {
|
|
|
|
|
singleMsgFailedCount++
|
|
|
|
|
log.NewError(msgChannelValue.msg.OperationID, "single data insert to mongo err", err.Error(), msgChannelValue.msg.String())
|
|
|
|
|
log.NewError(msg.OperationID, "single data insert to mongo err", err.Error(), msg.String())
|
|
|
|
|
} else {
|
|
|
|
|
singleMsgSuccessCountMutex.Lock()
|
|
|
|
|
singleMsgSuccessCount++
|
|
|
|
|
singleMsgSuccessCountMutex.Unlock()
|
|
|
|
|
if !(!isSenderSync && msgChannelValue.userID == msg.MsgData.SendID) {
|
|
|
|
|
go sendMessageToPush(&msg, msgChannelValue.userID)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -167,7 +182,6 @@ func (och *OnlineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.Consume
|
|
|
|
|
}
|
|
|
|
|
func (och *OnlineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) {
|
|
|
|
|
msg := cMsg.Value
|
|
|
|
|
now := time.Now()
|
|
|
|
|
msgFromMQ := pbMsg.MsgDataToMQ{}
|
|
|
|
|
err := proto.Unmarshal(msg, &msgFromMQ)
|
|
|
|
|
if err != nil {
|
|
|
|
@ -178,12 +192,7 @@ func (och *OnlineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg *
|
|
|
|
|
log.NewInfo(operationID, "msg come mongo!!!", "", "msg", string(msg))
|
|
|
|
|
//Control whether to store offline messages (mongo)
|
|
|
|
|
isHistory := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsHistory)
|
|
|
|
|
//Control whether to store history messages (mysql)
|
|
|
|
|
isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent)
|
|
|
|
|
isSenderSync := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync)
|
|
|
|
|
switch msgFromMQ.MsgData.SessionType {
|
|
|
|
|
case constant.SingleChatType:
|
|
|
|
|
log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = SingleChatType", isHistory, isPersist)
|
|
|
|
|
if isHistory {
|
|
|
|
|
seq, err := db.DB.IncrUserSeq(msgKey)
|
|
|
|
|
if err != nil {
|
|
|
|
@ -193,7 +202,7 @@ func (och *OnlineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg *
|
|
|
|
|
sess.MarkMessage(cMsg, "")
|
|
|
|
|
msgFromMQ.MsgData.Seq = uint32(seq)
|
|
|
|
|
log.Debug(operationID, "send ch msg is ", msgFromMQ.String())
|
|
|
|
|
och.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, &msgFromMQ}}
|
|
|
|
|
och.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, msgFromMQ}}
|
|
|
|
|
//err := saveUserChat(msgKey, &msgFromMQ)
|
|
|
|
|
//if err != nil {
|
|
|
|
|
// singleMsgFailedCount++
|
|
|
|
@ -204,61 +213,11 @@ func (och *OnlineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg *
|
|
|
|
|
//singleMsgSuccessCount++
|
|
|
|
|
//singleMsgSuccessCountMutex.Unlock()
|
|
|
|
|
//log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now))
|
|
|
|
|
}
|
|
|
|
|
if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID {
|
|
|
|
|
} else {
|
|
|
|
|
go sendMessageToPush(&msgFromMQ, msgKey)
|
|
|
|
|
}
|
|
|
|
|
log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now))
|
|
|
|
|
case constant.GroupChatType:
|
|
|
|
|
log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = GroupChatType", isHistory, isPersist)
|
|
|
|
|
if isHistory {
|
|
|
|
|
seq, err := db.DB.IncrUserSeq(msgKey)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.NewError(operationID, "data insert to redis err", err.Error(), string(msg))
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
sess.MarkMessage(cMsg, "")
|
|
|
|
|
msgFromMQ.MsgData.Seq = uint32(seq)
|
|
|
|
|
log.Debug(operationID, "send ch msg is ", msgFromMQ.String())
|
|
|
|
|
och.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, &msgFromMQ}}
|
|
|
|
|
//err := saveUserChat(msgFromMQ.MsgData.RecvID, &msgFromMQ)
|
|
|
|
|
//if err != nil {
|
|
|
|
|
// log.NewError(operationID, "group data insert to mongo err", msgFromMQ.String(), msgFromMQ.MsgData.RecvID, err.Error())
|
|
|
|
|
// return
|
|
|
|
|
//}
|
|
|
|
|
//groupMsgCount++
|
|
|
|
|
}
|
|
|
|
|
go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID)
|
|
|
|
|
case constant.NotificationChatType:
|
|
|
|
|
log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = NotificationChatType", isHistory, isPersist)
|
|
|
|
|
if isHistory {
|
|
|
|
|
seq, err := db.DB.IncrUserSeq(msgKey)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.NewError(operationID, "data insert to redis err", err.Error(), string(msg))
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
sess.MarkMessage(cMsg, "")
|
|
|
|
|
msgFromMQ.MsgData.Seq = uint32(seq)
|
|
|
|
|
log.Debug(operationID, "send ch msg is ", msgFromMQ.String())
|
|
|
|
|
och.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, &msgFromMQ}}
|
|
|
|
|
//err := saveUserChat(msgKey, &msgFromMQ)
|
|
|
|
|
//if err != nil {
|
|
|
|
|
// log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
|
|
|
|
|
// return
|
|
|
|
|
//}
|
|
|
|
|
//log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now))
|
|
|
|
|
}
|
|
|
|
|
if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID {
|
|
|
|
|
} else {
|
|
|
|
|
if !(!isSenderSync && msgKey == msgFromMQ.MsgData.SendID) {
|
|
|
|
|
go sendMessageToPush(&msgFromMQ, msgKey)
|
|
|
|
|
}
|
|
|
|
|
log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now))
|
|
|
|
|
default:
|
|
|
|
|
log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String())
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
log.NewDebug(msgFromMQ.OperationID, "msg_transfer handle topic data to database success...", msgFromMQ.String())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (OnlineHistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
|
|
|
|
|