@ -14,7 +14,6 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
kfk "github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
@ -38,7 +37,6 @@ func NewPersistentConsumerHandler(database controller.ChatLogDatabase) *Persiste
func ( pc * PersistentConsumerHandler ) handleChatWs2Mysql ( ctx context . Context , cMsg * sarama . ConsumerMessage , msgKey string , _ sarama . ConsumerGroupSession ) {
msg := cMsg . Value
operationID := mcontext . GetOperationID ( ctx )
var tag bool
msgFromMQ := pbMsg . MsgDataToMQ { }
err := proto . Unmarshal ( msg , & msgFromMQ )
@ -46,6 +44,7 @@ func (pc *PersistentConsumerHandler) handleChatWs2Mysql(ctx context.Context, cMs
log . ZError ( ctx , "msg_transfer Unmarshal msg err" , err )
return
}
return
log . ZDebug ( ctx , "handleChatWs2Mysql" , "msg" , msgFromMQ . MsgData )
//Control whether to store history messages (mysql)
isPersist := utils . GetSwitchFromOptions ( msgFromMQ . MsgData . Options , constant . IsPersistent )
@ -64,9 +63,9 @@ func (pc *PersistentConsumerHandler) handleChatWs2Mysql(ctx context.Context, cMs
tag = true
}
if tag {
log . NewInfo( operationID , "msg_transfer msg persistin g", string ( msg ) )
log . ZInfo( ctx , "msg_transfer msg persisting" , "ms g", string ( msg ) )
if err = pc . chatLogDatabase . CreateChatLog ( & msgFromMQ ) ; err != nil {
log . NewError( operationID , "Message insert failed" , "err" , err . Error ( ) , "msg" , msgFromMQ . String ( ) )
log . ZError( ctx , "Message insert failed" , err , "msg" , msgFromMQ . String ( ) )
return
}
}
@ -76,12 +75,12 @@ func (PersistentConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error {
func ( PersistentConsumerHandler ) Cleanup ( _ sarama . ConsumerGroupSession ) error { return nil }
func ( pc * PersistentConsumerHandler ) ConsumeClaim ( sess sarama . ConsumerGroupSession , claim sarama . ConsumerGroupClaim ) error {
for msg := range claim . Messages ( ) {
log . NewDebug ( "" , "kafka get info to mysql" , "msgTopic" , msg . Topic , "msgPartition" , msg . Partition , "msg" , string ( msg . Value ) , "key" , string ( msg . Key ) )
ctx := pc . persistentConsumerGroup . GetContextFromMsg ( msg )
log . ZDebug ( ctx , "kafka get info to mysql" , "msgTopic" , msg . Topic , "msgPartition" , msg . Partition , "msg" , string ( msg . Value ) , "key" , string ( msg . Key ) )
if len ( msg . Value ) != 0 {
ctx := pc . persistentConsumerGroup . GetContextFromMsg ( msg )
pc . handleChatWs2Mysql ( ctx , msg , string ( msg . Key ) , sess )
} else {
log . Error( "" , "msg get from kafka but is nil ", msg . Key )
log . ZError( ctx , "msg get from kafka but is nil ", nil , "key ", msg . Key )
}
sess . MarkMessage ( msg , "" )
}