From 52a0a8aca4f3564b31aee552bfb46460a2b182bb Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 15 Feb 2023 16:09:00 +0800 Subject: [PATCH 1/2] errcode --- pkg/common/db/controller/msg.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 2f2c19cc7..50c8a3121 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -123,7 +123,7 @@ type MsgDatabaseInterface interface { // 删除用户所有消息/redis/mongo然后重置seq CleanUpUserMsgFromMongo(ctx context.Context, userID string) error // 删除大群消息重置群成员最小群seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除 redis cache) - DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userID string, remainTime int64) error + DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userID []string, remainTime int64) error // 删除用户消息重置最小seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error } From 0d0a90a14bacfd8cd3fa15b144ad96b36b8b28b8 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 15 Feb 2023 17:00:43 +0800 Subject: [PATCH 2/2] errcode --- pkg/common/db/cache/redis.go | 15 ++++---- pkg/common/db/controller/msg.go | 61 +++++++++++++++++++++++---------- 2 files changed, 50 insertions(+), 26 deletions(-) diff --git a/pkg/common/db/cache/redis.go b/pkg/common/db/cache/redis.go index 7ad55371b..e68ece6e7 100644 --- a/pkg/common/db/cache/redis.go +++ b/pkg/common/db/cache/redis.go @@ -56,7 +56,7 @@ type Cache interface { GetMessageListBySeq(ctx context.Context, userID string, seqList []uint32) (seqMsg []*sdkws.MsgData, failedSeqList []uint32, err error) SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) (int, error) DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error - CleanUpOneUserAllMsgFromRedis(ctx context.Context, userID string) error + CleanUpOneUserAllMsg(ctx context.Context, userID string) error HandleSignalInfo(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error) GetSignalInfoFromCacheByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error) GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *pbRtc.SignalInviteReq, err error) @@ -231,27 +231,28 @@ func (r *RedisClient) DeleteTokenByUidPid(ctx context.Context, userID string, pl return r.rdb.HDel(context.Background(), key, fields...).Err() } -func (r *RedisClient) GetMessageListBySeq(ctx context.Context, userID string, seqList []uint32, operationID string) (seqMsg []*sdkws.MsgData, failedSeqList []uint32, errResult error) { +func (r *RedisClient) GetMessageListBySeq(ctx context.Context, userID string, seqList []uint32, operationID string) (seqMsg []*sdkws.MsgData, failedSeqList []uint32, err2 error) { for _, v := range seqList { //MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 key := messageCache + userID + "_" + strconv.Itoa(int(v)) result, err := r.rdb.Get(context.Background(), key).Result() if err != nil { - errResult = err + if err != redis.Nil { + err2 = err + } failedSeqList = append(failedSeqList, v) } else { msg := sdkws.MsgData{} err = jsonpb.UnmarshalString(result, &msg) if err != nil { - errResult = err + err2 = err failedSeqList = append(failedSeqList, v) } else { seqMsg = append(seqMsg, &msg) } - } } - return seqMsg, failedSeqList, errResult + return seqMsg, failedSeqList, err2 } func (r *RedisClient) SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ, uid string) (int, error) { @@ -285,7 +286,7 @@ func (r *RedisClient) DeleteMessageFromCache(ctx context.Context, userID string, return nil } -func (r *RedisClient) CleanUpOneUserAllMsgFromRedis(ctx context.Context, userID string) error { +func (r *RedisClient) CleanUpOneUserAllMsg(ctx context.Context, userID string) error { key := messageCache + userID + "_" + "*" vals, err := r.rdb.Keys(ctx, key).Result() if err == redis.Nil { diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 50c8a3121..49f73afbf 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -41,7 +41,7 @@ type MsgInterface interface { // 通过seqList获取大群在db里面的消息 GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error) // 删除用户所有消息/cache/db然后重置seq - CleanUpUserMsgFromMongo(ctx context.Context, userID string) error + CleanUpUserMsg(ctx context.Context, userID string) error // 删除大群消息重置群成员最小群seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除 redis cache) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userID string, remainTime int64) error // 删除用户消息重置最小seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache) @@ -91,8 +91,8 @@ func (m *MsgController) GetSuperGroupMsgBySeqs(ctx context.Context, groupID stri return m.database.GetSuperGroupMsgBySeqs(ctx, groupID, seqs) } -func (m *MsgController) CleanUpUserMsgFromMongo(ctx context.Context, userID string) error { - return m.database.CleanUpUserMsgFromMongo(ctx, userID) +func (m *MsgController) CleanUpUserMsg(ctx context.Context, userID string) error { + return m.database.CleanUpUserMsg(ctx, userID) } func (m *MsgController) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userID string, remainTime int64) error { @@ -121,7 +121,7 @@ type MsgDatabaseInterface interface { // 通过seqList获取大群在 mongo里面的消息 GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error) // 删除用户所有消息/redis/mongo然后重置seq - CleanUpUserMsgFromMongo(ctx context.Context, userID string) error + CleanUpUserMsg(ctx context.Context, userID string) error // 删除大群消息重置群成员最小群seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除 redis cache) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userID []string, remainTime int64) error // 删除用户消息重置最小seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache) @@ -410,30 +410,53 @@ func (db *MsgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs [ } func (db *MsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error) { - return db.getMsgBySeqs(ctx, userID, seqs, constant.WriteDiffusion) + successMsgs, failedSeqs, err := db.msgCache.GetMessageListBySeq(ctx, userID, seqs) + if err != nil { + if err != redis.Nil { + prome.PromeAdd(prome.MsgPullFromRedisFailedCounter, len(failedSeqs)) + log.Error(tracelog.GetOperationID(ctx), "get message from redis exception", err.Error(), failedSeqs) + } + } + prome.PromeAdd(prome.MsgPullFromRedisSuccessCounter, len(successMsgs)) + if len(failedSeqs) > 0 { + mongoMsgs, err := db.getMsgBySeqs(ctx, userID, seqs, constant.WriteDiffusion) + if err != nil { + prome.PromeAdd(prome.MsgPullFromMongoFailedCounter, len(failedSeqs)) + return nil, err + } + prome.PromeAdd(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs)) + successMsgs = append(successMsgs, mongoMsgs...) + } + return successMsgs, nil } func (db *MsgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []uint32) (seqMsg []*sdkws.MsgData, err error) { - return db.getMsgBySeqs(ctx, groupID, seqs, constant.ReadDiffusion) -} - -func (db *MsgDatabase) CleanUpUserMsgFromMongo(ctx context.Context, userID string) error { - maxSeq, err := db.msgCache.GetUserMaxSeq(ctx, userID) - if err == redis.Nil { - return nil - } + successMsgs, failedSeqs, err := db.msgCache.GetMessageListBySeq(ctx, groupID, seqs) if err != nil { - return err + if err != redis.Nil { + prome.PromeAdd(prome.MsgPullFromRedisFailedCounter, len(failedSeqs)) + log.Error(tracelog.GetOperationID(ctx), "get message from redis exception", err.Error(), failedSeqs) + } } - docIDs := db.msg.GetSeqDocIDList(userID, uint32(maxSeq)) - err = db.msgModel.Delete(ctx, docIDs) - if err == mongo.ErrNoDocuments { - return nil + prome.PromeAdd(prome.MsgPullFromRedisSuccessCounter, len(successMsgs)) + if len(failedSeqs) > 0 { + mongoMsgs, err := db.getMsgBySeqs(ctx, groupID, seqs, constant.ReadDiffusion) + if err != nil { + prome.PromeAdd(prome.MsgPullFromMongoFailedCounter, len(failedSeqs)) + return nil, err + } + prome.PromeAdd(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs)) + successMsgs = append(successMsgs, mongoMsgs...) } + return successMsgs, nil +} + +func (db *MsgDatabase) CleanUpUserMsg(ctx context.Context, userID string) error { + err := db.DeleteUserMsgsAndSetMinSeq(ctx, userID, 0) if err != nil { return err } - err = db.msgCache.SetUserMinSeq(ctx, userID, maxSeq) + err = db.msgCache.CleanUpOneUserAllMsg(ctx, userID) return utils.Wrap(err, "") }