From 8753f91a4ad2e041a90e2b267e6e03aa94d0e8f4 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 11 May 2022 18:38:58 +0800 Subject: [PATCH 1/6] send group message split topic --- internal/msg_transfer/logic/init.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/msg_transfer/logic/init.go b/internal/msg_transfer/logic/init.go index dff95334c..a9c8778b8 100644 --- a/internal/msg_transfer/logic/init.go +++ b/internal/msg_transfer/logic/init.go @@ -27,6 +27,7 @@ var ( func Init() { cmdCh = make(chan Cmd2Value, 10000) + w = new(sync.Mutex) persistentCH.Init() historyCH.Init(cmdCh) offlineHistoryCH.Init(cmdCh) From ab747d03383c336811d9c89c1dfca041bf18d0df Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 11 May 2022 18:49:21 +0800 Subject: [PATCH 2/6] test --- internal/msg_transfer/logic/offline_history_msg_handler.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/msg_transfer/logic/offline_history_msg_handler.go b/internal/msg_transfer/logic/offline_history_msg_handler.go index c4e78f96d..f8fb6f3af 100644 --- a/internal/msg_transfer/logic/offline_history_msg_handler.go +++ b/internal/msg_transfer/logic/offline_history_msg_handler.go @@ -19,6 +19,7 @@ type OfflineHistoryConsumerHandler struct { } func (mc *OfflineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { + log.Debug("ts", config.Config.Kafka.ConsumerGroupID.MsgToMongoOffline, config.Config.Kafka.Ws2mschatOffline.Topic) mc.msgHandle = make(map[string]fcb) mc.cmdCh = cmdCh mc.msgHandle[config.Config.Kafka.Ws2mschatOffline.Topic] = mc.handleChatWs2Mongo From 9a90c2e56847d4b0d92336589db028eca641c856 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 11 May 2022 18:54:22 +0800 Subject: [PATCH 3/6] test --- internal/msg_transfer/logic/init.go | 2 ++ internal/msg_transfer/logic/offline_history_msg_handler.go | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/msg_transfer/logic/init.go b/internal/msg_transfer/logic/init.go index a9c8778b8..2673665f7 100644 --- a/internal/msg_transfer/logic/init.go +++ b/internal/msg_transfer/logic/init.go @@ -4,6 +4,7 @@ import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/kafka" + "Open_IM/pkg/common/log" "Open_IM/pkg/statistics" "fmt" "sync" @@ -30,6 +31,7 @@ func Init() { w = new(sync.Mutex) persistentCH.Init() historyCH.Init(cmdCh) + log.Debug("come msg transfer ts", config.Config.Kafka.ConsumerGroupID.MsgToMongoOffline, config.Config.Kafka.Ws2mschatOffline.Topic) offlineHistoryCH.Init(cmdCh) statistics.NewStatistics(&singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) statistics.NewStatistics(&groupMsgCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second groupMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) diff --git a/internal/msg_transfer/logic/offline_history_msg_handler.go b/internal/msg_transfer/logic/offline_history_msg_handler.go index f8fb6f3af..c4e78f96d 100644 --- a/internal/msg_transfer/logic/offline_history_msg_handler.go +++ b/internal/msg_transfer/logic/offline_history_msg_handler.go @@ -19,7 +19,6 @@ type OfflineHistoryConsumerHandler struct { } func (mc *OfflineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { - log.Debug("ts", config.Config.Kafka.ConsumerGroupID.MsgToMongoOffline, config.Config.Kafka.Ws2mschatOffline.Topic) mc.msgHandle = make(map[string]fcb) mc.cmdCh = cmdCh mc.msgHandle[config.Config.Kafka.Ws2mschatOffline.Topic] = mc.handleChatWs2Mongo From 3ecc1816be5c01827f2c027e001c3804d5dfe1e8 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 11 May 2022 18:58:11 +0800 Subject: [PATCH 4/6] test --- config/config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/config.yaml b/config/config.yaml index c1134b9cf..e49f73658 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -49,7 +49,7 @@ kafka: topic: "ms2ps_chat" consumergroupid: msgToMongo: mongo - MsgToMongoOffline: mongo_offline + msgToMongoOffline: mongo_offline msgToMySql: mysql msgToPush: push From dc6b0b4dde0d99f6b3f924b5009ba97c685f04ac Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 11 May 2022 19:16:57 +0800 Subject: [PATCH 5/6] test --- .../logic/offline_history_msg_handler.go | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/internal/msg_transfer/logic/offline_history_msg_handler.go b/internal/msg_transfer/logic/offline_history_msg_handler.go index c4e78f96d..4d1618853 100644 --- a/internal/msg_transfer/logic/offline_history_msg_handler.go +++ b/internal/msg_transfer/logic/offline_history_msg_handler.go @@ -98,19 +98,24 @@ func (OfflineHistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error func (OfflineHistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (mc *OfflineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group - log.NewDebug("", "new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) - select { - case cmd := <-mc.cmdCh: - if cmd.Cmd == OnlineTopicVacancy { - for msg := range claim.Messages() { - if GetOnlineTopicStatus() == OnlineTopicVacancy { - log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) - mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) - sess.MarkMessage(msg, "") - } - } - } + log.NewDebug("", "new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) + for msg := range claim.Messages() { + log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "offline") + mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) } + //select { + //case cmd := <-mc.cmdCh: + // if cmd.Cmd == OnlineTopicVacancy { + // for msg := range claim.Messages() { + // if GetOnlineTopicStatus() == OnlineTopicVacancy { + // log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) + // mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) + // sess.MarkMessage(msg, "") + // } + // } + // } + // + //} return nil } From cec5a6ceeb09430df4b509c5c0626ed000aa5e43 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 11 May 2022 19:21:28 +0800 Subject: [PATCH 6/6] test --- internal/msg_transfer/logic/offline_history_msg_handler.go | 2 +- internal/msg_transfer/logic/online_history_msg_handler.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/msg_transfer/logic/offline_history_msg_handler.go b/internal/msg_transfer/logic/offline_history_msg_handler.go index 4d1618853..3b0a3b759 100644 --- a/internal/msg_transfer/logic/offline_history_msg_handler.go +++ b/internal/msg_transfer/logic/offline_history_msg_handler.go @@ -101,7 +101,7 @@ func (mc *OfflineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS log.NewDebug("", "new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) for msg := range claim.Messages() { - log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "offline") + log.NewDebug("", "kafka get info to delay mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", "offline") mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) } //select { diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 597d39d05..0286a348b 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -136,7 +136,7 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS och.TriggerCmd(OnlineTopicBusy) SetOnlineTopicStatus(OnlineTopicBusy) for msg := range claim.Messages() { - log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) + log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online") och.msgHandle[msg.Topic](msg.Value, string(msg.Key)) sess.MarkMessage(msg, "") if claim.HighWaterMarkOffset()-msg.Offset <= 1 {