diff --git a/internal/msg_transfer/logic/offline_history_msg_handler.go b/internal/msg_transfer/logic/offline_history_msg_handler.go index 78c867851..504ed5d9c 100644 --- a/internal/msg_transfer/logic/offline_history_msg_handler.go +++ b/internal/msg_transfer/logic/offline_history_msg_handler.go @@ -38,7 +38,7 @@ func (mc *OfflineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2MongoLowReliability } - mc.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, + mc.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschatOffline.Topic}, config.Config.Kafka.Ws2mschatOffline.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongoOffline) diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 8e4e3c2a2..80360eb3c 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -58,7 +58,7 @@ func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { och.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = och.handleChatWs2MongoLowReliability } - och.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, + och.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic}, config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo) diff --git a/internal/msg_transfer/logic/persistent_msg_handler.go b/internal/msg_transfer/logic/persistent_msg_handler.go index 7418e629e..245634d84 100644 --- a/internal/msg_transfer/logic/persistent_msg_handler.go +++ b/internal/msg_transfer/logic/persistent_msg_handler.go @@ -26,7 +26,7 @@ type PersistentConsumerHandler struct { func (pc *PersistentConsumerHandler) Init() { pc.msgHandle = make(map[string]fcb) pc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = pc.handleChatWs2Mysql - pc.persistentConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, + pc.persistentConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic}, config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMySql) diff --git a/internal/push/logic/push_handler.go b/internal/push/logic/push_handler.go index 5412b8b3e..e18f0c136 100644 --- a/internal/push/logic/push_handler.go +++ b/internal/push/logic/push_handler.go @@ -26,7 +26,7 @@ type PushConsumerHandler struct { func (ms *PushConsumerHandler) Init() { ms.msgHandle = make(map[string]fcb) ms.msgHandle[config.Config.Kafka.Ms2pschat.Topic] = ms.handleMs2PsChat - ms.pushConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, + ms.pushConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ms2pschat.Topic}, config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToPush) } diff --git a/pkg/common/kafka/consumer_group.go b/pkg/common/kafka/consumer_group.go index 3af714373..6da75972d 100644 --- a/pkg/common/kafka/consumer_group.go +++ b/pkg/common/kafka/consumer_group.go @@ -8,6 +8,7 @@ package kafka import ( "context" + "fmt" "github.com/Shopify/sarama" ) @@ -23,16 +24,13 @@ type MConsumerGroupConfig struct { IsReturnErr bool } -func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addr []string, groupID string) *MConsumerGroup { +func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []string, groupID string) *MConsumerGroup { config := sarama.NewConfig() config.Version = consumerConfig.KafkaVersion config.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial config.Consumer.Return.Errors = consumerConfig.IsReturnErr - client, err := sarama.NewClient(addr, config) - if err != nil { - panic(err.Error()) - } - consumerGroup, err := sarama.NewConsumerGroupFromClient(groupID, client) + fmt.Println("init address is ", addrs, "topics is ", topics) + consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, config) if err != nil { panic(err.Error()) }