From 2236074fd3bfad535bcf8624d24b846baac650a3 Mon Sep 17 00:00:00 2001 From: icey-yu <1186114839@qq.com> Date: Tue, 3 Sep 2024 16:01:35 +0800 Subject: [PATCH] feat: push to kafka log --- internal/msgtransfer/online_history_msg_handler.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 8c1978d48..6de07cfbc 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -237,6 +237,10 @@ func (och *OnlineHistoryRedisConsumerHandler) categorizeMessageLists(totalMsgs [ } func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key, conversationID string, storageList, notStorageList []*ContextMsg) { + for _, storageMsg := range storageList { + log.ZDebug(ctx, "handle storage msg", "msg", storageMsg.message.String()) + } + och.toPushTopic(ctx, key, conversationID, notStorageList) var storageMessageList []*sdkws.MsgData for _, msg := range storageList { @@ -311,8 +315,9 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con } } -func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(_ context.Context, key, conversationID string, msgs []*ContextMsg) { +func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, key, conversationID string, msgs []*ContextMsg) { for _, v := range msgs { + log.ZDebug(ctx, "push msg to topic", "msg", v.message.String()) och.msgDatabase.MsgToPushMQ(v.ctx, key, conversationID, v.message) } }