feat: msg format

pull/2100/head
withchao 2 years ago
parent 32b7738112
commit cf635918ae

@ -36,9 +36,9 @@ func (s *Server) InitServer(ctx context.Context, config *config.GlobalConfig, di
return err
}
msgModel := cache.NewMsgCacheModel(rdb, config.MsgCacheTimeout, &config.Redis)
tokenCacheModel := cache.NewTokenCacheModel(rdb)
s.LongConnServer.SetDiscoveryRegistry(disCov, config)
s.LongConnServer.SetCacheHandler(msgModel)
s.LongConnServer.SetCacheHandler(tokenCacheModel)
msggateway.RegisterMsgGatewayServer(server, s)
return nil
}

@ -48,7 +48,7 @@ type LongConnServer interface {
GetUserAllCons(userID string) ([]*Client, bool)
GetUserPlatformCons(userID string, platform int) ([]*Client, bool, bool)
Validate(s any) error
SetCacheHandler(cache cache.MsgModel)
SetCacheHandler(cache cache.TokenModel)
SetDiscoveryRegistry(client discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig)
KickUserConn(client *Client) error
UnRegister(c *Client)
@ -79,7 +79,7 @@ type WsServer struct {
handshakeTimeout time.Duration
writeBufferSize int
validate *validator.Validate
cache cache.MsgModel
cache cache.TokenModel
userClient *rpcclient.UserRpcClient
disCov discoveryregistry.SvcDiscoveryRegistry
Compressor
@ -119,7 +119,7 @@ func (ws *WsServer) SetUserOnlineStatus(ctx context.Context, client *Client, sta
}
}
func (ws *WsServer) SetCacheHandler(cache cache.MsgModel) {
func (ws *WsServer) SetCacheHandler(cache cache.TokenModel) {
ws.cache = cache
}

@ -17,10 +17,10 @@ package msg
import (
"context"
"encoding/json"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
unrelationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws"
@ -93,7 +93,7 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.
}
}
now := time.Now().UnixMilli()
err = m.MsgDatabase.RevokeMsg(ctx, req.ConversationID, req.Seq, &unrelationtb.RevokeModel{
err = m.MsgDatabase.RevokeMsg(ctx, req.ConversationID, req.Seq, &relation.RevokeModel{
Role: role,
UserID: req.UserID,
Nickname: user.Nickname,

@ -16,12 +16,11 @@ package msg
import (
"context"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/utils"
"time"
)
func (m *msgServer) GetActiveUser(ctx context.Context, req *msg.GetActiveUserReq) (*msg.GetActiveUserResp, error) {
@ -39,7 +38,7 @@ func (m *msgServer) GetActiveUser(ctx context.Context, req *msg.GetActiveUserReq
}
var pbUsers []*msg.ActiveUser
if len(users) > 0 {
userIDs := utils.Slice(users, func(e *unrelation.UserCount) string { return e.UserID })
userIDs := utils.Slice(users, func(e *relation.UserCount) string { return e.UserID })
userMap, err := m.UserLocalCache.GetUsersInfoMap(ctx, userIDs)
if err != nil {
return nil, err
@ -81,7 +80,7 @@ func (m *msgServer) GetActiveGroup(ctx context.Context, req *msg.GetActiveGroupR
}
var pbgroups []*msg.ActiveGroup
if len(groups) > 0 {
groupIDs := utils.Slice(groups, func(e *unrelation.GroupCount) string { return e.GroupID })
groupIDs := utils.Slice(groups, func(e *relation.GroupCount) string { return e.GroupID })
resp, err := m.GroupLocalCache.GetGroupInfos(ctx, groupIDs)
if err != nil {
return nil, err

@ -15,16 +15,16 @@
package convert
import (
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/sdkws"
)
func MsgPb2DB(msg *sdkws.MsgData) *unrelation.MsgDataModel {
func MsgPb2DB(msg *sdkws.MsgData) *relation.MsgDataModel {
if msg == nil {
return nil
}
var msgDataModel unrelation.MsgDataModel
var msgDataModel relation.MsgDataModel
msgDataModel.SendID = msg.SendID
msgDataModel.RecvID = msg.RecvID
msgDataModel.GroupID = msg.GroupID
@ -43,7 +43,7 @@ func MsgPb2DB(msg *sdkws.MsgData) *unrelation.MsgDataModel {
msgDataModel.Status = msg.Status
msgDataModel.Options = msg.Options
if msg.OfflinePushInfo != nil {
msgDataModel.OfflinePush = &unrelation.OfflinePushModel{
msgDataModel.OfflinePush = &relation.OfflinePushModel{
Title: msg.OfflinePushInfo.Title,
Desc: msg.OfflinePushInfo.Desc,
Ex: msg.OfflinePushInfo.Ex,
@ -57,7 +57,7 @@ func MsgPb2DB(msg *sdkws.MsgData) *unrelation.MsgDataModel {
return &msgDataModel
}
func MsgDB2Pb(msgModel *unrelation.MsgDataModel) *sdkws.MsgData {
func MsgDB2Pb(msgModel *relation.MsgDataModel) *sdkws.MsgData {
if msgModel == nil {
return nil
}

@ -19,13 +19,12 @@ import (
"encoding/json"
"errors"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
unrelationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
"github.com/openimsdk/open-im-server/v3/pkg/common/kafka"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/protocol/constant"
@ -48,7 +47,7 @@ type CommonMsgDatabase interface {
// BatchInsertChat2DB inserts a batch of messages into the database for a specific conversation.
BatchInsertChat2DB(ctx context.Context, conversationID string, msgs []*sdkws.MsgData, currentMaxSeq int64) error
// RevokeMsg revokes a message in a conversation.
RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *unrelationtb.RevokeModel) error
RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *relation.RevokeModel) error
// MarkSingleChatMsgsAsRead marks messages as read for a single chat by sequence numbers.
MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, seqs []int64) error
// DeleteMessagesFromCache deletes message caches from Redis by sequence numbers.
@ -101,12 +100,12 @@ type CommonMsgDatabase interface {
MsgToPushMQ(ctx context.Context, key, conversarionID string, msg2mq *sdkws.MsgData) (int32, int64, error)
MsgToMongoMQ(ctx context.Context, key, conversarionID string, msgs []*sdkws.MsgData, lastSeq int64) error
RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*unrelationtb.UserCount, dateCount map[string]int64, err error)
RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*unrelationtb.GroupCount, dateCount map[string]int64, err error)
RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*relation.UserCount, dateCount map[string]int64, err error)
RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*relation.GroupCount, dateCount map[string]int64, err error)
ConvertMsgsDocLen(ctx context.Context, conversationIDs []string)
}
func NewCommonMsgDatabase(msgDocModel unrelationtb.MsgDocModelInterface, cacheModel cache.MsgModel, kafkaConf *config.Kafka) (CommonMsgDatabase, error) {
func NewCommonMsgDatabase(msgDocModel relation.MsgDocModelInterface, cacheModel cache.MsgModel, kafkaConf *config.Kafka) (CommonMsgDatabase, error) {
producerConfig := &kafka.ProducerConfig{
ProducerAck: kafkaConf.ProducerAck,
CompressType: kafkaConf.CompressType,
@ -155,8 +154,8 @@ func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database,
}
type commonMsgDatabase struct {
msgDocDatabase unrelationtb.MsgDocModelInterface
msg unrelationtb.MsgDocModel
msgDocDatabase relation.MsgDocModelInterface
msg relation.MsgDocModel
cache cache.MsgModel
producer *kafka.Producer
producerToMongo *kafka.Producer
@ -204,13 +203,13 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI
var ok bool
switch key {
case updateKeyMsg:
var msg *unrelationtb.MsgDataModel
msg, ok = field.(*unrelationtb.MsgDataModel)
var msg *relation.MsgDataModel
msg, ok = field.(*relation.MsgDataModel)
if msg != nil && msg.Seq != firstSeq+int64(i) {
return errs.ErrInternalServer.WrapMsg("seq is invalid")
}
case updateKeyRevoke:
_, ok = field.(*unrelationtb.RevokeModel)
_, ok = field.(*relation.RevokeModel)
default:
return errs.ErrInternalServer.WrapMsg("key is invalid")
}
@ -250,9 +249,9 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI
continue // The current data has been updated, skip the current data
}
}
doc := unrelationtb.MsgDocModel{
doc := relation.MsgDocModel{
DocID: db.msg.GetDocID(conversationID, seq),
Msg: make([]*unrelationtb.MsgInfoModel, num),
Msg: make([]*relation.MsgInfoModel, num),
}
var insert int // Inserted data number
for j := i; j < len(fields); j++ {
@ -263,18 +262,18 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI
insert++
switch key {
case updateKeyMsg:
doc.Msg[db.msg.GetMsgIndex(seq)] = &unrelationtb.MsgInfoModel{
Msg: fields[j].(*unrelationtb.MsgDataModel),
doc.Msg[db.msg.GetMsgIndex(seq)] = &relation.MsgInfoModel{
Msg: fields[j].(*relation.MsgDataModel),
}
case updateKeyRevoke:
doc.Msg[db.msg.GetMsgIndex(seq)] = &unrelationtb.MsgInfoModel{
Revoke: fields[j].(*unrelationtb.RevokeModel),
doc.Msg[db.msg.GetMsgIndex(seq)] = &relation.MsgInfoModel{
Revoke: fields[j].(*relation.RevokeModel),
}
}
}
for i, model := range doc.Msg {
if model == nil {
model = &unrelationtb.MsgInfoModel{}
model = &relation.MsgInfoModel{}
doc.Msg[i] = model
}
if model.DelList == nil {
@ -304,9 +303,9 @@ func (db *commonMsgDatabase) BatchInsertChat2DB(ctx context.Context, conversatio
if msg == nil {
continue
}
var offlinePushModel *unrelationtb.OfflinePushModel
var offlinePushModel *relation.OfflinePushModel
if msg.OfflinePushInfo != nil {
offlinePushModel = &unrelationtb.OfflinePushModel{
offlinePushModel = &relation.OfflinePushModel{
Title: msg.OfflinePushInfo.Title,
Desc: msg.OfflinePushInfo.Desc,
Ex: msg.OfflinePushInfo.Ex,
@ -314,7 +313,7 @@ func (db *commonMsgDatabase) BatchInsertChat2DB(ctx context.Context, conversatio
IOSBadgeCount: msg.OfflinePushInfo.IOSBadgeCount,
}
}
msgs[i] = &unrelationtb.MsgDataModel{
msgs[i] = &relation.MsgDataModel{
SendID: msg.SendID,
RecvID: msg.RecvID,
GroupID: msg.GroupID,
@ -341,7 +340,7 @@ func (db *commonMsgDatabase) BatchInsertChat2DB(ctx context.Context, conversatio
return db.BatchInsertBlock(ctx, conversationID, msgs, updateKeyMsg, msgList[0].Seq)
}
func (db *commonMsgDatabase) RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *unrelationtb.RevokeModel) error {
func (db *commonMsgDatabase) RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *relation.RevokeModel) error {
return db.BatchInsertBlock(ctx, conversationID, []any{revoke}, updateKeyRevoke, seq)
}
@ -428,7 +427,7 @@ func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversat
return totalMsgs, nil
}
func (db *commonMsgDatabase) handlerDBMsg(ctx context.Context, cache map[int64][]*unrelationtb.MsgInfoModel, userID, conversationID string, msg *unrelationtb.MsgInfoModel) {
func (db *commonMsgDatabase) handlerDBMsg(ctx context.Context, cache map[int64][]*relation.MsgInfoModel, userID, conversationID string, msg *relation.MsgInfoModel) {
if msg.IsRead {
msg.Msg.IsRead = true
}
@ -450,7 +449,7 @@ func (db *commonMsgDatabase) handlerDBMsg(ctx context.Context, cache map[int64][
if quoteMsg.QuoteMessage == nil || quoteMsg.QuoteMessage.ContentType == constant.MsgRevokeNotification {
return
}
var msgs []*unrelationtb.MsgInfoModel
var msgs []*relation.MsgInfoModel
if v, ok := cache[quoteMsg.QuoteMessage.Seq]; ok {
msgs = v
} else {
@ -484,12 +483,12 @@ func (db *commonMsgDatabase) handlerDBMsg(ctx context.Context, cache map[int64][
}
}
func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID string, conversationID string, seqs []int64) (totalMsgs []*unrelationtb.MsgInfoModel, err error) {
func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID string, conversationID string, seqs []int64) (totalMsgs []*relation.MsgInfoModel, err error) {
msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, userID, docID, seqs)
if err != nil {
return nil, err
}
tempCache := make(map[int64][]*unrelationtb.MsgInfoModel)
tempCache := make(map[int64][]*relation.MsgInfoModel)
for _, msg := range msgs {
db.handlerDBMsg(ctx, tempCache, userID, conversationID, msg)
}
@ -751,7 +750,7 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string
msgDocModel, err := db.msgDocDatabase.GetMsgDocModelByIndex(ctx, conversationID, index, 1)
if err != nil || msgDocModel.DocID == "" {
if err != nil {
if err == unrelation.ErrMsgListNotExist {
if err == relation.ErrMsgListNotExist {
log.ZDebug(ctx, "not doc find", "conversationID", conversationID, "userID", userID, "index", index)
} else {
log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index)
@ -818,7 +817,7 @@ func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversatio
msgDocModel, err := db.msgDocDatabase.GetMsgDocModelByIndex(ctx, conversationID, index, 1)
if err != nil || msgDocModel.DocID == "" {
if err != nil {
if err == unrelation.ErrMsgListNotExist {
if err == relation.ErrMsgListNotExist {
log.ZDebug(ctx, "deleteMsgRecursion ErrMsgListNotExist", "conversationID", conversationID, "index:", index)
} else {
log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index)
@ -1036,7 +1035,7 @@ func (db *commonMsgDatabase) RangeUserSendCount(
ase bool,
pageNumber int32,
showNumber int32,
) (msgCount int64, userCount int64, users []*unrelationtb.UserCount, dateCount map[string]int64, err error) {
) (msgCount int64, userCount int64, users []*relation.UserCount, dateCount map[string]int64, err error) {
return db.msgDocDatabase.RangeUserSendCount(ctx, start, end, group, ase, pageNumber, showNumber)
}
@ -1047,7 +1046,7 @@ func (db *commonMsgDatabase) RangeGroupSendCount(
ase bool,
pageNumber int32,
showNumber int32,
) (msgCount int64, userCount int64, groups []*unrelationtb.GroupCount, dateCount map[string]int64, err error) {
) (msgCount int64, userCount int64, groups []*relation.GroupCount, dateCount map[string]int64, err error) {
return db.msgDocDatabase.RangeGroupSendCount(ctx, start, end, ase, pageNumber, showNumber)
}

@ -26,7 +26,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
unrelationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation"
)
type UserDatabase interface {
@ -87,10 +86,10 @@ type userDatabase struct {
tx tx.CtxTx
userDB relation.UserModelInterface
cache cache.UserCache
mongoDB unrelationtb.SubscribeUserModelInterface
mongoDB relation.SubscribeUserModelInterface
}
func NewUserDatabase(userDB relation.UserModelInterface, cache cache.UserCache, tx tx.CtxTx, mongoDB unrelationtb.SubscribeUserModelInterface) UserDatabase {
func NewUserDatabase(userDB relation.UserModelInterface, cache cache.UserCache, tx tx.CtxTx, mongoDB relation.SubscribeUserModelInterface) UserDatabase {
return &userDatabase{userDB: userDB, cache: cache, tx: tx, mongoDB: mongoDB}
}

@ -2,7 +2,6 @@ package mgo
import (
"context"
"errors"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"github.com/openimsdk/protocol/constant"
@ -19,8 +18,6 @@ import (
"time"
)
var ErrMsgListNotExist = errors.New("user not have msg in mongoDB")
func NewMsgMongo(db *mongo.Database) (relation.MsgDocModelInterface, error) {
coll := db.Collection(new(relation.MsgDocModel).TableName())
_, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{
@ -227,7 +224,7 @@ func (m *MsgMgo) GetMsgDocModelByIndex(ctx context.Context, conversationID strin
if len(msgs) > 0 {
return msgs[0], nil
}
return nil, errs.Wrap(ErrMsgListNotExist)
return nil, errs.Wrap(relation.ErrMsgListNotExist)
}
func (m *MsgMgo) DeleteMsgsInOneDocByIndex(ctx context.Context, docID string, indexes []int) error {

@ -16,6 +16,7 @@ package relation
import (
"context"
"errors"
"strconv"
"time"
@ -32,6 +33,8 @@ const (
NewestList = -1
)
var ErrMsgListNotExist = errors.New("user not have msg in mongoDB")
type MsgDocModel struct {
DocID string `bson:"doc_id"`
Msg []*MsgInfoModel `bson:"msgs"`

Loading…
Cancel
Save