From 94cd17909d0cc9039c54ebba8d8d98b090e6471a Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 25 May 2022 21:23:10 +0800 Subject: [PATCH] channelNum --- .../logic/online_history_msg_handler.go | 44 +++++++++---------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 48f190dde..7434dfd5d 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -395,28 +395,6 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS cMsg := make([]*sarama.ConsumerMessage, 0, 1000) t := time.NewTicker(time.Duration(100) * time.Millisecond) var triggerID string - for msg := range claim.Messages() { - //msgFromMQ := pbMsg.MsgDataToMQ{} - //err := proto.Unmarshal(msg.Value, &msgFromMQ) - //if err != nil { - // log.Error(triggerID, "msg_transfer Unmarshal msg err", "msg", string(msg.Value), "err", err.Error()) - //} - //userID := string(msg.Key) - //hashCode := getHashCode(userID) - //channelID := hashCode % ChannelNum - //log.Debug(triggerID, "generate channelID", hashCode, channelID, userID) - ////go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { - //och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: []*pbMsg.MsgDataToMQ{&msgFromMQ}, triggerID: msgFromMQ.OperationID}} - //sess.MarkMessage(msg, "") - rwLock.Lock() - cMsg = append(cMsg, msg) - rwLock.Unlock() - sess.MarkMessage(msg, "") - //och.TriggerCmd(OnlineTopicBusy) - - //log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset()) - - } go func() { for { select { @@ -468,6 +446,28 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS } }() + for msg := range claim.Messages() { + //msgFromMQ := pbMsg.MsgDataToMQ{} + //err := proto.Unmarshal(msg.Value, &msgFromMQ) + //if err != nil { + // log.Error(triggerID, "msg_transfer Unmarshal msg err", "msg", string(msg.Value), "err", err.Error()) + //} + //userID := string(msg.Key) + //hashCode := getHashCode(userID) + //channelID := hashCode % ChannelNum + //log.Debug(triggerID, "generate channelID", hashCode, channelID, userID) + ////go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { + //och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: []*pbMsg.MsgDataToMQ{&msgFromMQ}, triggerID: msgFromMQ.OperationID}} + //sess.MarkMessage(msg, "") + rwLock.Lock() + cMsg = append(cMsg, msg) + rwLock.Unlock() + sess.MarkMessage(msg, "") + //och.TriggerCmd(OnlineTopicBusy) + + //log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset()) + + } return nil }