diff --git a/internal/rpc/msg/pull_message.go b/internal/rpc/msg/pull_message.go index 819a9592e..1811d3847 100644 --- a/internal/rpc/msg/pull_message.go +++ b/internal/rpc/msg/pull_message.go @@ -40,18 +40,28 @@ func (rpc *rpcChat) PullMessageBySeqList(_ context.Context, in *open_im_sdk.Pull log.NewInfo(in.OperationID, "rpc PullMessageBySeqList is arriving", in.String()) resp := new(open_im_sdk.PullMessageBySeqListResp) //msgList, err := commonDB.DB.GetMsgBySeqList(in.UserID, in.SeqList, in.OperationID) - msgList, err := commonDB.DB.GetMsgBySeqListMongo2(in.UserID, in.SeqList, in.OperationID) + redisMsgList, failedSeqList, err := commonDB.DB.GetMessageListBySeq(in.UserID, in.SeqList, in.OperationID) if err != nil { - log.Error(in.OperationID, "PullMessageBySeqList data error", in.String(), err.Error()) - resp.ErrCode = 201 - resp.ErrMsg = err.Error() - return resp, nil + if err != redis.ErrNil { + log.Error(in.OperationID, "get message from redis exception", err.Error(), failedSeqList) + } else { + log.Debug(in.OperationID, "get message from redis is nil", failedSeqList) + } + msgList, err1 := commonDB.DB.GetMsgBySeqListMongo2(in.UserID, failedSeqList, in.OperationID) + if err1 != nil { + log.Error(in.OperationID, "PullMessageBySeqList data error", in.String(), err.Error()) + resp.ErrCode = 201 + resp.ErrMsg = err.Error() + return resp, nil + } else { + redisMsgList = append(redisMsgList, msgList...) + resp.List = redisMsgList + } + } else { + resp.List = redisMsgList } //respSingleMsgFormat = singleMsgHandleByUser(SingleMsgFormat, in.UserID) //respGroupMsgFormat = groupMsgHandleByUser(GroupMsgFormat) - resp.ErrCode = 0 - resp.ErrMsg = "" - resp.List = msgList return resp, nil } diff --git a/pkg/common/db/redisModel.go b/pkg/common/db/redisModel.go index 93ac28d80..2596038a0 100644 --- a/pkg/common/db/redisModel.go +++ b/pkg/common/db/redisModel.go @@ -263,6 +263,29 @@ func (d *DataBases) GetGroupMemberIDListFromCache(groupID string) ([]string, err result, err := redis.Strings(d.Exec("SMEMBERS", groupCache+groupID)) return result, err } +func (d *DataBases) GetMessageListBySeq(userID string, seqList []uint32, operationID string) (seqMsg []*pbCommon.MsgData, failedSeqList []uint32, errResult error) { + for _, v := range seqList { + key := messageCache + userID + "_" + strconv.Itoa(int(v)) + result, err := redis.String(d.Exec("HGETALL", key)) + if err != nil { + if err != redis.ErrNil { + errResult = err + } + failedSeqList = append(failedSeqList, v) + } else { + msg := pbCommon.MsgData{} + err = json.Unmarshal([]byte(result), &msg) + if err != nil { + failedSeqList = append(failedSeqList, v) + log2.NewWarn(operationID, "Unmarshal err", result, err.Error()) + } else { + seqMsg = append(seqMsg, &msg) + } + + } + } + return seqMsg, failedSeqList, errResult +} func (d *DataBases) SetMessageToCache(msgList []*pbChat.MsgDataToMQ, uid string) (err error) { var failedList []pbChat.MsgDataToMQ