diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 7baccf176..2b6a1bf18 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -466,7 +466,9 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS //och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: []*pbMsg.MsgDataToMQ{&msgFromMQ}, triggerID: msgFromMQ.OperationID}} //sess.MarkMessage(msg, "") rwLock.Lock() - cMsg = append(cMsg, msg) + if len(msg.Value) != 0 { + cMsg = append(cMsg, msg) + } rwLock.Unlock() sess.MarkMessage(msg, "") //och.TriggerCmd(OnlineTopicBusy) diff --git a/internal/msg_transfer/logic/persistent_msg_handler.go b/internal/msg_transfer/logic/persistent_msg_handler.go index 83ca7bd12..f0104740a 100644 --- a/internal/msg_transfer/logic/persistent_msg_handler.go +++ b/internal/msg_transfer/logic/persistent_msg_handler.go @@ -72,7 +72,9 @@ func (pc *PersistentConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSessi claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key)) - pc.msgHandle[msg.Topic](msg, string(msg.Key), sess) + if len(msg.Value) != 0 { + pc.msgHandle[msg.Topic](msg, string(msg.Key), sess) + } sess.MarkMessage(msg, "") } return nil