concurrent consumption of messages

pull/232/head
Gordon 3 years ago
parent a2888b009d
commit d086de8103

@ -45,14 +45,20 @@ func (mc *OfflineHistoryConsumerHandler) Run() {
switch cmd.Cmd { switch cmd.Cmd {
case Msg: case Msg:
msgChannelValue := cmd.Value.(MsgChannelValue) msgChannelValue := cmd.Value.(MsgChannelValue)
err := saveUserChat(msgChannelValue.userID, msgChannelValue.msg) msg := msgChannelValue.msg
log.Debug(msg.OperationID, "msg arrived channel", msg.String())
isSenderSync := utils.GetSwitchFromOptions(msg.MsgData.Options, constant.IsSenderSync)
err := saveUserChat(msgChannelValue.userID, &msg)
if err != nil { if err != nil {
singleMsgFailedCount++ singleMsgFailedCount++
log.NewError(msgChannelValue.msg.OperationID, "single data insert to mongo err", err.Error(), msgChannelValue.msg.String()) log.NewError(msg.OperationID, "single data insert to mongo err", err.Error(), msg.String())
} else { } else {
singleMsgSuccessCountMutex.Lock() singleMsgSuccessCountMutex.Lock()
singleMsgSuccessCount++ singleMsgSuccessCount++
singleMsgSuccessCountMutex.Unlock() singleMsgSuccessCountMutex.Unlock()
if !(!isSenderSync && msgChannelValue.userID == msg.MsgData.SendID) {
go sendMessageToPush(&msg, msgChannelValue.userID)
}
} }
} }
} }
@ -131,7 +137,6 @@ func (mc *OfflineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.Consume
} }
func (mc *OfflineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { func (mc *OfflineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) {
msg := cMsg.Value msg := cMsg.Value
now := time.Now()
msgFromMQ := pbMsg.MsgDataToMQ{} msgFromMQ := pbMsg.MsgDataToMQ{}
err := proto.Unmarshal(msg, &msgFromMQ) err := proto.Unmarshal(msg, &msgFromMQ)
if err != nil { if err != nil {
@ -142,87 +147,32 @@ func (mc *OfflineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg *
log.NewInfo(operationID, "msg come mongo!!!", "", "msg", string(msg)) log.NewInfo(operationID, "msg come mongo!!!", "", "msg", string(msg))
//Control whether to store offline messages (mongo) //Control whether to store offline messages (mongo)
isHistory := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsHistory) isHistory := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsHistory)
//Control whether to store history messages (mysql)
isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent)
isSenderSync := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync) isSenderSync := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync)
switch msgFromMQ.MsgData.SessionType { if isHistory {
case constant.SingleChatType: seq, err := db.DB.IncrUserSeq(msgKey)
log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = SingleChatType", isHistory, isPersist) if err != nil {
if isHistory { log.NewError(operationID, "data insert to redis err", err.Error(), string(msg))
seq, err := db.DB.IncrUserSeq(msgKey) return
if err != nil {
log.NewError(operationID, "data insert to redis err", err.Error(), string(msg))
return
}
sess.MarkMessage(cMsg, "")
msgFromMQ.MsgData.Seq = uint32(seq)
log.Debug(operationID, "send ch msg is ", msgFromMQ.String())
mc.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, &msgFromMQ}}
//err := saveUserChat(msgKey, &msgFromMQ)
//if err != nil {
// singleMsgFailedCount++
// log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
// return
//}
//singleMsgSuccessCountMutex.Lock()
//singleMsgSuccessCount++
//singleMsgSuccessCountMutex.Unlock()
//log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now))
}
if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID {
} else {
go sendMessageToPush(&msgFromMQ, msgKey)
}
log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now))
case constant.GroupChatType:
log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = GroupChatType", isHistory, isPersist)
if isHistory {
seq, err := db.DB.IncrUserSeq(msgKey)
if err != nil {
log.NewError(operationID, "data insert to redis err", err.Error(), string(msg))
return
}
sess.MarkMessage(cMsg, "")
msgFromMQ.MsgData.Seq = uint32(seq)
log.Debug(operationID, "send ch msg is ", msgFromMQ.String())
mc.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, &msgFromMQ}}
//err := saveUserChat(msgFromMQ.MsgData.RecvID, &msgFromMQ)
//if err != nil {
// log.NewError(operationID, "group data insert to mongo err", msgFromMQ.String(), msgFromMQ.MsgData.RecvID, err.Error())
// return
//}
//groupMsgCount++
} }
go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID) sess.MarkMessage(cMsg, "")
case constant.NotificationChatType: msgFromMQ.MsgData.Seq = uint32(seq)
log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = NotificationChatType", isHistory, isPersist) log.Debug(operationID, "send ch msg is ", msgFromMQ.String())
if isHistory { mc.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, msgFromMQ}}
seq, err := db.DB.IncrUserSeq(msgKey) //err := saveUserChat(msgKey, &msgFromMQ)
if err != nil { //if err != nil {
log.NewError(operationID, "data insert to redis err", err.Error(), string(msg)) // singleMsgFailedCount++
return // log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
} // return
sess.MarkMessage(cMsg, "") //}
msgFromMQ.MsgData.Seq = uint32(seq) //singleMsgSuccessCountMutex.Lock()
log.Debug(operationID, "send ch msg is ", msgFromMQ.String()) //singleMsgSuccessCount++
mc.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, &msgFromMQ}} //singleMsgSuccessCountMutex.Unlock()
//err := saveUserChat(msgKey, &msgFromMQ) //log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now))
//if err != nil { } else {
// log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) if !(!isSenderSync && msgKey == msgFromMQ.MsgData.SendID) {
// return
//}
//log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now))
}
if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID {
} else {
go sendMessageToPush(&msgFromMQ, msgKey) go sendMessageToPush(&msgFromMQ, msgKey)
} }
log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now))
default:
log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String())
return
} }
log.NewDebug(msgFromMQ.OperationID, "msg_transfer handle topic data to database success...", msgFromMQ.String())
} }
func (OfflineHistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (OfflineHistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }

@ -20,7 +20,7 @@ import (
type MsgChannelValue struct { type MsgChannelValue struct {
userID string userID string
msg *pbMsg.MsgDataToMQ msg pbMsg.MsgDataToMQ
} }
type fcb func(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) type fcb func(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession)
type Cmd2Value struct { type Cmd2Value struct {
@ -83,14 +83,29 @@ func (och *OnlineHistoryConsumerHandler) Run() {
switch cmd.Cmd { switch cmd.Cmd {
case Msg: case Msg:
msgChannelValue := cmd.Value.(MsgChannelValue) msgChannelValue := cmd.Value.(MsgChannelValue)
err := saveUserChat(msgChannelValue.userID, msgChannelValue.msg) msg := msgChannelValue.msg
log.Debug(msg.OperationID, "msg arrived channel", msg.String())
isSenderSync := utils.GetSwitchFromOptions(msg.MsgData.Options, constant.IsSenderSync)
//switch msgChannelValue.msg.MsgData.SessionType {
//case constant.SingleChatType:
//case constant.GroupChatType:
//case constant.NotificationChatType:
//default:
// log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String())
// return
//}
err := saveUserChat(msgChannelValue.userID, &msg)
if err != nil { if err != nil {
singleMsgFailedCount++ singleMsgFailedCount++
log.NewError(msgChannelValue.msg.OperationID, "single data insert to mongo err", err.Error(), msgChannelValue.msg.String()) log.NewError(msg.OperationID, "single data insert to mongo err", err.Error(), msg.String())
} else { } else {
singleMsgSuccessCountMutex.Lock() singleMsgSuccessCountMutex.Lock()
singleMsgSuccessCount++ singleMsgSuccessCount++
singleMsgSuccessCountMutex.Unlock() singleMsgSuccessCountMutex.Unlock()
if !(!isSenderSync && msgChannelValue.userID == msg.MsgData.SendID) {
go sendMessageToPush(&msg, msgChannelValue.userID)
}
} }
} }
} }
@ -167,7 +182,6 @@ func (och *OnlineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.Consume
} }
func (och *OnlineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { func (och *OnlineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) {
msg := cMsg.Value msg := cMsg.Value
now := time.Now()
msgFromMQ := pbMsg.MsgDataToMQ{} msgFromMQ := pbMsg.MsgDataToMQ{}
err := proto.Unmarshal(msg, &msgFromMQ) err := proto.Unmarshal(msg, &msgFromMQ)
if err != nil { if err != nil {
@ -178,87 +192,32 @@ func (och *OnlineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg *
log.NewInfo(operationID, "msg come mongo!!!", "", "msg", string(msg)) log.NewInfo(operationID, "msg come mongo!!!", "", "msg", string(msg))
//Control whether to store offline messages (mongo) //Control whether to store offline messages (mongo)
isHistory := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsHistory) isHistory := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsHistory)
//Control whether to store history messages (mysql)
isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent)
isSenderSync := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync) isSenderSync := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync)
switch msgFromMQ.MsgData.SessionType { if isHistory {
case constant.SingleChatType: seq, err := db.DB.IncrUserSeq(msgKey)
log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = SingleChatType", isHistory, isPersist) if err != nil {
if isHistory { log.NewError(operationID, "data insert to redis err", err.Error(), string(msg))
seq, err := db.DB.IncrUserSeq(msgKey) return
if err != nil {
log.NewError(operationID, "data insert to redis err", err.Error(), string(msg))
return
}
sess.MarkMessage(cMsg, "")
msgFromMQ.MsgData.Seq = uint32(seq)
log.Debug(operationID, "send ch msg is ", msgFromMQ.String())
och.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, &msgFromMQ}}
//err := saveUserChat(msgKey, &msgFromMQ)
//if err != nil {
// singleMsgFailedCount++
// log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
// return
//}
//singleMsgSuccessCountMutex.Lock()
//singleMsgSuccessCount++
//singleMsgSuccessCountMutex.Unlock()
//log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now))
}
if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID {
} else {
go sendMessageToPush(&msgFromMQ, msgKey)
}
log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now))
case constant.GroupChatType:
log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = GroupChatType", isHistory, isPersist)
if isHistory {
seq, err := db.DB.IncrUserSeq(msgKey)
if err != nil {
log.NewError(operationID, "data insert to redis err", err.Error(), string(msg))
return
}
sess.MarkMessage(cMsg, "")
msgFromMQ.MsgData.Seq = uint32(seq)
log.Debug(operationID, "send ch msg is ", msgFromMQ.String())
och.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, &msgFromMQ}}
//err := saveUserChat(msgFromMQ.MsgData.RecvID, &msgFromMQ)
//if err != nil {
// log.NewError(operationID, "group data insert to mongo err", msgFromMQ.String(), msgFromMQ.MsgData.RecvID, err.Error())
// return
//}
//groupMsgCount++
}
go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID)
case constant.NotificationChatType:
log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = NotificationChatType", isHistory, isPersist)
if isHistory {
seq, err := db.DB.IncrUserSeq(msgKey)
if err != nil {
log.NewError(operationID, "data insert to redis err", err.Error(), string(msg))
return
}
sess.MarkMessage(cMsg, "")
msgFromMQ.MsgData.Seq = uint32(seq)
log.Debug(operationID, "send ch msg is ", msgFromMQ.String())
och.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, &msgFromMQ}}
//err := saveUserChat(msgKey, &msgFromMQ)
//if err != nil {
// log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
// return
//}
//log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now))
} }
if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID { sess.MarkMessage(cMsg, "")
} else { msgFromMQ.MsgData.Seq = uint32(seq)
log.Debug(operationID, "send ch msg is ", msgFromMQ.String())
och.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, msgFromMQ}}
//err := saveUserChat(msgKey, &msgFromMQ)
//if err != nil {
// singleMsgFailedCount++
// log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
// return
//}
//singleMsgSuccessCountMutex.Lock()
//singleMsgSuccessCount++
//singleMsgSuccessCountMutex.Unlock()
//log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now))
} else {
if !(!isSenderSync && msgKey == msgFromMQ.MsgData.SendID) {
go sendMessageToPush(&msgFromMQ, msgKey) go sendMessageToPush(&msgFromMQ, msgKey)
} }
log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now))
default:
log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String())
return
} }
log.NewDebug(msgFromMQ.OperationID, "msg_transfer handle topic data to database success...", msgFromMQ.String())
} }
func (OnlineHistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (OnlineHistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }

Loading…
Cancel
Save