|
|
@ -226,12 +226,15 @@ func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() {
|
|
|
|
if oldM, ok := och.UserAggregationMsgs[string(consumerMessages[i].Key)]; ok {
|
|
|
|
if oldM, ok := och.UserAggregationMsgs[string(consumerMessages[i].Key)]; ok {
|
|
|
|
oldM = append(oldM, &msgFromMQ)
|
|
|
|
oldM = append(oldM, &msgFromMQ)
|
|
|
|
och.UserAggregationMsgs[string(consumerMessages[i].Key)] = oldM
|
|
|
|
och.UserAggregationMsgs[string(consumerMessages[i].Key)] = oldM
|
|
|
|
|
|
|
|
log.Debug(operationID, "consumerMessages key ", string(consumerMessages[i].Key), oldM)
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
m := make([]*pbMsg.MsgDataToMQ, 100)
|
|
|
|
m := make([]*pbMsg.MsgDataToMQ, 100)
|
|
|
|
m = append(m, &msgFromMQ)
|
|
|
|
m = append(m, &msgFromMQ)
|
|
|
|
och.UserAggregationMsgs[string(consumerMessages[i].Key)] = m
|
|
|
|
och.UserAggregationMsgs[string(consumerMessages[i].Key)] = m
|
|
|
|
|
|
|
|
log.Debug(operationID, "consumerMessages key ", string(consumerMessages[i].Key), m)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
log.Debug(operationID, "UserAggregationMsgs len ", len(och.UserAggregationMsgs))
|
|
|
|
for userID, v := range och.UserAggregationMsgs {
|
|
|
|
for userID, v := range och.UserAggregationMsgs {
|
|
|
|
if len(v) >= 0 {
|
|
|
|
if len(v) >= 0 {
|
|
|
|
channelID := getHashCode(userID) % ChannelNum
|
|
|
|
channelID := getHashCode(userID) % ChannelNum
|
|
|
|