diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index a915fce0e..fb5b33ce9 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -17,6 +17,7 @@ import ( "github.com/golang/protobuf/proto" "hash/crc32" "strings" + "sync" "time" ) @@ -50,11 +51,11 @@ func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { go och.MessagesDistributionHandle() och.cmdCh = cmdCh for i := 0; i < ChannelNum; i++ { - och.chArrays[i] = make(chan Cmd2Value, 100) + och.chArrays[i] = make(chan Cmd2Value, 50) go och.Run(i) } for i := 0; i < ChannelNum; i++ { - och.chMongoArrays[i] = make(chan Cmd2Value, 1000) + och.chMongoArrays[i] = make(chan Cmd2Value, 10000) go och.MongoMessageRun(i) } if config.Config.ReliableStorage { @@ -153,7 +154,6 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) { } } func (och *OnlineHistoryConsumerHandler) SendMessageToMongoCH(userID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq uint64) { - return hashCode := getHashCode(userID) channelID := hashCode % ChannelNum log.Debug(triggerID, "generate channelID", hashCode, channelID, userID) @@ -389,11 +389,62 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS break } } - + rwLock := new(sync.RWMutex) log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) cMsg := make([]*sarama.ConsumerMessage, 0, 1000) t := time.NewTicker(time.Duration(100) * time.Millisecond) var triggerID string + go func() { + for { + select { + //case : + // triggerID = utils.OperationIDGenerator() + // + // log.NewDebug(triggerID, "claim.Messages ", msg) + // cMsg = append(cMsg, msg) + // if len(cMsg) >= 1000 { + // ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) + // for _, v := range cMsg { + // ccMsg = append(ccMsg, v) + // } + // log.Debug(triggerID, "length trigger msg consumer start", len(ccMsg)) + // och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ + // triggerID: triggerID, cmsgList: ccMsg}} + // sess.MarkMessage(msg, "") + // cMsg = make([]*sarama.ConsumerMessage, 0, 1000) + // log.Debug(triggerID, "length trigger msg consumer end", len(cMsg)) + // } + + case <-t.C: + if len(cMsg) > 0 { + rwLock.Lock() + ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) + for _, v := range cMsg { + ccMsg = append(ccMsg, v) + } + cMsg = make([]*sarama.ConsumerMessage, 0, 1000) + rwLock.Unlock() + split := 1000 + triggerID = utils.OperationIDGenerator() + log.NewWarn(triggerID, "timer trigger msg consumer start", len(ccMsg)) + for i := 0; i < len(ccMsg)/split; i++ { + //log.Debug() + och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ + triggerID: triggerID, cmsgList: ccMsg[i*split : (i+1)*split]}} + } + if (len(ccMsg) % split) > 0 { + och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ + triggerID: triggerID, cmsgList: ccMsg[split*(len(ccMsg)/split):]}} + } + //sess.MarkMessage(ccMsg[len(cMsg)-1], "") + + log.NewWarn(triggerID, "timer trigger msg consumer end", len(cMsg)) + } + + } + } + + }() for msg := range claim.Messages() { //msgFromMQ := pbMsg.MsgDataToMQ{} //err := proto.Unmarshal(msg.Value, &msgFromMQ) @@ -407,47 +458,16 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS ////go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { //och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: []*pbMsg.MsgDataToMQ{&msgFromMQ}, triggerID: msgFromMQ.OperationID}} //sess.MarkMessage(msg, "") + rwLock.Lock() cMsg = append(cMsg, msg) + rwLock.Unlock() + sess.MarkMessage(msg, "") //och.TriggerCmd(OnlineTopicBusy) - select { - //case : - // triggerID = utils.OperationIDGenerator() - // - // log.NewDebug(triggerID, "claim.Messages ", msg) - // cMsg = append(cMsg, msg) - // if len(cMsg) >= 1000 { - // ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) - // for _, v := range cMsg { - // ccMsg = append(ccMsg, v) - // } - // log.Debug(triggerID, "length trigger msg consumer start", len(ccMsg)) - // och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ - // triggerID: triggerID, cmsgList: ccMsg}} - // sess.MarkMessage(msg, "") - // cMsg = make([]*sarama.ConsumerMessage, 0, 1000) - // log.Debug(triggerID, "length trigger msg consumer end", len(cMsg)) - // } - - case <-t.C: - if len(cMsg) > 0 { - ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) - for _, v := range cMsg { - ccMsg = append(ccMsg, v) - } - triggerID = utils.OperationIDGenerator() - log.Debug(triggerID, "timer trigger msg consumer start", len(ccMsg)) - och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ - triggerID: triggerID, cmsgList: ccMsg}} - sess.MarkMessage(ccMsg[len(cMsg)-1], "") - cMsg = make([]*sarama.ConsumerMessage, 0, 1000) - log.Debug(triggerID, "timer trigger msg consumer end", len(cMsg)) - } - default: - } //log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset()) } + return nil }