From c9f326c72232bec8c74f019133f31db156f1b80d Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Tue, 26 Oct 2021 10:59:59 +0800 Subject: [PATCH] seq pull change --- src/common/db/redisModel.go | 18 +++++++++++++++-- src/msg_gateway/gate/logic.go | 11 ++++++----- src/rpc/chat/chat/pull_message.go | 33 +++++++++++++++++++------------ 3 files changed, 42 insertions(+), 20 deletions(-) diff --git a/src/common/db/redisModel.go b/src/common/db/redisModel.go index f86dbb638..385165687 100644 --- a/src/common/db/redisModel.go +++ b/src/common/db/redisModel.go @@ -9,6 +9,7 @@ const ( userIncrSeq = "REDIS_USER_INCR_SEQ:" // user incr seq appleDeviceToken = "DEVICE_TOKEN" lastGetSeq = "LAST_GET_SEQ" + userMinSeq = "REDIS_USER_Min_SEQ:" ) func (d *DataBases) Exec(cmd string, key interface{}, args ...interface{}) (interface{}, error) { @@ -37,12 +38,25 @@ func (d *DataBases) IncrUserSeq(uid string) (int64, error) { return redis.Int64(d.Exec("INCR", key)) } -//获取最新的seq -func (d *DataBases) GetUserSeq(uid string) (int64, error) { +//获取最大的Seq +func (d *DataBases) GetUserMaxSeq(uid string) (int64, error) { key := userIncrSeq + uid return redis.Int64(d.Exec("GET", key)) } +//设置用户最小的seq +func (d *DataBases) SetUserMinSeq(uid string) (err error) { + key := userMinSeq + uid + _, err = d.Exec("SET", key) + return err +} + +//获取最小的Seq +func (d *DataBases) GetUserMinSeq(uid string) (int64, error) { + key := userMinSeq + uid + return redis.Int64(d.Exec("GET", key)) +} + //存储苹果的设备token到redis func (d *DataBases) SetAppleDeviceToken(accountAddress, value string) (err error) { key := appleDeviceToken + accountAddress diff --git a/src/msg_gateway/gate/logic.go b/src/msg_gateway/gate/logic.go index 9ce7db745..0d1a2b5e9 100644 --- a/src/msg_gateway/gate/logic.go +++ b/src/msg_gateway/gate/logic.go @@ -68,9 +68,10 @@ func (ws *WServer) msgParse(conn *UserConn, binaryMsg []byte) { log.NewInfo("", "goroutine num is ", runtime.NumGoroutine()) } -func (ws *WServer) newestSeqResp(conn *UserConn, m *Req, pb *pbChat.GetNewSeqResp) { - var mReplyData pbWs.GetNewSeqResp - mReplyData.Seq = pb.GetSeq() +func (ws *WServer) newestSeqResp(conn *UserConn, m *Req, pb *pbChat.GetMaxAndMinSeqResp) { + var mReplyData pbWs.GetMaxAndMinSeqResp + mReplyData.MaxSeq = pb.GetMaxSeq() + mReplyData.MinSeq = pb.GetMinSeq() b, _ := proto.Marshal(&mReplyData) mReply := Resp{ ReqIdentifier: m.ReqIdentifier, @@ -84,7 +85,7 @@ func (ws *WServer) newestSeqResp(conn *UserConn, m *Req, pb *pbChat.GetNewSeqRes } func (ws *WServer) newestSeqReq(conn *UserConn, m *Req) { log.InfoByKv("Ws call success to getNewSeq", m.OperationID, "Parameters", m) - pbData := pbChat.GetNewSeqReq{} + pbData := pbChat.GetMaxAndMinSeqReq{} pbData.UserID = m.SendID pbData.OperationID = m.OperationID grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName) @@ -92,7 +93,7 @@ func (ws *WServer) newestSeqReq(conn *UserConn, m *Req) { log.ErrorByKv("get grpcConn err", pbData.OperationID, "args", m) } msgClient := pbChat.NewChatClient(grpcConn) - reply, err := msgClient.GetNewSeq(context.Background(), &pbData) + reply, err := msgClient.GetMaxAndMinSeq(context.Background(), &pbData) if err != nil { log.ErrorByKv("rpc call failed to getNewSeq", pbData.OperationID, "err", err, "pbData", pbData.String()) return diff --git a/src/rpc/chat/chat/pull_message.go b/src/rpc/chat/chat/pull_message.go index 359b3709f..884e1dd9a 100644 --- a/src/rpc/chat/chat/pull_message.go +++ b/src/rpc/chat/chat/pull_message.go @@ -13,22 +13,30 @@ import ( pbMsg "Open_IM/src/proto/chat" ) -func (rpc *rpcChat) GetNewSeq(_ context.Context, in *pbMsg.GetNewSeqReq) (*pbMsg.GetNewSeqResp, error) { - log.InfoByKv("rpc getNewSeq is arriving", in.OperationID, in.String()) +func (rpc *rpcChat) GetMaxAndMinSeq(_ context.Context, in *pbMsg.GetMaxAndMinSeqReq) (*pbMsg.GetMaxAndMinSeqResp, error) { + log.InfoByKv("rpc getMaxAndMinSeq is arriving", in.OperationID, in.String()) //seq, err := model.GetBiggestSeqFromReceive(in.UserID) - seq, err := commonDB.DB.GetUserSeq(in.UserID) - resp := new(pbMsg.GetNewSeqResp) - if err == nil { - resp.Seq = seq + maxSeq, err1 := commonDB.DB.GetUserMaxSeq(in.UserID) + minSeq, err2 := commonDB.DB.GetUserMinSeq(in.UserID) + resp := new(pbMsg.GetMaxAndMinSeqResp) + if err1 == nil && err2 == nil { + resp.MaxSeq = maxSeq + resp.MinSeq = minSeq resp.ErrCode = 0 resp.ErrMsg = "" - return resp, err + return resp, nil } else { - if err == redis.ErrNil { - resp.Seq = 0 - } else { - log.ErrorByKv("getSeq from redis error", in.OperationID, "args", in.String(), "err", err.Error()) - resp.Seq = -1 + if err1 == redis.ErrNil { + resp.MaxSeq = 0 + } else if err1 != nil { + log.NewInfo(in.OperationID, "getMaxSeq from redis error", in.String(), err1.Error()) + resp.MaxSeq = -1 + } + if err2 == redis.ErrNil { + resp.MinSeq = 0 + } else if err2 != nil { + log.NewInfo(in.OperationID, "getMinSeq from redis error", in.String(), err2.Error()) + resp.MinSeq = -1 } resp.ErrCode = 0 resp.ErrMsg = "" @@ -81,7 +89,6 @@ func (rpc *rpcChat) PullMessageBySeqList(_ context.Context, in *pbMsg.PullMessag SingleUserMsg: respSingleMsgFormat, GroupUserMsg: respGroupMsgFormat, }, nil - panic("implement me") } func singleMsgHandleByUser(allMsg []*pbMsg.MsgFormat, ownerId string) []*pbMsg.GatherFormat { var userid string