From be01c4da0c9e1a3f03d6a3f9f19174071e1b525f Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 7 Dec 2022 21:12:48 +0800 Subject: [PATCH 1/2] dek jinzhu --- config/config.yaml | 4 + go.mod | 1 - go.sum | 2 - internal/api/client_init/init_config.go | 10 +-- internal/demo/register/ip_limit.go | 1 - .../logic/online_msg_to_mongo_handler.go | 2 +- internal/rpc/msg/callback.go | 6 ++ pkg/common/config/config.go | 1 + pkg/common/constant/constant.go | 4 + pkg/common/db/extend_msg_mongo_model.go | 79 +++++++++++++------ .../im_mysql_model/friend_model.go | 2 - .../im_mysql_model/invitation_model.go | 5 -- pkg/common/db/rocks_cache/rocks_cache.go | 2 +- pkg/proto/msg/msg.proto | 24 ++++++ pkg/proto/sdk_ws/ws.proto | 26 +++++- 15 files changed, 126 insertions(+), 43 deletions(-) diff --git a/config/config.yaml b/config/config.yaml index 2406108a3..966c613f4 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -345,6 +345,10 @@ callback: enable: false callbackTimeOut: 2 callbackFailedContinue: true # 回调超时是否继续 + callbackBeforeExtendMsgModify: + enable: false + callbackTimeOut: 2 + callbackFailedContinue: true # 回调超时是否继续 notification: groupCreated: diff --git a/go.mod b/go.mod index 3c231e7ff..330008db8 100644 --- a/go.mod +++ b/go.mod @@ -29,7 +29,6 @@ require ( github.com/gorilla/websocket v1.4.2 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/jinzhu/copier v0.3.4 - github.com/jinzhu/gorm v1.9.16 github.com/jonboulle/clockwork v0.2.2 // indirect github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible github.com/lestrrat-go/strftime v1.0.4 // indirect diff --git a/go.sum b/go.sum index d2621af33..c3a219fcb 100644 --- a/go.sum +++ b/go.sum @@ -368,8 +368,6 @@ github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZ github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/jinzhu/copier v0.3.4 h1:mfU6jI9PtCeUjkjQ322dlff9ELjGDu975C2p/nrubVI= github.com/jinzhu/copier v0.3.4/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg= -github.com/jinzhu/gorm v1.9.16 h1:+IyIjPEABKRpsu/F8OvDPy9fyQlgsg2luMV2ZIH5i5o= -github.com/jinzhu/gorm v1.9.16/go.mod h1:G3LB3wezTOWM2ITLzPxEXgSkOXAntiLHS7UdBefADcs= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.0.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= diff --git a/internal/api/client_init/init_config.go b/internal/api/client_init/init_config.go index de08fc90c..2138603c4 100644 --- a/internal/api/client_init/init_config.go +++ b/internal/api/client_init/init_config.go @@ -9,7 +9,6 @@ import ( "net/http" "github.com/gin-gonic/gin" - "github.com/jinzhu/gorm" ) func SetClientInitConfig(c *gin.Context) { @@ -62,11 +61,10 @@ func GetClientInitConfig(c *gin.Context) { } config, err := imdb.GetClientInitConfig() if err != nil { - if !gorm.IsRecordNotFoundError(err) { - log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error()) - c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": err.Error()}) - return - } + log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error()) + c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": err.Error()}) + return + } resp.Data.DiscoverPageURL = config.DiscoverPageURL log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp ", resp) diff --git a/internal/demo/register/ip_limit.go b/internal/demo/register/ip_limit.go index a94e51e74..401927613 100644 --- a/internal/demo/register/ip_limit.go +++ b/internal/demo/register/ip_limit.go @@ -10,7 +10,6 @@ import ( "github.com/gin-gonic/gin" - //"github.com/jinzhu/gorm" "net/http" "time" ) diff --git a/internal/msg_transfer/logic/online_msg_to_mongo_handler.go b/internal/msg_transfer/logic/online_msg_to_mongo_handler.go index f08b59965..11bc399ad 100644 --- a/internal/msg_transfer/logic/online_msg_to_mongo_handler.go +++ b/internal/msg_transfer/logic/online_msg_to_mongo_handler.go @@ -61,7 +61,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(cMsg *sarama.Con if unexistSeqList, err := db.DB.DelMsgBySeqList(DeleteMessageTips.UserID, DeleteMessageTips.SeqList, v.OperationID); err != nil { log.NewError(v.OperationID, utils.GetSelfFuncName(), "DelMsgBySeqList args: ", DeleteMessageTips.UserID, DeleteMessageTips.SeqList, v.OperationID, err.Error(), unexistSeqList) } - + //if v.MsgData.ContentType == ? {} } } } diff --git a/internal/rpc/msg/callback.go b/internal/rpc/msg/callback.go index 5fae1e133..a927e7afc 100644 --- a/internal/rpc/msg/callback.go +++ b/internal/rpc/msg/callback.go @@ -221,3 +221,9 @@ func callbackMsgModify(msg *pbChat.SendMsgReq) cbApi.CommonCallbackResp { log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), string(msg.MsgData.Content)) return callbackResp } + +func CallbackBeforeExtendMsgModify() cbApi.CommonCallbackResp { + callbackResp := cbApi.CommonCallbackResp{OperationID: msg.OperationID} + + return callbackResp +} diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 2adf33b47..e1942a66d 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -292,6 +292,7 @@ type config struct { CallbackBeforeAddFriend callBackConfig `yaml:"callbackBeforeAddFriend"` CallbackBeforeCreateGroup callBackConfig `yaml:"callbackBeforeCreateGroup"` CallbackBeforeMemberJoinGroup callBackConfig `yaml:"callbackBeforeMemberJoinGroup"` + CallbackBeforeExtendMsgModify callBackConfig `yaml:"callbackBeforeExtendMsgModify"` } `yaml:"callback"` Notification struct { ///////////////////////group///////////////////////////// diff --git a/pkg/common/constant/constant.go b/pkg/common/constant/constant.go index 719541316..ff2e0097f 100644 --- a/pkg/common/constant/constant.go +++ b/pkg/common/constant/constant.go @@ -112,6 +112,10 @@ const ( WorkMomentNotificationBegin = 1900 WorkMomentNotification = 1901 + BusinessNotificationBegin = 2000 + BusinessNotification = 2001 + BusinessNotificationEnd = 2099 + NotificationEnd = 3000 //status diff --git a/pkg/common/db/extend_msg_mongo_model.go b/pkg/common/db/extend_msg_mongo_model.go index 7ee7c0650..f25731aea 100644 --- a/pkg/common/db/extend_msg_mongo_model.go +++ b/pkg/common/db/extend_msg_mongo_model.go @@ -6,6 +6,7 @@ import ( "context" "fmt" "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo/options" "strconv" "time" @@ -15,24 +16,26 @@ import ( const cExtendMsgSet = "extend_msg_set" type ExtendMsgSet struct { - ID string `bson:"id" json:"ID"` - ExtendMsgs []*ExtendMsg `bson:"extend_msgs" json:"extendMsgs"` - LatestUpdateTime int32 `bson:"latest_update_time" json:"latestUpdateTime"` - AttachedInfo *string `bson:"attached_info" json:"attachedInfo"` - Ex *string `bson:"ex" json:"ex"` - ExtendMsgNum int32 `bson:"extend_msg_num" json:"extendMsgNum"` - CreateTime int32 `bson:"create_time" json:"createTime"` + ID string `bson:"id" json:"ID"` + ExtendMsgs map[string]ExtendMsg `bson:"extend_msgs" json:"extendMsgs"` + LatestUpdateTime int32 `bson:"latest_update_time" json:"latestUpdateTime"` + AttachedInfo *string `bson:"attached_info" json:"attachedInfo"` + Ex *string `bson:"ex" json:"ex"` + ExtendMsgNum int32 `bson:"extend_msg_num" json:"extendMsgNum"` + CreateTime int32 `bson:"create_time" json:"createTime"` } type ReactionExtendMsgSet struct { - UserKey string `bson:"user_key" json:"userKey"` - Value string `bson:"value" json:"value"` + UserKey string `bson:"user_key" json:"userKey"` + Value string `bson:"value" json:"value"` + LatestUpdateTime int32 `bson:"latest_update_time" json:"latestUpdateTime"` } type ExtendMsg struct { - Content []*ReactionExtendMsgSet `bson:"content" json:"content"` - ClientMsgID string `bson:"client_msg_id" json:"clientMsgID"` - CreateTime int32 `bson:"create_time" json:"createTime"` + Content []*ReactionExtendMsgSet `bson:"content" json:"content"` + ClientMsgID string `bson:"client_msg_id" json:"clientMsgID"` + CreateTime int32 `bson:"create_time" json:"createTime"` + LatestUpdateTime int32 `bson:"latest_update_time" json:"latestUpdateTime"` } func GetExtendMsgSetID(ID string, index int32) string { @@ -46,11 +49,22 @@ func (d *DataBases) CreateExtendMsgSet(set *ExtendMsgSet) error { return err } -func (d *DataBases) GetAllExtendMsgSet(ID string) (sets []*ExtendMsgSet, err error) { +type GetAllExtendMsgSetOpts struct { + ExcludeExtendMsgs bool +} + +func (d *DataBases) GetAllExtendMsgSet(ID string, opts *GetAllExtendMsgSetOpts) (sets []*ExtendMsgSet, err error) { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) regex := fmt.Sprintf("^%s", ID) - cursor, err := c.Find(ctx, bson.M{"uid": primitive.Regex{Pattern: regex}}) + var findOpts *options.FindOptions + if opts != nil { + if opts.ExcludeExtendMsgs { + findOpts = &options.FindOptions{} + findOpts.SetProjection(bson.M{"extend_msgs": 0}) + } + } + cursor, err := c.Find(ctx, bson.M{"uid": primitive.Regex{Pattern: regex}}, findOpts) if err != nil { return nil, utils.Wrap(err, "") } @@ -62,43 +76,62 @@ func (d *DataBases) GetAllExtendMsgSet(ID string) (sets []*ExtendMsgSet, err err } type GetExtendMsgSetOpts struct { - IncludeExtendMsgs bool + ExcludeExtendMsgs bool } func (d *DataBases) GetExtendMsgSet(ID string, index int32, opts *GetExtendMsgSetOpts) (*ExtendMsgSet, error) { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) var set ExtendMsgSet - err := c.FindOne(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index)}).Decode(&set) + var findOneOpt *options.FindOneOptions + if opts != nil { + if opts.ExcludeExtendMsgs { + findOneOpt = &options.FindOneOptions{} + findOneOpt.SetProjection(bson.M{"extend_msgs": 0}) + } + } + err := c.FindOne(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index)}, findOneOpt).Decode(&set) return &set, err } -func (d *DataBases) InsertExtendMsg(ID string, index int32, msg *ExtendMsg) (msgIndex int32, err error) { +func (d *DataBases) InsertExtendMsgAndGetIndex(ID string, index int32, msg *ExtendMsg) (msgIndex int32, err error) { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) - result := c.FindOneAndUpdate(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index)}, bson.M{"$set": bson.M{"create_time": utils.GetCurrentTimestampBySecond(), "$inc": bson.M{"extend_msg_num": 1}, "$push": bson.M{"extend_msgs": msg}}}) + result := c.FindOneAndUpdate(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index)}, bson.M{"$set": bson.M{"latest_update_time": utils.GetCurrentTimestampBySecond(), "$inc": bson.M{"extend_msg_num": 1}, "$push": bson.M{"extend_msgs": msg}}}) set := &ExtendMsgSet{} err = result.Decode(set) return set.ExtendMsgNum, err } -func (d *DataBases) UpdateOneExtendMsgSet(ID string, index, MsgIndex int32, userIndex string, value string) error { +func (d *DataBases) InsertOrUpdateReactionExtendMsgSet(ID string, index, msgIndex int32, userID, value string) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) - _, err := c.UpdateOne(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index)}, bson.M{}) + reactionExtendMsgSet := ReactionExtendMsgSet{ + UserKey: userID, + Value: value, + LatestUpdateTime: int32(utils.GetCurrentTimestampBySecond()), + } + upsert := true + opt := &options.UpdateOptions{ + Upsert: &upsert, + } + //s := fmt.Sprintf("extend_msgs.%d.content", msgIndex) + _, err := c.UpdateOne(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index), "extend_msgs": bson.M{"$slice": msgIndex}}, bson.M{"$set": bson.M{"latest_update_time": utils.GetCurrentTimestampBySecond()}, "&push": bson.M{"content": &reactionExtendMsgSet}}, opt) return err } -func (d *DataBases) DelOneExtendMsgSetUserKey(ID string, index, MsgIndex int32, userIndex string, userID string) error { +func (d *DataBases) DeleteReactionExtendMsgSet(ID string, index, msgIndex int32, userID string) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) - _, err := c.DeleteOne(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index)}) + //s := fmt.Sprintf("extend_msgs.%d.content", msgIndex) + _, err := c.DeleteOne(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index), "extend_msgs": bson.M{"$slice": msgIndex}}) return err } +// by index start end func (d *DataBases) GetExtendMsgList(ID string, index, msgStartIndex, msgEndIndex int32) (extendMsgList []*ExtendMsg, err error) { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) - err = c.FindOne(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index)}).Decode(&extendMsgList) + err = c.FindOne(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index), "extend_msgs": bson.M{"$slice": []int32{msgStartIndex, msgEndIndex}}}).Decode(&extendMsgList) return extendMsgList, err } diff --git a/pkg/common/db/mysql_model/im_mysql_model/friend_model.go b/pkg/common/db/mysql_model/im_mysql_model/friend_model.go index 7cae9f9d0..4e4340d84 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/friend_model.go +++ b/pkg/common/db/mysql_model/im_mysql_model/friend_model.go @@ -4,8 +4,6 @@ import ( "Open_IM/pkg/common/db" "fmt" "time" - - _ "github.com/jinzhu/gorm/dialects/mysql" ) func InsertToFriend(toInsertFollow *db.Friend) error { diff --git a/pkg/common/db/mysql_model/im_mysql_model/invitation_model.go b/pkg/common/db/mysql_model/im_mysql_model/invitation_model.go index ddbe974c5..80d2f768a 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/invitation_model.go +++ b/pkg/common/db/mysql_model/im_mysql_model/invitation_model.go @@ -5,8 +5,6 @@ import ( "errors" "math/rand" "time" - - "github.com/jinzhu/gorm" ) /** @@ -89,9 +87,6 @@ func GetInvitationCode(code string) (*db.Invitation, error) { InvitationCode: code, } err := db.DB.MysqlDB.DefaultGormDB().Model(invitation).Find(invitation).Error - if gorm.IsRecordNotFoundError(err) { - return invitation, nil - } return invitation, err } diff --git a/pkg/common/db/rocks_cache/rocks_cache.go b/pkg/common/db/rocks_cache/rocks_cache.go index e95557ad3..f5b98869c 100644 --- a/pkg/common/db/rocks_cache/rocks_cache.go +++ b/pkg/common/db/rocks_cache/rocks_cache.go @@ -569,7 +569,7 @@ func DelConversationFromCache(ownerUserID, conversationID string) error { func GetExtendMsgSetFromCache(ID string, index int32) (*db.ExtendMsgSet, error) { getExtendMsgSet := func() (string, error) { - extendMsgSet, err := db.DB.GetExtendMsgSet(ID, index, &db.GetExtendMsgSetOpts{IncludeExtendMsgs: false}) + extendMsgSet, err := db.DB.GetExtendMsgSet(ID, index, &db.GetExtendMsgSetOpts{ExcludeExtendMsgs: false}) if err != nil { return "", utils.Wrap(err, "GetExtendMsgSet failed") } diff --git a/pkg/proto/msg/msg.proto b/pkg/proto/msg/msg.proto index c32a4ed47..38144eec9 100644 --- a/pkg/proto/msg/msg.proto +++ b/pkg/proto/msg/msg.proto @@ -1,5 +1,6 @@ syntax = "proto3"; import "Open-IM-Server/pkg/proto/sdk_ws/ws.proto"; +import "Open-IM-Server/pkg/proto/sdk_ws/wrappers.proto"; option go_package = "Open_IM/pkg/proto/msg;msg"; package msg; @@ -153,6 +154,26 @@ message GetWriteDiffMsgResp{ server_api_params.MsgData msgData = 3; } +message ModifyMsgReq { + string operationID = 1; + string ID = 2; + int32 Index = 3; + int32 msgIndex = 4; + string opUserID = 5; + string userID = 6; + bool isFirstModify = 7; + string clientMsgID = 8; + google.protobuf.StringValue value = 9; + google.protobuf.StringValue ex = 10; + google.protobuf.StringValue attachedInfo = 11; +} + +message ModifyMsgResp { + int32 errCode = 1; + string errMsg = 2; + int32 Index = 4; + int32 msgIndex = 3; +} service msg { rpc GetMaxAndMinSeq(server_api_params.GetMaxAndMinSeqReq) returns(server_api_params.GetMaxAndMinSeqResp); @@ -166,4 +187,7 @@ service msg { rpc GetSendMsgStatus(GetSendMsgStatusReq) returns(GetSendMsgStatusResp); rpc GetSuperGroupMsg(GetSuperGroupMsgReq) returns(GetSuperGroupMsgResp); rpc GetWriteDiffMsg(GetWriteDiffMsgReq) returns(GetWriteDiffMsgResp); + + // modify msg + rpc ModifyMsg(ModifyMsgReq) returns(ModifyMsgResp); } diff --git a/pkg/proto/sdk_ws/ws.proto b/pkg/proto/sdk_ws/ws.proto index e7b3932b1..b8c232965 100644 --- a/pkg/proto/sdk_ws/ws.proto +++ b/pkg/proto/sdk_ws/ws.proto @@ -704,4 +704,28 @@ message SetAppBackgroundStatusReq { message SetAppBackgroundStatusResp { int32 errCode = 1; string errMsg = 2; -} \ No newline at end of file +} + +message ExtendMsgSet { + string ID = 1; + repeated ExtendMsg extendMsgs = 2; + int32 latestUpdateTime = 3; + string attachedInfo = 4; + string ex = 5; + int32 extendMsgNum = 6; + int32 createTime = 7; +} + +message ExtendMsg { + repeated ReactionExtendMsgSet content = 1; + string clientMsgID = 2; + int32 createTime = 3; + int32 latestUpdateTime = 4; +} + +message ReactionExtendMsgSet { + string userKey = 1; + string value = 2; + int32 latestUpdateTime = 3; +} + From 0e726a20ba079745bd96b2df56fa1d34eb92b7fd Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 8 Dec 2022 12:54:39 +0800 Subject: [PATCH 2/2] modify --- internal/rpc/msg/callback.go | 2 +- pkg/common/db/extend_msg_mongo_model.go | 36 +++++++++++------------- pkg/common/db/rocks_cache/rocks_cache.go | 15 ++++------ 3 files changed, 24 insertions(+), 29 deletions(-) diff --git a/internal/rpc/msg/callback.go b/internal/rpc/msg/callback.go index a927e7afc..b4ae38418 100644 --- a/internal/rpc/msg/callback.go +++ b/internal/rpc/msg/callback.go @@ -223,7 +223,7 @@ func callbackMsgModify(msg *pbChat.SendMsgReq) cbApi.CommonCallbackResp { } func CallbackBeforeExtendMsgModify() cbApi.CommonCallbackResp { - callbackResp := cbApi.CommonCallbackResp{OperationID: msg.OperationID} + callbackResp := cbApi.CommonCallbackResp{OperationID: ""} return callbackResp } diff --git a/pkg/common/db/extend_msg_mongo_model.go b/pkg/common/db/extend_msg_mongo_model.go index f25731aea..45e97ee25 100644 --- a/pkg/common/db/extend_msg_mongo_model.go +++ b/pkg/common/db/extend_msg_mongo_model.go @@ -13,7 +13,7 @@ import ( "go.mongodb.org/mongo-driver/bson" ) -const cExtendMsgSet = "extend_msg_set" +const cExtendMsgSet = "extend_msgs" type ExtendMsgSet struct { ID string `bson:"id" json:"ID"` @@ -32,10 +32,10 @@ type ReactionExtendMsgSet struct { } type ExtendMsg struct { - Content []*ReactionExtendMsgSet `bson:"content" json:"content"` - ClientMsgID string `bson:"client_msg_id" json:"clientMsgID"` - CreateTime int32 `bson:"create_time" json:"createTime"` - LatestUpdateTime int32 `bson:"latest_update_time" json:"latestUpdateTime"` + Content map[string]ReactionExtendMsgSet `bson:"content" json:"content"` + ClientMsgID string `bson:"client_msg_id" json:"clientMsgID"` + CreateTime int32 `bson:"create_time" json:"createTime"` + LatestUpdateTime int32 `bson:"latest_update_time" json:"latestUpdateTime"` } func GetExtendMsgSetID(ID string, index int32) string { @@ -94,16 +94,16 @@ func (d *DataBases) GetExtendMsgSet(ID string, index int32, opts *GetExtendMsgSe return &set, err } -func (d *DataBases) InsertExtendMsgAndGetIndex(ID string, index int32, msg *ExtendMsg) (msgIndex int32, err error) { +// first modify msg +func (d *DataBases) InsertExtendMsgAndGetIndex(ID string, index int32, msg *ExtendMsg) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) - result := c.FindOneAndUpdate(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index)}, bson.M{"$set": bson.M{"latest_update_time": utils.GetCurrentTimestampBySecond(), "$inc": bson.M{"extend_msg_num": 1}, "$push": bson.M{"extend_msgs": msg}}}) - set := &ExtendMsgSet{} - err = result.Decode(set) - return set.ExtendMsgNum, err + _, err := c.UpdateOne(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index)}, bson.M{"$set": bson.M{"latest_update_time": utils.GetCurrentTimestampBySecond(), "$inc": bson.M{"extend_msg_num": 1}, fmt.Sprintf("extend_msgs.%s", msg.ClientMsgID): msg}}) + return err } -func (d *DataBases) InsertOrUpdateReactionExtendMsgSet(ID string, index, msgIndex int32, userID, value string) error { +// insert or update +func (d *DataBases) InsertOrUpdateReactionExtendMsgSet(ID string, index int32, clientMsgID, userID, value string) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) reactionExtendMsgSet := ReactionExtendMsgSet{ @@ -115,23 +115,21 @@ func (d *DataBases) InsertOrUpdateReactionExtendMsgSet(ID string, index, msgInde opt := &options.UpdateOptions{ Upsert: &upsert, } - //s := fmt.Sprintf("extend_msgs.%d.content", msgIndex) - _, err := c.UpdateOne(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index), "extend_msgs": bson.M{"$slice": msgIndex}}, bson.M{"$set": bson.M{"latest_update_time": utils.GetCurrentTimestampBySecond()}, "&push": bson.M{"content": &reactionExtendMsgSet}}, opt) + _, err := c.UpdateOne(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index)}, bson.M{"$set": bson.M{"latest_update_time": utils.GetCurrentTimestampBySecond()}, fmt.Sprintf("extend_msgs.%s.%s", clientMsgID, userID): reactionExtendMsgSet}, opt) return err } -func (d *DataBases) DeleteReactionExtendMsgSet(ID string, index, msgIndex int32, userID string) error { +func (d *DataBases) DeleteReactionExtendMsgSet(ID string, index int32, clientMsgID, userID string) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) - //s := fmt.Sprintf("extend_msgs.%d.content", msgIndex) - _, err := c.DeleteOne(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index), "extend_msgs": bson.M{"$slice": msgIndex}}) + _, err := c.UpdateOne(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index)}, bson.M{"$unset": bson.M{}}) return err } // by index start end -func (d *DataBases) GetExtendMsgList(ID string, index, msgStartIndex, msgEndIndex int32) (extendMsgList []*ExtendMsg, err error) { +func (d *DataBases) GetExtendMsgList(ID string, index int32, clientMsgID string) (extendMsg *ExtendMsg, err error) { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) - err = c.FindOne(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index), "extend_msgs": bson.M{"$slice": []int32{msgStartIndex, msgEndIndex}}}).Decode(&extendMsgList) - return extendMsgList, err + err = c.FindOne(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index), "extend_msgs": bson.M{}}).Decode(&extendMsg) + return extendMsg, err } diff --git a/pkg/common/db/rocks_cache/rocks_cache.go b/pkg/common/db/rocks_cache/rocks_cache.go index f5b98869c..3edb9f238 100644 --- a/pkg/common/db/rocks_cache/rocks_cache.go +++ b/pkg/common/db/rocks_cache/rocks_cache.go @@ -595,23 +595,20 @@ func DelExtendMsgSetFromCache(ID string, index int32) error { return utils.Wrap(db.DB.Rc.TagAsDeleted(extendMsgSetCache+db.GetExtendMsgSetID(ID, index)), "DelExtendMsgSetFromCache err") } -func GetExtendMsg(ID string, index, extendMsgIndex int32) (*db.ExtendMsg, error) { +func GetExtendMsg(ID string, index int32, clientMsgID string) (*db.ExtendMsg, error) { getExtendMsg := func() (string, error) { - extendMsg, err := db.DB.GetExtendMsgList(ID, index, extendMsgIndex, extendMsgIndex+1) + extendMsg, err := db.DB.GetExtendMsgList(ID, index, clientMsgID) if err != nil { return "", utils.Wrap(err, "GetExtendMsgList failed") } - if len(extendMsg) == 0 { - return "", nil - } - bytes, err := json.Marshal(extendMsg[0]) + bytes, err := json.Marshal(extendMsg) if err != nil { return "", utils.Wrap(err, "Marshal failed") } return string(bytes), nil } - extendMsgStr, err := db.DB.Rc.Fetch(extendMsgCache+db.GetExtendMsgSetID(ID, index)+":"+strconv.Itoa(int(extendMsgIndex)), time.Second*30*60, getExtendMsg) + extendMsgStr, err := db.DB.Rc.Fetch(extendMsgCache+db.GetExtendMsgSetID(ID, index)+":"+clientMsgID, time.Second*30*60, getExtendMsg) if err != nil { return nil, utils.Wrap(err, "Fetch failed") } @@ -623,6 +620,6 @@ func GetExtendMsg(ID string, index, extendMsgIndex int32) (*db.ExtendMsg, error) return extendMsg, nil } -func DelExtendMsg(ID string, index, extendMsgIndex int32) error { - return utils.Wrap(db.DB.Rc.TagAsDeleted(extendMsgCache+db.GetExtendMsgSetID(ID, index)+":"+strconv.Itoa(int(extendMsgIndex))), "DelExtendMsg err") +func DelExtendMsg(ID string, index int32, clientMsgID string) error { + return utils.Wrap(db.DB.Rc.TagAsDeleted(extendMsgCache+db.GetExtendMsgSetID(ID, index)+":"+clientMsgID), "DelExtendMsg err") }