From 06aae24dada45bb0dfefeab884cdd81ce7f3640e Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Fri, 20 May 2022 14:12:29 +0800 Subject: [PATCH] log --- internal/msg_transfer/logic/online_history_msg_handler.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 5e1e5f935..b6e717af6 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -205,19 +205,23 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) { func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() { for { + operationID := utils.OperationIDGenerator() select { + case cmd := <-och.msgDistributionCh: switch cmd.Cmd { case ConsumerMsgs: consumerMessages := cmd.Value.([]*sarama.ConsumerMessage) //Aggregation map[userid]message list for i := 0; i < len(consumerMessages); i++ { + msgFromMQ := pbMsg.MsgDataToMQ{} err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQ) if err != nil { - log.Error("msg_transfer Unmarshal msg err", "", "msg", string(consumerMessages[i].Value), "err", err.Error()) + log.Error(operationID, "msg_transfer Unmarshal msg err", "", "msg", string(consumerMessages[i].Value), "err", err.Error()) return } + log.Debug(operationID, "MessagesDistributionHandle ", msgFromMQ.String()) if oldM, ok := och.UserAggregationMsgs[string(consumerMessages[i].Key)]; ok { oldM = append(oldM, &msgFromMQ) och.UserAggregationMsgs[string(consumerMessages[i].Key)] = oldM @@ -230,6 +234,7 @@ func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() { for userID, v := range och.UserAggregationMsgs { if len(v) >= 0 { channelID := getHashCode(userID) % ChannelNum + log.Debug(operationID, "UserAggregationMsgs ", len(v), channelID, userID) go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { och.chArrays[cID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: messages}} }(channelID, userID, v)