diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 62abb2bc7..d29fa998a 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -3,6 +3,7 @@ package controller import ( "fmt" "sort" + "strconv" "sync" "time" @@ -141,6 +142,82 @@ func (db *commonMsgDatabase) MsgToMongoMQ(ctx context.Context, key, conversation return nil } +func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationID string, msgList []*unRelationTb.MsgInfoModel, firstSeq int64) error { + if len(msgList) == 0 { + return nil + } + num := db.msg.GetSingleGocMsgNum() + if msgList[0].Msg != nil { + firstSeq = msgList[0].Msg.Seq + } + getDocID := func(seq int64) string { + return conversationID + ":" + strconv.FormatInt(seq/num, 10) + } + getIndex := func(seq int64) int64 { + return seq % num + } + // 返回值为true表示数据库存在该文档,false表示数据库不存在该文档 + updateMsgModel := func(docID string, index int64, msg *unRelationTb.MsgInfoModel) (bool, error) { + var ( + res *mongo.UpdateResult + err error + ) + if msg.Msg != nil { + res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "msg", msgList[0].Msg) + } else if msg.Revoke != nil { + res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "revoke", msgList[0].Revoke) + } else if msg.DelList != nil { + res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "del_list", msgList[0].DelList) + } else if msg.ReadList != nil { + res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "read_list", msgList[0].ReadList) + } else { + return false, errs.ErrArgs.Wrap("msg all field is nil") + } + if err != nil { + return false, err + } + return res.MatchedCount > 0, nil + } + tryUpdate := true + for i := 0; i < len(msgList); i++ { + msg := msgList[i] + seq := firstSeq + int64(i) + docID := getDocID(seq) + if tryUpdate { + matched, err := updateMsgModel(docID, getIndex(seq), msg) + if err != nil { + return err + } + if matched { + continue + } + } + doc := unRelationTb.MsgDocModel{ + DocID: docID, + Msg: make([]unRelationTb.MsgInfoModel, num), + } + var insert int + for j := i; j < len(msgList); j++ { + if getDocID(seq) != docID { + break + } + insert++ + doc.Msg[getIndex(seq)] = *msgList[j] + } + if err := db.msgDocDatabase.Create(ctx, &doc); err != nil { + if mongo.IsDuplicateKeyError(err) { + i-- + tryUpdate = true + continue + } + return err + } + tryUpdate = false + i += insert - 1 + } + return nil +} + func (db *commonMsgDatabase) BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error { num := db.msg.GetSingleGocMsgNum() currentIndex := currentMaxSeq / num diff --git a/pkg/common/db/controller/msg_test.go b/pkg/common/db/controller/msg_test.go index d7cf3db86..00d2ac2ea 100644 --- a/pkg/common/db/controller/msg_test.go +++ b/pkg/common/db/controller/msg_test.go @@ -38,6 +38,7 @@ func Test_BatchInsertChat2DB(t *testing.T) { db := &commonMsgDatabase{ msgDocDatabase: unrelation.NewMsgMongoDriver(mongo.GetDatabase()), } + //ctx := context.Background() //msgs := make([]*sdkws.MsgData, 0, 1) //for i := 0; i < cap(msgs); i++ { diff --git a/pkg/common/db/table/unrelation/msg.go b/pkg/common/db/table/unrelation/msg.go index 788c21d4d..8ea183b44 100644 --- a/pkg/common/db/table/unrelation/msg.go +++ b/pkg/common/db/table/unrelation/msg.go @@ -2,10 +2,10 @@ package unrelation import ( "context" + "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" + "go.mongodb.org/mongo-driver/mongo" "strconv" "strings" - - "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" ) const ( @@ -20,18 +20,56 @@ type MsgDocModel struct { Msg []MsgInfoModel `bson:"msgs"` } +type RevokeModel struct { + UserID string `bson:"user_id"` + Nickname string `bson:"nickname"` + Time int64 `bson:"time"` +} + +type OfflinePushModel struct { + Title string `bson:"title"` + Desc string `bson:"desc"` + Ex string `bson:"ex"` + IOSPushSound string `bson:"ios_push_sound"` + IOSBadgeCount bool `bson:"ios_badge_count"` +} + +type MsgDataModel struct { + SendID string `bson:"send_id"` + RecvID string `bson:"recv_id"` + GroupID string `bson:"group_id"` + ClientMsgID string `bson:"client_msg_id"` + ServerMsgID string `bson:"server_msg_id"` + SenderPlatformID int32 `bson:"sender_platform_id"` + SenderNickname string `bson:"sender_nickname"` + SenderFaceURL string `bson:"sender_face_url"` + SessionType int32 `bson:"session_type"` + MsgFrom int32 `bson:"msg_from"` + ContentType int32 `bson:"content_type"` + Content []byte `bson:"content"` + Seq int64 `bson:"seq"` + SendTime int64 `bson:"send_time"` + CreateTime int64 `bson:"create_time"` + Status int32 `bson:"status"` + Options map[string]bool `bson:"options"` + OfflinePush *OfflinePushModel `bson:"offline_push"` + AtUserIDList []string `bson:"at_user_id_list"` + AttachedInfo string `bson:"attached_info"` + Ex string `bson:"ex"` +} + type MsgInfoModel struct { - SendTime int64 `bson:"sendtime"` - Msg []byte `bson:"msg"` - Revoke bool `bson:"revoke"` - ReadList []string `bson:"read_list"` - DelList []string `bson:"del_list"` + Msg *MsgDataModel `bson:"msg"` + Revoke *RevokeModel `bson:"revoke"` + DelList []string `bson:"del_list"` + ReadList []string `bson:"read_list"` } type MsgDocModelInterface interface { PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []MsgInfoModel) error Create(ctx context.Context, model *MsgDocModel) error - UpdateMsg(ctx context.Context, docID string, index int64, info *MsgInfoModel) error + UpdateMsg(ctx context.Context, docID string, index int64, key string, value any) (*mongo.UpdateResult, error) + PushUnique(ctx context.Context, docID string, index int64, key string, value any) (*mongo.UpdateResult, error) UpdateMsgContent(ctx context.Context, docID string, index int64, msg []byte) error IsExistDocID(ctx context.Context, docID string) (bool, error) UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error diff --git a/pkg/common/db/unrelation/msg.go b/pkg/common/db/unrelation/msg.go index b9ec543f2..e8edfe156 100644 --- a/pkg/common/db/unrelation/msg.go +++ b/pkg/common/db/unrelation/msg.go @@ -38,12 +38,41 @@ func (m *MsgMongoDriver) Create(ctx context.Context, model *table.MsgDocModel) e return err } -func (m *MsgMongoDriver) UpdateMsg(ctx context.Context, docID string, index int64, info *table.MsgInfoModel) error { - _, err := m.MsgCollection.UpdateOne(ctx, bson.M{"doc_id": docID}, bson.M{"$set": bson.M{fmt.Sprintf("msgs.%d", index): info}}) +func (m *MsgMongoDriver) UpdateMsg(ctx context.Context, docID string, index int64, key string, value any) (*mongo.UpdateResult, error) { + var field string + if key == "" { + field = fmt.Sprintf("msgs.%d", index) + } else { + field = fmt.Sprintf("msgs.%d.%s", index, key) + } + filter := bson.M{"doc_id": docID} + update := bson.M{"$set": bson.M{field: value}} + res, err := m.MsgCollection.UpdateOne(ctx, filter, update) if err != nil { - return utils.Wrap(err, "") + return nil, utils.Wrap(err, "") } - return nil + return res, nil +} + +// PushUnique value must slice +func (m *MsgMongoDriver) PushUnique(ctx context.Context, docID string, index int64, key string, value any) (*mongo.UpdateResult, error) { + var field string + if key == "" { + field = fmt.Sprintf("msgs.%d", index) + } else { + field = fmt.Sprintf("msgs.%d.%s", index, key) + } + filter := bson.M{"doc_id": docID} + update := bson.M{ + "$addToSet": bson.M{ + field: bson.M{"$each": value}, + }, + } + res, err := m.MsgCollection.UpdateOne(ctx, filter, update) + if err != nil { + return nil, utils.Wrap(err, "") + } + return res, nil } func (m *MsgMongoDriver) UpdateMsgContent(ctx context.Context, docID string, index int64, msg []byte) error {