diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 9d0fad271..c20422551 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -392,43 +392,42 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS cMsg := make([]*sarama.ConsumerMessage, 0, 500) t := time.NewTicker(time.Duration(100) * time.Millisecond) var triggerID string - for msg := range claim.Messages() { + for { //och.TriggerCmd(OnlineTopicBusy) - cMsg = append(cMsg, msg) select { - case <-t.C: - if len(cMsg) >= 0 { + case msg := <-claim.Messages(): + cMsg = append(cMsg, msg) + if len(cMsg) >= 500 { ccMsg := make([]*sarama.ConsumerMessage, 0, 500) for _, v := range cMsg { ccMsg = append(ccMsg, v) } triggerID = utils.OperationIDGenerator() - log.Debug(triggerID, "timer trigger msg consumer start", len(ccMsg)) + log.Debug(triggerID, "length trigger msg consumer start", len(ccMsg)) och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ triggerID: triggerID, cmsgList: ccMsg}} sess.MarkMessage(msg, "") cMsg = make([]*sarama.ConsumerMessage, 0, 500) - log.Debug(triggerID, "timer trigger msg consumer end", len(cMsg)) + log.Debug(triggerID, "length trigger msg consumer end", len(cMsg)) } - default: - if len(cMsg) >= 500 { + case <-t.C: + if len(cMsg) > 0 { ccMsg := make([]*sarama.ConsumerMessage, 0, 500) for _, v := range cMsg { ccMsg = append(ccMsg, v) - } triggerID = utils.OperationIDGenerator() - log.Debug(triggerID, "length trigger msg consumer start", len(ccMsg)) + log.Debug(triggerID, "timer trigger msg consumer start", len(ccMsg)) och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ triggerID: triggerID, cmsgList: ccMsg}} - sess.MarkMessage(msg, "") + sess.MarkMessage(cMsg[len(cMsg)-1], "") cMsg = make([]*sarama.ConsumerMessage, 0, 500) - log.Debug(triggerID, "length trigger msg consumer end", len(cMsg)) + log.Debug(triggerID, "timer trigger msg consumer end", len(cMsg)) } } - log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset()) + //log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset()) } return nil