@ -390,45 +390,44 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS
claim sarama . ConsumerGroupClaim ) error { // a instance in the consumer group
log . NewDebug ( "" , "online new session msg come" , claim . HighWaterMarkOffset ( ) , claim . Topic ( ) , claim . Partition ( ) )
cMsg := make ( [ ] * sarama . ConsumerMessage , 0 , 500 )
t := time . NewTicker ( time . Duration ( 10 ) * time . Millisecond )
t := time . NewTicker ( time . Duration ( 10 0 ) * time . Millisecond )
var triggerID string
for msg := range claim . Messages ( ) {
for {
//och.TriggerCmd(OnlineTopicBusy)
cMsg = append ( cMsg , msg )
select {
case <- t . C :
if len ( cMsg ) >= 0 {
case msg := <- claim . Messages ( ) :
cMsg = append ( cMsg , msg )
if len ( cMsg ) >= 500 {
ccMsg := make ( [ ] * sarama . ConsumerMessage , 0 , 500 )
for _ , v := range cMsg {
ccMsg = append ( ccMsg , v )
}
triggerID = utils . OperationIDGenerator ( )
log . Debug ( triggerID , " timer trigger msg consumer start", len ( ccMsg ) )
log . Debug ( triggerID , " length trigger msg consumer start", len ( ccMsg ) )
och . msgDistributionCh <- Cmd2Value { Cmd : ConsumerMsgs , Value : TriggerChannelValue {
triggerID : triggerID , cmsgList : ccMsg } }
sess . MarkMessage ( msg , "" )
cMsg = make ( [ ] * sarama . ConsumerMessage , 0 , 500 )
log . Debug ( triggerID , " timer trigger msg consumer end", len ( cMsg ) )
log . Debug ( triggerID , " length trigger msg consumer end", len ( cMsg ) )
}
default :
if len ( cMsg ) >= 50 0 {
case <- t . C :
if len ( cMsg ) > 0 {
ccMsg := make ( [ ] * sarama . ConsumerMessage , 0 , 500 )
for _ , v := range cMsg {
ccMsg = append ( ccMsg , v )
}
triggerID = utils . OperationIDGenerator ( )
log . Debug ( triggerID , " length trigger msg consumer start", len ( ccMsg ) )
log . Debug ( triggerID , " timer trigger msg consumer start", len ( ccMsg ) )
och . msgDistributionCh <- Cmd2Value { Cmd : ConsumerMsgs , Value : TriggerChannelValue {
triggerID : triggerID , cmsgList : ccMsg } }
sess . MarkMessage ( msg , "" )
sess . MarkMessage ( cMsg[ len ( cMsg ) - 1 ] , "" )
cMsg = make ( [ ] * sarama . ConsumerMessage , 0 , 500 )
log . Debug ( triggerID , " length trigger msg consumer end", len ( cMsg ) )
log . Debug ( triggerID , " timer trigger msg consumer end", len ( cMsg ) )
}
}
log . NewDebug ( "" , "online kafka get info to mongo" , "msgTopic" , msg . Topic , "msgPartition" , msg . Partition , "online" , msg . Offset , claim . HighWaterMarkOffset ( ) )
//log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset())
}
return nil