流程更新后的代码整改

pull/270/head
x-shadow-man 3 years ago
parent 73118ce447
commit b4cc62db92

@ -3,18 +3,14 @@ package logic
import ( import (
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant" "Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
kfk "Open_IM/pkg/common/kafka" kfk "Open_IM/pkg/common/kafka"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
pbMsg "Open_IM/pkg/proto/msg" pbMsg "Open_IM/pkg/proto/msg"
pbPush "Open_IM/pkg/proto/push" pbPush "Open_IM/pkg/proto/push"
"Open_IM/pkg/utils" "Open_IM/pkg/utils"
"context"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"hash/crc32" "hash/crc32"
"strings"
"sync" "sync"
"time" "time"
) )
@ -49,12 +45,12 @@ func (och *OnlineHistoryRedisConsumerHandler) Init(cmdCh chan Cmd2Value) {
och.chArrays[i] = make(chan Cmd2Value, 50) och.chArrays[i] = make(chan Cmd2Value, 50)
go och.Run(i) go och.Run(i)
} }
if config.Config.ReliableStorage { //if config.Config.ReliableStorage {
och.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = och.handleChatWs2Mongo // och.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = och.handleChatWs2Mongo
} else { //} else {
och.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = och.handleChatWs2MongoLowReliability // och.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = och.handleChatWs2MongoLowReliability
//
} //}
och.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0, och.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic}, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis) config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis)
@ -227,117 +223,118 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
} }
} }
func (mc *OnlineHistoryRedisConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) {
msg := cMsg.Value
now := time.Now()
msgFromMQ := pbMsg.MsgDataToMQ{}
err := proto.Unmarshal(msg, &msgFromMQ)
if err != nil {
log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error())
return
}
operationID := msgFromMQ.OperationID
log.NewInfo(operationID, "msg come mongo!!!", "", "msg", string(msg))
//Control whether to store offline messages (mongo)
isHistory := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsHistory)
//Control whether to store history messages (mysql)
isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent)
isSenderSync := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync)
switch msgFromMQ.MsgData.SessionType {
case constant.SingleChatType:
log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = SingleChatType", isHistory, isPersist)
if isHistory {
err := saveUserChat(msgKey, &msgFromMQ)
if err != nil {
singleMsgFailedCount++
log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
return
}
singleMsgSuccessCountMutex.Lock()
singleMsgSuccessCount++
singleMsgSuccessCountMutex.Unlock()
log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now))
}
if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID {
} else {
go sendMessageToPush(&msgFromMQ, msgKey)
}
log.NewDebug(operationID, "saveSingleMsg cost time ", time.Since(now))
case constant.GroupChatType:
log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = GroupChatType", isHistory, isPersist)
if isHistory {
err := saveUserChat(msgFromMQ.MsgData.RecvID, &msgFromMQ)
if err != nil {
log.NewError(operationID, "group data insert to mongo err", msgFromMQ.String(), msgFromMQ.MsgData.RecvID, err.Error())
return
}
groupMsgCount++
}
go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID)
log.NewDebug(operationID, "saveGroupMsg cost time ", time.Since(now))
case constant.NotificationChatType: //func (mc *OnlineHistoryRedisConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) {
log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = NotificationChatType", isHistory, isPersist) // msg := cMsg.Value
if isHistory { // now := time.Now()
err := saveUserChat(msgKey, &msgFromMQ) // msgFromMQ := pbMsg.MsgDataToMQ{}
if err != nil { // err := proto.Unmarshal(msg, &msgFromMQ)
log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) // if err != nil {
return // log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error())
} // return
log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now)) // }
} // operationID := msgFromMQ.OperationID
if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID { // log.NewInfo(operationID, "msg come mongo!!!", "", "msg", string(msg))
} else { // //Control whether to store offline messages (mongo)
go sendMessageToPush(&msgFromMQ, msgKey) // isHistory := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsHistory)
} // //Control whether to store history messages (mysql)
log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now)) // isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent)
default: // isSenderSync := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync)
log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String()) // switch msgFromMQ.MsgData.SessionType {
return // case constant.SingleChatType:
} // log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = SingleChatType", isHistory, isPersist)
sess.MarkMessage(cMsg, "") // if isHistory {
log.NewDebug(msgFromMQ.OperationID, "msg_transfer handle topic data to database success...", msgFromMQ.String()) // err := saveUserChat(msgKey, &msgFromMQ)
} // if err != nil {
// singleMsgFailedCount++
// log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
// return
// }
// singleMsgSuccessCountMutex.Lock()
// singleMsgSuccessCount++
// singleMsgSuccessCountMutex.Unlock()
// log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now))
// }
// if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID {
// } else {
// go sendMessageToPush(&msgFromMQ, msgKey)
// }
// log.NewDebug(operationID, "saveSingleMsg cost time ", time.Since(now))
// case constant.GroupChatType:
// log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = GroupChatType", isHistory, isPersist)
// if isHistory {
// err := saveUserChat(msgFromMQ.MsgData.RecvID, &msgFromMQ)
// if err != nil {
// log.NewError(operationID, "group data insert to mongo err", msgFromMQ.String(), msgFromMQ.MsgData.RecvID, err.Error())
// return
// }
// groupMsgCount++
// }
// go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID)
// log.NewDebug(operationID, "saveGroupMsg cost time ", time.Since(now))
//
// case constant.NotificationChatType:
// log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = NotificationChatType", isHistory, isPersist)
// if isHistory {
// err := saveUserChat(msgKey, &msgFromMQ)
// if err != nil {
// log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
// return
// }
// log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now))
// }
// if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID {
// } else {
// go sendMessageToPush(&msgFromMQ, msgKey)
// }
// log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now))
// default:
// log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String())
// return
// }
// sess.MarkMessage(cMsg, "")
// log.NewDebug(msgFromMQ.OperationID, "msg_transfer handle topic data to database success...", msgFromMQ.String())
//}
func (och *OnlineHistoryRedisConsumerHandler) handleChatWs2MongoLowReliability(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { //func (och *OnlineHistoryRedisConsumerHandler) handleChatWs2MongoLowReliability(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) {
msg := cMsg.Value // msg := cMsg.Value
msgFromMQ := pbMsg.MsgDataToMQ{} // msgFromMQ := pbMsg.MsgDataToMQ{}
err := proto.Unmarshal(msg, &msgFromMQ) // err := proto.Unmarshal(msg, &msgFromMQ)
if err != nil { // if err != nil {
log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error()) // log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error())
return // return
} // }
operationID := msgFromMQ.OperationID // operationID := msgFromMQ.OperationID
log.NewInfo(operationID, "msg come mongo!!!", "", "msg", string(msg)) // log.NewInfo(operationID, "msg come mongo!!!", "", "msg", string(msg))
//Control whether to store offline messages (mongo) // //Control whether to store offline messages (mongo)
isHistory := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsHistory) // isHistory := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsHistory)
isSenderSync := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync) // isSenderSync := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync)
if isHistory { // if isHistory {
seq, err := db.DB.IncrUserSeq(msgKey) // seq, err := db.DB.IncrUserSeq(msgKey)
if err != nil { // if err != nil {
log.NewError(operationID, "data insert to redis err", err.Error(), string(msg)) // log.NewError(operationID, "data insert to redis err", err.Error(), string(msg))
return // return
} // }
sess.MarkMessage(cMsg, "") // sess.MarkMessage(cMsg, "")
msgFromMQ.MsgData.Seq = uint32(seq) // msgFromMQ.MsgData.Seq = uint32(seq)
log.Debug(operationID, "send ch msg is ", msgFromMQ.String()) // log.Debug(operationID, "send ch msg is ", msgFromMQ.String())
//och.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, msgFromMQ}} // //och.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, msgFromMQ}}
//err := saveUserChat(msgKey, &msgFromMQ) // //err := saveUserChat(msgKey, &msgFromMQ)
//if err != nil { // //if err != nil {
// singleMsgFailedCount++ // // singleMsgFailedCount++
// log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) // // log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
// return // // return
//} // //}
//singleMsgSuccessCountMutex.Lock() // //singleMsgSuccessCountMutex.Lock()
//singleMsgSuccessCount++ // //singleMsgSuccessCount++
//singleMsgSuccessCountMutex.Unlock() // //singleMsgSuccessCountMutex.Unlock()
//log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now)) // //log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now))
} else { // } else {
if !(!isSenderSync && msgKey == msgFromMQ.MsgData.SendID) { // if !(!isSenderSync && msgKey == msgFromMQ.MsgData.SendID) {
go sendMessageToPush(&msgFromMQ, msgKey) // go sendMessageToPush(&msgFromMQ, msgKey)
} // }
} // }
} //}
func (OnlineHistoryRedisConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (OnlineHistoryRedisConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (OnlineHistoryRedisConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (OnlineHistoryRedisConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
@ -515,32 +512,32 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerG
// return nil // return nil
//} //}
func sendMessageToPush(message *pbMsg.MsgDataToMQ, pushToUserID string) { //func sendMessageToPush(message *pbMsg.MsgDataToMQ, pushToUserID string) {
log.Info(message.OperationID, "msg_transfer send message to push", "message", message.String()) // log.Info(message.OperationID, "msg_transfer send message to push", "message", message.String())
rpcPushMsg := pbPush.PushMsgReq{OperationID: message.OperationID, MsgData: message.MsgData, PushToUserID: pushToUserID} // rpcPushMsg := pbPush.PushMsgReq{OperationID: message.OperationID, MsgData: message.MsgData, PushToUserID: pushToUserID}
mqPushMsg := pbMsg.PushMsgDataToMQ{OperationID: message.OperationID, MsgData: message.MsgData, PushToUserID: pushToUserID} // mqPushMsg := pbMsg.PushMsgDataToMQ{OperationID: message.OperationID, MsgData: message.MsgData, PushToUserID: pushToUserID}
grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImPushName, message.OperationID) // grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImPushName, message.OperationID)
if grpcConn == nil { // if grpcConn == nil {
log.Error(rpcPushMsg.OperationID, "rpc dial failed", "push data", rpcPushMsg.String()) // log.Error(rpcPushMsg.OperationID, "rpc dial failed", "push data", rpcPushMsg.String())
pid, offset, err := producer.SendMessage(&mqPushMsg, mqPushMsg.PushToUserID, rpcPushMsg.OperationID) // pid, offset, err := producer.SendMessage(&mqPushMsg, mqPushMsg.PushToUserID, rpcPushMsg.OperationID)
if err != nil { // if err != nil {
log.Error(mqPushMsg.OperationID, "kafka send failed", "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error()) // log.Error(mqPushMsg.OperationID, "kafka send failed", "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error())
} // }
return // return
} // }
msgClient := pbPush.NewPushMsgServiceClient(grpcConn) // msgClient := pbPush.NewPushMsgServiceClient(grpcConn)
_, err := msgClient.PushMsg(context.Background(), &rpcPushMsg) // _, err := msgClient.PushMsg(context.Background(), &rpcPushMsg)
if err != nil { // if err != nil {
log.Error(rpcPushMsg.OperationID, "rpc send failed", rpcPushMsg.OperationID, "push data", rpcPushMsg.String(), "err", err.Error()) // log.Error(rpcPushMsg.OperationID, "rpc send failed", rpcPushMsg.OperationID, "push data", rpcPushMsg.String(), "err", err.Error())
pid, offset, err := producer.SendMessage(&mqPushMsg, mqPushMsg.PushToUserID, rpcPushMsg.OperationID) // pid, offset, err := producer.SendMessage(&mqPushMsg, mqPushMsg.PushToUserID, rpcPushMsg.OperationID)
if err != nil { // if err != nil {
log.Error(message.OperationID, "kafka send failed", mqPushMsg.OperationID, "send data", mqPushMsg.String(), "pid", pid, "offset", offset, "err", err.Error()) // log.Error(message.OperationID, "kafka send failed", mqPushMsg.OperationID, "send data", mqPushMsg.String(), "pid", pid, "offset", offset, "err", err.Error())
} // }
} else { // } else {
log.Info(message.OperationID, "rpc send success", rpcPushMsg.OperationID, "push data", rpcPushMsg.String()) // log.Info(message.OperationID, "rpc send success", rpcPushMsg.OperationID, "push data", rpcPushMsg.String())
//
} // }
} //}
func sendMessageToPushMQ(message *pbMsg.MsgDataToMQ, pushToUserID string) { func sendMessageToPushMQ(message *pbMsg.MsgDataToMQ, pushToUserID string) {
log.Info(message.OperationID, utils.GetSelfFuncName(), "msg ", message.String(), pushToUserID) log.Info(message.OperationID, utils.GetSelfFuncName(), "msg ", message.String(), pushToUserID)

@ -29,7 +29,7 @@ var (
func Init(rpcPort int) { func Init(rpcPort int) {
rpcServer.Init(rpcPort) //rpcServer.Init(rpcPort)
pushCh.Init() pushCh.Init()
pushTerminal = []int32{constant.IOSPlatformID, constant.AndroidPlatformID} pushTerminal = []int32{constant.IOSPlatformID, constant.AndroidPlatformID}
} }
@ -49,6 +49,6 @@ func init() {
} }
func Run() { func Run() {
go rpcServer.run() //go rpcServer.run()
go pushCh.pushConsumerGroup.RegisterHandleAndConsumer(&pushCh) go pushCh.pushConsumerGroup.RegisterHandleAndConsumer(&pushCh)
} }

@ -63,7 +63,7 @@ func (r *RPCServer) run() {
} }
} }
func (r *RPCServer) PushMsg(_ context.Context, pbData *pbPush.PushMsgReq) (*pbPush.PushMsgResp, error) { func (r *RPCServer) PushMsg(_ context.Context, pbData *pbPush.PushMsgReq) (*pbPush.PushMsgResp, error) {
//Call push module to send message to the user //Call push module to send message to the user, but the service is not currently used
switch pbData.MsgData.SessionType { switch pbData.MsgData.SessionType {
case constant.SuperGroupChatType: case constant.SuperGroupChatType:
MsgToSuperGroupUser(pbData) MsgToSuperGroupUser(pbData)

Loading…
Cancel
Save