diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index c20422551..7e4026088 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -389,7 +389,7 @@ func (OnlineHistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) - cMsg := make([]*sarama.ConsumerMessage, 0, 500) + cMsg := make([]*sarama.ConsumerMessage, 0, 1000) t := time.NewTicker(time.Duration(100) * time.Millisecond) var triggerID string for { @@ -397,8 +397,8 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS select { case msg := <-claim.Messages(): cMsg = append(cMsg, msg) - if len(cMsg) >= 500 { - ccMsg := make([]*sarama.ConsumerMessage, 0, 500) + if len(cMsg) >= 1000 { + ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) for _, v := range cMsg { ccMsg = append(ccMsg, v) @@ -408,12 +408,12 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ triggerID: triggerID, cmsgList: ccMsg}} sess.MarkMessage(msg, "") - cMsg = make([]*sarama.ConsumerMessage, 0, 500) + cMsg = make([]*sarama.ConsumerMessage, 0, 1000) log.Debug(triggerID, "length trigger msg consumer end", len(cMsg)) } case <-t.C: if len(cMsg) > 0 { - ccMsg := make([]*sarama.ConsumerMessage, 0, 500) + ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) for _, v := range cMsg { ccMsg = append(ccMsg, v) } @@ -422,7 +422,7 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ triggerID: triggerID, cmsgList: ccMsg}} sess.MarkMessage(cMsg[len(cMsg)-1], "") - cMsg = make([]*sarama.ConsumerMessage, 0, 500) + cMsg = make([]*sarama.ConsumerMessage, 0, 1000) log.Debug(triggerID, "timer trigger msg consumer end", len(cMsg)) }