|
|
@ -77,8 +77,8 @@ type MsgDatabase interface {
|
|
|
|
|
|
|
|
|
|
|
|
func NewMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel cache.Model) MsgDatabase {
|
|
|
|
func NewMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel cache.Model) MsgDatabase {
|
|
|
|
return &msgDatabase{
|
|
|
|
return &msgDatabase{
|
|
|
|
msgDocModel: msgDocModel,
|
|
|
|
msgDocDatabase: msgDocModel,
|
|
|
|
cache: cacheModel,
|
|
|
|
cache: cacheModel,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -90,22 +90,11 @@ func InitMsgDatabase(rdb redis.UniversalClient, database *mongo.Database) MsgDat
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
type msgDatabase struct {
|
|
|
|
type msgDatabase struct {
|
|
|
|
msgDocModel unRelationTb.MsgDocModelInterface
|
|
|
|
msgDocDatabase unRelationTb.MsgDocModelInterface
|
|
|
|
cache cache.Model
|
|
|
|
extendMsgDatabase unRelationTb.ExtendMsgSetModelInterface
|
|
|
|
msg unRelationTb.MsgDocModel
|
|
|
|
cache cache.Model
|
|
|
|
extendMsgModel unRelationTb.ExtendMsgSetModelInterface
|
|
|
|
msg unRelationTb.MsgDocModel
|
|
|
|
}
|
|
|
|
extendMsgSetModel unRelationTb.ExtendMsgSetModel
|
|
|
|
|
|
|
|
|
|
|
|
func (db *msgDatabase) reactionExtensionList(reactionExtensionList map[string]*sdkws.KeyValue) map[string]*unRelationTb.KeyValueModel {
|
|
|
|
|
|
|
|
r := make(map[string]*unRelationTb.KeyValueModel)
|
|
|
|
|
|
|
|
for key, value := range reactionExtensionList {
|
|
|
|
|
|
|
|
r[key] = &unRelationTb.KeyValueModel{
|
|
|
|
|
|
|
|
TypeKey: value.TypeKey,
|
|
|
|
|
|
|
|
Value: value.Value,
|
|
|
|
|
|
|
|
LatestUpdateTime: value.LatestUpdateTime,
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
return r
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (db *msgDatabase) JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) {
|
|
|
|
func (db *msgDatabase) JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) {
|
|
|
@ -133,11 +122,11 @@ func (db *msgDatabase) DeleteOneMessageKey(ctx context.Context, clientMsgID stri
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (db *msgDatabase) InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensions map[string]*sdkws.KeyValue) error {
|
|
|
|
func (db *msgDatabase) InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensions map[string]*sdkws.KeyValue) error {
|
|
|
|
return db.extendMsgModel.InsertOrUpdateReactionExtendMsgSet(ctx, sourceID, sessionType, clientMsgID, msgFirstModifyTime, db.reactionExtensionList(reactionExtensions))
|
|
|
|
return db.extendMsgDatabase.InsertOrUpdateReactionExtendMsgSet(ctx, sourceID, sessionType, clientMsgID, msgFirstModifyTime, db.extendMsgSetModel.Pb2Model(reactionExtensions))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (db *msgDatabase) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*pbMsg.ExtendMsg, error) {
|
|
|
|
func (db *msgDatabase) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*pbMsg.ExtendMsg, error) {
|
|
|
|
extendMsgSet, err := db.extendMsgModel.GetExtendMsgSet(ctx, sourceID, sessionType, maxMsgUpdateTime)
|
|
|
|
extendMsgSet, err := db.extendMsgDatabase.GetExtendMsgSet(ctx, sourceID, sessionType, maxMsgUpdateTime)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -165,7 +154,7 @@ func (db *msgDatabase) GetExtendMsg(ctx context.Context, sourceID string, sessio
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (db *msgDatabase) DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensions map[string]*sdkws.KeyValue) error {
|
|
|
|
func (db *msgDatabase) DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensions map[string]*sdkws.KeyValue) error {
|
|
|
|
return db.extendMsgModel.DeleteReactionExtendMsgSet(ctx, sourceID, sessionType, clientMsgID, msgFirstModifyTime, db.reactionExtensionList(reactionExtensions))
|
|
|
|
return db.extendMsgDatabase.DeleteReactionExtendMsgSet(ctx, sourceID, sessionType, clientMsgID, msgFirstModifyTime, db.extendMsgSetModel.Pb2Model(reactionExtensions))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (db *msgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
|
|
|
|
func (db *msgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
|
|
|
@ -244,13 +233,13 @@ func (db *msgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string,
|
|
|
|
//filter := bson.M{"uid": seqUid}
|
|
|
|
//filter := bson.M{"uid": seqUid}
|
|
|
|
//log.NewDebug(operationID, "filter ", seqUid, "list ", msgListToMongo, "userID: ", userID)
|
|
|
|
//log.NewDebug(operationID, "filter ", seqUid, "list ", msgListToMongo, "userID: ", userID)
|
|
|
|
//err := c.FindOneAndUpdate(ctx, filter, bson.M{"$push": bson.M{"msg": bson.M{"$each": msgsToMongo}}}).Err()
|
|
|
|
//err := c.FindOneAndUpdate(ctx, filter, bson.M{"$push": bson.M{"msg": bson.M{"$each": msgsToMongo}}}).Err()
|
|
|
|
err = db.msgDocModel.PushMsgsToDoc(ctx, docID, msgsToMongo)
|
|
|
|
err = db.msgDocDatabase.PushMsgsToDoc(ctx, docID, msgsToMongo)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
if err == mongo.ErrNoDocuments {
|
|
|
|
if err == mongo.ErrNoDocuments {
|
|
|
|
doc := &unRelationTb.MsgDocModel{}
|
|
|
|
doc := &unRelationTb.MsgDocModel{}
|
|
|
|
doc.DocID = docID
|
|
|
|
doc.DocID = docID
|
|
|
|
doc.Msg = msgsToMongo
|
|
|
|
doc.Msg = msgsToMongo
|
|
|
|
if err = db.msgDocModel.Create(ctx, doc); err != nil {
|
|
|
|
if err = db.msgDocDatabase.Create(ctx, doc); err != nil {
|
|
|
|
prome.Inc(prome.MsgInsertMongoFailedCounter)
|
|
|
|
prome.Inc(prome.MsgInsertMongoFailedCounter)
|
|
|
|
//log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
|
|
|
|
//log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
|
|
|
|
return utils.Wrap(err, "")
|
|
|
|
return utils.Wrap(err, "")
|
|
|
@ -270,7 +259,7 @@ func (db *msgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string,
|
|
|
|
nextDoc.DocID = docIDNext
|
|
|
|
nextDoc.DocID = docIDNext
|
|
|
|
nextDoc.Msg = msgsToMongoNext
|
|
|
|
nextDoc.Msg = msgsToMongoNext
|
|
|
|
//log.NewDebug(operationID, "filter ", seqUidNext, "list ", msgListToMongoNext, "userID: ", userID)
|
|
|
|
//log.NewDebug(operationID, "filter ", seqUidNext, "list ", msgListToMongoNext, "userID: ", userID)
|
|
|
|
if err = db.msgDocModel.Create(ctx, nextDoc); err != nil {
|
|
|
|
if err = db.msgDocDatabase.Create(ctx, nextDoc); err != nil {
|
|
|
|
prome.Inc(prome.MsgInsertMongoFailedCounter)
|
|
|
|
prome.Inc(prome.MsgInsertMongoFailedCounter)
|
|
|
|
//log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
|
|
|
|
//log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
|
|
|
|
return utils.Wrap(err, "")
|
|
|
|
return utils.Wrap(err, "")
|
|
|
@ -364,7 +353,7 @@ func (db *msgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, s
|
|
|
|
return nil, err
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
for i, v := range seqMsgs {
|
|
|
|
for i, v := range seqMsgs {
|
|
|
|
if err = db.msgDocModel.UpdateMsgStatusByIndexInOneDoc(ctx, docID, v, indexes[i], constant.MsgDeleted); err != nil {
|
|
|
|
if err = db.msgDocDatabase.UpdateMsgStatusByIndexInOneDoc(ctx, docID, v, indexes[i], constant.MsgDeleted); err != nil {
|
|
|
|
return nil, err
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -372,7 +361,7 @@ func (db *msgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, s
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (db *msgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) {
|
|
|
|
func (db *msgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) {
|
|
|
|
doc, err := db.msgDocModel.FindOneByDocID(ctx, docID)
|
|
|
|
doc, err := db.msgDocDatabase.FindOneByDocID(ctx, docID)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
return nil, nil, nil, err
|
|
|
|
return nil, nil, nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -403,7 +392,7 @@ func (db *msgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID s
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (db *msgDatabase) GetNewestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) {
|
|
|
|
func (db *msgDatabase) GetNewestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) {
|
|
|
|
msgInfo, err := db.msgDocModel.GetNewestMsg(ctx, sourceID)
|
|
|
|
msgInfo, err := db.msgDocDatabase.GetNewestMsg(ctx, sourceID)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -411,7 +400,7 @@ func (db *msgDatabase) GetNewestMsg(ctx context.Context, sourceID string) (msgPb
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (db *msgDatabase) GetOldestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) {
|
|
|
|
func (db *msgDatabase) GetOldestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) {
|
|
|
|
msgInfo, err := db.msgDocModel.GetOldestMsg(ctx, sourceID)
|
|
|
|
msgInfo, err := db.msgDocDatabase.GetOldestMsg(ctx, sourceID)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -432,7 +421,7 @@ func (db *msgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs [
|
|
|
|
singleCount := 0
|
|
|
|
singleCount := 0
|
|
|
|
m := db.msg.GetDocIDSeqsMap(sourceID, seqs)
|
|
|
|
m := db.msg.GetDocIDSeqsMap(sourceID, seqs)
|
|
|
|
for docID, value := range m {
|
|
|
|
for docID, value := range m {
|
|
|
|
doc, err := db.msgDocModel.FindOneByDocID(ctx, docID)
|
|
|
|
doc, err := db.msgDocDatabase.FindOneByDocID(ctx, docID)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
//log.NewError(operationID, "not find seqUid", seqUid, value, uid, seqList, err.Error())
|
|
|
|
//log.NewError(operationID, "not find seqUid", seqUid, value, uid, seqList, err.Error())
|
|
|
|
continue
|
|
|
|
continue
|
|
|
@ -575,7 +564,7 @@ func (d *delMsgRecursionStruct) getSetMinSeq() int64 {
|
|
|
|
// recursion 删除list并且返回设置的最小seq
|
|
|
|
// recursion 删除list并且返回设置的最小seq
|
|
|
|
func (db *msgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (int64, error) {
|
|
|
|
func (db *msgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (int64, error) {
|
|
|
|
// find from oldest list
|
|
|
|
// find from oldest list
|
|
|
|
msgs, err := db.msgDocModel.GetMsgsByIndex(ctx, sourceID, index)
|
|
|
|
msgs, err := db.msgDocDatabase.GetMsgsByIndex(ctx, sourceID, index)
|
|
|
|
if err != nil || msgs.DocID == "" {
|
|
|
|
if err != nil || msgs.DocID == "" {
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
if err == unrelation.ErrMsgListNotExist {
|
|
|
|
if err == unrelation.ErrMsgListNotExist {
|
|
|
@ -585,7 +574,7 @@ func (db *msgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// 获取报错,或者获取不到了,物理删除并且返回seq delMongoMsgsPhysical(delStruct.delDocIDList)
|
|
|
|
// 获取报错,或者获取不到了,物理删除并且返回seq delMongoMsgsPhysical(delStruct.delDocIDList)
|
|
|
|
err = db.msgDocModel.Delete(ctx, delStruct.delDocIDs)
|
|
|
|
err = db.msgDocDatabase.Delete(ctx, delStruct.delDocIDs)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -620,11 +609,11 @@ func (db *msgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string,
|
|
|
|
msg.SendTime = 0
|
|
|
|
msg.SendTime = 0
|
|
|
|
hasMarkDelFlag = true
|
|
|
|
hasMarkDelFlag = true
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
if err := db.msgDocModel.Delete(ctx, delStruct.delDocIDs); err != nil {
|
|
|
|
if err := db.msgDocDatabase.Delete(ctx, delStruct.delDocIDs); err != nil {
|
|
|
|
return 0, err
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if hasMarkDelFlag {
|
|
|
|
if hasMarkDelFlag {
|
|
|
|
if err := db.msgDocModel.UpdateOneDoc(ctx, msgs); err != nil {
|
|
|
|
if err := db.msgDocDatabase.UpdateOneDoc(ctx, msgs); err != nil {
|
|
|
|
return delStruct.getSetMinSeq(), utils.Wrap(err, "")
|
|
|
|
return delStruct.getSetMinSeq(), utils.Wrap(err, "")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -668,7 +657,7 @@ func (db *msgDatabase) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (db *msgDatabase) GetMinMaxSeqMongo(ctx context.Context, sourceID string) (minSeqMongo, maxSeqMongo int64, err error) {
|
|
|
|
func (db *msgDatabase) GetMinMaxSeqMongo(ctx context.Context, sourceID string) (minSeqMongo, maxSeqMongo int64, err error) {
|
|
|
|
oldestMsgMongo, err := db.msgDocModel.GetOldestMsg(ctx, sourceID)
|
|
|
|
oldestMsgMongo, err := db.msgDocDatabase.GetOldestMsg(ctx, sourceID)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
return 0, 0, err
|
|
|
|
return 0, 0, err
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -677,7 +666,7 @@ func (db *msgDatabase) GetMinMaxSeqMongo(ctx context.Context, sourceID string) (
|
|
|
|
return 0, 0, err
|
|
|
|
return 0, 0, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
minSeqMongo = msgPb.Seq
|
|
|
|
minSeqMongo = msgPb.Seq
|
|
|
|
newestMsgMongo, err := db.msgDocModel.GetNewestMsg(ctx, sourceID)
|
|
|
|
newestMsgMongo, err := db.msgDocDatabase.GetNewestMsg(ctx, sourceID)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
return 0, 0, err
|
|
|
|
return 0, 0, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|