From cf635918ae017298d28308a81eb64ddde042f06d Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Thu, 21 Mar 2024 18:14:24 +0800 Subject: [PATCH] feat: msg format --- internal/msggateway/hub_server.go | 4 +- internal/msggateway/n_ws_server.go | 6 +-- internal/rpc/msg/revoke.go | 4 +- internal/rpc/msg/statistics.go | 9 ++--- pkg/common/convert/msg.go | 10 ++--- pkg/common/db/controller/msg.go | 59 ++++++++++++++--------------- pkg/common/db/controller/user.go | 5 +-- pkg/common/db/mgo/msg.go | 5 +-- pkg/common/db/table/relation/msg.go | 3 ++ 9 files changed, 51 insertions(+), 54 deletions(-) diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index a19626c96..e29fbfc93 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -36,9 +36,9 @@ func (s *Server) InitServer(ctx context.Context, config *config.GlobalConfig, di return err } - msgModel := cache.NewMsgCacheModel(rdb, config.MsgCacheTimeout, &config.Redis) + tokenCacheModel := cache.NewTokenCacheModel(rdb) s.LongConnServer.SetDiscoveryRegistry(disCov, config) - s.LongConnServer.SetCacheHandler(msgModel) + s.LongConnServer.SetCacheHandler(tokenCacheModel) msggateway.RegisterMsgGatewayServer(server, s) return nil } diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index 5127b886b..cdc75c1e4 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -48,7 +48,7 @@ type LongConnServer interface { GetUserAllCons(userID string) ([]*Client, bool) GetUserPlatformCons(userID string, platform int) ([]*Client, bool, bool) Validate(s any) error - SetCacheHandler(cache cache.MsgModel) + SetCacheHandler(cache cache.TokenModel) SetDiscoveryRegistry(client discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) KickUserConn(client *Client) error UnRegister(c *Client) @@ -79,7 +79,7 @@ type WsServer struct { handshakeTimeout time.Duration writeBufferSize int validate *validator.Validate - cache cache.MsgModel + cache cache.TokenModel userClient *rpcclient.UserRpcClient disCov discoveryregistry.SvcDiscoveryRegistry Compressor @@ -119,7 +119,7 @@ func (ws *WsServer) SetUserOnlineStatus(ctx context.Context, client *Client, sta } } -func (ws *WsServer) SetCacheHandler(cache cache.MsgModel) { +func (ws *WsServer) SetCacheHandler(cache cache.TokenModel) { ws.cache = cache } diff --git a/internal/rpc/msg/revoke.go b/internal/rpc/msg/revoke.go index 212898ad6..fe0811f49 100644 --- a/internal/rpc/msg/revoke.go +++ b/internal/rpc/msg/revoke.go @@ -17,10 +17,10 @@ package msg import ( "context" "encoding/json" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" "time" "github.com/openimsdk/open-im-server/v3/pkg/authverify" - unrelationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" @@ -93,7 +93,7 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg. } } now := time.Now().UnixMilli() - err = m.MsgDatabase.RevokeMsg(ctx, req.ConversationID, req.Seq, &unrelationtb.RevokeModel{ + err = m.MsgDatabase.RevokeMsg(ctx, req.ConversationID, req.Seq, &relation.RevokeModel{ Role: role, UserID: req.UserID, Nickname: user.Nickname, diff --git a/internal/rpc/msg/statistics.go b/internal/rpc/msg/statistics.go index fcabdace5..b7f16ff9a 100644 --- a/internal/rpc/msg/statistics.go +++ b/internal/rpc/msg/statistics.go @@ -16,12 +16,11 @@ package msg import ( "context" - "time" - - "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/utils" + "time" ) func (m *msgServer) GetActiveUser(ctx context.Context, req *msg.GetActiveUserReq) (*msg.GetActiveUserResp, error) { @@ -39,7 +38,7 @@ func (m *msgServer) GetActiveUser(ctx context.Context, req *msg.GetActiveUserReq } var pbUsers []*msg.ActiveUser if len(users) > 0 { - userIDs := utils.Slice(users, func(e *unrelation.UserCount) string { return e.UserID }) + userIDs := utils.Slice(users, func(e *relation.UserCount) string { return e.UserID }) userMap, err := m.UserLocalCache.GetUsersInfoMap(ctx, userIDs) if err != nil { return nil, err @@ -81,7 +80,7 @@ func (m *msgServer) GetActiveGroup(ctx context.Context, req *msg.GetActiveGroupR } var pbgroups []*msg.ActiveGroup if len(groups) > 0 { - groupIDs := utils.Slice(groups, func(e *unrelation.GroupCount) string { return e.GroupID }) + groupIDs := utils.Slice(groups, func(e *relation.GroupCount) string { return e.GroupID }) resp, err := m.GroupLocalCache.GetGroupInfos(ctx, groupIDs) if err != nil { return nil, err diff --git a/pkg/common/convert/msg.go b/pkg/common/convert/msg.go index ab5193ab6..594a0ffc2 100644 --- a/pkg/common/convert/msg.go +++ b/pkg/common/convert/msg.go @@ -15,16 +15,16 @@ package convert import ( - "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/sdkws" ) -func MsgPb2DB(msg *sdkws.MsgData) *unrelation.MsgDataModel { +func MsgPb2DB(msg *sdkws.MsgData) *relation.MsgDataModel { if msg == nil { return nil } - var msgDataModel unrelation.MsgDataModel + var msgDataModel relation.MsgDataModel msgDataModel.SendID = msg.SendID msgDataModel.RecvID = msg.RecvID msgDataModel.GroupID = msg.GroupID @@ -43,7 +43,7 @@ func MsgPb2DB(msg *sdkws.MsgData) *unrelation.MsgDataModel { msgDataModel.Status = msg.Status msgDataModel.Options = msg.Options if msg.OfflinePushInfo != nil { - msgDataModel.OfflinePush = &unrelation.OfflinePushModel{ + msgDataModel.OfflinePush = &relation.OfflinePushModel{ Title: msg.OfflinePushInfo.Title, Desc: msg.OfflinePushInfo.Desc, Ex: msg.OfflinePushInfo.Ex, @@ -57,7 +57,7 @@ func MsgPb2DB(msg *sdkws.MsgData) *unrelation.MsgDataModel { return &msgDataModel } -func MsgDB2Pb(msgModel *unrelation.MsgDataModel) *sdkws.MsgData { +func MsgDB2Pb(msgModel *relation.MsgDataModel) *sdkws.MsgData { if msgModel == nil { return nil } diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 1c8bafc75..9fefcaafb 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -19,13 +19,12 @@ import ( "encoding/json" "errors" "github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" "time" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" - unrelationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" "github.com/openimsdk/open-im-server/v3/pkg/common/kafka" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/protocol/constant" @@ -48,7 +47,7 @@ type CommonMsgDatabase interface { // BatchInsertChat2DB inserts a batch of messages into the database for a specific conversation. BatchInsertChat2DB(ctx context.Context, conversationID string, msgs []*sdkws.MsgData, currentMaxSeq int64) error // RevokeMsg revokes a message in a conversation. - RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *unrelationtb.RevokeModel) error + RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *relation.RevokeModel) error // MarkSingleChatMsgsAsRead marks messages as read for a single chat by sequence numbers. MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, seqs []int64) error // DeleteMessagesFromCache deletes message caches from Redis by sequence numbers. @@ -101,12 +100,12 @@ type CommonMsgDatabase interface { MsgToPushMQ(ctx context.Context, key, conversarionID string, msg2mq *sdkws.MsgData) (int32, int64, error) MsgToMongoMQ(ctx context.Context, key, conversarionID string, msgs []*sdkws.MsgData, lastSeq int64) error - RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*unrelationtb.UserCount, dateCount map[string]int64, err error) - RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*unrelationtb.GroupCount, dateCount map[string]int64, err error) + RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*relation.UserCount, dateCount map[string]int64, err error) + RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*relation.GroupCount, dateCount map[string]int64, err error) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) } -func NewCommonMsgDatabase(msgDocModel unrelationtb.MsgDocModelInterface, cacheModel cache.MsgModel, kafkaConf *config.Kafka) (CommonMsgDatabase, error) { +func NewCommonMsgDatabase(msgDocModel relation.MsgDocModelInterface, cacheModel cache.MsgModel, kafkaConf *config.Kafka) (CommonMsgDatabase, error) { producerConfig := &kafka.ProducerConfig{ ProducerAck: kafkaConf.ProducerAck, CompressType: kafkaConf.CompressType, @@ -155,8 +154,8 @@ func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database, } type commonMsgDatabase struct { - msgDocDatabase unrelationtb.MsgDocModelInterface - msg unrelationtb.MsgDocModel + msgDocDatabase relation.MsgDocModelInterface + msg relation.MsgDocModel cache cache.MsgModel producer *kafka.Producer producerToMongo *kafka.Producer @@ -204,13 +203,13 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI var ok bool switch key { case updateKeyMsg: - var msg *unrelationtb.MsgDataModel - msg, ok = field.(*unrelationtb.MsgDataModel) + var msg *relation.MsgDataModel + msg, ok = field.(*relation.MsgDataModel) if msg != nil && msg.Seq != firstSeq+int64(i) { return errs.ErrInternalServer.WrapMsg("seq is invalid") } case updateKeyRevoke: - _, ok = field.(*unrelationtb.RevokeModel) + _, ok = field.(*relation.RevokeModel) default: return errs.ErrInternalServer.WrapMsg("key is invalid") } @@ -250,9 +249,9 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI continue // The current data has been updated, skip the current data } } - doc := unrelationtb.MsgDocModel{ + doc := relation.MsgDocModel{ DocID: db.msg.GetDocID(conversationID, seq), - Msg: make([]*unrelationtb.MsgInfoModel, num), + Msg: make([]*relation.MsgInfoModel, num), } var insert int // Inserted data number for j := i; j < len(fields); j++ { @@ -263,18 +262,18 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI insert++ switch key { case updateKeyMsg: - doc.Msg[db.msg.GetMsgIndex(seq)] = &unrelationtb.MsgInfoModel{ - Msg: fields[j].(*unrelationtb.MsgDataModel), + doc.Msg[db.msg.GetMsgIndex(seq)] = &relation.MsgInfoModel{ + Msg: fields[j].(*relation.MsgDataModel), } case updateKeyRevoke: - doc.Msg[db.msg.GetMsgIndex(seq)] = &unrelationtb.MsgInfoModel{ - Revoke: fields[j].(*unrelationtb.RevokeModel), + doc.Msg[db.msg.GetMsgIndex(seq)] = &relation.MsgInfoModel{ + Revoke: fields[j].(*relation.RevokeModel), } } } for i, model := range doc.Msg { if model == nil { - model = &unrelationtb.MsgInfoModel{} + model = &relation.MsgInfoModel{} doc.Msg[i] = model } if model.DelList == nil { @@ -304,9 +303,9 @@ func (db *commonMsgDatabase) BatchInsertChat2DB(ctx context.Context, conversatio if msg == nil { continue } - var offlinePushModel *unrelationtb.OfflinePushModel + var offlinePushModel *relation.OfflinePushModel if msg.OfflinePushInfo != nil { - offlinePushModel = &unrelationtb.OfflinePushModel{ + offlinePushModel = &relation.OfflinePushModel{ Title: msg.OfflinePushInfo.Title, Desc: msg.OfflinePushInfo.Desc, Ex: msg.OfflinePushInfo.Ex, @@ -314,7 +313,7 @@ func (db *commonMsgDatabase) BatchInsertChat2DB(ctx context.Context, conversatio IOSBadgeCount: msg.OfflinePushInfo.IOSBadgeCount, } } - msgs[i] = &unrelationtb.MsgDataModel{ + msgs[i] = &relation.MsgDataModel{ SendID: msg.SendID, RecvID: msg.RecvID, GroupID: msg.GroupID, @@ -341,7 +340,7 @@ func (db *commonMsgDatabase) BatchInsertChat2DB(ctx context.Context, conversatio return db.BatchInsertBlock(ctx, conversationID, msgs, updateKeyMsg, msgList[0].Seq) } -func (db *commonMsgDatabase) RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *unrelationtb.RevokeModel) error { +func (db *commonMsgDatabase) RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *relation.RevokeModel) error { return db.BatchInsertBlock(ctx, conversationID, []any{revoke}, updateKeyRevoke, seq) } @@ -428,7 +427,7 @@ func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversat return totalMsgs, nil } -func (db *commonMsgDatabase) handlerDBMsg(ctx context.Context, cache map[int64][]*unrelationtb.MsgInfoModel, userID, conversationID string, msg *unrelationtb.MsgInfoModel) { +func (db *commonMsgDatabase) handlerDBMsg(ctx context.Context, cache map[int64][]*relation.MsgInfoModel, userID, conversationID string, msg *relation.MsgInfoModel) { if msg.IsRead { msg.Msg.IsRead = true } @@ -450,7 +449,7 @@ func (db *commonMsgDatabase) handlerDBMsg(ctx context.Context, cache map[int64][ if quoteMsg.QuoteMessage == nil || quoteMsg.QuoteMessage.ContentType == constant.MsgRevokeNotification { return } - var msgs []*unrelationtb.MsgInfoModel + var msgs []*relation.MsgInfoModel if v, ok := cache[quoteMsg.QuoteMessage.Seq]; ok { msgs = v } else { @@ -484,12 +483,12 @@ func (db *commonMsgDatabase) handlerDBMsg(ctx context.Context, cache map[int64][ } } -func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID string, conversationID string, seqs []int64) (totalMsgs []*unrelationtb.MsgInfoModel, err error) { +func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID string, conversationID string, seqs []int64) (totalMsgs []*relation.MsgInfoModel, err error) { msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, userID, docID, seqs) if err != nil { return nil, err } - tempCache := make(map[int64][]*unrelationtb.MsgInfoModel) + tempCache := make(map[int64][]*relation.MsgInfoModel) for _, msg := range msgs { db.handlerDBMsg(ctx, tempCache, userID, conversationID, msg) } @@ -751,7 +750,7 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string msgDocModel, err := db.msgDocDatabase.GetMsgDocModelByIndex(ctx, conversationID, index, 1) if err != nil || msgDocModel.DocID == "" { if err != nil { - if err == unrelation.ErrMsgListNotExist { + if err == relation.ErrMsgListNotExist { log.ZDebug(ctx, "not doc find", "conversationID", conversationID, "userID", userID, "index", index) } else { log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index) @@ -818,7 +817,7 @@ func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversatio msgDocModel, err := db.msgDocDatabase.GetMsgDocModelByIndex(ctx, conversationID, index, 1) if err != nil || msgDocModel.DocID == "" { if err != nil { - if err == unrelation.ErrMsgListNotExist { + if err == relation.ErrMsgListNotExist { log.ZDebug(ctx, "deleteMsgRecursion ErrMsgListNotExist", "conversationID", conversationID, "index:", index) } else { log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index) @@ -1036,7 +1035,7 @@ func (db *commonMsgDatabase) RangeUserSendCount( ase bool, pageNumber int32, showNumber int32, -) (msgCount int64, userCount int64, users []*unrelationtb.UserCount, dateCount map[string]int64, err error) { +) (msgCount int64, userCount int64, users []*relation.UserCount, dateCount map[string]int64, err error) { return db.msgDocDatabase.RangeUserSendCount(ctx, start, end, group, ase, pageNumber, showNumber) } @@ -1047,7 +1046,7 @@ func (db *commonMsgDatabase) RangeGroupSendCount( ase bool, pageNumber int32, showNumber int32, -) (msgCount int64, userCount int64, groups []*unrelationtb.GroupCount, dateCount map[string]int64, err error) { +) (msgCount int64, userCount int64, groups []*relation.GroupCount, dateCount map[string]int64, err error) { return db.msgDocDatabase.RangeGroupSendCount(ctx, start, end, ase, pageNumber, showNumber) } diff --git a/pkg/common/db/controller/user.go b/pkg/common/db/controller/user.go index 9f0845c08..37619ace1 100644 --- a/pkg/common/db/controller/user.go +++ b/pkg/common/db/controller/user.go @@ -26,7 +26,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" - unrelationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation" ) type UserDatabase interface { @@ -87,10 +86,10 @@ type userDatabase struct { tx tx.CtxTx userDB relation.UserModelInterface cache cache.UserCache - mongoDB unrelationtb.SubscribeUserModelInterface + mongoDB relation.SubscribeUserModelInterface } -func NewUserDatabase(userDB relation.UserModelInterface, cache cache.UserCache, tx tx.CtxTx, mongoDB unrelationtb.SubscribeUserModelInterface) UserDatabase { +func NewUserDatabase(userDB relation.UserModelInterface, cache cache.UserCache, tx tx.CtxTx, mongoDB relation.SubscribeUserModelInterface) UserDatabase { return &userDatabase{userDB: userDB, cache: cache, tx: tx, mongoDB: mongoDB} } diff --git a/pkg/common/db/mgo/msg.go b/pkg/common/db/mgo/msg.go index 23ad10a7a..97aa56531 100644 --- a/pkg/common/db/mgo/msg.go +++ b/pkg/common/db/mgo/msg.go @@ -2,7 +2,6 @@ package mgo import ( "context" - "errors" "fmt" "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" "github.com/openimsdk/protocol/constant" @@ -19,8 +18,6 @@ import ( "time" ) -var ErrMsgListNotExist = errors.New("user not have msg in mongoDB") - func NewMsgMongo(db *mongo.Database) (relation.MsgDocModelInterface, error) { coll := db.Collection(new(relation.MsgDocModel).TableName()) _, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ @@ -227,7 +224,7 @@ func (m *MsgMgo) GetMsgDocModelByIndex(ctx context.Context, conversationID strin if len(msgs) > 0 { return msgs[0], nil } - return nil, errs.Wrap(ErrMsgListNotExist) + return nil, errs.Wrap(relation.ErrMsgListNotExist) } func (m *MsgMgo) DeleteMsgsInOneDocByIndex(ctx context.Context, docID string, indexes []int) error { diff --git a/pkg/common/db/table/relation/msg.go b/pkg/common/db/table/relation/msg.go index 8adfa4370..eb1e2444f 100644 --- a/pkg/common/db/table/relation/msg.go +++ b/pkg/common/db/table/relation/msg.go @@ -16,6 +16,7 @@ package relation import ( "context" + "errors" "strconv" "time" @@ -32,6 +33,8 @@ const ( NewestList = -1 ) +var ErrMsgListNotExist = errors.New("user not have msg in mongoDB") + type MsgDocModel struct { DocID string `bson:"doc_id"` Msg []*MsgInfoModel `bson:"msgs"`