Merge branch 'tuoyun' of github.com:OpenIMSDK/Open-IM-Server into tuoyun

pull/232/head
wangchuxiao 3 years ago
commit 778c1852b8

@ -16,7 +16,7 @@ const Msg = 2
const ConsumerMsgs = 3 const ConsumerMsgs = 3
const UserMessages = 4 const UserMessages = 4
const MongoMessages = 5 const MongoMessages = 5
const ChannelNum = 10 const ChannelNum = 100
var ( var (
persistentCH PersistentConsumerHandler persistentCH PersistentConsumerHandler

@ -42,13 +42,13 @@ type OnlineHistoryConsumerHandler struct {
msgCh chan Cmd2Value msgCh chan Cmd2Value
chArrays [ChannelNum]chan Cmd2Value chArrays [ChannelNum]chan Cmd2Value
chMongoArrays [ChannelNum]chan Cmd2Value chMongoArrays [ChannelNum]chan Cmd2Value
//msgDistributionCh chan Cmd2Value msgDistributionCh chan Cmd2Value
} }
func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) {
och.msgHandle = make(map[string]fcb) och.msgHandle = make(map[string]fcb)
//och.msgDistributionCh = make(chan Cmd2Value) //no buffer channel och.msgDistributionCh = make(chan Cmd2Value) //no buffer channel
//go och.MessagesDistributionHandle() go och.MessagesDistributionHandle()
och.cmdCh = cmdCh och.cmdCh = cmdCh
och.msgCh = make(chan Cmd2Value, 1000) och.msgCh = make(chan Cmd2Value, 1000)
for i := 0; i < ChannelNum; i++ { for i := 0; i < ChannelNum; i++ {
@ -202,52 +202,52 @@ func (och *OnlineHistoryConsumerHandler) MongoMessageRun(channelID int) {
} }
} }
//func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() { func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() {
// for { for {
// UserAggregationMsgs := make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum) UserAggregationMsgs := make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum)
// select { select {
// case cmd := <-och.msgDistributionCh: case cmd := <-och.msgDistributionCh:
// switch cmd.Cmd { switch cmd.Cmd {
// case ConsumerMsgs: case ConsumerMsgs:
// triggerChannelValue := cmd.Value.(TriggerChannelValue) triggerChannelValue := cmd.Value.(TriggerChannelValue)
// triggerID := triggerChannelValue.triggerID triggerID := triggerChannelValue.triggerID
// consumerMessages := triggerChannelValue.cmsgList consumerMessages := triggerChannelValue.cmsgList
// //Aggregation map[userid]message list //Aggregation map[userid]message list
// log.Debug(triggerID, "batch messages come to distribution center", len(consumerMessages)) log.Debug(triggerID, "batch messages come to distribution center", len(consumerMessages))
// for i := 0; i < len(consumerMessages); i++ { for i := 0; i < len(consumerMessages); i++ {
// msgFromMQ := pbMsg.MsgDataToMQ{} msgFromMQ := pbMsg.MsgDataToMQ{}
// err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQ) err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQ)
// if err != nil { if err != nil {
// log.Error(triggerID, "msg_transfer Unmarshal msg err", "msg", string(consumerMessages[i].Value), "err", err.Error()) log.Error(triggerID, "msg_transfer Unmarshal msg err", "msg", string(consumerMessages[i].Value), "err", err.Error())
// return return
// } }
// log.Debug(triggerID, "single msg come to distribution center", msgFromMQ.String(), string(consumerMessages[i].Key)) log.Debug(triggerID, "single msg come to distribution center", msgFromMQ.String(), string(consumerMessages[i].Key))
// if oldM, ok := UserAggregationMsgs[string(consumerMessages[i].Key)]; ok { if oldM, ok := UserAggregationMsgs[string(consumerMessages[i].Key)]; ok {
// oldM = append(oldM, &msgFromMQ) oldM = append(oldM, &msgFromMQ)
// UserAggregationMsgs[string(consumerMessages[i].Key)] = oldM UserAggregationMsgs[string(consumerMessages[i].Key)] = oldM
// } else { } else {
// m := make([]*pbMsg.MsgDataToMQ, 0, 100) m := make([]*pbMsg.MsgDataToMQ, 0, 100)
// m = append(m, &msgFromMQ) m = append(m, &msgFromMQ)
// UserAggregationMsgs[string(consumerMessages[i].Key)] = m UserAggregationMsgs[string(consumerMessages[i].Key)] = m
// } }
// } }
// log.Debug(triggerID, "generate map list users len", len(UserAggregationMsgs)) log.Debug(triggerID, "generate map list users len", len(UserAggregationMsgs))
// for userID, v := range UserAggregationMsgs { for userID, v := range UserAggregationMsgs {
// if len(v) >= 0 { if len(v) >= 0 {
// hashCode := getHashCode(userID) hashCode := getHashCode(userID)
// channelID := hashCode % ChannelNum channelID := hashCode % ChannelNum
// log.Debug(triggerID, "generate channelID", hashCode, channelID, userID) log.Debug(triggerID, "generate channelID", hashCode, channelID, userID)
// //go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { //go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) {
// och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: v, triggerID: triggerID}} och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: v, triggerID: triggerID}}
// //}(channelID, userID, v) //}(channelID, userID, v)
// } }
// } }
// } }
// } }
//
// } }
//
//} }
func (mc *OnlineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { func (mc *OnlineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) {
msg := cMsg.Value msg := cMsg.Value
now := time.Now() now := time.Now()
@ -393,60 +393,60 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS
} }
log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition())
//cMsg := make([]*sarama.ConsumerMessage, 0, 1000) cMsg := make([]*sarama.ConsumerMessage, 0, 1000)
//t := time.NewTicker(time.Duration(100) * time.Millisecond) t := time.NewTicker(time.Duration(100) * time.Millisecond)
var triggerID string var triggerID string
for msg := range claim.Messages() { for msg := range claim.Messages() {
msgFromMQ := pbMsg.MsgDataToMQ{} //msgFromMQ := pbMsg.MsgDataToMQ{}
err := proto.Unmarshal(msg.Value, &msgFromMQ) //err := proto.Unmarshal(msg.Value, &msgFromMQ)
if err != nil { //if err != nil {
log.Error(triggerID, "msg_transfer Unmarshal msg err", "msg", string(msg.Value), "err", err.Error()) // log.Error(triggerID, "msg_transfer Unmarshal msg err", "msg", string(msg.Value), "err", err.Error())
} //}
userID := string(msg.Key) //userID := string(msg.Key)
hashCode := getHashCode(userID) //hashCode := getHashCode(userID)
channelID := hashCode % ChannelNum //channelID := hashCode % ChannelNum
log.Debug(triggerID, "generate channelID", hashCode, channelID, userID) //log.Debug(triggerID, "generate channelID", hashCode, channelID, userID)
//go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { ////go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) {
och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: []*pbMsg.MsgDataToMQ{&msgFromMQ}, triggerID: msgFromMQ.OperationID}} //och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: []*pbMsg.MsgDataToMQ{&msgFromMQ}, triggerID: msgFromMQ.OperationID}}
sess.MarkMessage(msg, "") //sess.MarkMessage(msg, "")
//cMsg = append(cMsg, msg) cMsg = append(cMsg, msg)
//och.TriggerCmd(OnlineTopicBusy) //och.TriggerCmd(OnlineTopicBusy)
//select { select {
////case : //case :
//// triggerID = utils.OperationIDGenerator() // triggerID = utils.OperationIDGenerator()
////
//// log.NewDebug(triggerID, "claim.Messages ", msg)
//// cMsg = append(cMsg, msg)
//// if len(cMsg) >= 1000 {
//// ccMsg := make([]*sarama.ConsumerMessage, 0, 1000)
//// for _, v := range cMsg {
//// ccMsg = append(ccMsg, v)
//// }
//// log.Debug(triggerID, "length trigger msg consumer start", len(ccMsg))
//// och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
//// triggerID: triggerID, cmsgList: ccMsg}}
//// sess.MarkMessage(msg, "")
//// cMsg = make([]*sarama.ConsumerMessage, 0, 1000)
//// log.Debug(triggerID, "length trigger msg consumer end", len(cMsg))
//// }
// //
//case <-t.C: // log.NewDebug(triggerID, "claim.Messages ", msg)
// if len(cMsg) > 0 { // cMsg = append(cMsg, msg)
// if len(cMsg) >= 1000 {
// ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) // ccMsg := make([]*sarama.ConsumerMessage, 0, 1000)
// for _, v := range cMsg { // for _, v := range cMsg {
// ccMsg = append(ccMsg, v) // ccMsg = append(ccMsg, v)
// } // }
// triggerID = utils.OperationIDGenerator() // log.Debug(triggerID, "length trigger msg consumer start", len(ccMsg))
// log.Debug(triggerID, "timer trigger msg consumer start", len(ccMsg))
// och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ // och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
// triggerID: triggerID, cmsgList: ccMsg}} // triggerID: triggerID, cmsgList: ccMsg}}
// sess.MarkMessage(cMsg[len(cMsg)-1], "") // sess.MarkMessage(msg, "")
// cMsg = make([]*sarama.ConsumerMessage, 0, 1000) // cMsg = make([]*sarama.ConsumerMessage, 0, 1000)
// log.Debug(triggerID, "timer trigger msg consumer end", len(cMsg)) // log.Debug(triggerID, "length trigger msg consumer end", len(cMsg))
// } // }
//default:
// case <-t.C:
//} if len(cMsg) > 0 {
ccMsg := make([]*sarama.ConsumerMessage, 0, 1000)
for _, v := range cMsg {
ccMsg = append(ccMsg, v)
}
triggerID = utils.OperationIDGenerator()
log.Debug(triggerID, "timer trigger msg consumer start", len(ccMsg))
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
triggerID: triggerID, cmsgList: ccMsg}}
sess.MarkMessage(ccMsg[len(cMsg)-1], "")
cMsg = make([]*sarama.ConsumerMessage, 0, 1000)
log.Debug(triggerID, "timer trigger msg consumer end", len(cMsg))
}
default:
}
//log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset()) //log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset())
} }

Loading…
Cancel
Save