From 7a2c1e48aad40280761f36213515184c76a17ef7 Mon Sep 17 00:00:00 2001 From: OpenIM-Robot Date: Thu, 13 Feb 2025 14:00:48 +0800 Subject: [PATCH] fix: the source message of the reference is withdrawn, and the referenced message is deleted (#3137) (#3140) * pb * fix: Modifying other fields while setting IsPrivateChat does not take effect * fix: quote message error revoke * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * upgrading pkg tools * fix * fix * optimize log output * feat: support GetLastMessage * feat: support GetLastMessage * feat: s3 switch * feat: s3 switch * fix: GetUsersOnline * feat: SendBusinessNotification supported configuration parameters * feat: SendBusinessNotification supported configuration parameters * feat: SendBusinessNotification supported configuration parameters * feat: seq conversion failed without exiting * fix: DeleteDoc crash * fix: fill send time * fix: fill send time * fix: crash caused by withdrawing messages from users who have left the group * fix: user msg timestamp * seq read config * seq read config * fix: the source message of the reference is withdrawn, and the referenced message is deleted Co-authored-by: chao <48119764+withchao@users.noreply.github.com> --- .../online_msg_to_mongo_handler.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index a895bb9c4..631df7df2 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -62,3 +62,22 @@ func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(ctx context.Cont // msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID) //} } + +func (*OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } + +func (*OnlineHistoryMongoConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } + +func (mc *OnlineHistoryMongoConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // an instance in the consumer group + log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset", + claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition()) + for msg := range claim.Messages() { + ctx := mc.historyConsumerGroup.GetContextFromMsg(msg) + if len(msg.Value) != 0 { + mc.handleChatWs2Mongo(ctx, msg, string(msg.Key), sess) + } else { + log.ZError(ctx, "mongo msg get from kafka but is nil", nil, "conversationID", msg.Key) + } + sess.MarkMessage(msg, "") + } + return nil +}