diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index a895bb9c4..631df7df2 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -62,3 +62,22 @@ func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(ctx context.Cont // msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID) //} } + +func (*OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } + +func (*OnlineHistoryMongoConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } + +func (mc *OnlineHistoryMongoConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // an instance in the consumer group + log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset", + claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition()) + for msg := range claim.Messages() { + ctx := mc.historyConsumerGroup.GetContextFromMsg(msg) + if len(msg.Value) != 0 { + mc.handleChatWs2Mongo(ctx, msg, string(msg.Key), sess) + } else { + log.ZError(ctx, "mongo msg get from kafka but is nil", nil, "conversationID", msg.Key) + } + sess.MarkMessage(msg, "") + } + return nil +}