diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 7e5053df1..007896bd4 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -389,8 +389,11 @@ func (OnlineHistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group + if sess == nil { + time.Sleep(100 * time.Millisecond) + } log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) - cMsg := make([]*sarama.ConsumerMessage, 0, 500) + cMsg := make([]*sarama.ConsumerMessage, 0, 1000) t := time.NewTicker(time.Duration(100) * time.Millisecond) var triggerID string for { @@ -398,8 +401,8 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS select { case msg := <-claim.Messages(): cMsg = append(cMsg, msg) - if len(cMsg) >= 500 { - ccMsg := make([]*sarama.ConsumerMessage, 0, 500) + if len(cMsg) >= 1000 { + ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) for _, v := range cMsg { ccMsg = append(ccMsg, v) @@ -409,12 +412,12 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ triggerID: triggerID, cmsgList: ccMsg}} sess.MarkMessage(msg, "") - cMsg = make([]*sarama.ConsumerMessage, 0, 500) + cMsg = make([]*sarama.ConsumerMessage, 0, 1000) log.Debug(triggerID, "length trigger msg consumer end", len(cMsg)) } case <-t.C: if len(cMsg) > 0 { - ccMsg := make([]*sarama.ConsumerMessage, 0, 500) + ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) for _, v := range cMsg { ccMsg = append(ccMsg, v) } @@ -423,7 +426,7 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ triggerID: triggerID, cmsgList: ccMsg}} sess.MarkMessage(cMsg[len(cMsg)-1], "") - cMsg = make([]*sarama.ConsumerMessage, 0, 500) + cMsg = make([]*sarama.ConsumerMessage, 0, 1000) log.Debug(triggerID, "timer trigger msg consumer end", len(cMsg)) }