seq pull change

pull/103/head
Gordon 3 years ago
parent 90cde99f06
commit c9f326c722

@ -9,6 +9,7 @@ const (
userIncrSeq = "REDIS_USER_INCR_SEQ:" // user incr seq
appleDeviceToken = "DEVICE_TOKEN"
lastGetSeq = "LAST_GET_SEQ"
userMinSeq = "REDIS_USER_Min_SEQ:"
)
func (d *DataBases) Exec(cmd string, key interface{}, args ...interface{}) (interface{}, error) {
@ -37,12 +38,25 @@ func (d *DataBases) IncrUserSeq(uid string) (int64, error) {
return redis.Int64(d.Exec("INCR", key))
}
//获取最新的seq
func (d *DataBases) GetUserSeq(uid string) (int64, error) {
//获取最大的Seq
func (d *DataBases) GetUserMaxSeq(uid string) (int64, error) {
key := userIncrSeq + uid
return redis.Int64(d.Exec("GET", key))
}
//设置用户最小的seq
func (d *DataBases) SetUserMinSeq(uid string) (err error) {
key := userMinSeq + uid
_, err = d.Exec("SET", key)
return err
}
//获取最小的Seq
func (d *DataBases) GetUserMinSeq(uid string) (int64, error) {
key := userMinSeq + uid
return redis.Int64(d.Exec("GET", key))
}
//存储苹果的设备token到redis
func (d *DataBases) SetAppleDeviceToken(accountAddress, value string) (err error) {
key := appleDeviceToken + accountAddress

@ -68,9 +68,10 @@ func (ws *WServer) msgParse(conn *UserConn, binaryMsg []byte) {
log.NewInfo("", "goroutine num is ", runtime.NumGoroutine())
}
func (ws *WServer) newestSeqResp(conn *UserConn, m *Req, pb *pbChat.GetNewSeqResp) {
var mReplyData pbWs.GetNewSeqResp
mReplyData.Seq = pb.GetSeq()
func (ws *WServer) newestSeqResp(conn *UserConn, m *Req, pb *pbChat.GetMaxAndMinSeqResp) {
var mReplyData pbWs.GetMaxAndMinSeqResp
mReplyData.MaxSeq = pb.GetMaxSeq()
mReplyData.MinSeq = pb.GetMinSeq()
b, _ := proto.Marshal(&mReplyData)
mReply := Resp{
ReqIdentifier: m.ReqIdentifier,
@ -84,7 +85,7 @@ func (ws *WServer) newestSeqResp(conn *UserConn, m *Req, pb *pbChat.GetNewSeqRes
}
func (ws *WServer) newestSeqReq(conn *UserConn, m *Req) {
log.InfoByKv("Ws call success to getNewSeq", m.OperationID, "Parameters", m)
pbData := pbChat.GetNewSeqReq{}
pbData := pbChat.GetMaxAndMinSeqReq{}
pbData.UserID = m.SendID
pbData.OperationID = m.OperationID
grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName)
@ -92,7 +93,7 @@ func (ws *WServer) newestSeqReq(conn *UserConn, m *Req) {
log.ErrorByKv("get grpcConn err", pbData.OperationID, "args", m)
}
msgClient := pbChat.NewChatClient(grpcConn)
reply, err := msgClient.GetNewSeq(context.Background(), &pbData)
reply, err := msgClient.GetMaxAndMinSeq(context.Background(), &pbData)
if err != nil {
log.ErrorByKv("rpc call failed to getNewSeq", pbData.OperationID, "err", err, "pbData", pbData.String())
return

@ -13,22 +13,30 @@ import (
pbMsg "Open_IM/src/proto/chat"
)
func (rpc *rpcChat) GetNewSeq(_ context.Context, in *pbMsg.GetNewSeqReq) (*pbMsg.GetNewSeqResp, error) {
log.InfoByKv("rpc getNewSeq is arriving", in.OperationID, in.String())
func (rpc *rpcChat) GetMaxAndMinSeq(_ context.Context, in *pbMsg.GetMaxAndMinSeqReq) (*pbMsg.GetMaxAndMinSeqResp, error) {
log.InfoByKv("rpc getMaxAndMinSeq is arriving", in.OperationID, in.String())
//seq, err := model.GetBiggestSeqFromReceive(in.UserID)
seq, err := commonDB.DB.GetUserSeq(in.UserID)
resp := new(pbMsg.GetNewSeqResp)
if err == nil {
resp.Seq = seq
maxSeq, err1 := commonDB.DB.GetUserMaxSeq(in.UserID)
minSeq, err2 := commonDB.DB.GetUserMinSeq(in.UserID)
resp := new(pbMsg.GetMaxAndMinSeqResp)
if err1 == nil && err2 == nil {
resp.MaxSeq = maxSeq
resp.MinSeq = minSeq
resp.ErrCode = 0
resp.ErrMsg = ""
return resp, err
return resp, nil
} else {
if err == redis.ErrNil {
resp.Seq = 0
} else {
log.ErrorByKv("getSeq from redis error", in.OperationID, "args", in.String(), "err", err.Error())
resp.Seq = -1
if err1 == redis.ErrNil {
resp.MaxSeq = 0
} else if err1 != nil {
log.NewInfo(in.OperationID, "getMaxSeq from redis error", in.String(), err1.Error())
resp.MaxSeq = -1
}
if err2 == redis.ErrNil {
resp.MinSeq = 0
} else if err2 != nil {
log.NewInfo(in.OperationID, "getMinSeq from redis error", in.String(), err2.Error())
resp.MinSeq = -1
}
resp.ErrCode = 0
resp.ErrMsg = ""
@ -81,7 +89,6 @@ func (rpc *rpcChat) PullMessageBySeqList(_ context.Context, in *pbMsg.PullMessag
SingleUserMsg: respSingleMsgFormat,
GroupUserMsg: respGroupMsgFormat,
}, nil
panic("implement me")
}
func singleMsgHandleByUser(allMsg []*pbMsg.MsgFormat, ownerId string) []*pbMsg.GatherFormat {
var userid string

Loading…
Cancel
Save