From 06726758d218d96330a559378b2b0b15e8b0f700 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Fri, 20 May 2022 13:33:38 +0800 Subject: [PATCH] concurrent consumption of messages --- internal/msg_transfer/logic/init.go | 3 + .../logic/offline_history_msg_handler.go | 167 ++++++++++--- .../logic/online_history_msg_handler.go | 225 +++++++++++++++--- 3 files changed, 332 insertions(+), 63 deletions(-) diff --git a/internal/msg_transfer/logic/init.go b/internal/msg_transfer/logic/init.go index bef20e8c7..b4cfff6db 100644 --- a/internal/msg_transfer/logic/init.go +++ b/internal/msg_transfer/logic/init.go @@ -13,6 +13,9 @@ import ( const OnlineTopicBusy = 1 const OnlineTopicVacancy = 0 const Msg = 2 +const ConsumerMsgs = 3 +const UserMessages = 4 +const ChannelNum = 10 var ( persistentCH PersistentConsumerHandler diff --git a/internal/msg_transfer/logic/offline_history_msg_handler.go b/internal/msg_transfer/logic/offline_history_msg_handler.go index c5cac9a82..4fdd30577 100644 --- a/internal/msg_transfer/logic/offline_history_msg_handler.go +++ b/internal/msg_transfer/logic/offline_history_msg_handler.go @@ -15,22 +15,28 @@ import ( type OfflineHistoryConsumerHandler struct { msgHandle map[string]fcb + historyConsumerGroup *kfk.MConsumerGroup cmdCh chan Cmd2Value msgCh chan Cmd2Value - historyConsumerGroup *kfk.MConsumerGroup + UserAggregationMsgs map[string][]*pbMsg.MsgDataToMQ + chArrays [ChannelNum]chan Cmd2Value + msgDistributionCh chan Cmd2Value } func (mc *OfflineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { mc.msgHandle = make(map[string]fcb) + mc.UserAggregationMsgs = make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum) + mc.msgDistributionCh = make(chan Cmd2Value) //no buffer channel + go mc.MessagesDistributionHandle() mc.cmdCh = cmdCh mc.msgCh = make(chan Cmd2Value, 1000) if config.Config.ReliableStorage { - mc.msgHandle[config.Config.Kafka.Ws2mschatOffline.Topic] = mc.handleChatWs2Mongo + mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2Mongo } else { - mc.msgHandle[config.Config.Kafka.Ws2mschatOffline.Topic] = mc.handleChatWs2MongoLowReliability - for i := 0; i < 10; i++ { - go mc.Run() - + mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2MongoLowReliability + for i := 0; i < ChannelNum; i++ { + mc.chArrays[i] = make(chan Cmd2Value, 1000) + go mc.Run(i) } } mc.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, @@ -38,32 +44,92 @@ func (mc *OfflineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { config.Config.Kafka.Ws2mschatOffline.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongoOffline) } -func (mc *OfflineHistoryConsumerHandler) Run() { +func (och *OfflineHistoryConsumerHandler) Run(channelID int) { for { select { - case cmd := <-mc.msgCh: + case cmd := <-och.chArrays[channelID]: switch cmd.Cmd { - case Msg: + case UserMessages: msgChannelValue := cmd.Value.(MsgChannelValue) - msg := msgChannelValue.msg - log.Debug(msg.OperationID, "msg arrived channel", msg.String()) - isSenderSync := utils.GetSwitchFromOptions(msg.MsgData.Options, constant.IsSenderSync) - err := saveUserChat(msgChannelValue.userID, &msg) + msgList := msgChannelValue.msgList + storageMsgList := make([]*pbMsg.MsgDataToMQ, 80) + pushMsgList := make([]*pbMsg.MsgDataToMQ, 80) + latestMsgOperationID := msgList[len(msgList)-1].OperationID + log.Debug(latestMsgOperationID, "msg arrived channel", "channel id", channelID, msgList) + for _, v := range msgList { + isHistory := utils.GetSwitchFromOptions(v.MsgData.Options, constant.IsHistory) + isSenderSync := utils.GetSwitchFromOptions(v.MsgData.Options, constant.IsSenderSync) + if isHistory { + storageMsgList = append(storageMsgList, v) + } + if !(!isSenderSync && msgChannelValue.userID == v.MsgData.SendID) { + pushMsgList = append(pushMsgList, v) + } + } + + //switch msgChannelValue.msg.MsgData.SessionType { + //case constant.SingleChatType: + //case constant.GroupChatType: + //case constant.NotificationChatType: + //default: + // log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String()) + // return + //} + + err := saveUserChatList(msgChannelValue.userID, storageMsgList, latestMsgOperationID) if err != nil { - singleMsgFailedCount++ - log.NewError(msg.OperationID, "single data insert to mongo err", err.Error(), msg.String()) + singleMsgFailedCount += uint64(len(storageMsgList)) + log.NewError(latestMsgOperationID, "single data insert to mongo err", err.Error(), storageMsgList) } else { singleMsgSuccessCountMutex.Lock() - singleMsgSuccessCount++ + singleMsgSuccessCount += uint64(len(storageMsgList)) singleMsgSuccessCountMutex.Unlock() - if !(!isSenderSync && msgChannelValue.userID == msg.MsgData.SendID) { - go sendMessageToPush(&msg, msgChannelValue.userID) + for _, v := range pushMsgList { + sendMessageToPush(v, msgChannelValue.userID) } + } } } } } +func (och *OfflineHistoryConsumerHandler) MessagesDistributionHandle() { + for { + select { + case cmd := <-och.msgDistributionCh: + switch cmd.Cmd { + case ConsumerMsgs: + consumerMessages := cmd.Value.([]*sarama.ConsumerMessage) + //Aggregation map[userid]message list + for i := 0; i < len(consumerMessages); i++ { + msgFromMQ := pbMsg.MsgDataToMQ{} + err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQ) + if err != nil { + log.Error("msg_transfer Unmarshal msg err", "", "msg", string(consumerMessages[i].Value), "err", err.Error()) + return + } + if oldM, ok := och.UserAggregationMsgs[string(consumerMessages[i].Key)]; ok { + oldM = append(oldM, &msgFromMQ) + och.UserAggregationMsgs[string(consumerMessages[i].Key)] = oldM + } else { + m := make([]*pbMsg.MsgDataToMQ, 100) + m = append(m, &msgFromMQ) + och.UserAggregationMsgs[string(consumerMessages[i].Key)] = m + } + } + for userID, v := range och.UserAggregationMsgs { + if len(v) >= 0 { + channelID := getHashCode(userID) % ChannelNum + go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { + och.chArrays[cID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: messages}} + }(channelID, userID, v) + } + } + } + } + } + +} func (mc *OfflineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { msg := cMsg.Value now := time.Now() @@ -157,7 +223,7 @@ func (mc *OfflineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg * sess.MarkMessage(cMsg, "") msgFromMQ.MsgData.Seq = uint32(seq) log.Debug(operationID, "send ch msg is ", msgFromMQ.String()) - mc.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, msgFromMQ}} + //mc.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, msgFromMQ}} //err := saveUserChat(msgKey, &msgFromMQ) //if err != nil { // singleMsgFailedCount++ @@ -177,27 +243,56 @@ func (mc *OfflineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg * func (OfflineHistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (OfflineHistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } -func (mc *OfflineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, + +//func (mc *OfflineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, +// claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group +// //log.NewDebug("", "offline new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) +// //for msg := range claim.Messages() { +// // log.NewDebug("", "kafka get info to delay mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "offline") +// // //mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) +// //} +// for msg := range claim.Messages() { +// if GetOnlineTopicStatus() == OnlineTopicVacancy { +// log.NewDebug("", "vacancy offline kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) +// mc.msgHandle[msg.Topic](msg, string(msg.Key), sess) +// } else { +// select { +// case <-mc.cmdCh: +// log.NewDebug("", "cmd offline kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) +// case <-time.After(time.Millisecond * time.Duration(100)): +// log.NewDebug("", "timeout offline kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) +// } +// mc.msgHandle[msg.Topic](msg, string(msg.Key), sess) +// } +// } +// +// return nil +//} +func (och *OfflineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group - //log.NewDebug("", "offline new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) - //for msg := range claim.Messages() { - // log.NewDebug("", "kafka get info to delay mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "offline") - // //mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) - //} + log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) + cMsg := make([]*sarama.ConsumerMessage, 500) + t := time.NewTicker(time.Duration(500) * time.Millisecond) for msg := range claim.Messages() { - if GetOnlineTopicStatus() == OnlineTopicVacancy { - log.NewDebug("", "vacancy offline kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) - mc.msgHandle[msg.Topic](msg, string(msg.Key), sess) - } else { - select { - case <-mc.cmdCh: - log.NewDebug("", "cmd offline kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) - case <-time.After(time.Millisecond * time.Duration(100)): - log.NewDebug("", "timeout offline kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) + //och.TriggerCmd(OnlineTopicBusy) + cMsg = append(cMsg, msg) + select { + case <-t.C: + if len(cMsg) >= 0 { + och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: cMsg} + sess.MarkMessage(msg, "") + cMsg = cMsg[0:0] + } + default: + if len(cMsg) >= 500 { + och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: cMsg} + sess.MarkMessage(msg, "") + cMsg = cMsg[0:0] } - mc.msgHandle[msg.Topic](msg, string(msg.Key), sess) + } - } + log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset()) + } return nil } diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index ea787c107..248e55bd6 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -14,13 +14,14 @@ import ( "errors" "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" + "hash/crc32" "strings" "time" ) type MsgChannelValue struct { - userID string - msg pbMsg.MsgDataToMQ + userID string + msgList []*pbMsg.MsgDataToMQ } type fcb func(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) type Cmd2Value struct { @@ -32,19 +33,25 @@ type OnlineHistoryConsumerHandler struct { historyConsumerGroup *kfk.MConsumerGroup cmdCh chan Cmd2Value msgCh chan Cmd2Value + UserAggregationMsgs map[string][]*pbMsg.MsgDataToMQ + chArrays [ChannelNum]chan Cmd2Value + msgDistributionCh chan Cmd2Value } func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { och.msgHandle = make(map[string]fcb) + och.UserAggregationMsgs = make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum) + och.msgDistributionCh = make(chan Cmd2Value) //no buffer channel + go och.MessagesDistributionHandle() och.cmdCh = cmdCh och.msgCh = make(chan Cmd2Value, 1000) if config.Config.ReliableStorage { och.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = och.handleChatWs2Mongo } else { och.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = och.handleChatWs2MongoLowReliability - for i := 0; i < 10; i++ { - go och.Run() - + for i := 0; i < ChannelNum; i++ { + och.chArrays[i] = make(chan Cmd2Value, 1000) + go och.Run(i) } } och.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, @@ -76,16 +83,29 @@ func sendCmd(ch chan Cmd2Value, value Cmd2Value, timeout int64) error { return errors.New("send cmd timeout") } } -func (och *OnlineHistoryConsumerHandler) Run() { +func (och *OnlineHistoryConsumerHandler) Run(channelID int) { for { select { - case cmd := <-och.msgCh: + case cmd := <-och.chArrays[channelID]: switch cmd.Cmd { - case Msg: + case UserMessages: msgChannelValue := cmd.Value.(MsgChannelValue) - msg := msgChannelValue.msg - log.Debug(msg.OperationID, "msg arrived channel", msg.String()) - isSenderSync := utils.GetSwitchFromOptions(msg.MsgData.Options, constant.IsSenderSync) + msgList := msgChannelValue.msgList + storageMsgList := make([]*pbMsg.MsgDataToMQ, 80) + pushMsgList := make([]*pbMsg.MsgDataToMQ, 80) + latestMsgOperationID := msgList[len(msgList)-1].OperationID + log.Debug(latestMsgOperationID, "msg arrived channel", "channel id", channelID, msgList) + for _, v := range msgList { + isHistory := utils.GetSwitchFromOptions(v.MsgData.Options, constant.IsHistory) + isSenderSync := utils.GetSwitchFromOptions(v.MsgData.Options, constant.IsSenderSync) + if isHistory { + storageMsgList = append(storageMsgList, v) + } + if !(!isSenderSync && msgChannelValue.userID == v.MsgData.SendID) { + pushMsgList = append(pushMsgList, v) + } + } + //switch msgChannelValue.msg.MsgData.SessionType { //case constant.SingleChatType: //case constant.GroupChatType: @@ -95,23 +115,132 @@ func (och *OnlineHistoryConsumerHandler) Run() { // return //} - err := saveUserChat(msgChannelValue.userID, &msg) + err := saveUserChatList(msgChannelValue.userID, storageMsgList, latestMsgOperationID) if err != nil { - singleMsgFailedCount++ - log.NewError(msg.OperationID, "single data insert to mongo err", err.Error(), msg.String()) + singleMsgFailedCount += uint64(len(storageMsgList)) + log.NewError(latestMsgOperationID, "single data insert to mongo err", err.Error(), storageMsgList) } else { singleMsgSuccessCountMutex.Lock() - singleMsgSuccessCount++ + singleMsgSuccessCount += uint64(len(storageMsgList)) singleMsgSuccessCountMutex.Unlock() - if !(!isSenderSync && msgChannelValue.userID == msg.MsgData.SendID) { - go sendMessageToPush(&msg, msgChannelValue.userID) + for _, v := range pushMsgList { + sendMessageToPush(v, msgChannelValue.userID) } + } } } } } -func (och *OnlineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { + +//func (och *OnlineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { +// msg := cMsg.Value +// now := time.Now() +// msgFromMQ := pbMsg.MsgDataToMQ{} +// err := proto.Unmarshal(msg, &msgFromMQ) +// if err != nil { +// log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error()) +// return +// } +// operationID := msgFromMQ.OperationID +// log.NewInfo(operationID, "msg come mongo!!!", "", "msg", string(msg)) +// //Control whether to store offline messages (mongo) +// isHistory := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsHistory) +// //Control whether to store history messages (mysql) +// isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent) +// isSenderSync := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync) +// switch msgFromMQ.MsgData.SessionType { +// case constant.SingleChatType: +// log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = SingleChatType", isHistory, isPersist) +// if isHistory { +// err := saveUserChat(msgKey, &msgFromMQ) +// if err != nil { +// singleMsgFailedCount++ +// log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) +// return +// } +// singleMsgSuccessCountMutex.Lock() +// singleMsgSuccessCount++ +// singleMsgSuccessCountMutex.Unlock() +// log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now)) +// } +// if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID { +// } else { +// go sendMessageToPush(&msgFromMQ, msgKey) +// } +// log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now)) +// case constant.GroupChatType: +// log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = GroupChatType", isHistory, isPersist) +// if isHistory { +// err := saveUserChat(msgFromMQ.MsgData.RecvID, &msgFromMQ) +// if err != nil { +// log.NewError(operationID, "group data insert to mongo err", msgFromMQ.String(), msgFromMQ.MsgData.RecvID, err.Error()) +// return +// } +// groupMsgCount++ +// } +// go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID) +// case constant.NotificationChatType: +// log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = NotificationChatType", isHistory, isPersist) +// if isHistory { +// err := saveUserChat(msgKey, &msgFromMQ) +// if err != nil { +// log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) +// return +// } +// log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now)) +// } +// if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID { +// } else { +// go sendMessageToPush(&msgFromMQ, msgKey) +// } +// log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now)) +// default: +// log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String()) +// return +// } +// sess.MarkMessage(cMsg, "") +// log.NewDebug(msgFromMQ.OperationID, "msg_transfer handle topic data to database success...", msgFromMQ.String()) +//} + +func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() { + for { + select { + case cmd := <-och.msgDistributionCh: + switch cmd.Cmd { + case ConsumerMsgs: + consumerMessages := cmd.Value.([]*sarama.ConsumerMessage) + //Aggregation map[userid]message list + for i := 0; i < len(consumerMessages); i++ { + msgFromMQ := pbMsg.MsgDataToMQ{} + err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQ) + if err != nil { + log.Error("msg_transfer Unmarshal msg err", "", "msg", string(consumerMessages[i].Value), "err", err.Error()) + return + } + if oldM, ok := och.UserAggregationMsgs[string(consumerMessages[i].Key)]; ok { + oldM = append(oldM, &msgFromMQ) + och.UserAggregationMsgs[string(consumerMessages[i].Key)] = oldM + } else { + m := make([]*pbMsg.MsgDataToMQ, 100) + m = append(m, &msgFromMQ) + och.UserAggregationMsgs[string(consumerMessages[i].Key)] = m + } + } + for userID, v := range och.UserAggregationMsgs { + if len(v) >= 0 { + channelID := getHashCode(userID) % ChannelNum + go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { + och.chArrays[cID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: messages}} + }(channelID, userID, v) + } + } + } + } + } + +} +func (mc *OnlineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { msg := cMsg.Value now := time.Now() msgFromMQ := pbMsg.MsgDataToMQ{} @@ -146,7 +275,7 @@ func (och *OnlineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.Consume } else { go sendMessageToPush(&msgFromMQ, msgKey) } - log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now)) + log.NewDebug(operationID, "saveSingleMsg cost time ", time.Since(now)) case constant.GroupChatType: log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = GroupChatType", isHistory, isPersist) if isHistory { @@ -158,6 +287,8 @@ func (och *OnlineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.Consume groupMsgCount++ } go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID) + log.NewDebug(operationID, "saveGroupMsg cost time ", time.Since(now)) + case constant.NotificationChatType: log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = NotificationChatType", isHistory, isPersist) if isHistory { @@ -180,6 +311,7 @@ func (och *OnlineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.Consume sess.MarkMessage(cMsg, "") log.NewDebug(msgFromMQ.OperationID, "msg_transfer handle topic data to database success...", msgFromMQ.String()) } + func (och *OnlineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { msg := cMsg.Value msgFromMQ := pbMsg.MsgDataToMQ{} @@ -202,7 +334,7 @@ func (och *OnlineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg * sess.MarkMessage(cMsg, "") msgFromMQ.MsgData.Seq = uint32(seq) log.Debug(operationID, "send ch msg is ", msgFromMQ.String()) - och.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, msgFromMQ}} + //och.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, msgFromMQ}} //err := saveUserChat(msgKey, &msgFromMQ) //if err != nil { // singleMsgFailedCount++ @@ -222,19 +354,49 @@ func (och *OnlineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg * func (OnlineHistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (OnlineHistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } + +//func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, +// claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group +// log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) +// for msg := range claim.Messages() { +// SetOnlineTopicStatus(OnlineTopicBusy) +// //och.TriggerCmd(OnlineTopicBusy) +// log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset()) +// och.msgHandle[msg.Topic](msg, string(msg.Key), sess) +// if claim.HighWaterMarkOffset()-msg.Offset <= 1 { +// log.Debug("", "online msg consume end", claim.HighWaterMarkOffset(), msg.Offset) +// SetOnlineTopicStatus(OnlineTopicVacancy) +// och.TriggerCmd(OnlineTopicVacancy) +// } +// } +// return nil +//} + func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) + cMsg := make([]*sarama.ConsumerMessage, 500) + t := time.NewTicker(time.Duration(500) * time.Millisecond) for msg := range claim.Messages() { - SetOnlineTopicStatus(OnlineTopicBusy) //och.TriggerCmd(OnlineTopicBusy) - log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset()) - och.msgHandle[msg.Topic](msg, string(msg.Key), sess) - if claim.HighWaterMarkOffset()-msg.Offset <= 1 { - log.Debug("", "online msg consume end", claim.HighWaterMarkOffset(), msg.Offset) - SetOnlineTopicStatus(OnlineTopicVacancy) - och.TriggerCmd(OnlineTopicVacancy) + cMsg = append(cMsg, msg) + select { + case <-t.C: + if len(cMsg) >= 0 { + och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: cMsg} + sess.MarkMessage(msg, "") + cMsg = cMsg[0:0] + } + default: + if len(cMsg) >= 500 { + och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: cMsg} + sess.MarkMessage(msg, "") + cMsg = cMsg[0:0] + } + } + log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset()) + } return nil } @@ -264,3 +426,12 @@ func sendMessageToPush(message *pbMsg.MsgDataToMQ, pushToUserID string) { } } + +// String hashes a string to a unique hashcode. +// +// crc32 returns a uint32, but for our use we need +// and non negative integer. Here we cast to an integer +// and invert it if the result is negative. +func getHashCode(s string) uint32 { + return crc32.ChecksumIEEE([]byte(s)) +}