diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 80360eb3c..390cf6ba9 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -415,20 +415,24 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS //och.TriggerCmd(OnlineTopicBusy) select { case msg := <-claim.Messages(): - log.NewDebug("", "claim.Messages ", msg) - cMsg = append(cMsg, msg) - if len(cMsg) >= 1000 { - ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) - for _, v := range cMsg { - ccMsg = append(ccMsg, v) + triggerID = utils.OperationIDGenerator() + if msg != nil { + log.NewDebug(triggerID, "claim.Messages ", msg) + cMsg = append(cMsg, msg) + if len(cMsg) >= 1000 { + ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) + for _, v := range cMsg { + ccMsg = append(ccMsg, v) + } + log.Debug(triggerID, "length trigger msg consumer start", len(ccMsg)) + och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ + triggerID: triggerID, cmsgList: ccMsg}} + sess.MarkMessage(msg, "") + cMsg = make([]*sarama.ConsumerMessage, 0, 1000) + log.Debug(triggerID, "length trigger msg consumer end", len(cMsg)) } - triggerID = utils.OperationIDGenerator() - log.Debug(triggerID, "length trigger msg consumer start", len(ccMsg)) - och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ - triggerID: triggerID, cmsgList: ccMsg}} - sess.MarkMessage(msg, "") - cMsg = make([]*sarama.ConsumerMessage, 0, 1000) - log.Debug(triggerID, "length trigger msg consumer end", len(cMsg)) + } else { + log.NewWarn(triggerID, "msg is nil") } case <-t.C: