@ -19,7 +19,7 @@ import (
type fcb func ( msg [ ] byte , msgKey string )
type Cmd2Value struct {
Cmd string
Cmd int
Value interface { }
}
type OnlineHistoryConsumerHandler struct {
@ -37,7 +37,7 @@ func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) {
config . Config . Kafka . Ws2mschat . Addr , config . Config . Kafka . ConsumerGroupID . MsgToMongo )
}
func ( och * OnlineHistoryConsumerHandler ) TriggerCmd ( status string ) {
func ( och * OnlineHistoryConsumerHandler ) TriggerCmd ( status int ) {
operationID := utils . OperationIDGenerator ( )
for {
err := sendCmd ( och . cmdCh , Cmd2Value { Cmd : status , Value : "" } , 1 )
@ -135,15 +135,15 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS
claim sarama . ConsumerGroupClaim ) error { // a instance in the consumer group
log . NewDebug ( "" , "online new session msg come" , claim . HighWaterMarkOffset ( ) , claim . Topic ( ) , claim . Partition ( ) )
for msg := range claim . Messages ( ) {
och . TriggerCmd ( OnlineTopicBusy )
SetOnlineTopicStatus ( OnlineTopicBusy )
log . NewDebug ( "" , "kafka get info to mongo" , "msgTopic" , msg . Topic , "msgPartition" , msg . Partition , "online" )
//och.TriggerCmd(OnlineTopicBusy)
log . NewDebug ( "" , "kafka get info to mongo" , "msgTopic" , msg . Topic , "msgPartition" , msg . Partition , "online" , msg . Offset , claim . HighWaterMarkOffset ( ) )
och . msgHandle [ msg . Topic ] ( msg . Value , string ( msg . Key ) )
sess . MarkMessage ( msg , "" )
if claim . HighWaterMarkOffset ( ) - msg . Offset <= 1 {
log . Debug ( "" , "online msg consume end" , claim . HighWaterMarkOffset ( ) , msg . Offset )
och . TriggerCmd ( OnlineTopicVacancy )
SetOnlineTopicStatus ( OnlineTopicVacancy )
och . TriggerCmd ( OnlineTopicVacancy )
}
}
return nil