From d4f81f086cf496ae26eed03987e1a9ea0804f622 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Fri, 20 May 2022 11:00:11 +0800 Subject: [PATCH] batch to mongo --- internal/msg_transfer/logic/db.go | 23 ++++++--- pkg/common/db/batch_insert_chat.go | 78 ++++++++++++++++++++++++++++++ pkg/common/db/mongoModel.go | 47 +++++++++++++++++- 3 files changed, 139 insertions(+), 9 deletions(-) create mode 100644 pkg/common/db/batch_insert_chat.go diff --git a/internal/msg_transfer/logic/db.go b/internal/msg_transfer/logic/db.go index 98a684cc7..74106999c 100644 --- a/internal/msg_transfer/logic/db.go +++ b/internal/msg_transfer/logic/db.go @@ -2,20 +2,27 @@ package logic import ( "Open_IM/pkg/common/db" + "Open_IM/pkg/common/log" pbMsg "Open_IM/pkg/proto/chat" + "Open_IM/pkg/utils" ) func saveUserChat(uid string, msg *pbMsg.MsgDataToMQ) error { - //time := utils.GetCurrentTimestampByMill() - //seq, err := db.DB.IncrUserSeq(uid) - //if err != nil { - // log.NewError(msg.OperationID, "data insert to redis err", err.Error(), msg.String()) - // return err - //} - //msg.MsgData.Seq = uint32(seq) + time := utils.GetCurrentTimestampByMill() + seq, err := db.DB.IncrUserSeq(uid) + if err != nil { + log.NewError(msg.OperationID, "data insert to redis err", err.Error(), msg.String()) + return err + } + msg.MsgData.Seq = uint32(seq) pbSaveData := pbMsg.MsgDataToDB{} pbSaveData.MsgData = msg.MsgData - //log.NewInfo(msg.OperationID, "IncrUserSeq cost time", utils.GetCurrentTimestampByMill()-time) + log.NewInfo(msg.OperationID, "IncrUserSeq cost time", utils.GetCurrentTimestampByMill()-time) return db.DB.SaveUserChatMongo2(uid, pbSaveData.MsgData.SendTime, &pbSaveData) // return db.DB.SaveUserChatMongo2(uid, pbSaveData.MsgData.SendTime, &pbSaveData) } + +func saveUserChatList(uid string, msgList []*pbMsg.MsgDataToMQ, operationID string) error { + log.Info(operationID, utils.GetSelfFuncName(), "args ", uid, len(msgList)) + return db.DB.BatchInsertChat(uid, msgList, operationID) +} diff --git a/pkg/common/db/batch_insert_chat.go b/pkg/common/db/batch_insert_chat.go new file mode 100644 index 000000000..9c23244f6 --- /dev/null +++ b/pkg/common/db/batch_insert_chat.go @@ -0,0 +1,78 @@ +package db + +import ( + "Open_IM/pkg/common/config" + "Open_IM/pkg/common/log" + pbMsg "Open_IM/pkg/proto/chat" + "Open_IM/pkg/utils" + "context" + "errors" + "github.com/garyburd/redigo/redis" + "github.com/golang/protobuf/proto" + "go.mongodb.org/mongo-driver/bson" + "time" +) + +func (d *DataBases) BatchInsertChat(userID string, msgList []*pbMsg.MsgDataToMQ, operationID string) error { + if len(msgList) > GetSingleGocMsgNum() { + return errors.New("too large") + } + currentMaxSeq, err := d.GetUserMaxSeq(userID) + if err == nil { + + } else if err == redis.ErrNil { + currentMaxSeq = 0 + } else { + return utils.Wrap(err, "") + } + + remain := currentMaxSeq % uint64(GetSingleGocMsgNum()) + insertCounter := uint64(0) + msgListToMongo := make([]MsgInfo, 0) + msgListToMongoNext := make([]MsgInfo, 0) + seqUid := "" + seqUidNext := "" + for _, m := range msgList { + currentMaxSeq++ + sMsg := MsgInfo{} + sMsg.SendTime = m.MsgData.SendTime + m.MsgData.Seq = uint32(currentMaxSeq) + if sMsg.Msg, err = proto.Marshal(m.MsgData); err != nil { + return utils.Wrap(err, "") + } + if insertCounter < remain { + msgListToMongo = append(msgListToMongo, sMsg) + insertCounter++ + seqUid = getSeqUid(userID, uint32(currentMaxSeq)) + } else { + msgListToMongoNext = append(msgListToMongoNext, sMsg) + seqUidNext = getSeqUid(userID, uint32(currentMaxSeq)) + } + } + ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) + newTime := getCurrentTimestampByMill() + + if seqUid != "" { + filter := bson.M{"uid": seqUid} + log.NewDebug(operationID, "filter ", seqUid) + err := c.FindOneAndUpdate(ctx, filter, bson.M{"$push": bson.M{"msg": bson.M{"$each": msgListToMongo}}}).Err() + if err != nil { + log.Error(operationID, "FindOneAndUpdate failed ", err.Error(), filter) + return utils.Wrap(err, "") + } + } + + if seqUidNext != "" { + filter := bson.M{"uid": seqUidNext} + sChat := UserChat{} + sChat.UID = seqUidNext + sChat.Msg = msgListToMongoNext + if _, err = c.InsertOne(ctx, &sChat); err != nil { + log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat) + return utils.Wrap(err, "") + } + } + log.NewDebug(operationID, "find mgo uid cost time", getCurrentTimestampByMill()-newTime) + return nil +} diff --git a/pkg/common/db/mongoModel.go b/pkg/common/db/mongoModel.go index 08d605113..96206c6f2 100644 --- a/pkg/common/db/mongoModel.go +++ b/pkg/common/db/mongoModel.go @@ -31,6 +31,10 @@ const cWorkMoment = "work_moment" const cCommentMsg = "comment_msg" const singleGocMsgNum = 5000 +func GetSingleGocMsgNum() int { + return singleGocMsgNum +} + type MsgInfo struct { SendTime int64 Msg []byte @@ -351,7 +355,7 @@ func (d *DataBases) SaveUserChatMongo2(uid string, sendTime int64, m *pbMsg.MsgD return utils.Wrap(err, "") } err = c.FindOneAndUpdate(ctx, filter, bson.M{"$push": bson.M{"msg": sMsg}}).Err() - log.NewDebug(operationID, "get mgoSession cost time", getCurrentTimestampByMill()-newTime) + log.NewWarn(operationID, "get mgoSession cost time", getCurrentTimestampByMill()-newTime) if err != nil { sChat := UserChat{} sChat.UID = seqUid @@ -368,6 +372,47 @@ func (d *DataBases) SaveUserChatMongo2(uid string, sendTime int64, m *pbMsg.MsgD return nil } +// +//func (d *DataBases) SaveUserChatListMongo2(uid string, sendTime int64, msgList []*pbMsg.MsgDataToDB) error { +// ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) +// c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) +// newTime := getCurrentTimestampByMill() +// operationID := "" +// seqUid := "" +// msgListToMongo := make([]MsgInfo, 0) +// +// for _, m := range msgList { +// seqUid = getSeqUid(uid, m.MsgData.Seq) +// var err error +// sMsg := MsgInfo{} +// sMsg.SendTime = sendTime +// if sMsg.Msg, err = proto.Marshal(m.MsgData); err != nil { +// return utils.Wrap(err, "") +// } +// msgListToMongo = append(msgListToMongo, sMsg) +// } +// +// filter := bson.M{"uid": seqUid} +// log.NewDebug(operationID, "filter ", seqUid) +// err := c.FindOneAndUpdate(ctx, filter, bson.M{"$push": bson.M{"msg": bson.M{"$each": msgListToMongo}}}).Err() +// log.NewWarn(operationID, "get mgoSession cost time", getCurrentTimestampByMill()-newTime) +// if err != nil { +// sChat := UserChat{} +// sChat.UID = seqUid +// sChat.Msg = msgListToMongo +// +// if _, err = c.InsertOne(ctx, &sChat); err != nil { +// log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat) +// return utils.Wrap(err, "") +// } +// } else { +// log.NewDebug(operationID, "FindOneAndUpdate ok", filter) +// } +// +// log.NewDebug(operationID, "find mgo uid cost time", getCurrentTimestampByMill()-newTime) +// return nil +//} + func (d *DataBases) SaveUserChat(uid string, sendTime int64, m *pbMsg.MsgDataToDB) error { var seqUid string newTime := getCurrentTimestampByMill()