|
|
@ -242,18 +242,18 @@ func (db *msgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string,
|
|
|
|
doc.DocID = docID
|
|
|
|
doc.DocID = docID
|
|
|
|
doc.Msg = msgsToMongo
|
|
|
|
doc.Msg = msgsToMongo
|
|
|
|
if err = db.mgo.Create(ctx, doc); err != nil {
|
|
|
|
if err = db.mgo.Create(ctx, doc); err != nil {
|
|
|
|
prome.PromeInc(prome.MsgInsertMongoFailedCounter)
|
|
|
|
prome.Inc(prome.MsgInsertMongoFailedCounter)
|
|
|
|
//log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
|
|
|
|
//log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
|
|
|
|
return utils.Wrap(err, "")
|
|
|
|
return utils.Wrap(err, "")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
prome.PromeInc(prome.MsgInsertMongoSuccessCounter)
|
|
|
|
prome.Inc(prome.MsgInsertMongoSuccessCounter)
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
prome.PromeInc(prome.MsgInsertMongoFailedCounter)
|
|
|
|
prome.Inc(prome.MsgInsertMongoFailedCounter)
|
|
|
|
//log.Error(operationID, "FindOneAndUpdate failed ", err.Error(), filter)
|
|
|
|
//log.Error(operationID, "FindOneAndUpdate failed ", err.Error(), filter)
|
|
|
|
return utils.Wrap(err, "")
|
|
|
|
return utils.Wrap(err, "")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
prome.PromeInc(prome.MsgInsertMongoSuccessCounter)
|
|
|
|
prome.Inc(prome.MsgInsertMongoSuccessCounter)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if docIDNext != "" {
|
|
|
|
if docIDNext != "" {
|
|
|
@ -262,11 +262,11 @@ func (db *msgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string,
|
|
|
|
nextDoc.Msg = msgsToMongoNext
|
|
|
|
nextDoc.Msg = msgsToMongoNext
|
|
|
|
//log.NewDebug(operationID, "filter ", seqUidNext, "list ", msgListToMongoNext, "userID: ", userID)
|
|
|
|
//log.NewDebug(operationID, "filter ", seqUidNext, "list ", msgListToMongoNext, "userID: ", userID)
|
|
|
|
if err = db.mgo.Create(ctx, nextDoc); err != nil {
|
|
|
|
if err = db.mgo.Create(ctx, nextDoc); err != nil {
|
|
|
|
prome.PromeInc(prome.MsgInsertMongoFailedCounter)
|
|
|
|
prome.Inc(prome.MsgInsertMongoFailedCounter)
|
|
|
|
//log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
|
|
|
|
//log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
|
|
|
|
return utils.Wrap(err, "")
|
|
|
|
return utils.Wrap(err, "")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
prome.PromeInc(prome.MsgInsertMongoSuccessCounter)
|
|
|
|
prome.Inc(prome.MsgInsertMongoSuccessCounter)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
//log.Debug(operationID, "batch mgo cost time ", mongo2.getCurrentTimestampByMill()-newTime, userID, len(msgList))
|
|
|
|
//log.Debug(operationID, "batch mgo cost time ", mongo2.getCurrentTimestampByMill()-newTime, userID, len(msgList))
|
|
|
|
return nil
|
|
|
|
return nil
|
|
|
@ -296,10 +296,10 @@ func (db *msgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID strin
|
|
|
|
//log.Debug(operationID, "constant.SingleChatType lastMaxSeq before add ", currentMaxSeq, "userID ", sourceID, err)
|
|
|
|
//log.Debug(operationID, "constant.SingleChatType lastMaxSeq before add ", currentMaxSeq, "userID ", sourceID, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if err != nil && err != redis.Nil {
|
|
|
|
if err != nil && err != redis.Nil {
|
|
|
|
prome.PromeInc(prome.SeqGetFailedCounter)
|
|
|
|
prome.Inc(prome.SeqGetFailedCounter)
|
|
|
|
return 0, utils.Wrap(err, "")
|
|
|
|
return 0, utils.Wrap(err, "")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
prome.PromeInc(prome.SeqGetSuccessCounter)
|
|
|
|
prome.Inc(prome.SeqGetSuccessCounter)
|
|
|
|
lastMaxSeq := currentMaxSeq
|
|
|
|
lastMaxSeq := currentMaxSeq
|
|
|
|
for _, m := range msgList {
|
|
|
|
for _, m := range msgList {
|
|
|
|
currentMaxSeq++
|
|
|
|
currentMaxSeq++
|
|
|
@ -309,10 +309,10 @@ func (db *msgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID strin
|
|
|
|
//log.Debug(operationID, "SetMessageToCache ", sourceID, len(msgList))
|
|
|
|
//log.Debug(operationID, "SetMessageToCache ", sourceID, len(msgList))
|
|
|
|
failedNum, err := db.cache.SetMessageToCache(ctx, sourceID, msgList)
|
|
|
|
failedNum, err := db.cache.SetMessageToCache(ctx, sourceID, msgList)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
prome.PromeAdd(prome.MsgInsertRedisFailedCounter, failedNum)
|
|
|
|
prome.Add(prome.MsgInsertRedisFailedCounter, failedNum)
|
|
|
|
//log.Error(operationID, "setMessageToCache failed, continue ", err.Error(), len(msgList), sourceID)
|
|
|
|
//log.Error(operationID, "setMessageToCache failed, continue ", err.Error(), len(msgList), sourceID)
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
prome.PromeInc(prome.MsgInsertRedisSuccessCounter)
|
|
|
|
prome.Inc(prome.MsgInsertRedisSuccessCounter)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
//log.Debug(operationID, "batch to redis cost time ", mongo2.getCurrentTimestampByMill()-newTime, sourceID, len(msgList))
|
|
|
|
//log.Debug(operationID, "batch to redis cost time ", mongo2.getCurrentTimestampByMill()-newTime, sourceID, len(msgList))
|
|
|
|
if msgList[0].MsgData.SessionType == constant.SuperGroupChatType {
|
|
|
|
if msgList[0].MsgData.SessionType == constant.SuperGroupChatType {
|
|
|
@ -321,9 +321,9 @@ func (db *msgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID strin
|
|
|
|
err = db.cache.SetUserMaxSeq(ctx, sourceID, currentMaxSeq)
|
|
|
|
err = db.cache.SetUserMaxSeq(ctx, sourceID, currentMaxSeq)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
prome.PromeInc(prome.SeqSetFailedCounter)
|
|
|
|
prome.Inc(prome.SeqSetFailedCounter)
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
prome.PromeInc(prome.SeqSetSuccessCounter)
|
|
|
|
prome.Inc(prome.SeqSetSuccessCounter)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return lastMaxSeq, utils.Wrap(err, "")
|
|
|
|
return lastMaxSeq, utils.Wrap(err, "")
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -463,18 +463,18 @@ func (db *msgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []i
|
|
|
|
successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, userID, seqs)
|
|
|
|
successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, userID, seqs)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
if err != redis.Nil {
|
|
|
|
if err != redis.Nil {
|
|
|
|
prome.PromeAdd(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
|
|
|
|
prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
|
|
|
|
log.Error(tracelog.GetOperationID(ctx), "get message from redis exception", err.Error(), failedSeqs)
|
|
|
|
log.Error(tracelog.GetOperationID(ctx), "get message from redis exception", err.Error(), failedSeqs)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
prome.PromeAdd(prome.MsgPullFromRedisSuccessCounter, len(successMsgs))
|
|
|
|
prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs))
|
|
|
|
if len(failedSeqs) > 0 {
|
|
|
|
if len(failedSeqs) > 0 {
|
|
|
|
mongoMsgs, err := db.getMsgBySeqs(ctx, userID, seqs, constant.WriteDiffusion)
|
|
|
|
mongoMsgs, err := db.getMsgBySeqs(ctx, userID, seqs, constant.WriteDiffusion)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
prome.PromeAdd(prome.MsgPullFromMongoFailedCounter, len(failedSeqs))
|
|
|
|
prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs))
|
|
|
|
return nil, err
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
prome.PromeAdd(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs))
|
|
|
|
prome.Add(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs))
|
|
|
|
successMsgs = append(successMsgs, mongoMsgs...)
|
|
|
|
successMsgs = append(successMsgs, mongoMsgs...)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return successMsgs, nil
|
|
|
|
return successMsgs, nil
|
|
|
@ -484,18 +484,18 @@ func (db *msgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID strin
|
|
|
|
successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, groupID, seqs)
|
|
|
|
successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, groupID, seqs)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
if err != redis.Nil {
|
|
|
|
if err != redis.Nil {
|
|
|
|
prome.PromeAdd(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
|
|
|
|
prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
|
|
|
|
log.Error(tracelog.GetOperationID(ctx), "get message from redis exception", err.Error(), failedSeqs)
|
|
|
|
log.Error(tracelog.GetOperationID(ctx), "get message from redis exception", err.Error(), failedSeqs)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
prome.PromeAdd(prome.MsgPullFromRedisSuccessCounter, len(successMsgs))
|
|
|
|
prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs))
|
|
|
|
if len(failedSeqs) > 0 {
|
|
|
|
if len(failedSeqs) > 0 {
|
|
|
|
mongoMsgs, err := db.getMsgBySeqs(ctx, groupID, seqs, constant.ReadDiffusion)
|
|
|
|
mongoMsgs, err := db.getMsgBySeqs(ctx, groupID, seqs, constant.ReadDiffusion)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
prome.PromeAdd(prome.MsgPullFromMongoFailedCounter, len(failedSeqs))
|
|
|
|
prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs))
|
|
|
|
return nil, err
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
prome.PromeAdd(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs))
|
|
|
|
prome.Add(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs))
|
|
|
|
successMsgs = append(successMsgs, mongoMsgs...)
|
|
|
|
successMsgs = append(successMsgs, mongoMsgs...)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return successMsgs, nil
|
|
|
|
return successMsgs, nil
|
|
|
|