From d276be9594e5af47f6b888c9407a02638e3f3636 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Sun, 22 May 2022 18:28:51 +0800 Subject: [PATCH] Reduce memory copies --- internal/rpc/msg/send_msg.go | 67 ++++++++++++++++++++++++++++++++++-- pkg/utils/map.go | 1 + 2 files changed, 66 insertions(+), 2 deletions(-) diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index 2df926486..cbb301956 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -277,11 +277,15 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S remain := len(v) % split for i := 0; i < len(v)/split; i++ { wg.Add(1) - go rpc.sendMsgToGroup(v[i*split:(i+1)*split], *pb, k, &sendTag, &wg) + tmp := valueCopy(pb) + // go rpc.sendMsgToGroup(v[i*split:(i+1)*split], *pb, k, &sendTag, &wg) + go rpc.sendMsgToGroupOptimization(v[i*split:(i+1)*split], tmp, k, &sendTag, &wg) } if remain > 0 { wg.Add(1) - go rpc.sendMsgToGroup(v[split*(len(v)/split):], *pb, k, &sendTag, &wg) + tmp := valueCopy(pb) + // go rpc.sendMsgToGroup(v[split*(len(v)/split):], *pb, k, &sendTag, &wg) + go rpc.sendMsgToGroupOptimization(v[split*(len(v)/split):], tmp, k, &sendTag, &wg) } } wg.Add(1) @@ -431,6 +435,29 @@ func modifyMessageByUserMessageReceiveOpt(userID, sourceID string, sessionType i return true } +func modifyMessageByUserMessageReceiveOptoptimization(userID, sourceID string, sessionType int, operationID string, options *map[string]bool) bool { + conversationID := utils.GetConversationIDBySessionType(sourceID, sessionType) + opt, err := db.DB.GetSingleConversationRecvMsgOpt(userID, conversationID) + if err != nil && err != redis.ErrNil { + log.NewError(operationID, "GetSingleConversationMsgOpt from redis err", userID, conversationID, err.Error()) + return true + } + + switch opt { + case constant.ReceiveMessage: + return true + case constant.NotReceiveMessage: + return false + case constant.ReceiveNotNotifyMessage: + if *options == nil { + *options = make(map[string]bool, 10) + } + utils.SetSwitchFromOptions(*options, constant.IsOfflinePush, false) + return true + } + return true +} + type NotificationMsg struct { SendID string RecvID string @@ -748,6 +775,23 @@ func getOnlineAndOfflineUserIDList(memberList []string, m map[string][]string, o m[constant.OfflineStatus] = offlUserIDList } +func valueCopy(pb *pbChat.SendMsgReq) *pbChat.SendMsgReq { + offlinePushInfo := sdk_ws.OfflinePushInfo{} + if pb.MsgData.OfflinePushInfo != nil { + offlinePushInfo = *pb.MsgData.OfflinePushInfo + } + msgData := sdk_ws.MsgData{} + msgData = *pb.MsgData + msgData.OfflinePushInfo = &offlinePushInfo + + options := make(map[string]bool, 10) + for key, value := range pb.MsgData.Options { + options[key] = value + } + msgData.Options = options + return &pbChat.SendMsgReq{Token: "", OperationID: pb.OperationID, MsgData: &msgData} +} + func (rpc *rpcChat) sendMsgToGroup(list []string, pb pbChat.SendMsgReq, status string, sendTag *bool, wg *sync.WaitGroup) { // log.Debug(pb.OperationID, "split userID ", list) offlinePushInfo := sdk_ws.OfflinePushInfo{} @@ -783,3 +827,22 @@ func (rpc *rpcChat) sendMsgToGroup(list []string, pb pbChat.SendMsgReq, status s } wg.Done() } + +func (rpc *rpcChat) sendMsgToGroupOptimization(list []string, groupPB *pbChat.SendMsgReq, status string, sendTag *bool, wg *sync.WaitGroup) { + msgToMQGroup := pbChat.MsgDataToMQ{Token: groupPB.Token, OperationID: groupPB.OperationID, MsgData: groupPB.MsgData} + for _, v := range list { + groupPB.MsgData.RecvID = v + isSend := modifyMessageByUserMessageReceiveOpt(v, groupPB.MsgData.GroupID, constant.GroupChatType, groupPB) + if isSend { + err := rpc.sendMsgToKafka(&msgToMQGroup, v, status) + if err != nil { + log.NewError(msgToMQGroup.OperationID, "kafka send msg err:UserId", v, msgToMQGroup.String()) + } else { + *sendTag = true + } + } else { + log.Debug(groupPB.OperationID, "not sendMsgToKafka, ", v) + } + } + wg.Done() +} diff --git a/pkg/utils/map.go b/pkg/utils/map.go index 66ca27471..7a5fb2d6b 100644 --- a/pkg/utils/map.go +++ b/pkg/utils/map.go @@ -124,6 +124,7 @@ func GetSwitchFromOptions(Options map[string]bool, key string) (result bool) { } return false } + func SetSwitchFromOptions(options map[string]bool, key string, value bool) { if options == nil { options = make(map[string]bool, 5)