|
|
|
@ -25,7 +25,7 @@ import (
|
|
|
|
|
"github.com/go-redis/redis/v8"
|
|
|
|
|
"go.mongodb.org/mongo-driver/mongo"
|
|
|
|
|
|
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
|
|
|
"google.golang.org/protobuf/proto"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type NotificationDatabase interface {
|
|
|
|
@ -229,7 +229,6 @@ func (db *notificationDatabase) GetGroupMinSeq(ctx context.Context, groupID stri
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (db *notificationDatabase) BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error {
|
|
|
|
|
//newTime := utils.GetCurrentTimestampByMill()
|
|
|
|
|
if int64(len(msgList)) > db.msg.GetsingleGocNotificationNum() {
|
|
|
|
|
return errors.New("too large")
|
|
|
|
|
}
|
|
|
|
@ -251,7 +250,6 @@ func (db *notificationDatabase) BatchInsertChat2DB(ctx context.Context, conversa
|
|
|
|
|
docIDNext := ""
|
|
|
|
|
var err error
|
|
|
|
|
for _, m := range msgList {
|
|
|
|
|
//log.Debug(operationID, "msg node ", m.String(), m.MsgData.ClientMsgID)
|
|
|
|
|
currentMaxSeq++
|
|
|
|
|
sMsg := unRelationTb.NotificationInfoModel{}
|
|
|
|
|
sMsg.SendTime = m.SendTime
|
|
|
|
@ -263,18 +261,13 @@ func (db *notificationDatabase) BatchInsertChat2DB(ctx context.Context, conversa
|
|
|
|
|
msgsToMongo = append(msgsToMongo, sMsg)
|
|
|
|
|
insertCounter++
|
|
|
|
|
docID = db.msg.GetDocID(conversationID, currentMaxSeq)
|
|
|
|
|
//log.Debug(operationID, "msgListToMongo ", seqUid, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain, "userID: ", userID)
|
|
|
|
|
} else {
|
|
|
|
|
msgsToMongoNext = append(msgsToMongoNext, sMsg)
|
|
|
|
|
docIDNext = db.msg.GetDocID(conversationID, currentMaxSeq)
|
|
|
|
|
//log.Debug(operationID, "msgListToMongoNext ", seqUidNext, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain, "userID: ", userID)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if docID != "" {
|
|
|
|
|
//filter := bson.M{"uid": seqUid}
|
|
|
|
|
//log.NewDebug(operationID, "filter ", seqUid, "list ", msgListToMongo, "userID: ", userID)
|
|
|
|
|
//err := c.FindOneAndUpdate(ctx, filter, bson.M{"$push": bson.M{"msg": bson.M{"$each": msgsToMongo}}}).Err()
|
|
|
|
|
err = db.msgDocDatabase.PushMsgsToDoc(ctx, docID, msgsToMongo)
|
|
|
|
|
if err != nil {
|
|
|
|
|
if err == mongo.ErrNoDocuments {
|
|
|
|
@ -283,13 +276,11 @@ func (db *notificationDatabase) BatchInsertChat2DB(ctx context.Context, conversa
|
|
|
|
|
doc.Msg = msgsToMongo
|
|
|
|
|
if err = db.msgDocDatabase.Create(ctx, doc); err != nil {
|
|
|
|
|
prome.Inc(prome.MsgInsertMongoFailedCounter)
|
|
|
|
|
//log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
|
|
|
|
|
return utils.Wrap(err, "")
|
|
|
|
|
}
|
|
|
|
|
prome.Inc(prome.MsgInsertMongoSuccessCounter)
|
|
|
|
|
} else {
|
|
|
|
|
prome.Inc(prome.MsgInsertMongoFailedCounter)
|
|
|
|
|
//log.Error(operationID, "FindOneAndUpdate failed ", err.Error(), filter)
|
|
|
|
|
return utils.Wrap(err, "")
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
@ -300,15 +291,12 @@ func (db *notificationDatabase) BatchInsertChat2DB(ctx context.Context, conversa
|
|
|
|
|
nextDoc := &unRelationTb.NotificationDocModel{}
|
|
|
|
|
nextDoc.DocID = docIDNext
|
|
|
|
|
nextDoc.Msg = msgsToMongoNext
|
|
|
|
|
//log.NewDebug(operationID, "filter ", seqUidNext, "list ", msgListToMongoNext, "userID: ", userID)
|
|
|
|
|
if err = db.msgDocDatabase.Create(ctx, nextDoc); err != nil {
|
|
|
|
|
prome.Inc(prome.MsgInsertMongoFailedCounter)
|
|
|
|
|
//log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
|
|
|
|
|
return utils.Wrap(err, "")
|
|
|
|
|
}
|
|
|
|
|
prome.Inc(prome.MsgInsertMongoSuccessCounter)
|
|
|
|
|
}
|
|
|
|
|
//log.Debug(operationID, "batch mgo cost time ", mongo2.getCurrentTimestampByMill()-newTime, userID, len(msgList))
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|