|
|
|
@ -41,9 +41,9 @@ type CommonMsgDatabase interface {
|
|
|
|
|
BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNewConversation bool, err error)
|
|
|
|
|
|
|
|
|
|
// 通过seqList获取mongo中写扩散消息
|
|
|
|
|
GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num int64) (seqMsg []*sdkws.MsgData, err error)
|
|
|
|
|
GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num int64) (minSeq int64, seqMsg []*sdkws.MsgData, err error)
|
|
|
|
|
// 通过seqList获取大群在 mongo里面的消息
|
|
|
|
|
GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error)
|
|
|
|
|
GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (minSeq int64, seqMsg []*sdkws.MsgData, err error)
|
|
|
|
|
// 删除会话消息重置最小seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache)
|
|
|
|
|
DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error
|
|
|
|
|
// 用户根据seq删除消息
|
|
|
|
@ -448,21 +448,21 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID strin
|
|
|
|
|
return seqMsgs, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num int64) (seqMsg []*sdkws.MsgData, err error) {
|
|
|
|
|
func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num int64) (int64, []*sdkws.MsgData, error) {
|
|
|
|
|
userMinSeq, err := db.cache.GetConversationUserMinSeq(ctx, conversationID, userID)
|
|
|
|
|
if err != nil && errs.Unwrap(err) != redis.Nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
return 0, nil, err
|
|
|
|
|
}
|
|
|
|
|
minSeq, err := db.cache.GetMinSeq(ctx, conversationID)
|
|
|
|
|
if err != nil && errs.Unwrap(err) != redis.Nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
return 0, nil, err
|
|
|
|
|
}
|
|
|
|
|
if userMinSeq < minSeq {
|
|
|
|
|
minSeq = userMinSeq
|
|
|
|
|
}
|
|
|
|
|
if minSeq > end {
|
|
|
|
|
log.ZInfo(ctx, "minSeq > end", "minSeq", minSeq, "end", end)
|
|
|
|
|
return nil, nil
|
|
|
|
|
return 0, nil, nil
|
|
|
|
|
}
|
|
|
|
|
if begin < minSeq {
|
|
|
|
|
begin = minSeq
|
|
|
|
@ -492,22 +492,22 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin
|
|
|
|
|
mongoMsgs, err := db.getMsgBySeqsRange(ctx, userID, conversationID, failedSeqs, begin, end)
|
|
|
|
|
if err != nil {
|
|
|
|
|
prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs))
|
|
|
|
|
return nil, err
|
|
|
|
|
return 0, nil, err
|
|
|
|
|
}
|
|
|
|
|
prome.Add(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs))
|
|
|
|
|
successMsgs = append(successMsgs, mongoMsgs...)
|
|
|
|
|
}
|
|
|
|
|
return successMsgs, nil
|
|
|
|
|
return minSeq, successMsgs, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (successMsgs []*sdkws.MsgData, err error) {
|
|
|
|
|
func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (int64, []*sdkws.MsgData, error) {
|
|
|
|
|
userMinSeq, err := db.cache.GetConversationUserMinSeq(ctx, conversationID, userID)
|
|
|
|
|
if err != nil && errs.Unwrap(err) != redis.Nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
return 0, nil, err
|
|
|
|
|
}
|
|
|
|
|
minSeq, err := db.cache.GetMinSeq(ctx, conversationID)
|
|
|
|
|
if err != nil && errs.Unwrap(err) != redis.Nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
return 0, nil, err
|
|
|
|
|
}
|
|
|
|
|
if userMinSeq < minSeq {
|
|
|
|
|
minSeq = userMinSeq
|
|
|
|
@ -531,12 +531,12 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co
|
|
|
|
|
mongoMsgs, err := db.getMsgBySeqs(ctx, userID, conversationID, failedSeqs)
|
|
|
|
|
if err != nil {
|
|
|
|
|
prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs))
|
|
|
|
|
return nil, err
|
|
|
|
|
return 0, nil, err
|
|
|
|
|
}
|
|
|
|
|
prome.Add(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs))
|
|
|
|
|
successMsgs = append(successMsgs, mongoMsgs...)
|
|
|
|
|
}
|
|
|
|
|
return successMsgs, nil
|
|
|
|
|
return minSeq, successMsgs, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (db *commonMsgDatabase) DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error {
|
|
|
|
|