From b48197c8aac95563b79375f5352a62a1f1b73385 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Tue, 24 May 2022 22:14:30 +0800 Subject: [PATCH] redis add get message --- .../logic/online_history_msg_handler.go | 37 ++++++++++--------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index f409da44a..211e2a416 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -443,26 +443,27 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS cMsg := make([]*sarama.ConsumerMessage, 0, 1000) t := time.NewTicker(time.Duration(100) * time.Millisecond) var triggerID string - for { + for msg := range claim.Messages() { + cMsg = append(cMsg, msg) //och.TriggerCmd(OnlineTopicBusy) select { - case msg := <-claim.Messages(): - triggerID = utils.OperationIDGenerator() - - log.NewDebug(triggerID, "claim.Messages ", msg) - cMsg = append(cMsg, msg) - if len(cMsg) >= 1000 { - ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) - for _, v := range cMsg { - ccMsg = append(ccMsg, v) - } - log.Debug(triggerID, "length trigger msg consumer start", len(ccMsg)) - och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ - triggerID: triggerID, cmsgList: ccMsg}} - sess.MarkMessage(msg, "") - cMsg = make([]*sarama.ConsumerMessage, 0, 1000) - log.Debug(triggerID, "length trigger msg consumer end", len(cMsg)) - } + //case : + // triggerID = utils.OperationIDGenerator() + // + // log.NewDebug(triggerID, "claim.Messages ", msg) + // cMsg = append(cMsg, msg) + // if len(cMsg) >= 1000 { + // ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) + // for _, v := range cMsg { + // ccMsg = append(ccMsg, v) + // } + // log.Debug(triggerID, "length trigger msg consumer start", len(ccMsg)) + // och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ + // triggerID: triggerID, cmsgList: ccMsg}} + // sess.MarkMessage(msg, "") + // cMsg = make([]*sarama.ConsumerMessage, 0, 1000) + // log.Debug(triggerID, "length trigger msg consumer end", len(cMsg)) + // } case <-t.C: if len(cMsg) > 0 {