pull/232/head
skiffer-git 3 years ago
parent 67cfdedcc4
commit 6f79e63b3f

@ -378,18 +378,21 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS
cMsg := make([]*sarama.ConsumerMessage, 500) cMsg := make([]*sarama.ConsumerMessage, 500)
t := time.NewTicker(time.Duration(500) * time.Millisecond) t := time.NewTicker(time.Duration(500) * time.Millisecond)
for msg := range claim.Messages() { for msg := range claim.Messages() {
operationID := utils.OperationIDGenerator()
//och.TriggerCmd(OnlineTopicBusy) //och.TriggerCmd(OnlineTopicBusy)
cMsg = append(cMsg, msg) cMsg = append(cMsg, msg)
select { select {
case <-t.C: case <-t.C:
if len(cMsg) >= 0 { if len(cMsg) >= 0 {
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: cMsg} och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: cMsg}
log.Debug(operationID, "timer send to msgDistributionCh", och.msgDistributionCh, "len: ", len(cMsg))
sess.MarkMessage(msg, "") sess.MarkMessage(msg, "")
cMsg = cMsg[0:0] cMsg = cMsg[0:0]
} }
default: default:
if len(cMsg) >= 500 { if len(cMsg) >= 500 {
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: cMsg} och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: cMsg}
log.Debug(operationID, "500 send to msgDistributionCh", och.msgDistributionCh, "len: ", len(cMsg))
sess.MarkMessage(msg, "") sess.MarkMessage(msg, "")
cMsg = cMsg[0:0] cMsg = cMsg[0:0]
} }

Loading…
Cancel
Save