diff --git a/config/config.yaml b/config/config.yaml index 4d9569d07..3143f3a0b 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -55,8 +55,8 @@ kafka: addr: [ 127.0.0.1:9092 ] #kafka配置,默认即可 topic: "ms2ps_chat" consumergroupid: + msgToRedis: redis msgToMongo: mongo - msgToMongoOffline: mongo_offline msgToMySql: mysql msgToPush: push diff --git a/internal/msg_transfer/logic/init.go b/internal/msg_transfer/logic/init.go index 65457878c..177745c6f 100644 --- a/internal/msg_transfer/logic/init.go +++ b/internal/msg_transfer/logic/init.go @@ -4,7 +4,6 @@ import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/kafka" - "Open_IM/pkg/common/log" "Open_IM/pkg/statistics" "fmt" "sync" @@ -20,7 +19,8 @@ const ChannelNum = 100 var ( persistentCH PersistentConsumerHandler - historyCH OnlineHistoryConsumerHandler + historyCH OnlineHistoryRedisConsumerHandler + historyMongoCH OnlineHistoryMongoConsumerHandler producer *kafka.Producer producerToMongo *kafka.Producer cmdCh chan Cmd2Value @@ -39,7 +39,6 @@ func Init() { persistentCH.Init() historyCH.Init(cmdCh) onlineTopicStatus = OnlineTopicVacancy - log.Debug("come msg transfer ts", config.Config.Kafka.ConsumerGroupID.MsgToMongoOffline, config.Config.Kafka.Ws2mschatOffline.Topic) //offlineHistoryCH.Init(cmdCh) statistics.NewStatistics(&singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) statistics.NewStatistics(&groupMsgCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second groupMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) @@ -54,6 +53,7 @@ func Run() { fmt.Println("not start mysql consumer") } go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH) + go historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyMongoCH) //go offlineHistoryCH.historyConsumerGroup.RegisterHandleAndConsumer(&offlineHistoryCH) } func SetOnlineTopicStatus(status int) { diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index a1544549d..908d28b27 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -9,10 +9,8 @@ import ( "Open_IM/pkg/grpc-etcdv3/getcdv3" pbMsg "Open_IM/pkg/proto/chat" pbPush "Open_IM/pkg/proto/push" - server_api_params "Open_IM/pkg/proto/sdk_ws" "Open_IM/pkg/utils" "context" - "errors" "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" "hash/crc32" @@ -36,28 +34,21 @@ type Cmd2Value struct { Cmd int Value interface{} } -type OnlineHistoryConsumerHandler struct { +type OnlineHistoryRedisConsumerHandler struct { msgHandle map[string]fcb historyConsumerGroup *kfk.MConsumerGroup - cmdCh chan Cmd2Value chArrays [ChannelNum]chan Cmd2Value - chMongoArrays [ChannelNum]chan Cmd2Value msgDistributionCh chan Cmd2Value } -func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { +func (och *OnlineHistoryRedisConsumerHandler) Init(cmdCh chan Cmd2Value) { och.msgHandle = make(map[string]fcb) och.msgDistributionCh = make(chan Cmd2Value) //no buffer channel go och.MessagesDistributionHandle() - och.cmdCh = cmdCh for i := 0; i < ChannelNum; i++ { och.chArrays[i] = make(chan Cmd2Value, 50) go och.Run(i) } - for i := 0; i < ChannelNum; i++ { - och.chMongoArrays[i] = make(chan Cmd2Value, 10000) - go och.MongoMessageRun(i) - } if config.Config.ReliableStorage { och.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = och.handleChatWs2Mongo } else { @@ -66,34 +57,10 @@ func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { } och.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic}, - config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo) - -} -func (och *OnlineHistoryConsumerHandler) TriggerCmd(status int) { - operationID := utils.OperationIDGenerator() - err := sendCmd(och.cmdCh, Cmd2Value{Cmd: status, Value: ""}, 1) - if err != nil { - log.Error(operationID, "TriggerCmd failed ", err.Error(), status) - return - } - log.Debug(operationID, "TriggerCmd success", status) + config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis) } -func sendCmd(ch chan Cmd2Value, value Cmd2Value, timeout int64) error { - var flag = 0 - select { - case ch <- value: - flag = 1 - case <-time.After(time.Second * time.Duration(timeout)): - flag = 2 - } - if flag == 1 { - return nil - } else { - return errors.New("send cmd timeout") - } -} -func (och *OnlineHistoryConsumerHandler) Run(channelID int) { +func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) { for { select { case cmd := <-och.chArrays[channelID]: @@ -154,7 +121,7 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) { } } } -func (och *OnlineHistoryConsumerHandler) SendMessageToMongoCH(aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq uint64) { +func (och *OnlineHistoryRedisConsumerHandler) SendMessageToMongoCH(aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq uint64) { pid, offset, err := producerToMongo.SendMessage(&pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, MessageList: messages, TriggerID: triggerID}, aggregationID, triggerID) if err != nil { log.Error(triggerID, "kafka send failed", "send data", len(messages), "pid", pid, "offset", offset, "err", err.Error(), "key", aggregationID) @@ -167,47 +134,48 @@ func (och *OnlineHistoryConsumerHandler) SendMessageToMongoCH(aggregationID stri ////go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { //och.chMongoArrays[channelID] <- Cmd2Value{Cmd: MongoMessages, Value: MsgChannelValue{aggregationID: aggregationID, msgList: messages, triggerID: triggerID, lastSeq: lastSeq}} } -func (och *OnlineHistoryConsumerHandler) MongoMessageRun(channelID int) { - for { - select { - case cmd := <-och.chMongoArrays[channelID]: - switch cmd.Cmd { - case MongoMessages: - msgChannelValue := cmd.Value.(MsgChannelValue) - msgList := msgChannelValue.msgList - triggerID := msgChannelValue.triggerID - aggregationID := msgChannelValue.aggregationID - lastSeq := msgChannelValue.lastSeq - err := db.DB.BatchInsertChat2DB(aggregationID, msgList, triggerID, lastSeq) - if err != nil { - log.NewError(triggerID, "single data insert to mongo err", err.Error(), msgList) - } - for _, v := range msgList { - if v.MsgData.ContentType == constant.DeleteMessageNotification { - tips := server_api_params.TipsComm{} - DeleteMessageTips := server_api_params.DeleteMessageTips{} - err := proto.Unmarshal(v.MsgData.Content, &tips) - if err != nil { - log.NewError(triggerID, "tips unmarshal err:", err.Error(), v.String()) - continue - } - err = proto.Unmarshal(tips.Detail, &DeleteMessageTips) - if err != nil { - log.NewError(triggerID, "deleteMessageTips unmarshal err:", err.Error(), v.String()) - continue - } - if unexistSeqList, err := db.DB.DelMsgBySeqList(DeleteMessageTips.UserID, DeleteMessageTips.SeqList, v.OperationID); err != nil { - log.NewError(v.OperationID, utils.GetSelfFuncName(), "DelMsgBySeqList args: ", DeleteMessageTips.UserID, DeleteMessageTips.SeqList, v.OperationID, err.Error(), unexistSeqList) - } - } - } - } - } - } -} +//func (och *OnlineHistoryRedisConsumerHandler) MongoMessageRun(channelID int) { +// for { +// select { +// case cmd := <-och.chMongoArrays[channelID]: +// switch cmd.Cmd { +// case MongoMessages: +// msgChannelValue := cmd.Value.(MsgChannelValue) +// msgList := msgChannelValue.msgList +// triggerID := msgChannelValue.triggerID +// aggregationID := msgChannelValue.aggregationID +// lastSeq := msgChannelValue.lastSeq +// err := db.DB.BatchInsertChat2DB(aggregationID, msgList, triggerID, lastSeq) +// if err != nil { +// log.NewError(triggerID, "single data insert to mongo err", err.Error(), msgList) +// } +// for _, v := range msgList { +// if v.MsgData.ContentType == constant.DeleteMessageNotification { +// tips := server_api_params.TipsComm{} +// DeleteMessageTips := server_api_params.DeleteMessageTips{} +// err := proto.Unmarshal(v.MsgData.Content, &tips) +// if err != nil { +// log.NewError(triggerID, "tips unmarshal err:", err.Error(), v.String()) +// continue +// } +// err = proto.Unmarshal(tips.Detail, &DeleteMessageTips) +// if err != nil { +// log.NewError(triggerID, "deleteMessageTips unmarshal err:", err.Error(), v.String()) +// continue +// } +// if unexistSeqList, err := db.DB.DelMsgBySeqList(DeleteMessageTips.UserID, DeleteMessageTips.SeqList, v.OperationID); err != nil { +// log.NewError(v.OperationID, utils.GetSelfFuncName(), "DelMsgBySeqList args: ", DeleteMessageTips.UserID, DeleteMessageTips.SeqList, v.OperationID, err.Error(), unexistSeqList) +// } +// +// } +// } +// } +// } +// } +//} -func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() { +func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() { for { aggregationMsgs := make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum) select { @@ -253,7 +221,7 @@ func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() { } } -func (mc *OnlineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { +func (mc *OnlineHistoryRedisConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { msg := cMsg.Value now := time.Now() msgFromMQ := pbMsg.MsgDataToMQ{} @@ -325,7 +293,7 @@ func (mc *OnlineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.Consumer log.NewDebug(msgFromMQ.OperationID, "msg_transfer handle topic data to database success...", msgFromMQ.String()) } -func (och *OnlineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { +func (och *OnlineHistoryRedisConsumerHandler) handleChatWs2MongoLowReliability(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { msg := cMsg.Value msgFromMQ := pbMsg.MsgDataToMQ{} err := proto.Unmarshal(msg, &msgFromMQ) @@ -365,10 +333,10 @@ func (och *OnlineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg * } } -func (OnlineHistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } -func (OnlineHistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } +func (OnlineHistoryRedisConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } +func (OnlineHistoryRedisConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } -//func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, +//func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, // claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group // log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) // for msg := range claim.Messages() { @@ -385,7 +353,7 @@ func (OnlineHistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error // return nil //} -func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, +func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group for { @@ -480,7 +448,7 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS return nil } -//func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, +//func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, // claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group // // for { diff --git a/internal/msg_transfer/logic/online_msg_to_mongo_handler.go b/internal/msg_transfer/logic/online_msg_to_mongo_handler.go new file mode 100644 index 000000000..1d71b8a1c --- /dev/null +++ b/internal/msg_transfer/logic/online_msg_to_mongo_handler.go @@ -0,0 +1,79 @@ +package logic + +import ( + "Open_IM/pkg/common/config" + "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/db" + kfk "Open_IM/pkg/common/kafka" + "Open_IM/pkg/common/log" + pbMsg "Open_IM/pkg/proto/chat" + server_api_params "Open_IM/pkg/proto/sdk_ws" + "Open_IM/pkg/utils" + "github.com/Shopify/sarama" + "github.com/golang/protobuf/proto" +) + +type OnlineHistoryMongoConsumerHandler struct { + msgHandle map[string]fcb + historyConsumerGroup *kfk.MConsumerGroup +} + +func (och *OnlineHistoryMongoConsumerHandler) Init(cmdCh chan Cmd2Value) { + och.msgHandle = make(map[string]fcb) + och.msgHandle[config.Config.Kafka.MsgToMongo.Topic] = och.handleChatWs2Mongo + och.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0, + OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.MsgToMongo.Topic}, + config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo) + +} +func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) { + msg := cMsg.Value + msgFromMQ := pbMsg.MsgDataToMongoByMQ{} + err := proto.Unmarshal(msg, &msgFromMQ) + if err != nil { + log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error()) + return + } + err = db.DB.BatchInsertChat2DB(msgFromMQ.AggregationID, msgFromMQ.MessageList, msgFromMQ.TriggerID, msgFromMQ.LastSeq) + if err != nil { + log.NewError(msgFromMQ.TriggerID, "single data insert to mongo err", err.Error(), msgFromMQ.MessageList) + } + for _, v := range msgFromMQ.MessageList { + if v.MsgData.ContentType == constant.DeleteMessageNotification { + tips := server_api_params.TipsComm{} + DeleteMessageTips := server_api_params.DeleteMessageTips{} + err := proto.Unmarshal(v.MsgData.Content, &tips) + if err != nil { + log.NewError(msgFromMQ.TriggerID, "tips unmarshal err:", err.Error(), v.String()) + continue + } + err = proto.Unmarshal(tips.Detail, &DeleteMessageTips) + if err != nil { + log.NewError(msgFromMQ.TriggerID, "deleteMessageTips unmarshal err:", err.Error(), v.String()) + continue + } + if unexistSeqList, err := db.DB.DelMsgBySeqList(DeleteMessageTips.UserID, DeleteMessageTips.SeqList, v.OperationID); err != nil { + log.NewError(v.OperationID, utils.GetSelfFuncName(), "DelMsgBySeqList args: ", DeleteMessageTips.UserID, DeleteMessageTips.SeqList, v.OperationID, err.Error(), unexistSeqList) + } + + } + } +} + +func (OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } +func (OnlineHistoryMongoConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } + +func (och *OnlineHistoryMongoConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, + claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group + log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) + for msg := range claim.Messages() { + log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key)) + if len(msg.Value) != 0 { + och.msgHandle[msg.Topic](msg, string(msg.Key), sess) + } else { + log.Error("", "mongo msg get from kafka but is nil", msg.Key) + } + sess.MarkMessage(msg, "") + } + return nil +} diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 185236744..de0d1597e 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -213,10 +213,10 @@ type config struct { Topic string `yaml:"topic"` } ConsumerGroupID struct { - MsgToMongo string `yaml:"msgToMongo"` - MsgToMongoOffline string `yaml:"msgToMongoOffline"` - MsgToMySql string `yaml:"msgToMySql"` - MsgToPush string `yaml:"msgToPush"` + MsgToRedis string `yaml:"msgToRedis"` + MsgToMongo string `yaml:"msgToMongo"` + MsgToMySql string `yaml:"msgToMySql"` + MsgToPush string `yaml:"msgToPush"` } } Secret string `yaml:"secret"`