diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index b80853436..4ea631b07 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -18,6 +18,7 @@ import ( "math/rand" "strconv" "strings" + "sync" "time" ) @@ -255,72 +256,55 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S } case constant.MemberQuitNotification: addUidList = append(addUidList, pb.MsgData.SendID) - case constant.AtText: - go func() { - var conversationReq pbConversation.ModifyConversationFieldReq - var tag bool - var atUserID []string - conversation := pbConversation.Conversation{ - OwnerUserID: pb.MsgData.SendID, - ConversationID: utils.GetConversationIDBySessionType(pb.MsgData.GroupID, constant.GroupChatType), - ConversationType: constant.GroupChatType, - GroupID: pb.MsgData.GroupID, - } - conversationReq.Conversation = &conversation - conversationReq.OperationID = pb.OperationID - conversationReq.FieldType = constant.FieldGroupAtType - tagAll := utils.IsContain(constant.AtAllString, pb.MsgData.AtUserIDList) - if tagAll { - atUserID = utils.DifferenceString([]string{constant.AtAllString}, pb.MsgData.AtUserIDList) - if len(atUserID) == 0 { //just @everyone - conversationReq.UserIDList = memberUserIDList - conversation.GroupAtType = constant.AtAll - } else { //@Everyone and @other people - conversationReq.UserIDList = atUserID - conversation.GroupAtType = constant.AtAllAtMe - tag = true + + default: + } + groupID := pb.MsgData.GroupID + //split parallel send + var wg sync.WaitGroup + var sendTag bool + var split = 50 + remain := len(memberUserIDList) % split + for i := 0; i < len(memberUserIDList)/split; i++ { + wg.Add(1) + go func(list []string) { + for _, v := range list { + pb.MsgData.RecvID = v + isSend := modifyMessageByUserMessageReceiveOpt(v, groupID, constant.GroupChatType, pb) + if isSend { + msgToMQ.MsgData = pb.MsgData + err := rpc.sendMsgToKafka(&msgToMQ, v) + if err != nil { + log.NewError(msgToMQ.OperationID, "kafka send msg err:UserId", v, msgToMQ.String()) + } else { + sendTag = true + } } - } else { - conversationReq.UserIDList = pb.MsgData.AtUserIDList - conversation.GroupAtType = constant.AtMe - } - etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImConversationName) - client := pbConversation.NewConversationClient(etcdConn) - conversationReply, err := client.ModifyConversationField(context.Background(), &conversationReq) - if err != nil { - log.NewError(conversationReq.OperationID, "ModifyConversationField rpc failed, ", conversationReq.String(), err.Error()) - } else if conversationReply.CommonResp.ErrCode != 0 { - log.NewError(conversationReq.OperationID, "ModifyConversationField rpc failed, ", conversationReq.String(), conversationReply.String()) } - if tag { - conversationReq.UserIDList = utils.DifferenceString(atUserID, memberUserIDList) - conversation.GroupAtType = constant.AtAll - etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImConversationName) - client := pbConversation.NewConversationClient(etcdConn) - conversationReply, err := client.ModifyConversationField(context.Background(), &conversationReq) - if err != nil { - log.NewError(conversationReq.OperationID, "ModifyConversationField rpc failed, ", conversationReq.String(), err.Error()) - } else if conversationReply.CommonResp.ErrCode != 0 { - log.NewError(conversationReq.OperationID, "ModifyConversationField rpc failed, ", conversationReq.String(), conversationReply.String()) + wg.Done() + }(memberUserIDList[i*split : (i+1)*split]) + } + if remain > 0 { + wg.Add(1) + go func(list []string) { + for _, v := range list { + pb.MsgData.RecvID = v + isSend := modifyMessageByUserMessageReceiveOpt(v, groupID, constant.GroupChatType, pb) + if isSend { + msgToMQ.MsgData = pb.MsgData + err := rpc.sendMsgToKafka(&msgToMQ, v) + if err != nil { + log.NewError(msgToMQ.OperationID, "kafka send msg err:UserId", v, msgToMQ.String()) + } else { + sendTag = true + } } } - }() - default: + wg.Done() + }(memberUserIDList[split*(len(memberUserIDList)/split):]) } - groupID := pb.MsgData.GroupID - for _, v := range memberUserIDList { - pb.MsgData.RecvID = v - isSend := modifyMessageByUserMessageReceiveOpt(v, groupID, constant.GroupChatType, pb) - if isSend { - msgToMQ.MsgData = pb.MsgData - err := rpc.sendMsgToKafka(&msgToMQ, v) - if err != nil { - log.NewError(msgToMQ.OperationID, "kafka send msg err:UserId", v, msgToMQ.String()) - return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) - } - } + wg.Wait() - } log.Info(msgToMQ.OperationID, "addUidList", addUidList) for _, v := range addUidList { pb.MsgData.RecvID = v @@ -331,7 +315,8 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S err := rpc.sendMsgToKafka(&msgToMQ, v) if err != nil { log.NewError(msgToMQ.OperationID, "kafka send msg err:UserId", v, msgToMQ.String()) - return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) + } else { + sendTag = true } } } @@ -339,7 +324,63 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S if err := callbackAfterSendGroupMsg(pb); err != nil { log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendGroupMsg failed", err.Error()) } - return returnMsg(&replay, pb, 0, "", msgToMQ.MsgData.ServerMsgID, msgToMQ.MsgData.SendTime) + if !sendTag { + return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) + } else { + if pb.MsgData.ContentType == constant.AtText { + go func() { + var conversationReq pbConversation.ModifyConversationFieldReq + var tag bool + var atUserID []string + conversation := pbConversation.Conversation{ + OwnerUserID: pb.MsgData.SendID, + ConversationID: utils.GetConversationIDBySessionType(pb.MsgData.GroupID, constant.GroupChatType), + ConversationType: constant.GroupChatType, + GroupID: pb.MsgData.GroupID, + } + conversationReq.Conversation = &conversation + conversationReq.OperationID = pb.OperationID + conversationReq.FieldType = constant.FieldGroupAtType + tagAll := utils.IsContain(constant.AtAllString, pb.MsgData.AtUserIDList) + if tagAll { + atUserID = utils.DifferenceString([]string{constant.AtAllString}, pb.MsgData.AtUserIDList) + if len(atUserID) == 0 { //just @everyone + conversationReq.UserIDList = memberUserIDList + conversation.GroupAtType = constant.AtAll + } else { //@Everyone and @other people + conversationReq.UserIDList = atUserID + conversation.GroupAtType = constant.AtAllAtMe + tag = true + } + } else { + conversationReq.UserIDList = pb.MsgData.AtUserIDList + conversation.GroupAtType = constant.AtMe + } + etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImConversationName) + client := pbConversation.NewConversationClient(etcdConn) + conversationReply, err := client.ModifyConversationField(context.Background(), &conversationReq) + if err != nil { + log.NewError(conversationReq.OperationID, "ModifyConversationField rpc failed, ", conversationReq.String(), err.Error()) + } else if conversationReply.CommonResp.ErrCode != 0 { + log.NewError(conversationReq.OperationID, "ModifyConversationField rpc failed, ", conversationReq.String(), conversationReply.String()) + } + if tag { + conversationReq.UserIDList = utils.DifferenceString(atUserID, memberUserIDList) + conversation.GroupAtType = constant.AtAll + etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImConversationName) + client := pbConversation.NewConversationClient(etcdConn) + conversationReply, err := client.ModifyConversationField(context.Background(), &conversationReq) + if err != nil { + log.NewError(conversationReq.OperationID, "ModifyConversationField rpc failed, ", conversationReq.String(), err.Error()) + } else if conversationReply.CommonResp.ErrCode != 0 { + log.NewError(conversationReq.OperationID, "ModifyConversationField rpc failed, ", conversationReq.String(), conversationReply.String()) + } + } + }() + } + return returnMsg(&replay, pb, 0, "", msgToMQ.MsgData.ServerMsgID, msgToMQ.MsgData.SendTime) + + } case constant.NotificationChatType: msgToMQ.MsgData = pb.MsgData log.NewInfo(msgToMQ.OperationID, msgToMQ)