test-errcode
wangchuxiao 2 years ago
parent d1728463b0
commit 5f783fa078

@ -43,6 +43,9 @@ func StartTransfer(prometheusPort int) error {
if err != nil {
return err
}
if err := mongo.CreateMsgIndex(); err != nil {
return err
}
client, err := openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema,
openKeeper.WithFreq(time.Hour), openKeeper.WithRoundRobin(), openKeeper.WithUserNameAndPassword(config.Config.Zookeeper.UserName,
config.Config.Zookeeper.Password), openKeeper.WithTimeout(10))

@ -55,17 +55,6 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont
}
for _, v := range msgFromMQ.MsgData {
switch v.ContentType {
case constant.DeleteMessageNotification:
deleteMessageTips := sdkws.DeleteMessageTips{}
err := proto.Unmarshal(v.Content, &deleteMessageTips)
if err != nil {
log.ZError(ctx, "tips unmarshal err:", err, "msg", msg)
continue
}
if totalUnExistSeqs, err := mc.msgDatabase.DelMsgBySeqs(ctx, deleteMessageTips.UserID, deleteMessageTips.Seqs); err != nil {
log.ZError(ctx, "DelMsgBySeqs", err, "userIDs", deleteMessageTips.UserID, "seqs", deleteMessageTips.Seqs, "totalUnExistSeqs", totalUnExistSeqs)
continue
}
case constant.MsgRevokeNotification:
var elem sdkws.NotificationElem
if err := json.Unmarshal(v.Content, &elem); err != nil {

@ -55,6 +55,9 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
if err != nil {
return err
}
if err := mongo.CreateMsgIndex(); err != nil {
return err
}
cacheModel := cache.NewMsgCacheModel(rdb)
msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase())
extendMsgModel := unrelation.NewExtendMsgSetMongoDriver(mongo.GetDatabase())

@ -15,7 +15,7 @@ func (m *msgServer) PullMessageBySeqs(ctx context.Context, req *sdkws.PullMessag
resp.NotificationMsgs = make(map[string]*sdkws.PullMsgs)
for _, seq := range req.SeqRanges {
if !utils.IsNotification(seq.ConversationID) {
msgs, err := m.MsgDatabase.GetMsgBySeqsRange(ctx, seq.ConversationID, seq.Begin, seq.End, seq.Num)
msgs, err := m.MsgDatabase.GetMsgBySeqsRange(ctx, req.UserID, seq.ConversationID, seq.Begin, seq.End, seq.Num)
if err != nil {
log.ZWarn(ctx, "GetMsgBySeqsRange error", err, "conversationID", seq.ConversationID, "seq", seq)
continue
@ -26,7 +26,7 @@ func (m *msgServer) PullMessageBySeqs(ctx context.Context, req *sdkws.PullMessag
for i := seq.Begin; i <= seq.End; i++ {
seqs = append(seqs, i)
}
notificationMsgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, seq.ConversationID, seqs)
notificationMsgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, req.UserID, seq.ConversationID, seqs)
if err != nil {
log.ZWarn(ctx, "GetMsgBySeqs error", err, "conversationID", seq.ConversationID, "seq", seq)
continue

@ -94,74 +94,14 @@ func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgRe
return nil, nil
}
return nil, nil
case constant.GroupChatType:
if utils.IsContain(data.MsgData.SendID, config.Config.Manager.AppManagerUid) {
return nil, nil
}
userIDList, err := m.GetGroupMemberIDs(ctx, data.MsgData.GroupID)
if err != nil {
return nil, err
}
if utils.IsContain(data.MsgData.SendID, config.Config.Manager.AppManagerUid) {
return userIDList, nil
}
if data.MsgData.ContentType <= constant.NotificationEnd && data.MsgData.ContentType >= constant.NotificationBegin {
return userIDList, nil
}
if !utils.IsContain(data.MsgData.SendID, userIDList) {
return nil, errs.ErrNotInGroupYet.Wrap()
}
isMute, err := m.userIsMuteAndIsAdminInGroup(ctx, data.MsgData.GroupID, data.MsgData.SendID)
if err != nil {
return nil, err
}
if isMute {
return nil, errs.ErrMutedInGroup.Wrap()
}
isMute, isAdmin, err := m.groupIsMuted(ctx, data.MsgData.GroupID, data.MsgData.SendID)
if err != nil {
return nil, err
}
if isAdmin {
return userIDList, nil
}
if isMute {
return nil, errs.ErrMutedGroup.Wrap()
}
return userIDList, nil
case constant.SuperGroupChatType:
groupInfo, err := m.Group.GetGroupInfo(ctx, data.MsgData.GroupID)
if err != nil {
return nil, err
}
if data.MsgData.ContentType == constant.AdvancedRevoke {
revokeMessage := new(MessageRevoked)
err := utils.JsonStringToStruct(string(data.MsgData.Content), revokeMessage)
if err != nil {
return nil, errs.ErrArgs.Wrap()
}
if revokeMessage.RevokerID != revokeMessage.SourceMessageSendID {
resp, err := m.MsgDatabase.GetMsgBySeqs(ctx, data.MsgData.GroupID, []int64{int64(revokeMessage.Seq)})
if err != nil {
return nil, err
}
if resp[0].ClientMsgID == revokeMessage.ClientMsgID && resp[0].Seq == int64(revokeMessage.Seq) {
revokeMessage.SourceMessageSendTime = resp[0].SendTime
revokeMessage.SourceMessageSenderNickname = resp[0].SenderNickname
revokeMessage.SourceMessageSendID = resp[0].SendID
data.MsgData.Content = []byte(utils.StructToJsonString(revokeMessage))
} else {
return nil, errs.ErrData.Wrap("MsgData")
}
}
}
if groupInfo.GroupType == constant.SuperGroup {
return nil, nil
}
userIDList, err := m.GetGroupMemberIDs(ctx, data.MsgData.GroupID)
if err != nil {
return nil, err

@ -7,10 +7,8 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"go.mongodb.org/mongo-driver/bson"
"google.golang.org/protobuf/proto"
unRelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation"
@ -22,10 +20,10 @@ import (
func GenMsgDoc(startSeq, stopSeq, delSeq, index int64, conversationID string) *unRelationTb.MsgDocModel {
msgDoc := &unRelationTb.MsgDocModel{DocID: conversationID + strconv.Itoa(int(index))}
for i := 0; i < 5000; i++ {
msgDoc.Msg = append(msgDoc.Msg, unRelationTb.MsgInfoModel{SendTime: 0, Msg: []byte{}})
msgDoc.Msg = append(msgDoc.Msg, &unRelationTb.MsgInfoModel{})
}
for i := startSeq; i <= stopSeq; i++ {
msg := sdkws.MsgData{
msg := &unRelationTb.MsgDataModel{
SendID: "sendID1",
RecvID: "recvID1",
GroupID: "",
@ -37,20 +35,17 @@ func GenMsgDoc(startSeq, stopSeq, delSeq, index int64, conversationID string) *u
SessionType: 1,
MsgFrom: 100,
ContentType: 101,
Content: []byte("testFaceURL"),
Content: "testContent",
Seq: i,
SendTime: time.Now().Unix(),
CreateTime: time.Now().Unix(),
Status: 1,
}
bytes, _ := proto.Marshal(&msg)
var sendTime int64
if i <= delSeq {
sendTime = 10000
msg.SendTime = 10000
} else {
sendTime = utils.GetCurrentTimestampByMill()
msg.SendTime = utils.GetCurrentTimestampByMill()
}
msgDoc.Msg[i-1] = unRelationTb.MsgInfoModel{SendTime: int64(sendTime), Msg: bytes}
msgDoc.Msg[i-1] = &unRelationTb.MsgInfoModel{Msg: msg}
}
return msgDoc
}

@ -33,7 +33,7 @@ const (
//SysRelated
NotificationBegin = 1000
DeleteMessageNotification = 1100
FriendApplicationApprovedNotification = 1201 //add_friend_response
FriendApplicationRejectedNotification = 1202 //add_friend_response
FriendApplicationNotification = 1203 //add_friend

@ -0,0 +1,74 @@
package convert
import (
"encoding/json"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
)
func MsgPb2DB(msg *sdkws.MsgData) unrelation.MsgDataModel {
var msgDataModel unrelation.MsgDataModel
msgDataModel.SendID = msg.SendID
msgDataModel.RecvID = msg.RecvID
msgDataModel.GroupID = msg.GroupID
msgDataModel.ClientMsgID = msg.ClientMsgID
msgDataModel.ServerMsgID = msg.ServerMsgID
msgDataModel.SenderPlatformID = msg.SenderPlatformID
msgDataModel.SenderNickname = msg.SenderNickname
msgDataModel.SenderFaceURL = msg.SenderFaceURL
msgDataModel.SessionType = msg.SessionType
msgDataModel.MsgFrom = msg.MsgFrom
msgDataModel.ContentType = msg.ContentType
msgDataModel.Content = string(msg.Content)
msgDataModel.Seq = msg.Seq
msgDataModel.SendTime = msg.SendTime
msgDataModel.CreateTime = msg.CreateTime
msgDataModel.Status = msg.Status
msgDataModel.Options = msg.Options
msgDataModel.OfflinePush = &unrelation.OfflinePushModel{
Title: msg.OfflinePushInfo.Title,
Desc: msg.OfflinePushInfo.Desc,
Ex: msg.OfflinePushInfo.Ex,
IOSPushSound: msg.OfflinePushInfo.IOSPushSound,
IOSBadgeCount: msg.OfflinePushInfo.IOSBadgeCount,
}
msgDataModel.AtUserIDList = msg.AtUserIDList
msgDataModel.AttachedInfo = msg.AttachedInfo
msgDataModel.Ex = msg.Ex
return msgDataModel
}
func MsgDB2Pb(msgModel *unrelation.MsgDataModel) *sdkws.MsgData {
var msg sdkws.MsgData
msg.SendID = msgModel.SendID
msg.RecvID = msgModel.RecvID
msg.GroupID = msgModel.GroupID
msg.ClientMsgID = msgModel.ClientMsgID
msg.ServerMsgID = msgModel.ServerMsgID
msg.SenderPlatformID = msgModel.SenderPlatformID
msg.SenderNickname = msgModel.SenderNickname
msg.SenderFaceURL = msgModel.SenderFaceURL
msg.SessionType = msgModel.SessionType
msg.MsgFrom = msgModel.MsgFrom
msg.ContentType = msgModel.ContentType
b, _ := json.Marshal(msgModel.Content)
msg.Content = b
msg.Seq = msgModel.Seq
msg.SendTime = msgModel.SendTime
msg.CreateTime = msgModel.CreateTime
msg.Status = msgModel.Status
msg.Options = msgModel.Options
msg.OfflinePushInfo = &sdkws.OfflinePushInfo{
Title: msgModel.OfflinePush.Title,
Desc: msgModel.OfflinePush.Desc,
Ex: msgModel.OfflinePush.Ex,
IOSPushSound: msgModel.OfflinePush.IOSPushSound,
IOSBadgeCount: msgModel.OfflinePush.IOSBadgeCount,
}
msg.AtUserIDList = msgModel.AtUserIDList
msg.AttachedInfo = msgModel.AttachedInfo
msg.Ex = msgModel.Ex
return &msg
}

@ -91,7 +91,7 @@ type MsgModel interface {
LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
GetMsgsByConversationIDAndSeq(ctx context.Context, docID string, seqs []int64) ([]*sdkws.MsgData, error)
GetMsgsByConversationIDAndSeq(ctx context.Context, docID string, seqs []int64) ([]*unRelationTb.MsgInfoModel, error)
DeleteMsgByConversationIDAndSeq(ctx context.Context, docID string, seq int64) MsgModel
}
@ -558,8 +558,8 @@ func (c msgCache) getMsgReadCacheKey(docID string, seq int64) string {
return messageReadCache + docID + "_" + strconv.Itoa(int(seq))
}
func (c *msgCache) getMsgsIndex(msg *sdkws.MsgData, keys []string) (int, error) {
key := c.getMsgReadCacheKey(utils.GetConversationIDByMsg(msg), msg.Seq)
func (c *msgCache) getMsgsIndex(msg *unRelationTb.MsgInfoModel, keys []string) (int, error) {
key := c.getMsgReadCacheKey(utils.GetConversationIDByMsgModel(msg.Msg), msg.Msg.Seq)
for i, _key := range keys {
if key == _key {
return i, nil
@ -568,12 +568,12 @@ func (c *msgCache) getMsgsIndex(msg *sdkws.MsgData, keys []string) (int, error)
return 0, errIndex
}
func (c *msgCache) GetMsgsByConversationIDAndSeq(ctx context.Context, docID string, seqs []int64) ([]*sdkws.MsgData, error) {
func (c *msgCache) GetMsgsByConversationIDAndSeq(ctx context.Context, docID string, seqs []int64) ([]*unRelationTb.MsgInfoModel, error) {
var keys []string
for _, seq := range seqs {
keys = append(keys, c.getMsgReadCacheKey(docID, seq))
}
return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.getMsgsIndex, func(ctx context.Context) ([]*sdkws.MsgData, error) {
return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.getMsgsIndex, func(ctx context.Context) ([]*unRelationTb.MsgInfoModel, error) {
return c.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, seqs)
})
}

@ -4,11 +4,11 @@ import (
"fmt"
"sort"
"strconv"
"sync"
"time"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/convert"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
unRelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation"
@ -16,7 +16,6 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
"github.com/gogo/protobuf/sortkeys"
"context"
"errors"
@ -37,15 +36,15 @@ type CommonMsgDatabase interface {
DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error
// incrSeq然后批量插入缓存
BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNewConversation bool, err error)
// 删除消息 返回不存在的seqList
DelMsgBySeqs(ctx context.Context, conversationID string, seqs []int64) (totalUnExistSeqs []int64, err error)
// 通过seqList获取mongo中写扩散消息
GetMsgBySeqsRange(ctx context.Context, conversationID string, begin, end, num int64) (seqMsg []*sdkws.MsgData, err error)
GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num int64) (seqMsg []*sdkws.MsgData, err error)
// 通过seqList获取大群在 mongo里面的消息
GetMsgBySeqs(ctx context.Context, conversationID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error)
GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error)
// 删除会话消息重置最小seq remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache)
DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error
CleanUpUserConversationsMsgs(ctx context.Context, userID string, conversationIDs []string)
SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error
GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error)
GetMaxSeq(ctx context.Context, conversationID string) (int64, error)
@ -168,8 +167,8 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI
res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "revoke", msg.Revoke)
} else if msg.DelList != nil {
res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "del_list", msg.DelList)
} else if msg.ReadList != nil {
res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "read_list", msg.ReadList)
// } else if msg.ReadList != nil {
// res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "read_list", msg.ReadList)
} else {
return false, errs.ErrArgs.Wrap("msg all field is nil")
}
@ -194,7 +193,7 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI
}
doc := unRelationTb.MsgDocModel{
DocID: docID,
Msg: make([]unRelationTb.MsgInfoModel, num),
Msg: make([]*unRelationTb.MsgInfoModel, num),
}
var insert int
for j := i; j < len(msgList); j++ {
@ -203,15 +202,15 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI
break
}
insert++
doc.Msg[getIndex(seq)] = *msgList[j]
doc.Msg[getIndex(seq)] = msgList[j]
}
for i, model := range doc.Msg {
if model.DelList == nil {
doc.Msg[i].DelList = []string{}
}
if model.ReadList == nil {
doc.Msg[i].ReadList = []string{}
}
// if model.ReadList == nil {
// doc.Msg[i].ReadList = []string{}
// }
}
if err := db.msgDocDatabase.Create(ctx, &doc); err != nil {
if mongo.IsDuplicateKeyError(err) {
@ -364,76 +363,17 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa
return lastMaxSeq, isNew, utils.Wrap(err, "")
}
func (db *commonMsgDatabase) DelMsgBySeqs(ctx context.Context, conversationID string, seqs []int64) (totalUnExistSeqs []int64, err error) {
sortkeys.Int64s(seqs)
docIDSeqsMap := db.msg.GetDocIDSeqsMap(conversationID, seqs)
lock := sync.Mutex{}
var wg sync.WaitGroup
wg.Add(len(docIDSeqsMap))
for k, v := range docIDSeqsMap {
go func(docID string, seqs []int64) {
defer wg.Done()
unExistSeqList, err := db.DelMsgBySeqsInOneDoc(ctx, docID, seqs)
if err != nil {
return
}
lock.Lock()
totalUnExistSeqs = append(totalUnExistSeqs, unExistSeqList...)
lock.Unlock()
}(k, v)
}
return totalUnExistSeqs, nil
}
func (db *commonMsgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (unExistSeqs []int64, err error) {
seqMsgs, indexes, unExistSeqs, err := db.msgDocDatabase.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs)
if err != nil {
return nil, err
}
for i, v := range seqMsgs {
if err = db.msgDocDatabase.UpdateMsgStatusByIndexInOneDoc(ctx, docID, v, indexes[i], constant.MsgDeleted); err != nil {
log.ZError(ctx, "UpdateMsgStatusByIndexInOneDoc", err, "docID", docID, "msg", v, "index", indexes[i])
}
}
return unExistSeqs, nil
}
func (db *commonMsgDatabase) GetNewestMsg(ctx context.Context, conversationID string) (msgPb *sdkws.MsgData, err error) {
msgInfo, err := db.msgDocDatabase.GetNewestMsg(ctx, conversationID)
if err != nil {
return nil, err
}
return db.unmarshalMsg(msgInfo)
}
func (db *commonMsgDatabase) GetOldestMsg(ctx context.Context, conversationID string) (msgPb *sdkws.MsgData, err error) {
msgInfo, err := db.msgDocDatabase.GetOldestMsg(ctx, conversationID)
if err != nil {
return nil, err
}
return db.unmarshalMsg(msgInfo)
}
func (db *commonMsgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (msgPb *sdkws.MsgData, err error) {
msgPb = &sdkws.MsgData{}
// todo: unmarshal
//err = proto.Unmarshal(msgInfo.Msg, msgPb)
//if err != nil {
// return nil, utils.Wrap(err, "")
//}
return msgPb, nil
}
func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) {
m := db.msg.GetDocIDSeqsMap(conversationID, seqs)
var totalUnExistSeqs []int64
for docID, seqs := range m {
for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, seqs) {
log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs)
seqMsgs, unexistSeqs, err := db.findMsgBySeq(ctx, docID, seqs)
msgs, unexistSeqs, err := db.findMsgInfoBySeq(ctx, docID, seqs)
if err != nil {
return nil, err
}
totalMsgs = append(totalMsgs, seqMsgs...)
for _, msg := range msgs {
totalMsgs = append(totalMsgs, convert.MsgDB2Pb(msg.Msg))
}
totalUnExistSeqs = append(totalUnExistSeqs, unexistSeqs...)
}
for _, unexistSeq := range totalUnExistSeqs {
@ -442,7 +382,7 @@ func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, conversationID st
return totalMsgs, nil
}
func (db *commonMsgDatabase) refetchDelSeqsMsgs(ctx context.Context, conversationID string, delNums, rangeBegin, begin int64) (seqMsgs []*sdkws.MsgData, err error) {
func (db *commonMsgDatabase) refetchDelSeqsMsgs(ctx context.Context, conversationID string, delNums, rangeBegin, begin int64) (seqMsgs []*unRelationTb.MsgDataModel, err error) {
var reFetchSeqs []int64
if delNums > 0 {
newBeginSeq := rangeBegin - delNums
@ -457,18 +397,18 @@ func (db *commonMsgDatabase) refetchDelSeqsMsgs(ctx context.Context, conversatio
return
}
if len(reFetchSeqs) > 0 {
m := db.msg.GetDocIDSeqsMap(conversationID, reFetchSeqs)
for docID, seqs := range m {
msgs, _, err := db.findMsgBySeq(ctx, docID, seqs)
if err != nil {
return nil, err
}
for _, msg := range msgs {
if msg.Status != constant.MsgDeleted {
seqMsgs = append(seqMsgs, msg)
}
}
}
// m := db.msg.GetDocIDSeqsMap(conversationID, reFetchSeqs)
// for docID, seqs := range m {
// msgs, _, err := db.findMsgInfoBySeq(ctx, docID, seqs)
// if err != nil {
// return nil, err
// }
// for _, msg := range msgs {
// if msg.Status != constant.MsgDeleted {
// seqMsgs = append(seqMsgs, msg)
// }
// }
// }
}
if len(seqMsgs) < int(delNums) {
seqMsgs2, err := db.refetchDelSeqsMsgs(ctx, conversationID, delNums-int64(len(seqMsgs)), rangeBegin-1, begin)
@ -480,19 +420,19 @@ func (db *commonMsgDatabase) refetchDelSeqsMsgs(ctx context.Context, conversatio
return seqMsgs, nil
}
func (db *commonMsgDatabase) findMsgBySeq(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, unExistSeqs []int64, err error) {
func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, docID string, seqs []int64) (totalMsgs []*unRelationTb.MsgInfoModel, unExistSeqs []int64, err error) {
msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, seqs)
if err != nil {
return nil, nil, err
}
log.ZDebug(ctx, "findMsgBySeq", "docID", docID, "seqs", seqs, "len(msgs)", len(msgs))
seqMsgs = append(seqMsgs, msgs...)
log.ZDebug(ctx, "findMsgInfoBySeq", "docID", docID, "seqs", seqs, "len(msgs)", len(msgs))
totalMsgs = append(totalMsgs, msgs...)
if len(msgs) == 0 {
unExistSeqs = seqs
} else {
for _, seq := range seqs {
for i, msg := range msgs {
if seq == msg.Seq {
if seq == msg.Msg.Seq {
break
}
if i == len(msgs)-1 {
@ -501,50 +441,27 @@ func (db *commonMsgDatabase) findMsgBySeq(ctx context.Context, docID string, seq
}
}
}
msgs, _, unExistSeqs, err = db.msgDocDatabase.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, unExistSeqs)
if err != nil {
return nil, nil, err
}
seqMsgs = append(seqMsgs, msgs...)
return seqMsgs, unExistSeqs, nil
return totalMsgs, unExistSeqs, nil
}
func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, conversationID string, allSeqs []int64, begin, end int64) (seqMsgs []*sdkws.MsgData, err error) {
func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID string, conversationID string, allSeqs []int64, begin, end int64) (seqMsgs []*sdkws.MsgData, err error) {
log.ZDebug(ctx, "getMsgBySeqsRange", "conversationID", conversationID, "allSeqs", allSeqs, "begin", begin, "end", end)
m := db.msg.GetDocIDSeqsMap(conversationID, allSeqs)
var totalNotExistSeqs []int64
// mongo index
for docID, seqs := range m {
for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, allSeqs) {
log.ZDebug(ctx, "getMsgBySeqsRange", "docID", docID, "seqs", seqs)
msgs, notExistSeqs, err := db.findMsgBySeq(ctx, docID, seqs)
msgs, notExistSeqs, err := db.findMsgInfoBySeq(ctx, docID, seqs)
if err != nil {
return nil, err
}
log.ZDebug(ctx, "getMsgBySeqsRange", "unExistSeqs", notExistSeqs, "msgs", len(msgs))
seqMsgs = append(seqMsgs, msgs...)
for _, msg := range msgs {
seqMsgs = append(seqMsgs, convert.MsgDB2Pb(msg.Msg))
}
totalNotExistSeqs = append(totalNotExistSeqs, notExistSeqs...)
}
log.ZDebug(ctx, "getMsgBySeqsRange", "totalNotExistSeqs", totalNotExistSeqs)
// find by next doc
var missedSeqs []int64
if len(totalNotExistSeqs) > 0 {
m = db.msg.GetDocIDSeqsMap(conversationID, totalNotExistSeqs)
for docID, seqs := range m {
docID = db.msg.ToNextDoc(docID)
msgs, _, unExistSeqs, err := db.msgDocDatabase.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs)
if err != nil {
missedSeqs = append(missedSeqs, seqs...)
log.ZError(ctx, "get message from mongo exception", err, "docID", docID, "seqs", seqs)
continue
}
missedSeqs = append(missedSeqs, unExistSeqs...)
seqMsgs = append(seqMsgs, msgs...)
if len(unExistSeqs) > 0 {
log.ZWarn(ctx, "some seqs lost in mongo", err, "docID", docID, "seqs", seqs, "unExistSeqs", unExistSeqs)
}
}
}
seqMsgs = append(seqMsgs, db.msg.GenExceptionMessageBySeqs(missedSeqs)...)
seqMsgs = append(seqMsgs, db.msg.GenExceptionMessageBySeqs(totalNotExistSeqs)...)
var delSeqs []int64
for _, msg := range seqMsgs {
if msg.Status == constant.MsgDeleted {
@ -556,7 +473,9 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, conversation
if err != nil {
log.ZWarn(ctx, "refetchDelSeqsMsgs", err, "delSeqs", delSeqs, "begin", begin)
}
seqMsgs = append(seqMsgs, msgs...)
for _, msg := range msgs {
seqMsgs = append(seqMsgs, convert.MsgDB2Pb(msg))
}
}
// sort by seq
if len(totalNotExistSeqs) > 0 || len(delSeqs) > 0 {
@ -565,7 +484,7 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, conversation
return seqMsgs, nil
}
func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, conversationID string, begin, end, num int64) (seqMsg []*sdkws.MsgData, err error) {
func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num int64) (seqMsg []*sdkws.MsgData, err error) {
var seqs []int64
for i := end; i > end-num; i-- {
if i >= begin {
@ -587,7 +506,7 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, conversation
// get from cache or db
prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs))
if len(failedSeqs) > 0 {
mongoMsgs, err := db.getMsgBySeqsRange(ctx, conversationID, failedSeqs, begin, end)
mongoMsgs, err := db.getMsgBySeqsRange(ctx, userID, conversationID, failedSeqs, begin, end)
if err != nil {
prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs))
return nil, err
@ -598,8 +517,25 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, conversation
return successMsgs, nil
}
func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, conversationID string, seqs []int64) (successMsgs []*sdkws.MsgData, err error) {
successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, conversationID, seqs)
func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (successMsgs []*sdkws.MsgData, err error) {
userMinSeq, err := db.cache.GetConversationUserMinSeq(ctx, conversationID, userID)
if err != nil {
return nil, err
}
minSeq, err := db.cache.GetMinSeq(ctx, conversationID)
if err != nil {
return nil, err
}
if userMinSeq < minSeq {
minSeq = userMinSeq
}
var newSeqs []int64
for _, seq := range seqs {
if seq >= minSeq {
newSeqs = append(newSeqs, seq)
}
}
successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, conversationID, newSeqs)
if err != nil {
if err != redis.Nil {
prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
@ -608,7 +544,7 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, conversationID st
}
prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs))
if len(failedSeqs) > 0 {
mongoMsgs, err := db.getMsgBySeqs(ctx, conversationID, seqs)
mongoMsgs, err := db.getMsgBySeqs(ctx, conversationID, failedSeqs)
if err != nil {
prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs))
return nil, err
@ -621,7 +557,8 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, conversationID st
func (db *commonMsgDatabase) DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error {
var delStruct delMsgRecursionStruct
minSeq, err := db.deleteMsgRecursion(ctx, conversationID, unRelationTb.OldestList, &delStruct, remainTime)
var skip int64
minSeq, err := db.deleteMsgRecursion(ctx, conversationID, skip, &delStruct, remainTime)
if err != nil {
return err
}
@ -653,69 +590,56 @@ func (d *delMsgRecursionStruct) getSetMinSeq() int64 {
// recursion 删除list并且返回设置的最小seq
func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversationID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (int64, error) {
// find from oldest list
//msgs, err := db.msgDocDatabase.GetMsgsByIndex(ctx, conversationID, index)
//if err != nil || msgs.DocID == "" {
// if err != nil {
// if err == unrelation.ErrMsgListNotExist {
// log.ZDebug(ctx, "deleteMsgRecursion ErrMsgListNotExist", "conversationID", conversationID, "index:", index)
// } else {
// log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index)
// }
// }
// // 获取报错或者获取不到了物理删除并且返回seq delMongoMsgsPhysical(delStruct.delDocIDList), 结束递归
// err = db.msgDocDatabase.Delete(ctx, delStruct.delDocIDs)
// if err != nil {
// return 0, err
// }
// return delStruct.getSetMinSeq() + 1, nil
//}
//log.ZDebug(ctx, "doc info", "conversationID", conversationID, "index", index, "docID", msgs.DocID, "len", len(msgs.Msg))
//if int64(len(msgs.Msg)) > db.msg.GetSingleGocMsgNum() {
// log.ZWarn(ctx, "msgs too large", nil, "lenth", len(msgs.Msg), "docID:", msgs.DocID)
//}
//if msgs.Msg[len(msgs.Msg)-1].SendTime+(remainTime*1000) < utils.GetCurrentTimestampByMill() && msgs.IsFull() {
// delStruct.delDocIDs = append(delStruct.delDocIDs, msgs.DocID)
// lastMsgPb := &sdkws.MsgData{}
// err = proto.Unmarshal(msgs.Msg[len(msgs.Msg)-1].Msg, lastMsgPb)
// if err != nil {
// log.ZError(ctx, "proto.Unmarshal failed", err, "index", len(msgs.Msg)-1, "docID", msgs.DocID)
// return 0, utils.Wrap(err, "proto.Unmarshal failed")
// }
// delStruct.minSeq = lastMsgPb.Seq
//} else {
// var hasMarkDelFlag bool
// for i, msg := range msgs.Msg {
// if msg.SendTime != 0 {
// msgPb := &sdkws.MsgData{}
// err = proto.Unmarshal(msg.Msg, msgPb)
// if err != nil {
// log.ZError(ctx, "proto.Unmarshal failed", err, "index", i, "docID", msgs.DocID)
// return 0, utils.Wrap(err, "proto.Unmarshal failed")
// }
// if utils.GetCurrentTimestampByMill() > msg.SendTime+(remainTime*1000) {
// msgPb.Status = constant.MsgDeleted
// bytes, _ := proto.Marshal(msgPb)
// msg.Msg = bytes
// msg.SendTime = 0
// hasMarkDelFlag = true
// } else {
// // 到本条消息不需要删除, minSeq置为这条消息的seq
// if err := db.msgDocDatabase.Delete(ctx, delStruct.delDocIDs); err != nil {
// return 0, err
// }
// if hasMarkDelFlag {
// if err := db.msgDocDatabase.UpdateOneDoc(ctx, msgs); err != nil {
msgDocModel, err := db.msgDocDatabase.GetMsgDocModelByIndex(ctx, conversationID, index, 1)
if err != nil || msgDocModel.DocID == "" {
if err != nil {
if err == unrelation.ErrMsgListNotExist {
log.ZDebug(ctx, "deleteMsgRecursion ErrMsgListNotExist", "conversationID", conversationID, "index:", index)
} else {
log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index)
}
}
// 获取报错或者获取不到了物理删除并且返回seq delMongoMsgsPhysical(delStruct.delDocIDList), 结束递归
err = db.msgDocDatabase.DeleteDocs(ctx, delStruct.delDocIDs)
if err != nil {
return 0, err
}
return delStruct.getSetMinSeq() + 1, nil
}
log.ZDebug(ctx, "doc info", "conversationID", conversationID, "index", index, "docID", msgDocModel.DocID, "len", len(msgDocModel.Msg))
if int64(len(msgDocModel.Msg)) > db.msg.GetSingleGocMsgNum() {
log.ZWarn(ctx, "msgs too large", nil, "lenth", len(msgDocModel.Msg), "docID:", msgDocModel.DocID)
}
if msgDocModel.Msg[len(msgDocModel.Msg)-1].Msg.SendTime+(remainTime*1000) < utils.GetCurrentTimestampByMill() && msgDocModel.IsFull() {
delStruct.delDocIDs = append(delStruct.delDocIDs, msgDocModel.DocID)
delStruct.minSeq = msgDocModel.Msg[len(msgDocModel.Msg)-1].Msg.Seq
} else {
var hasMarkDelFlag bool
var delMsgIndexs []int
for i, MsgInfoModel := range msgDocModel.Msg {
if MsgInfoModel != nil {
if utils.GetCurrentTimestampByMill() > MsgInfoModel.Msg.SendTime+(remainTime*1000) {
delMsgIndexs = append(delMsgIndexs, i)
hasMarkDelFlag = true
} else {
// 到本条消息不需要删除, minSeq置为这条消息的seq
if err := db.msgDocDatabase.DeleteDocs(ctx, delStruct.delDocIDs); err != nil {
return 0, err
}
if hasMarkDelFlag {
// mark del all delMsgIndexs
// if err := db.msgDocDatabase.UpdateOneDoc(ctx, msgDocModel); err != nil {
// return delStruct.getSetMinSeq(), err
// }
// }
// return msgPb.Seq, nil
// }
// }
// }
//}
//// 继续递归 index+1
//seq, err := db.deleteMsgRecursion(ctx, conversationID, index+1, delStruct, remainTime)
//return seq, err
}
return MsgInfoModel.Msg.Seq, nil
}
}
}
}
// 继续递归 index+1
seq, err := db.deleteMsgRecursion(ctx, conversationID, index+1, delStruct, remainTime)
return seq, err
return 0, nil
}
@ -805,20 +729,12 @@ func (db *commonMsgDatabase) GetMinMaxSeqMongo(ctx context.Context, conversation
if err != nil {
return
}
msgPb, err := db.unmarshalMsg(oldestMsgMongo)
if err != nil {
return
}
minSeqMongo = msgPb.Seq
minSeqMongo = oldestMsgMongo.Msg.Seq
newestMsgMongo, err := db.msgDocDatabase.GetNewestMsg(ctx, conversationID)
if err != nil {
return
}
msgPb, err = db.unmarshalMsg(newestMsgMongo)
if err != nil {
return
}
maxSeqMongo = msgPb.Seq
maxSeqMongo = newestMsgMongo.Msg.Seq
return
}

@ -2,10 +2,11 @@ package unrelation
import (
"context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"go.mongodb.org/mongo-driver/mongo"
"strconv"
"strings"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"go.mongodb.org/mongo-driver/mongo"
)
const (
@ -17,7 +18,7 @@ const (
type MsgDocModel struct {
DocID string `bson:"doc_id"`
Msg []MsgInfoModel `bson:"msgs"`
Msg []*MsgInfoModel `bson:"msgs"`
}
type RevokeModel struct {
@ -62,7 +63,6 @@ type MsgInfoModel struct {
Msg *MsgDataModel `bson:"msg"`
Revoke *RevokeModel `bson:"revoke"`
DelList []string `bson:"del_list"`
ReadList []string `bson:"read_list"`
}
type MsgDocModelInterface interface {
@ -72,15 +72,12 @@ type MsgDocModelInterface interface {
PushUnique(ctx context.Context, docID string, index int64, key string, value any) (*mongo.UpdateResult, error)
UpdateMsgContent(ctx context.Context, docID string, index int64, msg []byte) error
IsExistDocID(ctx context.Context, docID string) (bool, error)
UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error
FindOneByDocID(ctx context.Context, docID string) (*MsgDocModel, error)
GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, seqs []int64) ([]*sdkws.MsgData, error)
GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error)
GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, seqs []int64) ([]*MsgInfoModel, error)
GetNewestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error)
GetOldestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error)
Delete(ctx context.Context, docIDs []string) error
GetMsgsByIndex(ctx context.Context, conversationID string, index int64) (*MsgDocModel, error)
UpdateOneDoc(ctx context.Context, msg *MsgDocModel) error
DeleteDocs(ctx context.Context, docIDs []string) error
GetMsgDocModelByIndex(ctx context.Context, conversationID string, index, sort int64) (*MsgDocModel, error)
}
func (MsgDocModel) TableName() string {
@ -92,8 +89,7 @@ func (MsgDocModel) GetSingleGocMsgNum() int64 {
}
func (m *MsgDocModel) IsFull() bool {
//return m.Msg[len(m.Msg)-1].SendTime != 0
return false
return m.Msg[len(m.Msg)-1].Msg != nil
}
func (m MsgDocModel) GetDocID(conversationID string, seq int64) string {
@ -153,9 +149,9 @@ func (m MsgDocModel) indexGen(conversationID string, seqSuffix int64) string {
func (MsgDocModel) GenExceptionMessageBySeqs(seqs []int64) (exceptionMsg []*sdkws.MsgData) {
for _, v := range seqs {
msg := new(sdkws.MsgData)
msg.Seq = v
exceptionMsg = append(exceptionMsg, msg)
msgModel := new(sdkws.MsgData)
msgModel.Seq = v
exceptionMsg = append(exceptionMsg, msgModel)
}
return exceptionMsg
}

@ -65,7 +65,7 @@ func (m *Mongo) GetDatabase() *mongo.Database {
}
func (m *Mongo) CreateMsgIndex() error {
return m.createMongoIndex(unrelation.Msg, false, "uid")
return m.createMongoIndex(unrelation.Msg, true, "doc_id")
}
func (m *Mongo) CreateSuperGroupIndex() error {

@ -6,6 +6,7 @@ import (
"fmt"
table "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
@ -17,23 +18,14 @@ import (
)
var ErrMsgListNotExist = errors.New("user not have msg in mongoDB")
var ErrMsgNotFound = errors.New("msg not found")
type MsgMongoDriver struct {
MsgCollection *mongo.Collection
msg table.MsgDocModel
model table.MsgDocModel
}
func NewMsgMongoDriver(database *mongo.Database) table.MsgDocModelInterface {
collection := database.Collection(table.MsgDocModel{}.TableName())
indexModel := mongo.IndexModel{
Keys: bson.M{"doc_id": 1},
Options: options.Index().SetUnique(true),
}
_, err := collection.Indexes().CreateOne(context.Background(), indexModel)
if err != nil {
panic(err)
}
return &MsgMongoDriver{MsgCollection: collection}
}
@ -110,45 +102,17 @@ func (m *MsgMongoDriver) FindOneByDocID(ctx context.Context, docID string) (*tab
return doc, err
}
func (m *MsgMongoDriver) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) {
//doc, err := m.FindOneByDocID(ctx, docID)
//if err != nil {
// return nil, nil, nil, err
//}
//singleCount := 0
//var hasSeqList []int64
//for i := 0; i < len(doc.Msg); i++ {
// var msg sdkws.MsgData
// if err := proto.Unmarshal(doc.Msg[i].Msg, &msg); err != nil {
// return nil, nil, nil, err
// }
// if utils.Contain(msg.Seq, seqs...) {
// indexes = append(indexes, i)
// seqMsgs = append(seqMsgs, &msg)
// hasSeqList = append(hasSeqList, msg.Seq)
// singleCount++
// if singleCount == len(seqs) {
// break
// }
// }
//}
//for _, i := range seqs {
// if utils.Contain(i, hasSeqList...) {
// continue
// }
// unExistSeqs = append(unExistSeqs, i)
//}
return seqMsgs, indexes, unExistSeqs, nil
}
func (m *MsgMongoDriver) GetMsgsByIndex(ctx context.Context, conversationID string, index int64) (*table.MsgDocModel, error) {
findOpts := options.Find().SetLimit(1).SetSkip(index).SetSort(bson.M{"doc_id": 1})
func (m *MsgMongoDriver) GetMsgDocModelByIndex(ctx context.Context, conversationID string, index, sort int64) (*table.MsgDocModel, error) {
if sort != 1 && sort != -1 {
return nil, errs.ErrArgs.Wrap("mongo sort must be 1 or -1")
}
findOpts := options.Find().SetLimit(1).SetSkip(index).SetSort(bson.M{"doc_id": sort})
cursor, err := m.MsgCollection.Find(ctx, bson.M{"doc_id": primitive.Regex{Pattern: fmt.Sprintf("^%s:", conversationID)}}, findOpts)
if err != nil {
return nil, utils.Wrap(err, "")
}
var msgs []table.MsgDocModel
err = cursor.All(context.Background(), &msgs)
err = cursor.All(ctx, &msgs)
if err != nil {
return nil, utils.Wrap(err, fmt.Sprintf("cursor is %s", cursor.Current.String()))
}
@ -159,53 +123,38 @@ func (m *MsgMongoDriver) GetMsgsByIndex(ctx context.Context, conversationID stri
}
func (m *MsgMongoDriver) GetNewestMsg(ctx context.Context, conversationID string) (*table.MsgInfoModel, error) {
var msgDocs []table.MsgDocModel
cursor, err := m.MsgCollection.Find(ctx, bson.M{"doc_id": bson.M{"$regex": fmt.Sprintf("^%s:", conversationID)}}, options.Find().SetLimit(1).SetSort(bson.M{"doc_id": -1}))
var skip int64 = 0
for {
msgDocModel, err := m.GetMsgDocModelByIndex(ctx, conversationID, skip, -1)
if err != nil {
return nil, utils.Wrap(err, "")
return nil, err
}
err = cursor.All(ctx, &msgDocs)
if err != nil {
return nil, utils.Wrap(err, "")
for i := len(msgDocModel.Msg) - 1; i >= 0; i-- {
if msgDocModel.Msg[i].Msg != nil {
return msgDocModel.Msg[i], nil
}
if len(msgDocs) > 0 {
if len(msgDocs[0].Msg) > 0 {
return &msgDocs[0].Msg[len(msgDocs[0].Msg)-1], nil
}
return nil, errs.ErrRecordNotFound.Wrap("len(msgDocs[0].Msgs) < 0")
skip++
}
return nil, ErrMsgNotFound
}
func (m *MsgMongoDriver) GetOldestMsg(ctx context.Context, conversationID string) (*table.MsgInfoModel, error) {
//var msgDocs []table.MsgDocModel
//cursor, err := m.MsgCollection.Find(ctx, bson.M{"doc_id": bson.M{"$regex": fmt.Sprintf("^%s:", conversationID)}}, options.Find().SetLimit(1).SetSort(bson.M{"doc_id": 1}))
//if err != nil {
// return nil, err
//}
//err = cursor.All(ctx, &msgDocs)
//if err != nil {
// return nil, utils.Wrap(err, "")
//}
//var oldestMsg table.MsgInfoModel
//if len(msgDocs) > 0 {
// for _, v := range msgDocs[0].Msg {
// if v.SendTime != 0 {
// oldestMsg = v
// break
// }
// }
// if len(oldestMsg.Msg) == 0 {
// if len(msgDocs[0].Msg) > 0 {
// oldestMsg = msgDocs[0].Msg[0]
// }
// }
// return &oldestMsg, nil
//}
return nil, ErrMsgNotFound
var skip int64 = 0
for {
msgDocModel, err := m.GetMsgDocModelByIndex(ctx, conversationID, skip, 1)
if err != nil {
return nil, err
}
for i, v := range msgDocModel.Msg {
if v.Msg != nil {
return msgDocModel.Msg[i], nil
}
}
skip++
}
}
func (m *MsgMongoDriver) Delete(ctx context.Context, docIDs []string) error {
func (m *MsgMongoDriver) DeleteDocs(ctx context.Context, docIDs []string) error {
if docIDs == nil {
return nil
}
@ -213,56 +162,47 @@ func (m *MsgMongoDriver) Delete(ctx context.Context, docIDs []string) error {
return err
}
func (m *MsgMongoDriver) UpdateOneDoc(ctx context.Context, msg *table.MsgDocModel) error {
_, err := m.MsgCollection.UpdateOne(ctx, bson.M{"doc_id": msg.DocID}, bson.M{"$set": bson.M{"msgs": msg.Msg}})
return err
}
func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, seqs []int64) (msgs []*sdkws.MsgData, err error) {
//beginSeq, endSeq := utils.GetSeqsBeginEnd(seqs)
//beginIndex := m.msg.GetMsgIndex(beginSeq)
//num := endSeq - beginSeq + 1
//pipeline := bson.A{
// bson.M{
// "$match": bson.M{"doc_id": docID},
// },
// bson.M{
// "$project": bson.M{
// "msgs": bson.M{
// "$slice": bson.A{"$msgs", beginIndex, num},
// },
// },
// },
//}
//cursor, err := m.MsgCollection.Aggregate(ctx, pipeline)
//if err != nil {
// return nil, errs.Wrap(err)
//}
//defer cursor.Close(ctx)
//var doc table.MsgDocModel
//i := 0
//for cursor.Next(ctx) {
// err := cursor.Decode(&doc)
// if err != nil {
// return nil, err
// }
// if i == 0 {
// break
// }
//}
//log.ZDebug(ctx, "msgInfos", "num", len(doc.Msg), "docID", docID)
//for _, v := range doc.Msg {
// var msg sdkws.MsgData
// if err := proto.Unmarshal(v.Msg, &msg); err != nil {
// return nil, err
// }
// if msg.Seq >= beginSeq && msg.Seq <= endSeq {
// log.ZDebug(ctx, "find msg", "msg", &msg)
// msgs = append(msgs, &msg)
// } else {
// log.ZWarn(ctx, "this msg is at wrong position", nil, "msg", &msg)
// }
//}
func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, seqs []int64) (msgs []*table.MsgInfoModel, err error) {
beginSeq, endSeq := utils.GetSeqsBeginEnd(seqs)
beginIndex := m.model.GetMsgIndex(beginSeq)
num := endSeq - beginSeq + 1
pipeline := bson.A{
bson.M{
"$match": bson.M{"doc_id": docID},
},
bson.M{
"$project": bson.M{
"msgs": bson.M{
"$slice": bson.A{"$msgs", beginIndex, num},
},
},
},
}
cursor, err := m.MsgCollection.Aggregate(ctx, pipeline)
if err != nil {
return nil, errs.Wrap(err)
}
defer cursor.Close(ctx)
var doc table.MsgDocModel
i := 0
for cursor.Next(ctx) {
err := cursor.Decode(&doc)
if err != nil {
return nil, err
}
if i == 0 {
break
}
}
log.ZDebug(ctx, "msgInfos", "num", len(doc.Msg), "docID", docID)
for _, v := range doc.Msg {
if v.Msg.Seq >= beginSeq && v.Msg.Seq <= endSeq {
log.ZDebug(ctx, "find msg", "msg", v.Msg)
msgs = append(msgs, v)
} else {
log.ZWarn(ctx, "this msg is at wrong position", nil, "msg", v.Msg)
}
}
return msgs, nil
}

@ -10,6 +10,7 @@ import (
"time"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
sdkws "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/jinzhu/copier"
"github.com/pkg/errors"
@ -230,6 +231,35 @@ func GenConversationUniqueKey(msg *sdkws.MsgData) string {
return ""
}
func GetConversationIDByMsgModel(msg *unrelation.MsgDataModel) string {
options := Options(msg.Options)
switch msg.SessionType {
case constant.SingleChatType:
l := []string{msg.SendID, msg.RecvID}
sort.Strings(l)
if !options.IsNotNotification() {
return "n_" + strings.Join(l, "_")
}
return "si_" + strings.Join(l, "_") // single chat
case constant.GroupChatType:
if !options.IsNotNotification() {
return "n_" + msg.GroupID // group chat
}
return "g_" + msg.GroupID // group chat
case constant.SuperGroupChatType:
if !options.IsNotNotification() {
return "n_" + msg.GroupID // super group chat
}
return "sg_" + msg.GroupID // super group chat
case constant.NotificationChatType:
if !options.IsNotNotification() {
return "n_" + msg.SendID + "_" + msg.RecvID // super group chat
}
return "sn_" + msg.SendID + "_" + msg.RecvID // server notification chat
}
return ""
}
func GetConversationIDByMsg(msg *sdkws.MsgData) string {
options := Options(msg.Options)
switch msg.SessionType {

Loading…
Cancel
Save