// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package msgtransfer
import (
"context"
"encoding/json"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
unRelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
kfk "github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"github.com/Shopify/sarama"
"google.golang.org/protobuf/proto"
)
type ModifyMsgConsumerHandler struct {
modifyMsgConsumerGroup * kfk . MConsumerGroup
extendMsgDatabase controller . ExtendMsgDatabase
extendSetMsgModel unRelationTb . ExtendMsgSetModel
}
func NewModifyMsgConsumerHandler ( database controller . ExtendMsgDatabase ) * ModifyMsgConsumerHandler {
return & ModifyMsgConsumerHandler {
modifyMsgConsumerGroup : kfk . NewMConsumerGroup ( & kfk . MConsumerGroupConfig { KafkaVersion : sarama . V2_0_0_0 ,
OffsetsInitial : sarama . OffsetNewest , IsReturnErr : false } , [ ] string { config . Config . Kafka . MsgToModify . Topic } ,
config . Config . Kafka . Addr , config . Config . Kafka . ConsumerGroupID . MsgToModify ) ,
extendMsgDatabase : database ,
}
}
func ( ModifyMsgConsumerHandler ) Setup ( _ sarama . ConsumerGroupSession ) error { return nil }
func ( ModifyMsgConsumerHandler ) Cleanup ( _ sarama . ConsumerGroupSession ) error { return nil }
func ( mmc * ModifyMsgConsumerHandler ) ConsumeClaim ( sess sarama . ConsumerGroupSession ,
claim sarama . ConsumerGroupClaim ) error {
for msg := range claim . Messages ( ) {
ctx := mmc . modifyMsgConsumerGroup . GetContextFromMsg ( msg )
log . ZDebug ( ctx , "kafka get info to mysql" , "ModifyMsgConsumerHandler" , msg . Topic , "msgPartition" , msg . Partition , "msg" , string ( msg . Value ) , "key" , string ( msg . Key ) )
if len ( msg . Value ) != 0 {
mmc . ModifyMsg ( ctx , msg , string ( msg . Key ) , sess )
} else {
log . ZError ( ctx , "msg get from kafka but is nil" , nil , "key" , msg . Key )
}
sess . MarkMessage ( msg , "" )
}
return nil
}
func ( mmc * ModifyMsgConsumerHandler ) ModifyMsg ( ctx context . Context , cMsg * sarama . ConsumerMessage , msgKey string , _ sarama . ConsumerGroupSession ) {
msgFromMQ := pbMsg . MsgDataToModifyByMQ { }
operationID := mcontext . GetOperationID ( ctx )
err := proto . Unmarshal ( cMsg . Value , & msgFromMQ )
if err != nil {
log . ZError ( ctx , "msg_transfer Unmarshal msg err" , err , "msg" , string ( cMsg . Value ) )
return
}
log . ZDebug ( ctx , "proto.Unmarshal MsgDataToMQ" , "msgs" , msgFromMQ . String ( ) )
for _ , msg := range msgFromMQ . Messages {
isReactionFromCache := utils . GetSwitchFromOptions ( msg . Options , constant . IsReactionFromCache )
if ! isReactionFromCache {
continue
}
ctx = mcontext . SetOperationID ( ctx , operationID )
if msg . ContentType == constant . ReactionMessageModifier {
notification := & sdkws . ReactionMessageModifierNotification { }
if err := json . Unmarshal ( msg . Content , notification ) ; err != nil {
continue
}
if notification . IsExternalExtensions {
continue
}
if ! notification . IsReact {
// first time to modify
var reactionExtensionList = make ( map [ string ] unRelationTb . KeyValueModel )
extendMsg := unRelationTb . ExtendMsgModel {
ReactionExtensionList : reactionExtensionList ,
ClientMsgID : notification . ClientMsgID ,
MsgFirstModifyTime : notification . MsgFirstModifyTime ,
}
for _ , v := range notification . SuccessReactionExtensions {
reactionExtensionList [ v . TypeKey ] = unRelationTb . KeyValueModel {
TypeKey : v . TypeKey ,
Value : v . Value ,
LatestUpdateTime : v . LatestUpdateTime ,
}
}
if err := mmc . extendMsgDatabase . InsertExtendMsg ( ctx , notification . ConversationID , notification . SessionType , & extendMsg ) ; err != nil {
// log.ZError(ctx, "MsgFirstModify InsertExtendMsg failed", notification.ConversationID, notification.SessionType, extendMsg, err.Error())
continue
}
} else {
if err := mmc . extendMsgDatabase . InsertOrUpdateReactionExtendMsgSet ( ctx , notification . ConversationID , notification . SessionType , notification . ClientMsgID , notification . MsgFirstModifyTime , mmc . extendSetMsgModel . Pb2Model ( notification . SuccessReactionExtensions ) ) ; err != nil {
// log.NewError(operationID, "InsertOrUpdateReactionExtendMsgSet failed")
}
}
} else if msg . ContentType == constant . ReactionMessageDeleter {
notification := & sdkws . ReactionMessageDeleteNotification { }
if err := json . Unmarshal ( msg . Content , notification ) ; err != nil {
continue
}
if err := mmc . extendMsgDatabase . DeleteReactionExtendMsgSet ( ctx , notification . ConversationID , notification . SessionType , notification . ClientMsgID , notification . MsgFirstModifyTime , mmc . extendSetMsgModel . Pb2Model ( notification . SuccessReactionExtensions ) ) ; err != nil {
// log.NewError(operationID, "InsertOrUpdateReactionExtendMsgSet failed")
}
}
}
}