diff --git a/internal/msg_transfer/logic/init.go b/internal/msg_transfer/logic/init.go index 96412fa40..f8c424dcc 100644 --- a/internal/msg_transfer/logic/init.go +++ b/internal/msg_transfer/logic/init.go @@ -16,7 +16,7 @@ const Msg = 2 const ConsumerMsgs = 3 const UserMessages = 4 const MongoMessages = 5 -const ChannelNum = 10 +const ChannelNum = 100 var ( persistentCH PersistentConsumerHandler diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 8e8941d62..4e1242eff 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -42,13 +42,13 @@ type OnlineHistoryConsumerHandler struct { msgCh chan Cmd2Value chArrays [ChannelNum]chan Cmd2Value chMongoArrays [ChannelNum]chan Cmd2Value - //msgDistributionCh chan Cmd2Value + msgDistributionCh chan Cmd2Value } func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { och.msgHandle = make(map[string]fcb) - //och.msgDistributionCh = make(chan Cmd2Value) //no buffer channel - //go och.MessagesDistributionHandle() + och.msgDistributionCh = make(chan Cmd2Value) //no buffer channel + go och.MessagesDistributionHandle() och.cmdCh = cmdCh och.msgCh = make(chan Cmd2Value, 1000) for i := 0; i < ChannelNum; i++ { @@ -202,52 +202,52 @@ func (och *OnlineHistoryConsumerHandler) MongoMessageRun(channelID int) { } } -//func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() { -// for { -// UserAggregationMsgs := make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum) -// select { -// case cmd := <-och.msgDistributionCh: -// switch cmd.Cmd { -// case ConsumerMsgs: -// triggerChannelValue := cmd.Value.(TriggerChannelValue) -// triggerID := triggerChannelValue.triggerID -// consumerMessages := triggerChannelValue.cmsgList -// //Aggregation map[userid]message list -// log.Debug(triggerID, "batch messages come to distribution center", len(consumerMessages)) -// for i := 0; i < len(consumerMessages); i++ { -// msgFromMQ := pbMsg.MsgDataToMQ{} -// err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQ) -// if err != nil { -// log.Error(triggerID, "msg_transfer Unmarshal msg err", "msg", string(consumerMessages[i].Value), "err", err.Error()) -// return -// } -// log.Debug(triggerID, "single msg come to distribution center", msgFromMQ.String(), string(consumerMessages[i].Key)) -// if oldM, ok := UserAggregationMsgs[string(consumerMessages[i].Key)]; ok { -// oldM = append(oldM, &msgFromMQ) -// UserAggregationMsgs[string(consumerMessages[i].Key)] = oldM -// } else { -// m := make([]*pbMsg.MsgDataToMQ, 0, 100) -// m = append(m, &msgFromMQ) -// UserAggregationMsgs[string(consumerMessages[i].Key)] = m -// } -// } -// log.Debug(triggerID, "generate map list users len", len(UserAggregationMsgs)) -// for userID, v := range UserAggregationMsgs { -// if len(v) >= 0 { -// hashCode := getHashCode(userID) -// channelID := hashCode % ChannelNum -// log.Debug(triggerID, "generate channelID", hashCode, channelID, userID) -// //go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { -// och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: v, triggerID: triggerID}} -// //}(channelID, userID, v) -// } -// } -// } -// } -// -// } -// -//} +func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() { + for { + UserAggregationMsgs := make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum) + select { + case cmd := <-och.msgDistributionCh: + switch cmd.Cmd { + case ConsumerMsgs: + triggerChannelValue := cmd.Value.(TriggerChannelValue) + triggerID := triggerChannelValue.triggerID + consumerMessages := triggerChannelValue.cmsgList + //Aggregation map[userid]message list + log.Debug(triggerID, "batch messages come to distribution center", len(consumerMessages)) + for i := 0; i < len(consumerMessages); i++ { + msgFromMQ := pbMsg.MsgDataToMQ{} + err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQ) + if err != nil { + log.Error(triggerID, "msg_transfer Unmarshal msg err", "msg", string(consumerMessages[i].Value), "err", err.Error()) + return + } + log.Debug(triggerID, "single msg come to distribution center", msgFromMQ.String(), string(consumerMessages[i].Key)) + if oldM, ok := UserAggregationMsgs[string(consumerMessages[i].Key)]; ok { + oldM = append(oldM, &msgFromMQ) + UserAggregationMsgs[string(consumerMessages[i].Key)] = oldM + } else { + m := make([]*pbMsg.MsgDataToMQ, 0, 100) + m = append(m, &msgFromMQ) + UserAggregationMsgs[string(consumerMessages[i].Key)] = m + } + } + log.Debug(triggerID, "generate map list users len", len(UserAggregationMsgs)) + for userID, v := range UserAggregationMsgs { + if len(v) >= 0 { + hashCode := getHashCode(userID) + channelID := hashCode % ChannelNum + log.Debug(triggerID, "generate channelID", hashCode, channelID, userID) + //go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { + och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: v, triggerID: triggerID}} + //}(channelID, userID, v) + } + } + } + } + + } + +} func (mc *OnlineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { msg := cMsg.Value now := time.Now() @@ -393,60 +393,60 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS } log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) - //cMsg := make([]*sarama.ConsumerMessage, 0, 1000) - //t := time.NewTicker(time.Duration(100) * time.Millisecond) + cMsg := make([]*sarama.ConsumerMessage, 0, 1000) + t := time.NewTicker(time.Duration(100) * time.Millisecond) var triggerID string for msg := range claim.Messages() { - msgFromMQ := pbMsg.MsgDataToMQ{} - err := proto.Unmarshal(msg.Value, &msgFromMQ) - if err != nil { - log.Error(triggerID, "msg_transfer Unmarshal msg err", "msg", string(msg.Value), "err", err.Error()) - } - userID := string(msg.Key) - hashCode := getHashCode(userID) - channelID := hashCode % ChannelNum - log.Debug(triggerID, "generate channelID", hashCode, channelID, userID) - //go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { - och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: []*pbMsg.MsgDataToMQ{&msgFromMQ}, triggerID: msgFromMQ.OperationID}} - sess.MarkMessage(msg, "") - //cMsg = append(cMsg, msg) + //msgFromMQ := pbMsg.MsgDataToMQ{} + //err := proto.Unmarshal(msg.Value, &msgFromMQ) + //if err != nil { + // log.Error(triggerID, "msg_transfer Unmarshal msg err", "msg", string(msg.Value), "err", err.Error()) + //} + //userID := string(msg.Key) + //hashCode := getHashCode(userID) + //channelID := hashCode % ChannelNum + //log.Debug(triggerID, "generate channelID", hashCode, channelID, userID) + ////go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { + //och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: []*pbMsg.MsgDataToMQ{&msgFromMQ}, triggerID: msgFromMQ.OperationID}} + //sess.MarkMessage(msg, "") + cMsg = append(cMsg, msg) //och.TriggerCmd(OnlineTopicBusy) - //select { - ////case : - //// triggerID = utils.OperationIDGenerator() - //// - //// log.NewDebug(triggerID, "claim.Messages ", msg) - //// cMsg = append(cMsg, msg) - //// if len(cMsg) >= 1000 { - //// ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) - //// for _, v := range cMsg { - //// ccMsg = append(ccMsg, v) - //// } - //// log.Debug(triggerID, "length trigger msg consumer start", len(ccMsg)) - //// och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ - //// triggerID: triggerID, cmsgList: ccMsg}} - //// sess.MarkMessage(msg, "") - //// cMsg = make([]*sarama.ConsumerMessage, 0, 1000) - //// log.Debug(triggerID, "length trigger msg consumer end", len(cMsg)) - //// } + select { + //case : + // triggerID = utils.OperationIDGenerator() // - //case <-t.C: - // if len(cMsg) > 0 { + // log.NewDebug(triggerID, "claim.Messages ", msg) + // cMsg = append(cMsg, msg) + // if len(cMsg) >= 1000 { // ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) // for _, v := range cMsg { // ccMsg = append(ccMsg, v) // } - // triggerID = utils.OperationIDGenerator() - // log.Debug(triggerID, "timer trigger msg consumer start", len(ccMsg)) + // log.Debug(triggerID, "length trigger msg consumer start", len(ccMsg)) // och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ // triggerID: triggerID, cmsgList: ccMsg}} - // sess.MarkMessage(cMsg[len(cMsg)-1], "") + // sess.MarkMessage(msg, "") // cMsg = make([]*sarama.ConsumerMessage, 0, 1000) - // log.Debug(triggerID, "timer trigger msg consumer end", len(cMsg)) + // log.Debug(triggerID, "length trigger msg consumer end", len(cMsg)) // } - //default: - // - //} + + case <-t.C: + if len(cMsg) > 0 { + ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) + for _, v := range cMsg { + ccMsg = append(ccMsg, v) + } + triggerID = utils.OperationIDGenerator() + log.Debug(triggerID, "timer trigger msg consumer start", len(ccMsg)) + och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ + triggerID: triggerID, cmsgList: ccMsg}} + sess.MarkMessage(ccMsg[len(cMsg)-1], "") + cMsg = make([]*sarama.ConsumerMessage, 0, 1000) + log.Debug(triggerID, "timer trigger msg consumer end", len(cMsg)) + } + default: + + } //log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset()) }