@ -14,13 +14,14 @@ import (
type MsgChannelValue struct {
userID string
msg pbMsg . MsgDataToMQ
msg List [ ] * pbMsg . MsgDataToMQ
type fcb func ( cMsg * sarama . ConsumerMessage , msgKey string , sess sarama . ConsumerGroupSession )
type Cmd2Value struct {
@ -32,19 +33,25 @@ type OnlineHistoryConsumerHandler struct {
historyConsumerGroup * kfk . MConsumerGroup
cmdCh chan Cmd2Value
msgCh chan Cmd2Value
UserAggregationMsgs map [ string ] [ ] * pbMsg . MsgDataToMQ
chArrays [ ChannelNum ] chan Cmd2Value
msgDistributionCh chan Cmd2Value
func ( och * OnlineHistoryConsumerHandler ) Init ( cmdCh chan Cmd2Value ) {
och . msgHandle = make ( map [ string ] fcb )
och . UserAggregationMsgs = make ( map [ string ] [ ] * pbMsg . MsgDataToMQ , ChannelNum )
och . msgDistributionCh = make ( chan Cmd2Value ) //no buffer channel
go och . MessagesDistributionHandle ( )
och . cmdCh = cmdCh
och . msgCh = make ( chan Cmd2Value , 1000 )
if config . Config . ReliableStorage {
och . msgHandle [ config . Config . Kafka . Ws2mschat . Topic ] = och . handleChatWs2Mongo
} else {
och . msgHandle [ config . Config . Kafka . Ws2mschat . Topic ] = och . handleChatWs2MongoLowReliability
for i := 0 ; i < 10 ; i ++ {
go och . Run ( )
for i := 0 ; i < ChannelNum ; i ++ {
och . chArrays [ i ] = make ( chan Cmd2Value , 1000 )
go och . Run ( i )
och . historyConsumerGroup = kfk . NewMConsumerGroup ( & kfk . MConsumerGroupConfig { KafkaVersion : sarama . V0_10_2_0 ,
@ -76,16 +83,29 @@ func sendCmd(ch chan Cmd2Value, value Cmd2Value, timeout int64) error {
return errors . New ( "send cmd timeout" )
func ( och * OnlineHistoryConsumerHandler ) Run ( ) {
func ( och * OnlineHistoryConsumerHandler ) Run ( channelID int ) {
for {
select {
case cmd := <- och . msgCh :
case cmd := <- och . chArrays[ channelID ] :
switch cmd . Cmd {
case Msg:
case User Me ssa ges :
msgChannelValue := cmd . Value . ( MsgChannelValue )
msg := msgChannelValue . msg
log . Debug ( msg . OperationID , "msg arrived channel" , msg . String ( ) )
isSenderSync := utils . GetSwitchFromOptions ( msg . MsgData . Options , constant . IsSenderSync )
msgList := msgChannelValue . msgList
storageMsgList := make ( [ ] * pbMsg . MsgDataToMQ , 80 )
pushMsgList := make ( [ ] * pbMsg . MsgDataToMQ , 80 )
latestMsgOperationID := msgList [ len ( msgList ) - 1 ] . OperationID
log . Debug ( latestMsgOperationID , "msg arrived channel" , "channel id" , channelID , msgList )
for _ , v := range msgList {
isHistory := utils . GetSwitchFromOptions ( v . MsgData . Options , constant . IsHistory )
isSenderSync := utils . GetSwitchFromOptions ( v . MsgData . Options , constant . IsSenderSync )
if isHistory {
storageMsgList = append ( storageMsgList , v )
if ! ( ! isSenderSync && msgChannelValue . userID == v . MsgData . SendID ) {
pushMsgList = append ( pushMsgList , v )
//switch msgChannelValue.msg.MsgData.SessionType {
//case constant.SingleChatType:
//case constant.GroupChatType:
@ -95,23 +115,132 @@ func (och *OnlineHistoryConsumerHandler) Run() {
// return
err := saveUserChat ( msgChannelValue . userID , & msg )
err := saveUserChat List ( msgChannelValue . userID , storageMsgList , latestMsgOperationID )
if err != nil {
singleMsgFailedCount ++
log . NewError ( msg. OperationID, "single data insert to mongo err" , err . Error ( ) , msg. String ( ) )
singleMsgFailedCount += uint64 ( len ( storageMsgList ) )
log . NewError ( latestMsg OperationID, "single data insert to mongo err" , err . Error ( ) , storageMsgList )
} else {
singleMsgSuccessCountMutex . Lock ( )
singleMsgSuccessCount ++
singleMsgSuccessCount += uint64 ( len ( storageMsgList ) )
singleMsgSuccessCountMutex . Unlock ( )
if ! ( ! isSenderSync && msgChannelValue . userID == msg . MsgData . SendID ) {
go sendMessageToPush ( & msg , msgChannelValue . userID )
for _ , v := range pushMsgList {
sendMessageToPush ( v , msgChannelValue . userID )
func ( och * OnlineHistoryConsumerHandler ) handleChatWs2Mongo ( cMsg * sarama . ConsumerMessage , msgKey string , sess sarama . ConsumerGroupSession ) {
//func (och *OnlineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) {
// msg := cMsg.Value
// now := time.Now()
// msgFromMQ := pbMsg.MsgDataToMQ{}
// err := proto.Unmarshal(msg, &msgFromMQ)
// if err != nil {
// log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error())
// return
// }
// operationID := msgFromMQ.OperationID
// log.NewInfo(operationID, "msg come mongo!!!", "", "msg", string(msg))
// //Control whether to store offline messages (mongo)
// isHistory := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsHistory)
// //Control whether to store history messages (mysql)
// isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent)
// isSenderSync := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync)
// switch msgFromMQ.MsgData.SessionType {
// case constant.SingleChatType:
// log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = SingleChatType", isHistory, isPersist)
// if isHistory {
// err := saveUserChat(msgKey, &msgFromMQ)
// if err != nil {
// singleMsgFailedCount++
// log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
// return
// }
// singleMsgSuccessCountMutex.Lock()
// singleMsgSuccessCount++
// singleMsgSuccessCountMutex.Unlock()
// log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now))
// }
// if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID {
// } else {
// go sendMessageToPush(&msgFromMQ, msgKey)
// }
// log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now))
// case constant.GroupChatType:
// log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = GroupChatType", isHistory, isPersist)
// if isHistory {
// err := saveUserChat(msgFromMQ.MsgData.RecvID, &msgFromMQ)
// if err != nil {
// log.NewError(operationID, "group data insert to mongo err", msgFromMQ.String(), msgFromMQ.MsgData.RecvID, err.Error())
// return
// }
// groupMsgCount++
// }
// go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID)
// case constant.NotificationChatType:
// log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = NotificationChatType", isHistory, isPersist)
// if isHistory {
// err := saveUserChat(msgKey, &msgFromMQ)
// if err != nil {
// log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
// return
// }
// log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now))
// }
// if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID {
// } else {
// go sendMessageToPush(&msgFromMQ, msgKey)
// }
// log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now))
// default:
// log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String())
// return
// }
// sess.MarkMessage(cMsg, "")
// log.NewDebug(msgFromMQ.OperationID, "msg_transfer handle topic data to database success...", msgFromMQ.String())
func ( och * OnlineHistoryConsumerHandler ) MessagesDistributionHandle ( ) {
for {
select {
case cmd := <- och . msgDistributionCh :
switch cmd . Cmd {
case ConsumerMsgs :
consumerMessages := cmd . Value . ( [ ] * sarama . ConsumerMessage )
//Aggregation map[userid]message list
for i := 0 ; i < len ( consumerMessages ) ; i ++ {
msgFromMQ := pbMsg . MsgDataToMQ { }
err := proto . Unmarshal ( consumerMessages [ i ] . Value , & msgFromMQ )
if err != nil {
log . Error ( "msg_transfer Unmarshal msg err" , "" , "msg" , string ( consumerMessages [ i ] . Value ) , "err" , err . Error ( ) )
if oldM , ok := och . UserAggregationMsgs [ string ( consumerMessages [ i ] . Key ) ] ; ok {
oldM = append ( oldM , & msgFromMQ )
och . UserAggregationMsgs [ string ( consumerMessages [ i ] . Key ) ] = oldM
} else {
m := make ( [ ] * pbMsg . MsgDataToMQ , 100 )
m = append ( m , & msgFromMQ )
och . UserAggregationMsgs [ string ( consumerMessages [ i ] . Key ) ] = m
for userID , v := range och . UserAggregationMsgs {
if len ( v ) >= 0 {
channelID := getHashCode ( userID ) % ChannelNum
go func ( cID uint32 , userID string , messages [ ] * pbMsg . MsgDataToMQ ) {
och . chArrays [ cID ] <- Cmd2Value { Cmd : UserMessages , Value : MsgChannelValue { userID : userID , msgList : messages } }
} ( channelID , userID , v )
func ( mc * OnlineHistoryConsumerHandler ) handleChatWs2Mongo ( cMsg * sarama . ConsumerMessage , msgKey string , sess sarama . ConsumerGroupSession ) {
msg := cMsg . Value
now := time . Now ( )
msgFromMQ := pbMsg . MsgDataToMQ { }
@ -146,7 +275,7 @@ func (och *OnlineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.Consume
} else {
go sendMessageToPush ( & msgFromMQ , msgKey )
log . NewDebug ( operationID , "save UserChat cost time ", time . Since ( now ) )
log . NewDebug ( operationID , "save SingleMsg cost time ", time . Since ( now ) )
case constant . GroupChatType :
log . NewDebug ( msgFromMQ . OperationID , "msg_transfer msg type = GroupChatType" , isHistory , isPersist )
if isHistory {
@ -158,6 +287,8 @@ func (och *OnlineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.Consume
groupMsgCount ++
go sendMessageToPush ( & msgFromMQ , msgFromMQ . MsgData . RecvID )
log . NewDebug ( operationID , "saveGroupMsg cost time " , time . Since ( now ) )
case constant . NotificationChatType :
log . NewDebug ( msgFromMQ . OperationID , "msg_transfer msg type = NotificationChatType" , isHistory , isPersist )
if isHistory {
@ -180,6 +311,7 @@ func (och *OnlineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.Consume
sess . MarkMessage ( cMsg , "" )
log . NewDebug ( msgFromMQ . OperationID , "msg_transfer handle topic data to database success..." , msgFromMQ . String ( ) )
func ( och * OnlineHistoryConsumerHandler ) handleChatWs2MongoLowReliability ( cMsg * sarama . ConsumerMessage , msgKey string , sess sarama . ConsumerGroupSession ) {
msg := cMsg . Value
msgFromMQ := pbMsg . MsgDataToMQ { }
@ -202,7 +334,7 @@ func (och *OnlineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg *
sess . MarkMessage ( cMsg , "" )
msgFromMQ . MsgData . Seq = uint32 ( seq )
log . Debug ( operationID , "send ch msg is " , msgFromMQ . String ( ) )
och . msgCh <- Cmd2Value { Cmd : Msg , Value : MsgChannelValue { msgKey , msgFromMQ } }
//och.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, msgFromMQ}}
//err := saveUserChat(msgKey, &msgFromMQ)
//if err != nil {
// singleMsgFailedCount++
@ -222,19 +354,49 @@ func (och *OnlineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg *
func ( OnlineHistoryConsumerHandler ) Setup ( _ sarama . ConsumerGroupSession ) error { return nil }
func ( OnlineHistoryConsumerHandler ) Cleanup ( _ sarama . ConsumerGroupSession ) error { return nil }
//func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
// claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group
// log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition())
// for msg := range claim.Messages() {
// SetOnlineTopicStatus(OnlineTopicBusy)
// //och.TriggerCmd(OnlineTopicBusy)
// log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset())
// och.msgHandle[msg.Topic](msg, string(msg.Key), sess)
// if claim.HighWaterMarkOffset()-msg.Offset <= 1 {
// log.Debug("", "online msg consume end", claim.HighWaterMarkOffset(), msg.Offset)
// SetOnlineTopicStatus(OnlineTopicVacancy)
// och.TriggerCmd(OnlineTopicVacancy)
// }
// }
// return nil
func ( och * OnlineHistoryConsumerHandler ) ConsumeClaim ( sess sarama . ConsumerGroupSession ,
claim sarama . ConsumerGroupClaim ) error { // a instance in the consumer group
log . NewDebug ( "" , "online new session msg come" , claim . HighWaterMarkOffset ( ) , claim . Topic ( ) , claim . Partition ( ) )
cMsg := make ( [ ] * sarama . ConsumerMessage , 500 )
t := time . NewTicker ( time . Duration ( 500 ) * time . Millisecond )
for msg := range claim . Messages ( ) {
SetOnlineTopicStatus ( OnlineTopicBusy )
log . NewDebug ( "" , "online kafka get info to mongo" , "msgTopic" , msg . Topic , "msgPartition" , msg . Partition , "online" , msg . Offset , claim . HighWaterMarkOffset ( ) )
och . msgHandle [ msg . Topic ] ( msg , string ( msg . Key ) , sess )
if claim . HighWaterMarkOffset ( ) - msg . Offset <= 1 {
log . Debug ( "" , "online msg consume end" , claim . HighWaterMarkOffset ( ) , msg . Offset )
SetOnlineTopicStatus ( OnlineTopicVacancy )
och . TriggerCmd ( OnlineTopicVacancy )
cMsg = append ( cMsg , msg )
select {
case <- t . C :
if len ( cMsg ) >= 0 {
och . msgDistributionCh <- Cmd2Value { Cmd : ConsumerMsgs , Value : cMsg }
sess . MarkMessage ( msg , "" )
cMsg = cMsg [ 0 : 0 ]
default :
if len ( cMsg ) >= 500 {
och . msgDistributionCh <- Cmd2Value { Cmd : ConsumerMsgs , Value : cMsg }
sess . MarkMessage ( msg , "" )
cMsg = cMsg [ 0 : 0 ]
log . NewDebug ( "" , "online kafka get info to mongo" , "msgTopic" , msg . Topic , "msgPartition" , msg . Partition , "online" , msg . Offset , claim . HighWaterMarkOffset ( ) )
return nil
@ -264,3 +426,12 @@ func sendMessageToPush(message *pbMsg.MsgDataToMQ, pushToUserID string) {
// String hashes a string to a unique hashcode.
// crc32 returns a uint32, but for our use we need
// and non negative integer. Here we cast to an integer
// and invert it if the result is negative.
func getHashCode ( s string ) uint32 {
return crc32 . ChecksumIEEE ( [ ] byte ( s ) )