|
|
|
@ -7,10 +7,10 @@ import (
|
|
|
|
|
"github.com/openimsdk/protocol/constant"
|
|
|
|
|
"github.com/openimsdk/protocol/msg"
|
|
|
|
|
"github.com/openimsdk/protocol/sdkws"
|
|
|
|
|
"github.com/openimsdk/tools/db/mongoutil"
|
|
|
|
|
"github.com/openimsdk/tools/errs"
|
|
|
|
|
"github.com/openimsdk/tools/log"
|
|
|
|
|
"github.com/openimsdk/tools/mgoutil"
|
|
|
|
|
"github.com/openimsdk/tools/utils"
|
|
|
|
|
"github.com/openimsdk/tools/utils/jsonutil"
|
|
|
|
|
"go.mongodb.org/mongo-driver/bson"
|
|
|
|
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
|
|
|
|
"go.mongodb.org/mongo-driver/mongo"
|
|
|
|
@ -40,11 +40,11 @@ type MsgMgo struct {
|
|
|
|
|
func (m *MsgMgo) PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []relation.MsgInfoModel) error {
|
|
|
|
|
filter := bson.M{"doc_id": docID}
|
|
|
|
|
update := bson.M{"$push": bson.M{"msgs": bson.M{"$each": msgsToMongo}}}
|
|
|
|
|
return mgoutil.UpdateOne(ctx, m.coll, filter, update, false)
|
|
|
|
|
return mongoutil.UpdateOne(ctx, m.coll, filter, update, false)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (m *MsgMgo) Create(ctx context.Context, model *relation.MsgDocModel) error {
|
|
|
|
|
return mgoutil.InsertMany(ctx, m.coll, []*relation.MsgDocModel{model})
|
|
|
|
|
return mongoutil.InsertMany(ctx, m.coll, []*relation.MsgDocModel{model})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (m *MsgMgo) UpdateMsg(ctx context.Context, docID string, index int64, key string, value any) (*mongo.UpdateResult, error) {
|
|
|
|
@ -56,7 +56,7 @@ func (m *MsgMgo) UpdateMsg(ctx context.Context, docID string, index int64, key s
|
|
|
|
|
}
|
|
|
|
|
filter := bson.M{"doc_id": docID}
|
|
|
|
|
update := bson.M{"$set": bson.M{field: value}}
|
|
|
|
|
return mgoutil.UpdateOneResult(ctx, m.coll, filter, update)
|
|
|
|
|
return mongoutil.UpdateOneResult(ctx, m.coll, filter, update)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (m *MsgMgo) PushUnique(ctx context.Context, docID string, index int64, key string, value any) (*mongo.UpdateResult, error) {
|
|
|
|
@ -72,21 +72,21 @@ func (m *MsgMgo) PushUnique(ctx context.Context, docID string, index int64, key
|
|
|
|
|
field: bson.M{"$each": value},
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
return mgoutil.UpdateOneResult(ctx, m.coll, filter, update)
|
|
|
|
|
return mongoutil.UpdateOneResult(ctx, m.coll, filter, update)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (m *MsgMgo) UpdateMsgContent(ctx context.Context, docID string, index int64, msg []byte) error {
|
|
|
|
|
filter := bson.M{"doc_id": docID}
|
|
|
|
|
update := bson.M{"$set": bson.M{fmt.Sprintf("msgs.%d.msg", index): msg}}
|
|
|
|
|
return mgoutil.UpdateOne(ctx, m.coll, filter, update, false)
|
|
|
|
|
return mongoutil.UpdateOne(ctx, m.coll, filter, update, false)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (m *MsgMgo) IsExistDocID(ctx context.Context, docID string) (bool, error) {
|
|
|
|
|
return mgoutil.Exist(ctx, m.coll, bson.M{"doc_id": docID})
|
|
|
|
|
return mongoutil.Exist(ctx, m.coll, bson.M{"doc_id": docID})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (m *MsgMgo) FindOneByDocID(ctx context.Context, docID string) (*relation.MsgDocModel, error) {
|
|
|
|
|
return mgoutil.FindOne[*relation.MsgDocModel](ctx, m.coll, bson.M{"doc_id": docID})
|
|
|
|
|
return mongoutil.FindOne[*relation.MsgDocModel](ctx, m.coll, bson.M{"doc_id": docID})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (m *MsgMgo) GetMsgBySeqIndexIn1Doc(ctx context.Context, userID, docID string, seqs []int64) ([]*relation.MsgInfoModel, error) {
|
|
|
|
@ -130,7 +130,7 @@ func (m *MsgMgo) GetMsgBySeqIndexIn1Doc(ctx context.Context, userID, docID strin
|
|
|
|
|
{Key: "msgs.del_list", Value: 0},
|
|
|
|
|
}}},
|
|
|
|
|
}
|
|
|
|
|
msgDocModel, err := mgoutil.Aggregate[*relation.MsgDocModel](ctx, m.coll, pipeline)
|
|
|
|
|
msgDocModel, err := mongoutil.Aggregate[*relation.MsgDocModel](ctx, m.coll, pipeline)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
@ -157,14 +157,14 @@ func (m *MsgMgo) GetMsgBySeqIndexIn1Doc(ctx context.Context, userID, docID strin
|
|
|
|
|
Seq: msg.Msg.Seq,
|
|
|
|
|
Ex: msg.Msg.Ex,
|
|
|
|
|
}
|
|
|
|
|
data, err := utils.JsonMarshal(&revokeContent)
|
|
|
|
|
data, err := jsonutil.JsonMarshal(&revokeContent)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, errs.WrapMsg(err, fmt.Sprintf("docID is %s, seqs is %v", docID, seqs))
|
|
|
|
|
}
|
|
|
|
|
elem := sdkws.NotificationElem{
|
|
|
|
|
Detail: string(data),
|
|
|
|
|
}
|
|
|
|
|
content, err := utils.JsonMarshal(&elem)
|
|
|
|
|
content, err := jsonutil.JsonMarshal(&elem)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, errs.WrapMsg(err, fmt.Sprintf("docID is %s, seqs is %v", docID, seqs))
|
|
|
|
|
}
|
|
|
|
@ -208,7 +208,7 @@ func (m *MsgMgo) DeleteDocs(ctx context.Context, docIDs []string) error {
|
|
|
|
|
if len(docIDs) == 0 {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
return mgoutil.DeleteMany(ctx, m.coll, bson.M{"doc_id": bson.M{"$in": docIDs}})
|
|
|
|
|
return mongoutil.DeleteMany(ctx, m.coll, bson.M{"doc_id": bson.M{"$in": docIDs}})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (m *MsgMgo) GetMsgDocModelByIndex(ctx context.Context, conversationID string, index, sort int64) (*relation.MsgDocModel, error) {
|
|
|
|
@ -217,7 +217,7 @@ func (m *MsgMgo) GetMsgDocModelByIndex(ctx context.Context, conversationID strin
|
|
|
|
|
}
|
|
|
|
|
opt := options.Find().SetLimit(1).SetSkip(index).SetSort(bson.M{"doc_id": sort}).SetLimit(1)
|
|
|
|
|
filter := bson.M{"doc_id": primitive.Regex{Pattern: fmt.Sprintf("^%s:", conversationID)}}
|
|
|
|
|
msgs, err := mgoutil.Find[*relation.MsgDocModel](ctx, m.coll, filter, opt)
|
|
|
|
|
msgs, err := mongoutil.Find[*relation.MsgDocModel](ctx, m.coll, filter, opt)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
@ -236,7 +236,7 @@ func (m *MsgMgo) DeleteMsgsInOneDocByIndex(ctx context.Context, docID string, in
|
|
|
|
|
"msg": nil,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
_, err := mgoutil.UpdateMany(ctx, m.coll, bson.M{"doc_id": docID}, update)
|
|
|
|
|
_, err := mongoutil.UpdateMany(ctx, m.coll, bson.M{"doc_id": docID}, update)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -311,7 +311,7 @@ func (m *MsgMgo) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (
|
|
|
|
|
DocID string `bson:"doc_id"`
|
|
|
|
|
Msg *relation.MsgInfoModel `bson:"msgs"`
|
|
|
|
|
}
|
|
|
|
|
msgsDocs, err := mgoutil.Aggregate[*docModel](ctx, m.coll, pipe)
|
|
|
|
|
msgsDocs, err := mongoutil.Aggregate[*docModel](ctx, m.coll, pipe)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return 0, nil, err
|
|
|
|
|
}
|
|
|
|
@ -335,12 +335,12 @@ func (m *MsgMgo) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (
|
|
|
|
|
Seq: msgInfo.Msg.Seq,
|
|
|
|
|
Ex: msgInfo.Msg.Ex,
|
|
|
|
|
}
|
|
|
|
|
data, err := utils.JsonMarshal(&revokeContent)
|
|
|
|
|
data, err := jsonutil.JsonMarshal(&revokeContent)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return 0, nil, errs.WrapMsg(err, "json.Marshal revokeContent")
|
|
|
|
|
}
|
|
|
|
|
elem := sdkws.NotificationElem{Detail: string(data)}
|
|
|
|
|
content, err := utils.JsonMarshal(&elem)
|
|
|
|
|
content, err := jsonutil.JsonMarshal(&elem)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return 0, nil, errs.WrapMsg(err, "json.Marshal elem")
|
|
|
|
|
}
|
|
|
|
@ -589,7 +589,7 @@ func (m *MsgMgo) RangeUserSendCount(ctx context.Context, start time.Time, end ti
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
result, err := mgoutil.Aggregate[*Result](ctx, m.coll, pipeline, options.Aggregate().SetAllowDiskUse(true))
|
|
|
|
|
result, err := mongoutil.Aggregate[*Result](ctx, m.coll, pipeline, options.Aggregate().SetAllowDiskUse(true))
|
|
|
|
|
if err != nil {
|
|
|
|
|
return 0, 0, nil, nil, err
|
|
|
|
|
}
|
|
|
|
@ -826,7 +826,7 @@ func (m *MsgMgo) RangeGroupSendCount(ctx context.Context, start time.Time, end t
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
result, err := mgoutil.Aggregate[*Result](ctx, m.coll, pipeline, options.Aggregate().SetAllowDiskUse(true))
|
|
|
|
|
result, err := mongoutil.Aggregate[*Result](ctx, m.coll, pipeline, options.Aggregate().SetAllowDiskUse(true))
|
|
|
|
|
if err != nil {
|
|
|
|
|
return 0, 0, nil, nil, err
|
|
|
|
|
}
|
|
|
|
@ -850,7 +850,7 @@ func (m *MsgMgo) RangeGroupSendCount(ctx context.Context, start time.Time, end t
|
|
|
|
|
func (m *MsgMgo) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) {
|
|
|
|
|
for _, conversationID := range conversationIDs {
|
|
|
|
|
regex := primitive.Regex{Pattern: fmt.Sprintf("^%s:", conversationID)}
|
|
|
|
|
msgDocs, err := mgoutil.Find[*relation.MsgDocModel](ctx, m.coll, bson.M{"doc_id": regex})
|
|
|
|
|
msgDocs, err := mongoutil.Find[*relation.MsgDocModel](ctx, m.coll, bson.M{"doc_id": regex})
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.ZError(ctx, "convertAll find msg doc failed", err, "conversationID", conversationID)
|
|
|
|
|
continue
|
|
|
|
@ -860,7 +860,7 @@ func (m *MsgMgo) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string
|
|
|
|
|
}
|
|
|
|
|
log.ZInfo(ctx, "msg doc convert", "conversationID", conversationID, "len(msgDocs)", len(msgDocs))
|
|
|
|
|
if len(msgDocs[0].Msg) == int(m.model.GetSingleGocMsgNum5000()) {
|
|
|
|
|
if err := mgoutil.DeleteMany(ctx, m.coll, bson.M{"doc_id": regex}); err != nil {
|
|
|
|
|
if err := mongoutil.DeleteMany(ctx, m.coll, bson.M{"doc_id": regex}); err != nil {
|
|
|
|
|
log.ZError(ctx, "convertAll delete many failed", err, "conversationID", conversationID)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
@ -887,7 +887,7 @@ func (m *MsgMgo) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if err = mgoutil.InsertMany(ctx, m.coll, newMsgDocs); err != nil {
|
|
|
|
|
if err = mongoutil.InsertMany(ctx, m.coll, newMsgDocs); err != nil {
|
|
|
|
|
log.ZError(ctx, "convertAll insert many failed", err, "conversationID", conversationID, "len(newMsgDocs)", len(newMsgDocs))
|
|
|
|
|
} else {
|
|
|
|
|
log.ZInfo(ctx, "msg doc convert", "conversationID", conversationID, "len(newMsgDocs)", len(newMsgDocs))
|
|
|
|
|