@ -17,6 +17,7 @@ import (
"github.com/golang/protobuf/proto"
"hash/crc32"
"strings"
"sync"
"time"
)
@ -389,7 +390,7 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS
break
}
}
rwLock := new ( sync . RWMutex )
log . NewDebug ( "" , "online new session msg come" , claim . HighWaterMarkOffset ( ) , claim . Topic ( ) , claim . Partition ( ) )
cMsg := make ( [ ] * sarama . ConsumerMessage , 0 , 1000 )
t := time . NewTicker ( time . Duration ( 100 ) * time . Millisecond )
@ -407,8 +408,16 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS
////go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) {
//och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: []*pbMsg.MsgDataToMQ{&msgFromMQ}, triggerID: msgFromMQ.OperationID}}
//sess.MarkMessage(msg, "")
rwLock . Lock ( )
cMsg = append ( cMsg , msg )
rwLock . Unlock ( )
sess . MarkMessage ( msg , "" )
//och.TriggerCmd(OnlineTopicBusy)
//log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset())
}
go func ( ) {
select {
//case :
// triggerID = utils.OperationIDGenerator()
@ -430,24 +439,26 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS
case <- t . C :
if len ( cMsg ) > 0 {
rwLock . Lock ( )
ccMsg := make ( [ ] * sarama . ConsumerMessage , 0 , 1000 )
for _ , v := range cMsg {
ccMsg = append ( ccMsg , v )
}
cMsg = make ( [ ] * sarama . ConsumerMessage , 0 , 1000 )
rwLock . Unlock ( )
triggerID = utils . OperationIDGenerator ( )
log . Debug ( triggerID , "timer trigger msg consumer start" , len ( ccMsg ) )
och . msgDistributionCh <- Cmd2Value { Cmd : ConsumerMsgs , Value : TriggerChannelValue {
triggerID : triggerID , cmsgList : ccMsg } }
sess . MarkMessage ( ccMsg [ len ( cMsg ) - 1 ] , "" )
cMsg = make ( [ ] * sarama . ConsumerMessage , 0 , 1000 )
//sess.MarkMessage(ccMsg[len(cMsg)-1], "")
log . Debug ( triggerID , "timer trigger msg consumer end" , len ( cMsg ) )
}
default :
}
//log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset())
} ( )
}
return nil
}