pull/232/head
Gordon 3 years ago
parent 7267fd080e
commit dc2bc7fbf7

@ -42,13 +42,13 @@ type OnlineHistoryConsumerHandler struct {
msgCh chan Cmd2Value
chArrays [ChannelNum]chan Cmd2Value
chMongoArrays [ChannelNum]chan Cmd2Value
//msgDistributionCh chan Cmd2Value
msgDistributionCh chan Cmd2Value
}
func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) {
och.msgHandle = make(map[string]fcb)
//och.msgDistributionCh = make(chan Cmd2Value) //no buffer channel
//go och.MessagesDistributionHandle()
och.msgDistributionCh = make(chan Cmd2Value) //no buffer channel
go och.MessagesDistributionHandle()
och.cmdCh = cmdCh
och.msgCh = make(chan Cmd2Value, 1000)
for i := 0; i < ChannelNum; i++ {
@ -202,52 +202,52 @@ func (och *OnlineHistoryConsumerHandler) MongoMessageRun(channelID int) {
}
}
//func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() {
// for {
// UserAggregationMsgs := make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum)
// select {
// case cmd := <-och.msgDistributionCh:
// switch cmd.Cmd {
// case ConsumerMsgs:
// triggerChannelValue := cmd.Value.(TriggerChannelValue)
// triggerID := triggerChannelValue.triggerID
// consumerMessages := triggerChannelValue.cmsgList
// //Aggregation map[userid]message list
// log.Debug(triggerID, "batch messages come to distribution center", len(consumerMessages))
// for i := 0; i < len(consumerMessages); i++ {
// msgFromMQ := pbMsg.MsgDataToMQ{}
// err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQ)
// if err != nil {
// log.Error(triggerID, "msg_transfer Unmarshal msg err", "msg", string(consumerMessages[i].Value), "err", err.Error())
// return
// }
// log.Debug(triggerID, "single msg come to distribution center", msgFromMQ.String(), string(consumerMessages[i].Key))
// if oldM, ok := UserAggregationMsgs[string(consumerMessages[i].Key)]; ok {
// oldM = append(oldM, &msgFromMQ)
// UserAggregationMsgs[string(consumerMessages[i].Key)] = oldM
// } else {
// m := make([]*pbMsg.MsgDataToMQ, 0, 100)
// m = append(m, &msgFromMQ)
// UserAggregationMsgs[string(consumerMessages[i].Key)] = m
// }
// }
// log.Debug(triggerID, "generate map list users len", len(UserAggregationMsgs))
// for userID, v := range UserAggregationMsgs {
// if len(v) >= 0 {
// hashCode := getHashCode(userID)
// channelID := hashCode % ChannelNum
// log.Debug(triggerID, "generate channelID", hashCode, channelID, userID)
// //go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) {
// och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: v, triggerID: triggerID}}
// //}(channelID, userID, v)
// }
// }
// }
// }
//
// }
//
//}
func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() {
for {
UserAggregationMsgs := make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum)
select {
case cmd := <-och.msgDistributionCh:
switch cmd.Cmd {
case ConsumerMsgs:
triggerChannelValue := cmd.Value.(TriggerChannelValue)
triggerID := triggerChannelValue.triggerID
consumerMessages := triggerChannelValue.cmsgList
//Aggregation map[userid]message list
log.Debug(triggerID, "batch messages come to distribution center", len(consumerMessages))
for i := 0; i < len(consumerMessages); i++ {
msgFromMQ := pbMsg.MsgDataToMQ{}
err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQ)
if err != nil {
log.Error(triggerID, "msg_transfer Unmarshal msg err", "msg", string(consumerMessages[i].Value), "err", err.Error())
return
}
log.Debug(triggerID, "single msg come to distribution center", msgFromMQ.String(), string(consumerMessages[i].Key))
if oldM, ok := UserAggregationMsgs[string(consumerMessages[i].Key)]; ok {
oldM = append(oldM, &msgFromMQ)
UserAggregationMsgs[string(consumerMessages[i].Key)] = oldM
} else {
m := make([]*pbMsg.MsgDataToMQ, 0, 100)
m = append(m, &msgFromMQ)
UserAggregationMsgs[string(consumerMessages[i].Key)] = m
}
}
log.Debug(triggerID, "generate map list users len", len(UserAggregationMsgs))
for userID, v := range UserAggregationMsgs {
if len(v) >= 0 {
hashCode := getHashCode(userID)
channelID := hashCode % ChannelNum
log.Debug(triggerID, "generate channelID", hashCode, channelID, userID)
//go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) {
och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: v, triggerID: triggerID}}
//}(channelID, userID, v)
}
}
}
}
}
}
func (mc *OnlineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) {
msg := cMsg.Value
now := time.Now()
@ -393,60 +393,60 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS
}
log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition())
//cMsg := make([]*sarama.ConsumerMessage, 0, 1000)
//t := time.NewTicker(time.Duration(100) * time.Millisecond)
cMsg := make([]*sarama.ConsumerMessage, 0, 1000)
t := time.NewTicker(time.Duration(100) * time.Millisecond)
var triggerID string
for msg := range claim.Messages() {
msgFromMQ := pbMsg.MsgDataToMQ{}
err := proto.Unmarshal(msg.Value, &msgFromMQ)
if err != nil {
log.Error(triggerID, "msg_transfer Unmarshal msg err", "msg", string(msg.Value), "err", err.Error())
}
userID := string(msg.Key)
hashCode := getHashCode(userID)
channelID := hashCode % ChannelNum
log.Debug(triggerID, "generate channelID", hashCode, channelID, userID)
//go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) {
och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: []*pbMsg.MsgDataToMQ{&msgFromMQ}, triggerID: msgFromMQ.OperationID}}
sess.MarkMessage(msg, "")
//cMsg = append(cMsg, msg)
//msgFromMQ := pbMsg.MsgDataToMQ{}
//err := proto.Unmarshal(msg.Value, &msgFromMQ)
//if err != nil {
// log.Error(triggerID, "msg_transfer Unmarshal msg err", "msg", string(msg.Value), "err", err.Error())
//}
//userID := string(msg.Key)
//hashCode := getHashCode(userID)
//channelID := hashCode % ChannelNum
//log.Debug(triggerID, "generate channelID", hashCode, channelID, userID)
////go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) {
//och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: []*pbMsg.MsgDataToMQ{&msgFromMQ}, triggerID: msgFromMQ.OperationID}}
//sess.MarkMessage(msg, "")
cMsg = append(cMsg, msg)
//och.TriggerCmd(OnlineTopicBusy)
//select {
////case :
//// triggerID = utils.OperationIDGenerator()
////
//// log.NewDebug(triggerID, "claim.Messages ", msg)
//// cMsg = append(cMsg, msg)
//// if len(cMsg) >= 1000 {
//// ccMsg := make([]*sarama.ConsumerMessage, 0, 1000)
//// for _, v := range cMsg {
//// ccMsg = append(ccMsg, v)
//// }
//// log.Debug(triggerID, "length trigger msg consumer start", len(ccMsg))
//// och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
//// triggerID: triggerID, cmsgList: ccMsg}}
//// sess.MarkMessage(msg, "")
//// cMsg = make([]*sarama.ConsumerMessage, 0, 1000)
//// log.Debug(triggerID, "length trigger msg consumer end", len(cMsg))
//// }
select {
//case :
// triggerID = utils.OperationIDGenerator()
//
//case <-t.C:
// if len(cMsg) > 0 {
// log.NewDebug(triggerID, "claim.Messages ", msg)
// cMsg = append(cMsg, msg)
// if len(cMsg) >= 1000 {
// ccMsg := make([]*sarama.ConsumerMessage, 0, 1000)
// for _, v := range cMsg {
// ccMsg = append(ccMsg, v)
// }
// triggerID = utils.OperationIDGenerator()
// log.Debug(triggerID, "timer trigger msg consumer start", len(ccMsg))
// log.Debug(triggerID, "length trigger msg consumer start", len(ccMsg))
// och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
// triggerID: triggerID, cmsgList: ccMsg}}
// sess.MarkMessage(cMsg[len(cMsg)-1], "")
// sess.MarkMessage(msg, "")
// cMsg = make([]*sarama.ConsumerMessage, 0, 1000)
// log.Debug(triggerID, "timer trigger msg consumer end", len(cMsg))
// log.Debug(triggerID, "length trigger msg consumer end", len(cMsg))
// }
//default:
//
//}
case <-t.C:
if len(cMsg) > 0 {
ccMsg := make([]*sarama.ConsumerMessage, 0, 1000)
for _, v := range cMsg {
ccMsg = append(ccMsg, v)
}
triggerID = utils.OperationIDGenerator()
log.Debug(triggerID, "timer trigger msg consumer start", len(ccMsg))
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
triggerID: triggerID, cmsgList: ccMsg}}
sess.MarkMessage(ccMsg[len(cMsg)-1], "")
cMsg = make([]*sarama.ConsumerMessage, 0, 1000)
log.Debug(triggerID, "timer trigger msg consumer end", len(cMsg))
}
default:
}
//log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset())
}

Loading…
Cancel
Save