diff --git a/cmd/Open-IM-SDK-Core b/cmd/Open-IM-SDK-Core index b0897aa3a..fddfa3034 160000 --- a/cmd/Open-IM-SDK-Core +++ b/cmd/Open-IM-SDK-Core @@ -1 +1 @@ -Subproject commit b0897aa3abe719729c2ce099404f08806917bfe3 +Subproject commit fddfa30348d77fe7d5767ef410b1f30d042062bf diff --git a/internal/rpc/cache/cache.go b/internal/rpc/cache/cache.go index 9954cef2e..5fb32a2d3 100644 --- a/internal/rpc/cache/cache.go +++ b/internal/rpc/cache/cache.go @@ -11,10 +11,11 @@ import ( commonPb "Open_IM/pkg/proto/sdk_ws" "Open_IM/pkg/utils" "context" - "google.golang.org/grpc" "net" "strconv" "strings" + + "google.golang.org/grpc" ) type cacheServer struct { @@ -127,8 +128,10 @@ func updateAllGroupMemberListToCache() error { continue } log.NewDebug("", utils.GetSelfFuncName(), "groupMemberIDList", groupMemberIDList) - if err := db.DB.AddGroupMemberToCache(groupID, groupMemberIDList...); err != nil { - log.NewError("", utils.GetSelfFuncName(), "AddGroupMemberToCache", err.Error()) + if len(groupMemberIDList) > 0 { + if err := db.DB.AddGroupMemberToCache(groupID, groupMemberIDList...); err != nil { + log.NewError("", utils.GetSelfFuncName(), "AddGroupMemberToCache", err.Error()) + } } } log.NewInfo("0", utils.GetSelfFuncName(), "ok") @@ -143,8 +146,11 @@ func updateAllFriendToCache(userList []db.User) error { log.NewError("0", utils.GetSelfFuncName(), err.Error()) continue } - if err := db.DB.AddFriendToCache(user.UserID, friendIDList...); err != nil { - log.NewError("0", utils.GetSelfFuncName(), err.Error()) + log.NewDebug("", utils.GetSelfFuncName(), "friendIDList", user.UserID, friendIDList) + if len(friendIDList) > 0 { + if err := db.DB.AddFriendToCache(user.UserID, friendIDList...); err != nil { + log.NewError("0", utils.GetSelfFuncName(), err.Error(), friendIDList, user.UserID) + } } } log.NewInfo("0", utils.GetSelfFuncName(), "ok") @@ -159,8 +165,11 @@ func updateAllBlackListToCache(userList []db.User) error { log.NewError("", utils.GetSelfFuncName(), err.Error()) continue } - if err := db.DB.AddBlackUserToCache(user.UserID, blackIDList...); err != nil { - log.NewError("0", utils.GetSelfFuncName(), err.Error()) + log.NewDebug("", utils.GetSelfFuncName(), "updateAllBlackListToCache", user.UserID, blackIDList) + if len(blackIDList) > 0 { + if err := db.DB.AddBlackUserToCache(user.UserID, blackIDList...); err != nil { + log.NewError("0", utils.GetSelfFuncName(), err.Error()) + } } } log.NewInfo("0", utils.GetSelfFuncName(), "ok") diff --git a/internal/rpc/msg/del_msg.go b/internal/rpc/msg/del_msg.go index daacb8533..d8b28b847 100644 --- a/internal/rpc/msg/del_msg.go +++ b/internal/rpc/msg/del_msg.go @@ -12,12 +12,20 @@ import ( func (rpc *rpcChat) DelMsgList(_ context.Context, req *commonPb.DelMsgListReq) (*commonPb.DelMsgListResp, error) { log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String()) resp := &commonPb.DelMsgListResp{} - if err := db.DB.DelMsgLogic(req.UserID, req.SeqList, req.OperationID); err != nil { + //if err := db.DB.DelMsgLogic(req.UserID, req.SeqList, req.OperationID); err != nil { + // log.NewError(req.OperationID, utils.GetSelfFuncName(), "DelMsg failed", err.Error()) + // resp.ErrMsg = constant.ErrDB.ErrMsg + // resp.ErrCode = constant.ErrDB.ErrCode + // return resp, nil + //} + + if err := db.DB.DelMsgBySeqList(req.UserID, req.SeqList, req.OperationID); err != nil { log.NewError(req.OperationID, utils.GetSelfFuncName(), "DelMsg failed", err.Error()) resp.ErrMsg = constant.ErrDB.ErrMsg resp.ErrCode = constant.ErrDB.ErrCode return resp, nil } + log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp: ", resp.String()) return resp, nil } diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index 4ea631b07..b38345c56 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -146,7 +146,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S // return returnMsg(&replay, pb, http.StatusUnauthorized, "token validate err,not authorized", "", 0) rpc.encapsulateMsgData(pb.MsgData) log.Info("", "this is a test MsgData ", pb.MsgData) - msgToMQ := pbChat.MsgDataToMQ{Token: pb.Token, OperationID: pb.OperationID, MsgData: pb.MsgData} + msgToMQSingle := pbChat.MsgDataToMQ{Token: pb.Token, OperationID: pb.OperationID, MsgData: pb.MsgData} //options := utils.JsonStringToMap(pbData.Options) isHistory := utils.GetSwitchFromOptions(pb.MsgData.Options, constant.IsHistory) mReq := MsgCallBackReq{ @@ -186,18 +186,18 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S } isSend := modifyMessageByUserMessageReceiveOpt(pb.MsgData.RecvID, pb.MsgData.SendID, constant.SingleChatType, pb) if isSend { - msgToMQ.MsgData = pb.MsgData - log.NewInfo(msgToMQ.OperationID, msgToMQ) - err1 := rpc.sendMsgToKafka(&msgToMQ, msgToMQ.MsgData.RecvID) + msgToMQSingle.MsgData = pb.MsgData + log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle) + err1 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.RecvID) if err1 != nil { - log.NewError(msgToMQ.OperationID, "kafka send msg err:RecvID", msgToMQ.MsgData.RecvID, msgToMQ.String()) + log.NewError(msgToMQSingle.OperationID, "kafka send msg err:RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String()) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } } - if msgToMQ.MsgData.SendID != msgToMQ.MsgData.RecvID { //Filter messages sent to yourself - err2 := rpc.sendMsgToKafka(&msgToMQ, msgToMQ.MsgData.SendID) + if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself + err2 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.SendID) if err2 != nil { - log.NewError(msgToMQ.OperationID, "kafka send msg err:SendID", msgToMQ.MsgData.SendID, msgToMQ.String()) + log.NewError(msgToMQSingle.OperationID, "kafka send msg err:SendID", msgToMQSingle.MsgData.SendID, msgToMQSingle.String()) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } } @@ -205,7 +205,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S if err := callbackAfterSendSingleMsg(pb); err != nil { log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendSingleMsg failed", err.Error()) } - return returnMsg(&replay, pb, 0, "", msgToMQ.MsgData.ServerMsgID, msgToMQ.MsgData.SendTime) + return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) case constant.GroupChatType: // callback canSend, err := callbackBeforeSendGroupMsg(pb) @@ -237,6 +237,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S } return result }(reply.MemberList) + log.Debug(pb.OperationID, "GetGroupAllMember userID list", memberUserIDList) var addUidList []string switch pb.MsgData.ContentType { case constant.MemberKickedNotification: @@ -268,17 +269,25 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S for i := 0; i < len(memberUserIDList)/split; i++ { wg.Add(1) go func(list []string) { + // log.Debug(pb.OperationID, "split userID ", list) + groupPB := pbChat.SendMsgReq{Token: pb.Token, OperationID: pb.OperationID, MsgData: &sdk_ws.MsgData{OfflinePushInfo: &sdk_ws.OfflinePushInfo{}}} + *groupPB.MsgData = *pb.MsgData + *groupPB.MsgData.OfflinePushInfo = *pb.MsgData.OfflinePushInfo + msgToMQGroup := pbChat.MsgDataToMQ{Token: groupPB.Token, OperationID: groupPB.OperationID, MsgData: groupPB.MsgData} for _, v := range list { - pb.MsgData.RecvID = v - isSend := modifyMessageByUserMessageReceiveOpt(v, groupID, constant.GroupChatType, pb) + groupPB.MsgData.RecvID = v + isSend := modifyMessageByUserMessageReceiveOpt(v, groupID, constant.GroupChatType, &groupPB) if isSend { - msgToMQ.MsgData = pb.MsgData - err := rpc.sendMsgToKafka(&msgToMQ, v) + msgToMQGroup.MsgData = groupPB.MsgData + // log.Debug(groupPB.OperationID, "sendMsgToKafka, ", v, groupID, msgToMQGroup.String()) + err := rpc.sendMsgToKafka(&msgToMQGroup, v) if err != nil { - log.NewError(msgToMQ.OperationID, "kafka send msg err:UserId", v, msgToMQ.String()) + log.NewError(msgToMQGroup.OperationID, "kafka send msg err:UserId", v, msgToMQGroup.String()) } else { sendTag = true } + } else { + log.Debug(groupPB.OperationID, "not sendMsgToKafka, ", v) } } wg.Done() @@ -287,17 +296,25 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S if remain > 0 { wg.Add(1) go func(list []string) { + // log.Debug(pb.OperationID, "split userID ", list) + groupPB := pbChat.SendMsgReq{Token: pb.Token, OperationID: pb.OperationID, MsgData: &sdk_ws.MsgData{OfflinePushInfo: &sdk_ws.OfflinePushInfo{}}} + *groupPB.MsgData = *pb.MsgData + *groupPB.MsgData.OfflinePushInfo = *pb.MsgData.OfflinePushInfo + msgToMQGroup := pbChat.MsgDataToMQ{Token: groupPB.Token, OperationID: groupPB.OperationID, MsgData: groupPB.MsgData} for _, v := range list { - pb.MsgData.RecvID = v - isSend := modifyMessageByUserMessageReceiveOpt(v, groupID, constant.GroupChatType, pb) + groupPB.MsgData.RecvID = v + isSend := modifyMessageByUserMessageReceiveOpt(v, groupID, constant.GroupChatType, &groupPB) if isSend { - msgToMQ.MsgData = pb.MsgData - err := rpc.sendMsgToKafka(&msgToMQ, v) + msgToMQGroup.MsgData = groupPB.MsgData + // log.Debug(groupPB.OperationID, "sendMsgToKafka, ", v, groupID, msgToMQGroup.String()) + err := rpc.sendMsgToKafka(&msgToMQGroup, v) if err != nil { - log.NewError(msgToMQ.OperationID, "kafka send msg err:UserId", v, msgToMQ.String()) + log.NewError(msgToMQGroup.OperationID, "kafka send msg err:UserId", v, msgToMQGroup.String()) } else { sendTag = true } + } else { + log.Debug(groupPB.OperationID, "not sendMsgToKafka, ", v) } } wg.Done() @@ -305,16 +322,16 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S } wg.Wait() - log.Info(msgToMQ.OperationID, "addUidList", addUidList) + log.Info(msgToMQSingle.OperationID, "addUidList", addUidList) for _, v := range addUidList { pb.MsgData.RecvID = v isSend := modifyMessageByUserMessageReceiveOpt(v, groupID, constant.GroupChatType, pb) - log.Info(msgToMQ.OperationID, "isSend", isSend) + log.Info(msgToMQSingle.OperationID, "isSend", isSend) if isSend { - msgToMQ.MsgData = pb.MsgData - err := rpc.sendMsgToKafka(&msgToMQ, v) + msgToMQSingle.MsgData = pb.MsgData + err := rpc.sendMsgToKafka(&msgToMQSingle, v) if err != nil { - log.NewError(msgToMQ.OperationID, "kafka send msg err:UserId", v, msgToMQ.String()) + log.NewError(msgToMQSingle.OperationID, "kafka send msg err:UserId", v, msgToMQSingle.String()) } else { sendTag = true } @@ -378,26 +395,26 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S } }() } - return returnMsg(&replay, pb, 0, "", msgToMQ.MsgData.ServerMsgID, msgToMQ.MsgData.SendTime) + return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) } case constant.NotificationChatType: - msgToMQ.MsgData = pb.MsgData - log.NewInfo(msgToMQ.OperationID, msgToMQ) - err1 := rpc.sendMsgToKafka(&msgToMQ, msgToMQ.MsgData.RecvID) + msgToMQSingle.MsgData = pb.MsgData + log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle) + err1 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.RecvID) if err1 != nil { - log.NewError(msgToMQ.OperationID, "kafka send msg err:RecvID", msgToMQ.MsgData.RecvID, msgToMQ.String()) + log.NewError(msgToMQSingle.OperationID, "kafka send msg err:RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String()) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } - if msgToMQ.MsgData.SendID != msgToMQ.MsgData.RecvID { //Filter messages sent to yourself - err2 := rpc.sendMsgToKafka(&msgToMQ, msgToMQ.MsgData.SendID) + if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself + err2 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.SendID) if err2 != nil { - log.NewError(msgToMQ.OperationID, "kafka send msg err:SendID", msgToMQ.MsgData.SendID, msgToMQ.String()) + log.NewError(msgToMQSingle.OperationID, "kafka send msg err:SendID", msgToMQSingle.MsgData.SendID, msgToMQSingle.String()) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } } - return returnMsg(&replay, pb, 0, "", msgToMQ.MsgData.ServerMsgID, msgToMQ.MsgData.SendTime) + return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) default: return returnMsg(&replay, pb, 203, "unkonwn sessionType", "", 0) } diff --git a/pkg/common/db/mongoModel.go b/pkg/common/db/mongoModel.go index 51ad1c324..a196e984a 100644 --- a/pkg/common/db/mongoModel.go +++ b/pkg/common/db/mongoModel.go @@ -13,6 +13,7 @@ import ( "github.com/gogo/protobuf/sortkeys" "go.mongodb.org/mongo-driver/mongo/options" "math/rand" + "sync" //"github.com/garyburd/redigo/redis" "github.com/golang/protobuf/proto" @@ -79,6 +80,53 @@ func (d *DataBases) GetMinSeqFromMongo2(uid string) (MinSeq uint32, err error) { return 1, nil } +// deleteMsgByLogic +func (d *DataBases) DelMsgBySeqList(userID string, seqList []uint32, operationID string) (err error) { + log.Debug(operationID, utils.GetSelfFuncName(), "args ", userID, seqList) + sortkeys.Uint32s(seqList) + suffixUserID2SubSeqList := func(uid string, seqList []uint32) map[string][]uint32 { + t := make(map[string][]uint32) + for i := 0; i < len(seqList); i++ { + seqUid := getSeqUid(uid, seqList[i]) + if value, ok := t[seqUid]; !ok { + var temp []uint32 + t[seqUid] = append(temp, seqList[i]) + } else { + t[seqUid] = append(value, seqList[i]) + } + } + return t + }(userID, seqList) + + var wg sync.WaitGroup + wg.Add(len(suffixUserID2SubSeqList)) + for k, v := range suffixUserID2SubSeqList { + go func(suffixUserID string, subSeqList []uint32, operationID string) { + if e := d.DelMsgBySeqListInOneDoc(suffixUserID, subSeqList, operationID); e != nil { + log.Error(operationID, "DelMsgBySeqListInOneDoc failed ", e.Error(), suffixUserID, subSeqList) + err = e + } + wg.Done() + }(k, v, operationID) + } + wg.Wait() + return err +} + +func (d *DataBases) DelMsgBySeqListInOneDoc(suffixUserID string, seqList []uint32, operationID string) error { + log.Debug(operationID, utils.GetSelfFuncName(), "args ", suffixUserID, seqList) + seqMsgList, indexList, err := d.GetMsgAndIndexBySeqListInOneMongo2(suffixUserID, seqList, operationID) + if err != nil { + return utils.Wrap(err, "") + } + for i, v := range seqMsgList { + if err := d.ReplaceMsgByIndex(suffixUserID, v, operationID, indexList[i]); err != nil { + return utils.Wrap(err, "") + } + } + return nil +} + // deleteMsgByLogic func (d *DataBases) DelMsgLogic(uid string, seqList []uint32, operationID string) error { sortkeys.Uint32s(seqList) @@ -96,6 +144,27 @@ func (d *DataBases) DelMsgLogic(uid string, seqList []uint32, operationID string return nil } +func (d *DataBases) ReplaceMsgByIndex(suffixUserID string, msg *open_im_sdk.MsgData, operationID string, seqIndex int) error { + log.NewInfo(operationID, utils.GetSelfFuncName(), suffixUserID, *msg) + ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) + s := fmt.Sprintf("msg.%d.msg", seqIndex) + log.NewDebug(operationID, utils.GetSelfFuncName(), seqIndex, s) + msg.Status = constant.MsgDeleted + bytes, err := proto.Marshal(msg) + if err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), "proto marshal failed ", err.Error(), msg.String()) + return utils.Wrap(err, "") + } + updateResult, err := c.UpdateOne(ctx, bson.M{"uid": suffixUserID}, bson.M{"$set": bson.M{s: bytes}}) + log.NewInfo(operationID, utils.GetSelfFuncName(), updateResult) + if err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), "UpdateOne", err.Error()) + return utils.Wrap(err, "") + } + return nil +} + func (d *DataBases) ReplaceMsgBySeq(uid string, msg *open_im_sdk.MsgData, operationID string) error { log.NewInfo(operationID, utils.GetSelfFuncName(), uid, *msg) ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) @@ -109,6 +178,7 @@ func (d *DataBases) ReplaceMsgBySeq(uid string, msg *open_im_sdk.MsgData, operat log.NewError(operationID, utils.GetSelfFuncName(), "proto marshal", err.Error()) return utils.Wrap(err, "") } + updateResult, err := c.UpdateOne( ctx, bson.M{"uid": uid}, bson.M{"$set": bson.M{s: bytes}}) @@ -228,6 +298,36 @@ func (d *DataBases) GetMsgBySeqListMongo2(uid string, seqList []uint32, operatio return seqMsg, nil } +func (d *DataBases) GetMsgAndIndexBySeqListInOneMongo2(suffixUserID string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, indexList []int, err error) { + ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) + sChat := UserChat{} + if err = c.FindOne(ctx, bson.M{"uid": suffixUserID}).Decode(&sChat); err != nil { + log.NewError(operationID, "not find seqUid", suffixUserID, err.Error()) + return nil, nil, utils.Wrap(err, "") + } + singleCount := 0 + var hasSeqList []uint32 + for i := 0; i < len(sChat.Msg); i++ { + msg := new(open_im_sdk.MsgData) + if err = proto.Unmarshal(sChat.Msg[i].Msg, msg); err != nil { + log.NewError(operationID, "Unmarshal err", msg.String(), err.Error()) + return nil, nil, err + } + if isContainInt32(msg.Seq, seqList) { + indexList = append(indexList, i) + seqMsg = append(seqMsg, msg) + hasSeqList = append(hasSeqList, msg.Seq) + singleCount++ + if singleCount == len(seqList) { + break + } + } + } + + return seqMsg, indexList, nil +} + func genExceptionMessageBySeqList(seqList []uint32) (exceptionMsg []*open_im_sdk.MsgData) { for _, v := range seqList { msg := new(open_im_sdk.MsgData) @@ -741,7 +841,7 @@ func getMsgIndex(seq uint32) int { seqSuffix := seq / singleGocMsgNum var index uint32 if seqSuffix == 0 { - index = (seq - seqSuffix*5000) - 1 + index = (seq - seqSuffix*singleGocMsgNum) - 1 } else { index = seq - seqSuffix*singleGocMsgNum }