|
|
@ -35,13 +35,16 @@ type OnlineHistoryMongoConsumerHandler struct {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func NewOnlineHistoryMongoConsumerHandler(config *config.GlobalConfig, database controller.CommonMsgDatabase) (*OnlineHistoryMongoConsumerHandler, error) {
|
|
|
|
func NewOnlineHistoryMongoConsumerHandler(config *config.GlobalConfig, database controller.CommonMsgDatabase) (*OnlineHistoryMongoConsumerHandler, error) {
|
|
|
|
tlsConfig := &kfk.TLSConfig{
|
|
|
|
var tlsConfig *kfk.TLSConfig
|
|
|
|
|
|
|
|
if config.Kafka.TLS != nil {
|
|
|
|
|
|
|
|
tlsConfig = &kfk.TLSConfig{
|
|
|
|
CACrt: config.Kafka.TLS.CACrt,
|
|
|
|
CACrt: config.Kafka.TLS.CACrt,
|
|
|
|
ClientCrt: config.Kafka.TLS.ClientCrt,
|
|
|
|
ClientCrt: config.Kafka.TLS.ClientCrt,
|
|
|
|
ClientKey: config.Kafka.TLS.ClientKey,
|
|
|
|
ClientKey: config.Kafka.TLS.ClientKey,
|
|
|
|
ClientKeyPwd: config.Kafka.TLS.ClientKeyPwd,
|
|
|
|
ClientKeyPwd: config.Kafka.TLS.ClientKeyPwd,
|
|
|
|
InsecureSkipVerify: false,
|
|
|
|
InsecureSkipVerify: false,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
historyConsumerGroup, err := kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{
|
|
|
|
historyConsumerGroup, err := kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{
|
|
|
|
KafkaVersion: sarama.V2_0_0_0,
|
|
|
|
KafkaVersion: sarama.V2_0_0_0,
|
|
|
|
OffsetsInitial: sarama.OffsetNewest,
|
|
|
|
OffsetsInitial: sarama.OffsetNewest,
|
|
|
|