|
|
|
@ -8,6 +8,7 @@ import (
|
|
|
|
|
"Open_IM/pkg/grpc-etcdv3/getcdv3"
|
|
|
|
|
pbMsg "Open_IM/pkg/proto/chat"
|
|
|
|
|
pbPush "Open_IM/pkg/proto/push"
|
|
|
|
|
"Open_IM/pkg/statistics"
|
|
|
|
|
"Open_IM/pkg/utils"
|
|
|
|
|
"context"
|
|
|
|
|
"github.com/Shopify/sarama"
|
|
|
|
@ -20,9 +21,14 @@ type fcb func(msg []byte, msgKey string)
|
|
|
|
|
type HistoryConsumerHandler struct {
|
|
|
|
|
msgHandle map[string]fcb
|
|
|
|
|
historyConsumerGroup *kfk.MConsumerGroup
|
|
|
|
|
singleMsgCount uint64
|
|
|
|
|
groupMsgCount uint64
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (mc *HistoryConsumerHandler) Init() {
|
|
|
|
|
statistics.NewStatistics(&mc.singleMsgCount, config.Config.ModuleName.MsgTransferName, "singleMsgCount insert to mongo ", 10)
|
|
|
|
|
statistics.NewStatistics(&mc.singleMsgCount, config.Config.ModuleName.MsgTransferName, "groupMsgCount insert to mongo ", 10)
|
|
|
|
|
|
|
|
|
|
mc.msgHandle = make(map[string]fcb)
|
|
|
|
|
mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2Mongo
|
|
|
|
|
mc.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0,
|
|
|
|
@ -55,6 +61,7 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string)
|
|
|
|
|
log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
mc.singleMsgCount++
|
|
|
|
|
log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", utils.GetCurrentTimestampByNano()-time)
|
|
|
|
|
}
|
|
|
|
|
if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID {
|
|
|
|
@ -70,6 +77,7 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string)
|
|
|
|
|
log.NewError(operationID, "group data insert to mongo err", msgFromMQ.String(), msgFromMQ.MsgData.RecvID, err.Error())
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
mc.groupMsgCount++
|
|
|
|
|
}
|
|
|
|
|
go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID)
|
|
|
|
|
default:
|
|
|
|
|