diff --git a/internal/rpc/msg/pull_message.go b/internal/rpc/msg/pull_message.go index d9bb49344..1736baf83 100644 --- a/internal/rpc/msg/pull_message.go +++ b/internal/rpc/msg/pull_message.go @@ -77,7 +77,7 @@ func (rpc *rpcChat) PullMessageBySeqList(_ context.Context, in *open_im_sdk.Pull } else { log.Debug(in.OperationID, "get message from redis is nil", failedSeqList) } - msgList, err1 := commonDB.DB.GetMsgBySeqListMongo2(k, failedSeqList, in.OperationID) + msgList, err1 := commonDB.DB.GetSuperGroupMsgBySeqListMongo(k, failedSeqList, in.OperationID) if err1 != nil { log.Error(in.OperationID, "PullMessageBySeqList data error", in.String(), err.Error()) resp.ErrCode = 201 diff --git a/pkg/common/db/mongoModel.go b/pkg/common/db/mongoModel.go index 97d8885a6..db6406d1f 100644 --- a/pkg/common/db/mongoModel.go +++ b/pkg/common/db/mongoModel.go @@ -306,6 +306,57 @@ func (d *DataBases) GetMsgBySeqListMongo2(uid string, seqList []uint32, operatio } return seqMsg, nil } +func (d *DataBases) GetSuperGroupMsgBySeqListMongo(groupID string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, err error) { + var hasSeqList []uint32 + singleCount := 0 + ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) + + m := func(uid string, seqList []uint32) map[string][]uint32 { + t := make(map[string][]uint32) + for i := 0; i < len(seqList); i++ { + seqUid := getSeqUid(uid, seqList[i]) + if value, ok := t[seqUid]; !ok { + var temp []uint32 + t[seqUid] = append(temp, seqList[i]) + } else { + t[seqUid] = append(value, seqList[i]) + } + } + return t + }(groupID, seqList) + sChat := UserChat{} + for seqUid, value := range m { + if err = c.FindOne(ctx, bson.M{"uid": seqUid}).Decode(&sChat); err != nil { + log.NewError(operationID, "not find seqGroupID", seqUid, value, groupID, seqList, err.Error()) + continue + } + singleCount = 0 + for i := 0; i < len(sChat.Msg); i++ { + msg := new(open_im_sdk.MsgData) + if err = proto.Unmarshal(sChat.Msg[i].Msg, msg); err != nil { + log.NewError(operationID, "Unmarshal err", seqUid, value, groupID, seqList, err.Error()) + return nil, err + } + if isContainInt32(msg.Seq, value) { + seqMsg = append(seqMsg, msg) + hasSeqList = append(hasSeqList, msg.Seq) + singleCount++ + if singleCount == len(value) { + break + } + } + } + } + if len(hasSeqList) != len(seqList) { + var diff []uint32 + diff = utils.Difference(hasSeqList, seqList) + exceptionMSg := genExceptionSuperGroupMessageBySeqList(diff, groupID) + seqMsg = append(seqMsg, exceptionMSg...) + + } + return seqMsg, nil +} func (d *DataBases) GetMsgAndIndexBySeqListInOneMongo2(suffixUserID string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, indexList []int, unexistSeqList []uint32, err error) { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) @@ -351,6 +402,17 @@ func genExceptionMessageBySeqList(seqList []uint32) (exceptionMsg []*open_im_sdk return exceptionMsg } +func genExceptionSuperGroupMessageBySeqList(seqList []uint32, groupID string) (exceptionMsg []*open_im_sdk.MsgData) { + for _, v := range seqList { + msg := new(open_im_sdk.MsgData) + msg.Seq = v + msg.GroupID = groupID + msg.SessionType = constant.SuperGroupChatType + exceptionMsg = append(exceptionMsg, msg) + } + return exceptionMsg +} + func (d *DataBases) SaveUserChatMongo2(uid string, sendTime int64, m *pbMsg.MsgDataToDB) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)