Merge remote-tracking branch 'origin/errcode' into errcode

test-errcode
withchao 2 years ago
commit f517e63fca

@ -56,7 +56,7 @@ type Cache interface {
GetMessageListBySeq(ctx context.Context, userID string, seqList []uint32) (seqMsg []*sdkws.MsgData, failedSeqList []uint32, err error) GetMessageListBySeq(ctx context.Context, userID string, seqList []uint32) (seqMsg []*sdkws.MsgData, failedSeqList []uint32, err error)
SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) (int, error) SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) (int, error)
DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error
CleanUpOneUserAllMsgFromRedis(ctx context.Context, userID string) error CleanUpOneUserAllMsg(ctx context.Context, userID string) error
HandleSignalInfo(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error) HandleSignalInfo(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error)
GetSignalInfoFromCacheByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error) GetSignalInfoFromCacheByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error)
GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *pbRtc.SignalInviteReq, err error) GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *pbRtc.SignalInviteReq, err error)
@ -231,27 +231,28 @@ func (r *RedisClient) DeleteTokenByUidPid(ctx context.Context, userID string, pl
return r.rdb.HDel(context.Background(), key, fields...).Err() return r.rdb.HDel(context.Background(), key, fields...).Err()
} }
func (r *RedisClient) GetMessageListBySeq(ctx context.Context, userID string, seqList []uint32, operationID string) (seqMsg []*sdkws.MsgData, failedSeqList []uint32, errResult error) { func (r *RedisClient) GetMessageListBySeq(ctx context.Context, userID string, seqList []uint32, operationID string) (seqMsg []*sdkws.MsgData, failedSeqList []uint32, err2 error) {
for _, v := range seqList { for _, v := range seqList {
//MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 //MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1
key := messageCache + userID + "_" + strconv.Itoa(int(v)) key := messageCache + userID + "_" + strconv.Itoa(int(v))
result, err := r.rdb.Get(context.Background(), key).Result() result, err := r.rdb.Get(context.Background(), key).Result()
if err != nil { if err != nil {
errResult = err if err != redis.Nil {
err2 = err
}
failedSeqList = append(failedSeqList, v) failedSeqList = append(failedSeqList, v)
} else { } else {
msg := sdkws.MsgData{} msg := sdkws.MsgData{}
err = jsonpb.UnmarshalString(result, &msg) err = jsonpb.UnmarshalString(result, &msg)
if err != nil { if err != nil {
errResult = err err2 = err
failedSeqList = append(failedSeqList, v) failedSeqList = append(failedSeqList, v)
} else { } else {
seqMsg = append(seqMsg, &msg) seqMsg = append(seqMsg, &msg)
} }
} }
} }
return seqMsg, failedSeqList, errResult return seqMsg, failedSeqList, err2
} }
func (r *RedisClient) SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ, uid string) (int, error) { func (r *RedisClient) SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ, uid string) (int, error) {
@ -285,7 +286,7 @@ func (r *RedisClient) DeleteMessageFromCache(ctx context.Context, userID string,
return nil return nil
} }
func (r *RedisClient) CleanUpOneUserAllMsgFromRedis(ctx context.Context, userID string) error { func (r *RedisClient) CleanUpOneUserAllMsg(ctx context.Context, userID string) error {
key := messageCache + userID + "_" + "*" key := messageCache + userID + "_" + "*"
vals, err := r.rdb.Keys(ctx, key).Result() vals, err := r.rdb.Keys(ctx, key).Result()
if err == redis.Nil { if err == redis.Nil {

@ -41,7 +41,7 @@ type MsgInterface interface {
// 通过seqList获取大群在db里面的消息 // 通过seqList获取大群在db里面的消息
GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error)
// 删除用户所有消息/cache/db然后重置seq // 删除用户所有消息/cache/db然后重置seq
CleanUpUserMsgFromMongo(ctx context.Context, userID string) error CleanUpUserMsg(ctx context.Context, userID string) error
// 删除大群消息重置群成员最小群seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除 redis cache) // 删除大群消息重置群成员最小群seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除 redis cache)
DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userID string, remainTime int64) error DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userID string, remainTime int64) error
// 删除用户消息重置最小seq remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache) // 删除用户消息重置最小seq remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache)
@ -91,8 +91,8 @@ func (m *MsgController) GetSuperGroupMsgBySeqs(ctx context.Context, groupID stri
return m.database.GetSuperGroupMsgBySeqs(ctx, groupID, seqs) return m.database.GetSuperGroupMsgBySeqs(ctx, groupID, seqs)
} }
func (m *MsgController) CleanUpUserMsgFromMongo(ctx context.Context, userID string) error { func (m *MsgController) CleanUpUserMsg(ctx context.Context, userID string) error {
return m.database.CleanUpUserMsgFromMongo(ctx, userID) return m.database.CleanUpUserMsg(ctx, userID)
} }
func (m *MsgController) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userID string, remainTime int64) error { func (m *MsgController) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userID string, remainTime int64) error {
@ -121,9 +121,9 @@ type MsgDatabaseInterface interface {
// 通过seqList获取大群在 mongo里面的消息 // 通过seqList获取大群在 mongo里面的消息
GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error)
// 删除用户所有消息/redis/mongo然后重置seq // 删除用户所有消息/redis/mongo然后重置seq
CleanUpUserMsgFromMongo(ctx context.Context, userID string) error CleanUpUserMsg(ctx context.Context, userID string) error
// 删除大群消息重置群成员最小群seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除 redis cache) // 删除大群消息重置群成员最小群seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除 redis cache)
DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userID string, remainTime int64) error DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userID []string, remainTime int64) error
// 删除用户消息重置最小seq remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache) // 删除用户消息重置最小seq remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache)
DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error
} }
@ -410,30 +410,53 @@ func (db *MsgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs [
} }
func (db *MsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error) { func (db *MsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error) {
return db.getMsgBySeqs(ctx, userID, seqs, constant.WriteDiffusion) successMsgs, failedSeqs, err := db.msgCache.GetMessageListBySeq(ctx, userID, seqs)
if err != nil {
if err != redis.Nil {
prome.PromeAdd(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
log.Error(tracelog.GetOperationID(ctx), "get message from redis exception", err.Error(), failedSeqs)
}
}
prome.PromeAdd(prome.MsgPullFromRedisSuccessCounter, len(successMsgs))
if len(failedSeqs) > 0 {
mongoMsgs, err := db.getMsgBySeqs(ctx, userID, seqs, constant.WriteDiffusion)
if err != nil {
prome.PromeAdd(prome.MsgPullFromMongoFailedCounter, len(failedSeqs))
return nil, err
}
prome.PromeAdd(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs))
successMsgs = append(successMsgs, mongoMsgs...)
}
return successMsgs, nil
} }
func (db *MsgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error) { func (db *MsgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error) {
return db.getMsgBySeqs(ctx, groupID, seqs, constant.ReadDiffusion) successMsgs, failedSeqs, err := db.msgCache.GetMessageListBySeq(ctx, groupID, seqs)
}
func (db *MsgDatabase) CleanUpUserMsgFromMongo(ctx context.Context, userID string) error {
maxSeq, err := db.msgCache.GetUserMaxSeq(ctx, userID)
if err == redis.Nil {
return nil
}
if err != nil { if err != nil {
return err if err != redis.Nil {
prome.PromeAdd(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
log.Error(tracelog.GetOperationID(ctx), "get message from redis exception", err.Error(), failedSeqs)
}
} }
docIDs := db.msg.GetSeqDocIDList(userID, uint32(maxSeq)) prome.PromeAdd(prome.MsgPullFromRedisSuccessCounter, len(successMsgs))
err = db.msgModel.Delete(ctx, docIDs) if len(failedSeqs) > 0 {
if err == mongo.ErrNoDocuments { mongoMsgs, err := db.getMsgBySeqs(ctx, groupID, seqs, constant.ReadDiffusion)
return nil if err != nil {
prome.PromeAdd(prome.MsgPullFromMongoFailedCounter, len(failedSeqs))
return nil, err
}
prome.PromeAdd(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs))
successMsgs = append(successMsgs, mongoMsgs...)
} }
return successMsgs, nil
}
func (db *MsgDatabase) CleanUpUserMsg(ctx context.Context, userID string) error {
err := db.DeleteUserMsgsAndSetMinSeq(ctx, userID, 0)
if err != nil { if err != nil {
return err return err
} }
err = db.msgCache.SetUserMinSeq(ctx, userID, maxSeq) err = db.msgCache.CleanUpOneUserAllMsg(ctx, userID)
return utils.Wrap(err, "") return utils.Wrap(err, "")
} }

Loading…
Cancel
Save