From bcfa3efef7b3d729bfab6139ebf08ad704709b16 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 25 May 2022 21:15:17 +0800 Subject: [PATCH] channelNum --- .../logic/online_history_msg_handler.go | 84 ++++++++++--------- 1 file changed, 44 insertions(+), 40 deletions(-) diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 095319f10..48f190dde 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -418,51 +418,55 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS } go func() { - select { - //case : - // triggerID = utils.OperationIDGenerator() - // - // log.NewDebug(triggerID, "claim.Messages ", msg) - // cMsg = append(cMsg, msg) - // if len(cMsg) >= 1000 { - // ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) - // for _, v := range cMsg { - // ccMsg = append(ccMsg, v) - // } - // log.Debug(triggerID, "length trigger msg consumer start", len(ccMsg)) - // och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ - // triggerID: triggerID, cmsgList: ccMsg}} - // sess.MarkMessage(msg, "") - // cMsg = make([]*sarama.ConsumerMessage, 0, 1000) - // log.Debug(triggerID, "length trigger msg consumer end", len(cMsg)) - // } + for { + select { + //case : + // triggerID = utils.OperationIDGenerator() + // + // log.NewDebug(triggerID, "claim.Messages ", msg) + // cMsg = append(cMsg, msg) + // if len(cMsg) >= 1000 { + // ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) + // for _, v := range cMsg { + // ccMsg = append(ccMsg, v) + // } + // log.Debug(triggerID, "length trigger msg consumer start", len(ccMsg)) + // och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ + // triggerID: triggerID, cmsgList: ccMsg}} + // sess.MarkMessage(msg, "") + // cMsg = make([]*sarama.ConsumerMessage, 0, 1000) + // log.Debug(triggerID, "length trigger msg consumer end", len(cMsg)) + // } - case <-t.C: - if len(cMsg) > 0 { - rwLock.Lock() - ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) - for _, v := range cMsg { - ccMsg = append(ccMsg, v) - } - cMsg = make([]*sarama.ConsumerMessage, 0, 1000) - rwLock.Unlock() - split := 1000 - triggerID = utils.OperationIDGenerator() - for i := 0; i < len(ccMsg)/split; i++ { - log.Debug(triggerID, "timer trigger msg consumer start", len(ccMsg)) - och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ - triggerID: triggerID, cmsgList: ccMsg[i*split : (i+1)*split]}} - } - if (len(ccMsg) % split) > 0 { - och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ - triggerID: triggerID, cmsgList: ccMsg[split*(len(ccMsg)/split):]}} + case <-t.C: + if len(cMsg) > 0 { + rwLock.Lock() + ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) + for _, v := range cMsg { + ccMsg = append(ccMsg, v) + } + cMsg = make([]*sarama.ConsumerMessage, 0, 1000) + rwLock.Unlock() + split := 1000 + triggerID = utils.OperationIDGenerator() + for i := 0; i < len(ccMsg)/split; i++ { + log.NewWarn(triggerID, "timer trigger msg consumer start", len(ccMsg)) + //log.Debug() + och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ + triggerID: triggerID, cmsgList: ccMsg[i*split : (i+1)*split]}} + } + if (len(ccMsg) % split) > 0 { + och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ + triggerID: triggerID, cmsgList: ccMsg[split*(len(ccMsg)/split):]}} + } + //sess.MarkMessage(ccMsg[len(cMsg)-1], "") + + log.NewWarn(triggerID, "timer trigger msg consumer end", len(cMsg)) } - //sess.MarkMessage(ccMsg[len(cMsg)-1], "") - log.Debug(triggerID, "timer trigger msg consumer end", len(cMsg)) } - } + }() return nil