From ee0a71de1b306abe3b3e2490af63a4b450903b85 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Thu, 19 May 2022 12:25:46 +0800 Subject: [PATCH] concurrent consumption of messages --- config/config.yaml | 4 +- internal/msg_transfer/logic/db.go | 20 ++- internal/msg_transfer/logic/init.go | 1 + .../logic/online_history_msg_handler.go | 141 +++++++++++++++++- pkg/common/config/config.go | 3 +- 5 files changed, 151 insertions(+), 18 deletions(-) diff --git a/config/config.yaml b/config/config.yaml index 736b15bce..0f1c5c4ec 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -222,7 +222,9 @@ secret: tuoyun multiloginpolicy: 1 #chat log insert to db -chatPersistenceMysql: true +chatpersistencemysql: true +#可靠性存储 +reliablestorage: true #token config tokenpolicy: diff --git a/internal/msg_transfer/logic/db.go b/internal/msg_transfer/logic/db.go index 806b782c0..98a684cc7 100644 --- a/internal/msg_transfer/logic/db.go +++ b/internal/msg_transfer/logic/db.go @@ -2,22 +2,20 @@ package logic import ( "Open_IM/pkg/common/db" - "Open_IM/pkg/common/log" pbMsg "Open_IM/pkg/proto/chat" - "Open_IM/pkg/utils" ) func saveUserChat(uid string, msg *pbMsg.MsgDataToMQ) error { - time := utils.GetCurrentTimestampByMill() - seq, err := db.DB.IncrUserSeq(uid) - if err != nil { - log.NewError(msg.OperationID, "data insert to redis err", err.Error(), msg.String()) - return err - } - msg.MsgData.Seq = uint32(seq) + //time := utils.GetCurrentTimestampByMill() + //seq, err := db.DB.IncrUserSeq(uid) + //if err != nil { + // log.NewError(msg.OperationID, "data insert to redis err", err.Error(), msg.String()) + // return err + //} + //msg.MsgData.Seq = uint32(seq) pbSaveData := pbMsg.MsgDataToDB{} pbSaveData.MsgData = msg.MsgData - log.NewInfo(msg.OperationID, "IncrUserSeq cost time", utils.GetCurrentTimestampByMill()-time) + //log.NewInfo(msg.OperationID, "IncrUserSeq cost time", utils.GetCurrentTimestampByMill()-time) return db.DB.SaveUserChatMongo2(uid, pbSaveData.MsgData.SendTime, &pbSaveData) -// return db.DB.SaveUserChatMongo2(uid, pbSaveData.MsgData.SendTime, &pbSaveData) + // return db.DB.SaveUserChatMongo2(uid, pbSaveData.MsgData.SendTime, &pbSaveData) } diff --git a/internal/msg_transfer/logic/init.go b/internal/msg_transfer/logic/init.go index 8f5cc0695..bef20e8c7 100644 --- a/internal/msg_transfer/logic/init.go +++ b/internal/msg_transfer/logic/init.go @@ -12,6 +12,7 @@ import ( const OnlineTopicBusy = 1 const OnlineTopicVacancy = 0 +const Msg = 2 var ( persistentCH PersistentConsumerHandler diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index dd1f22634..fb94ffe5f 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -3,6 +3,7 @@ package logic import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/db" kfk "Open_IM/pkg/common/kafka" "Open_IM/pkg/common/log" "Open_IM/pkg/grpc-etcdv3/getcdv3" @@ -17,7 +18,11 @@ import ( "time" ) -type fcb func(msg []byte, msgKey string) +type MsgChannelValue struct { + userID string + msg *pbMsg.MsgDataToMQ +} +type fcb func(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) type Cmd2Value struct { Cmd int Value interface{} @@ -26,12 +31,22 @@ type OnlineHistoryConsumerHandler struct { msgHandle map[string]fcb historyConsumerGroup *kfk.MConsumerGroup cmdCh chan Cmd2Value + msgCh chan Cmd2Value } func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { och.msgHandle = make(map[string]fcb) och.cmdCh = cmdCh - och.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = och.handleChatWs2Mongo + och.msgCh = make(chan Cmd2Value, 1000) + if config.Config.ReliableStorage { + och.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = och.handleChatWs2Mongo + } else { + och.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = och.handleChatWs2MongoLowReliability + for i := 0; i < 10; i++ { + go och.Run() + + } + } och.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic}, config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo) @@ -61,7 +76,28 @@ func sendCmd(ch chan Cmd2Value, value Cmd2Value, timeout int64) error { return errors.New("send cmd timeout") } } -func (och *OnlineHistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) { +func (och *OnlineHistoryConsumerHandler) Run() { + for { + select { + case cmd := <-och.msgCh: + switch cmd.Cmd { + case Msg: + msgChannelValue := cmd.Value.(MsgChannelValue) + err := saveUserChat(msgChannelValue.userID, msgChannelValue.msg) + if err != nil { + singleMsgFailedCount++ + log.NewError(msgChannelValue.msg.OperationID, "single data insert to mongo err", err.Error(), msgChannelValue.msg.String()) + } else { + singleMsgSuccessCountMutex.Lock() + singleMsgSuccessCount++ + singleMsgSuccessCountMutex.Unlock() + } + } + } + } +} +func (och *OnlineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { + msg := cMsg.Value now := time.Now() msgFromMQ := pbMsg.MsgDataToMQ{} err := proto.Unmarshal(msg, &msgFromMQ) @@ -126,6 +162,102 @@ func (och *OnlineHistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey s log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String()) return } + sess.MarkMessage(cMsg, "") + log.NewDebug(msgFromMQ.OperationID, "msg_transfer handle topic data to database success...", msgFromMQ.String()) +} +func (och *OnlineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { + msg := cMsg.Value + now := time.Now() + msgFromMQ := pbMsg.MsgDataToMQ{} + err := proto.Unmarshal(msg, &msgFromMQ) + if err != nil { + log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error()) + return + } + operationID := msgFromMQ.OperationID + log.NewInfo(operationID, "msg come mongo!!!", "", "msg", string(msg)) + //Control whether to store offline messages (mongo) + isHistory := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsHistory) + //Control whether to store history messages (mysql) + isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent) + isSenderSync := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync) + switch msgFromMQ.MsgData.SessionType { + case constant.SingleChatType: + log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = SingleChatType", isHistory, isPersist) + if isHistory { + seq, err := db.DB.IncrUserSeq(msgKey) + if err != nil { + log.NewError(operationID, "data insert to redis err", err.Error(), string(msg)) + return + } + sess.MarkMessage(cMsg, "") + msgFromMQ.MsgData.Seq = uint32(seq) + log.Debug(operationID, "send ch msg is ", msgFromMQ.String()) + och.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, &msgFromMQ}} + //err := saveUserChat(msgKey, &msgFromMQ) + //if err != nil { + // singleMsgFailedCount++ + // log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) + // return + //} + //singleMsgSuccessCountMutex.Lock() + //singleMsgSuccessCount++ + //singleMsgSuccessCountMutex.Unlock() + //log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now)) + } + if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID { + } else { + go sendMessageToPush(&msgFromMQ, msgKey) + } + log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now)) + case constant.GroupChatType: + log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = GroupChatType", isHistory, isPersist) + if isHistory { + seq, err := db.DB.IncrUserSeq(msgKey) + if err != nil { + log.NewError(operationID, "data insert to redis err", err.Error(), string(msg)) + return + } + sess.MarkMessage(cMsg, "") + msgFromMQ.MsgData.Seq = uint32(seq) + log.Debug(operationID, "send ch msg is ", msgFromMQ.String()) + och.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, &msgFromMQ}} + //err := saveUserChat(msgFromMQ.MsgData.RecvID, &msgFromMQ) + //if err != nil { + // log.NewError(operationID, "group data insert to mongo err", msgFromMQ.String(), msgFromMQ.MsgData.RecvID, err.Error()) + // return + //} + //groupMsgCount++ + } + go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID) + case constant.NotificationChatType: + log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = NotificationChatType", isHistory, isPersist) + if isHistory { + seq, err := db.DB.IncrUserSeq(msgKey) + if err != nil { + log.NewError(operationID, "data insert to redis err", err.Error(), string(msg)) + return + } + sess.MarkMessage(cMsg, "") + msgFromMQ.MsgData.Seq = uint32(seq) + log.Debug(operationID, "send ch msg is ", msgFromMQ.String()) + och.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, &msgFromMQ}} + //err := saveUserChat(msgKey, &msgFromMQ) + //if err != nil { + // log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) + // return + //} + //log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now)) + } + if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID { + } else { + go sendMessageToPush(&msgFromMQ, msgKey) + } + log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now)) + default: + log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String()) + return + } log.NewDebug(msgFromMQ.OperationID, "msg_transfer handle topic data to database success...", msgFromMQ.String()) } @@ -138,8 +270,7 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS SetOnlineTopicStatus(OnlineTopicBusy) //och.TriggerCmd(OnlineTopicBusy) log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset()) - och.msgHandle[msg.Topic](msg.Value, string(msg.Key)) - sess.MarkMessage(msg, "") + och.msgHandle[msg.Topic](msg, string(msg.Key), sess) if claim.HighWaterMarkOffset()-msg.Offset <= 1 { log.Debug("", "online msg consume end", claim.HighWaterMarkOffset(), msg.Offset) SetOnlineTopicStatus(OnlineTopicVacancy) diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index e898e5aa6..b7454c37b 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -209,7 +209,8 @@ type config struct { } Secret string `yaml:"secret"` MultiLoginPolicy int `yaml:"multiloginpolicy"` - ChatPersistenceMysql bool `yaml:"chatPersistenceMysql"` + ChatPersistenceMysql bool `yaml:"chatpersistencemysql"` + ReliableStorage bool `yaml:"reliablestorage"` TokenPolicy struct { AccessSecret string `yaml:"accessSecret"`