From b7309cdda88060ffcf804c5ba1f93d76944f2080 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Mon, 23 May 2022 19:56:34 +0800 Subject: [PATCH 1/4] log --- pkg/common/kafka/consumer_group.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/common/kafka/consumer_group.go b/pkg/common/kafka/consumer_group.go index 3af714373..01b84e85e 100644 --- a/pkg/common/kafka/consumer_group.go +++ b/pkg/common/kafka/consumer_group.go @@ -8,6 +8,7 @@ package kafka import ( "context" + "fmt" "github.com/Shopify/sarama" ) @@ -28,6 +29,7 @@ func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addr []stri config.Version = consumerConfig.KafkaVersion config.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial config.Consumer.Return.Errors = consumerConfig.IsReturnErr + fmt.Println("init address is ", addr, "topics is ", topics) client, err := sarama.NewClient(addr, config) if err != nil { panic(err.Error()) From 66c62d88758944741f0e5de68b8407cc3b003547 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Mon, 23 May 2022 20:12:54 +0800 Subject: [PATCH 2/4] consumer update --- pkg/common/kafka/consumer_group.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/pkg/common/kafka/consumer_group.go b/pkg/common/kafka/consumer_group.go index 01b84e85e..6da75972d 100644 --- a/pkg/common/kafka/consumer_group.go +++ b/pkg/common/kafka/consumer_group.go @@ -24,17 +24,13 @@ type MConsumerGroupConfig struct { IsReturnErr bool } -func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addr []string, groupID string) *MConsumerGroup { +func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []string, groupID string) *MConsumerGroup { config := sarama.NewConfig() config.Version = consumerConfig.KafkaVersion config.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial config.Consumer.Return.Errors = consumerConfig.IsReturnErr - fmt.Println("init address is ", addr, "topics is ", topics) - client, err := sarama.NewClient(addr, config) - if err != nil { - panic(err.Error()) - } - consumerGroup, err := sarama.NewConsumerGroupFromClient(groupID, client) + fmt.Println("init address is ", addrs, "topics is ", topics) + consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, config) if err != nil { panic(err.Error()) } From ed3a1296d853313b2292e88d8473a18289276224 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Mon, 23 May 2022 20:20:06 +0800 Subject: [PATCH 3/4] consumer update --- internal/msg_transfer/logic/online_history_msg_handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 8e4e3c2a2..80360eb3c 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -58,7 +58,7 @@ func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { och.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = och.handleChatWs2MongoLowReliability } - och.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, + och.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic}, config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo) From 7fc6b61f21885c2728cfd10ef552f41d98eba70d Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Mon, 23 May 2022 20:39:44 +0800 Subject: [PATCH 4/4] consumer update --- internal/msg_transfer/logic/offline_history_msg_handler.go | 2 +- internal/msg_transfer/logic/persistent_msg_handler.go | 2 +- internal/push/logic/push_handler.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/msg_transfer/logic/offline_history_msg_handler.go b/internal/msg_transfer/logic/offline_history_msg_handler.go index 78c867851..504ed5d9c 100644 --- a/internal/msg_transfer/logic/offline_history_msg_handler.go +++ b/internal/msg_transfer/logic/offline_history_msg_handler.go @@ -38,7 +38,7 @@ func (mc *OfflineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2MongoLowReliability } - mc.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, + mc.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschatOffline.Topic}, config.Config.Kafka.Ws2mschatOffline.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongoOffline) diff --git a/internal/msg_transfer/logic/persistent_msg_handler.go b/internal/msg_transfer/logic/persistent_msg_handler.go index 7418e629e..245634d84 100644 --- a/internal/msg_transfer/logic/persistent_msg_handler.go +++ b/internal/msg_transfer/logic/persistent_msg_handler.go @@ -26,7 +26,7 @@ type PersistentConsumerHandler struct { func (pc *PersistentConsumerHandler) Init() { pc.msgHandle = make(map[string]fcb) pc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = pc.handleChatWs2Mysql - pc.persistentConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, + pc.persistentConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic}, config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMySql) diff --git a/internal/push/logic/push_handler.go b/internal/push/logic/push_handler.go index 5412b8b3e..e18f0c136 100644 --- a/internal/push/logic/push_handler.go +++ b/internal/push/logic/push_handler.go @@ -26,7 +26,7 @@ type PushConsumerHandler struct { func (ms *PushConsumerHandler) Init() { ms.msgHandle = make(map[string]fcb) ms.msgHandle[config.Config.Kafka.Ms2pschat.Topic] = ms.handleMs2PsChat - ms.pushConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, + ms.pushConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ms2pschat.Topic}, config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToPush) }