diff --git a/src/common/db/mongoModel.go b/src/common/db/mongoModel.go index a6e2651fe..0c415d103 100644 --- a/src/common/db/mongoModel.go +++ b/src/common/db/mongoModel.go @@ -8,11 +8,13 @@ import ( "errors" "github.com/golang/protobuf/proto" "gopkg.in/mgo.v2/bson" + "strconv" "time" ) const cChat = "chat" const cGroup = "group" +const singleGocMsgNum = 10000 type MsgInfo struct { SendTime int64 @@ -86,7 +88,8 @@ func (d *DataBases) GetMsgBySeqRange(uid string, seqBegin, seqEnd int64) (Single return SingleMsg, GroupMsg, MaxSeq, MinSeq, nil } func (d *DataBases) GetMsgBySeqList(uid string, seqList []int64) (SingleMsg []*pbMsg.MsgFormat, GroupMsg []*pbMsg.MsgFormat, MaxSeq int64, MinSeq int64, err error) { - count := 0 + allCount := 0 + singleCount := 0 session := d.mgoSession.Clone() if session == nil { return nil, nil, MaxSeq, MinSeq, errors.New("session == nil") @@ -94,54 +97,75 @@ func (d *DataBases) GetMsgBySeqList(uid string, seqList []int64) (SingleMsg []*p defer session.Close() c := session.DB(config.Config.Mongo.DBDatabase).C(cChat) - + m := func(uid string, seqList []int64) map[string][]int64 { + t := make(map[string][]int64) + for i := 0; i < len(seqList); i++ { + seqUid := getSeqUid(uid, seqList[i]) + if value, ok := t[seqUid]; !ok { + var temp []int64 + t[seqUid] = append(temp, seqList[i]) + } else { + t[seqUid] = append(value, seqList[i]) + } + } + return t + }(uid, seqList) sChat := UserChat{} - if err = c.Find(bson.M{"uid": uid}).One(&sChat); err != nil { - return nil, nil, MaxSeq, MinSeq, err - } pChat := pbMsg.MsgSvrToPushSvrChatMsg{} - for i := 0; i < len(sChat.Msg); i++ { - temp := new(pbMsg.MsgFormat) - if err = proto.Unmarshal(sChat.Msg[i].Msg, &pChat); err != nil { - return nil, nil, MaxSeq, MinSeq, err + for seqUid, value := range m { + if err = c.Find(bson.M{"uid": seqUid}).One(&sChat); err != nil { + log.NewError("", "not find seqUid", seqUid, value, uid, seqList) + continue } - if isContainInt64(pChat.RecvSeq, seqList) { - temp.SendID = pChat.SendID - temp.RecvID = pChat.RecvID - temp.MsgFrom = pChat.MsgFrom - temp.Seq = pChat.RecvSeq - temp.ServerMsgID = pChat.MsgID - temp.SendTime = pChat.SendTime - temp.Content = pChat.Content - temp.ContentType = pChat.ContentType - temp.SenderPlatformID = pChat.PlatformID - temp.ClientMsgID = pChat.ClientMsgID - temp.SenderFaceURL = pChat.SenderFaceURL - temp.SenderNickName = pChat.SenderNickName - if pChat.RecvSeq > MaxSeq { - MaxSeq = pChat.RecvSeq + singleCount = 0 + for i := 0; i < len(sChat.Msg); i++ { + temp := new(pbMsg.MsgFormat) + if err = proto.Unmarshal(sChat.Msg[i].Msg, &pChat); err != nil { + log.NewError("", "not find seqUid", seqUid, value, uid, seqList) + return nil, nil, MaxSeq, MinSeq, err } - if count == 0 { - MinSeq = pChat.RecvSeq - } - if pChat.RecvSeq < MinSeq { - MinSeq = pChat.RecvSeq - } - if pChat.SessionType == constant.SingleChatType { - SingleMsg = append(SingleMsg, temp) - } else { - GroupMsg = append(GroupMsg, temp) - } - count++ - if count == len(seqList) { - break + if isContainInt64(pChat.RecvSeq, value) { + temp.SendID = pChat.SendID + temp.RecvID = pChat.RecvID + temp.MsgFrom = pChat.MsgFrom + temp.Seq = pChat.RecvSeq + temp.ServerMsgID = pChat.MsgID + temp.SendTime = pChat.SendTime + temp.Content = pChat.Content + temp.ContentType = pChat.ContentType + temp.SenderPlatformID = pChat.PlatformID + temp.ClientMsgID = pChat.ClientMsgID + temp.SenderFaceURL = pChat.SenderFaceURL + temp.SenderNickName = pChat.SenderNickName + if pChat.RecvSeq > MaxSeq { + MaxSeq = pChat.RecvSeq + } + if allCount == 0 { + MinSeq = pChat.RecvSeq + } + if pChat.RecvSeq < MinSeq { + MinSeq = pChat.RecvSeq + } + if pChat.SessionType == constant.SingleChatType { + SingleMsg = append(SingleMsg, temp) + } else { + GroupMsg = append(GroupMsg, temp) + } + allCount++ + singleCount++ + if singleCount == len(value) { + break + } } } + if allCount == len(seqList) { + break + } } - return SingleMsg, GroupMsg, MaxSeq, MinSeq, nil } -func (d *DataBases) SaveUserChat(uid string, sendTime int64, m proto.Message) error { +func (d *DataBases) SaveUserChat(uid string, sendTime int64, m *pbMsg.MsgSvrToPushSvrChatMsg) error { + var seqUid string newTime := getCurrentTimestampByMill() session := d.mgoSession.Clone() if session == nil { @@ -150,8 +174,8 @@ func (d *DataBases) SaveUserChat(uid string, sendTime int64, m proto.Message) er defer session.Close() log.NewInfo("", "get mgoSession cost time", getCurrentTimestampByMill()-newTime) c := session.DB(config.Config.Mongo.DBDatabase).C(cChat) - - n, err := c.Find(bson.M{"uid": uid}).Count() + seqUid = getSeqUid(uid, m.RecvSeq) + n, err := c.Find(bson.M{"uid": seqUid}).Count() if err != nil { return err } @@ -161,17 +185,16 @@ func (d *DataBases) SaveUserChat(uid string, sendTime int64, m proto.Message) er if sMsg.Msg, err = proto.Marshal(m); err != nil { return err } - if n == 0 { sChat := UserChat{} - sChat.UID = uid + sChat.UID = seqUid sChat.Msg = append(sChat.Msg, sMsg) err = c.Insert(&sChat) if err != nil { return err } } else { - err = c.Update(bson.M{"uid": uid}, bson.M{"$push": bson.M{"msg": sMsg}}) + err = c.Update(bson.M{"uid": seqUid}, bson.M{"$push": bson.M{"msg": sMsg}}) if err != nil { return err } @@ -304,3 +327,7 @@ func isContainInt64(target int64, List []int64) bool { func getCurrentTimestampByMill() int64 { return time.Now().UnixNano() / 1e6 } +func getSeqUid(uid string, seq int64) string { + seqSuffix := seq / singleGocMsgNum + return uid + ":" + strconv.FormatInt(seqSuffix, 10) +}