From d7961bf22679db166377748b32eea69fd1dcb3b1 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 15 Jun 2022 16:33:30 +0800 Subject: [PATCH] redis replace to go_redis --- pkg/common/db/newRedisModel.go | 90 ++++++++- pkg/common/db/redisModel.go | 315 ++++++++++++++----------------- pkg/common/db/redisModel_test.go | 32 +++- 3 files changed, 254 insertions(+), 183 deletions(-) diff --git a/pkg/common/db/newRedisModel.go b/pkg/common/db/newRedisModel.go index 117e413c8..9c279b69f 100644 --- a/pkg/common/db/newRedisModel.go +++ b/pkg/common/db/newRedisModel.go @@ -25,6 +25,92 @@ import ( //func (d * DataBases)pubMessage(channel, msg string) { // d.rdb.Publish(context.Background(),channel,msg) //} +func (d *DataBases) JudgeAccountEXISTS(account string) (bool, error) { + key := accountTempCode + account + n, err := d.rdb.Exists(context.Background(), key).Result() + if n > 0 { + return true, err + } else { + return false, err + } +} +func (d *DataBases) SetAccountCode(account string, code, ttl int) (err error) { + key := accountTempCode + account + return d.rdb.Set(context.Background(), key, code, time.Duration(ttl)*time.Second).Err() +} +func (d *DataBases) GetAccountCode(account string) (string, error) { + key := accountTempCode + account + return d.rdb.Get(context.Background(), key).Result() +} + +//Perform seq auto-increment operation of user messages +func (d *DataBases) IncrUserSeq(uid string) (uint64, error) { + key := userIncrSeq + uid + seq, err := d.rdb.Incr(context.Background(), key).Result() + return uint64(seq), err +} + +//Get the largest Seq +func (d *DataBases) GetUserMaxSeq(uid string) (uint64, error) { + key := userIncrSeq + uid + seq, err := d.rdb.Get(context.Background(), key).Result() + return uint64(utils.StringToInt(seq)), err +} + +//set the largest Seq +func (d *DataBases) SetUserMaxSeq(uid string, maxSeq uint64) error { + key := userIncrSeq + uid + return d.rdb.Set(context.Background(), key, maxSeq, 0).Err() +} + +//Set the user's minimum seq +func (d *DataBases) SetUserMinSeq(uid string, minSeq uint32) (err error) { + key := userMinSeq + uid + return d.rdb.Set(context.Background(), key, minSeq, 0).Err() +} + +//Get the smallest Seq +func (d *DataBases) GetUserMinSeq(uid string) (uint64, error) { + key := userMinSeq + uid + seq, err := d.rdb.Get(context.Background(), key).Result() + return uint64(utils.StringToInt(seq)), err +} + +//Store userid and platform class to redis +func (d *DataBases) AddTokenFlag(userID string, platformID int, token string, flag int) error { + key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) + log2.NewDebug("", "add token key is ", key) + return d.rdb.HSet(context.Background(), key, token, flag).Err() +} + +func (d *DataBases) GetTokenMapByUidPid(userID, platformID string) (map[string]int, error) { + key := uidPidToken + userID + ":" + platformID + log2.NewDebug("", "get token key is ", key) + m, err := d.rdb.HGetAll(context.Background(), key).Result() + mm := make(map[string]int) + for k, v := range m { + mm[k] = utils.StringToInt(v) + } + return mm, err +} +func (d *DataBases) SetTokenMapByUidPid(userID string, platformID int, m map[string]int) error { + key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) + return d.rdb.HMSet(context.Background(), key, m).Err() +} +func (d *DataBases) DeleteTokenByUidPid(userID string, platformID int, fields []string) error { + key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) + return d.rdb.HDel(context.Background(), key, fields...).Err() +} +func (d *DataBases) SetSingleConversationRecvMsgOpt(userID, conversationID string, opt int32) error { + key := conversationReceiveMessageOpt + userID + return d.rdb.HSet(context.Background(), key, conversationID, opt).Err() +} + +func (d *DataBases) GetSingleConversationRecvMsgOpt(userID, conversationID string) (int, error) { + key := conversationReceiveMessageOpt + userID + result, err := d.rdb.HGet(context.Background(), key, conversationID).Result() + return utils.StringToInt(result), err +} func (d *DataBases) SetUserGlobalMsgRecvOpt(userID string, opt int32) error { key := conversationReceiveMessageOpt + userID return d.rdb.HSet(context.Background(), key, GlobalMsgRecvOpt, opt).Err() @@ -41,7 +127,7 @@ func (d *DataBases) GetUserGlobalMsgRecvOpt(userID string) (int, error) { } return utils.StringToInt(result), err } -func (d *DataBases) NewGetMessageListBySeq(userID string, seqList []uint32, operationID string) (seqMsg []*pbCommon.MsgData, failedSeqList []uint32, errResult error) { +func (d *DataBases) GetMessageListBySeq(userID string, seqList []uint32, operationID string) (seqMsg []*pbCommon.MsgData, failedSeqList []uint32, errResult error) { for _, v := range seqList { //MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 key := messageCache + userID + "_" + strconv.Itoa(int(v)) @@ -67,7 +153,7 @@ func (d *DataBases) NewGetMessageListBySeq(userID string, seqList []uint32, oper } return seqMsg, failedSeqList, errResult } -func (d *DataBases) NewSetMessageToCache(msgList []*pbChat.MsgDataToMQ, uid string, operationID string) error { +func (d *DataBases) SetMessageToCache(msgList []*pbChat.MsgDataToMQ, uid string, operationID string) error { ctx := context.Background() var failedList []pbChat.MsgDataToMQ for _, msg := range msgList { diff --git a/pkg/common/db/redisModel.go b/pkg/common/db/redisModel.go index f2344a800..59fd7ac72 100644 --- a/pkg/common/db/redisModel.go +++ b/pkg/common/db/redisModel.go @@ -1,19 +1,7 @@ package db import ( - "Open_IM/pkg/common/config" - "Open_IM/pkg/common/constant" log2 "Open_IM/pkg/common/log" - pbChat "Open_IM/pkg/proto/chat" - pbCommon "Open_IM/pkg/proto/sdk_ws" - "Open_IM/pkg/utils" - "errors" - "fmt" - - "github.com/garyburd/redigo/redis" - "github.com/golang/protobuf/jsonpb" - //osconfig "google.golang.org/genproto/googleapis/cloud/osconfig/v1alpha" - "strconv" ) const ( @@ -54,122 +42,101 @@ func (d *DataBases) Exec(cmd string, key interface{}, args ...interface{}) (inte return con.Do(cmd, params...) } -func (d *DataBases) JudgeAccountEXISTS(account string) (bool, error) { - key := accountTempCode + account - return redis.Bool(d.Exec("EXISTS", key)) -} -func (d *DataBases) SetAccountCode(account string, code, ttl int) (err error) { - key := accountTempCode + account - _, err = d.Exec("SET", key, code, "ex", ttl) - return err -} -func (d *DataBases) GetAccountCode(account string) (string, error) { - key := accountTempCode + account - return redis.String(d.Exec("GET", key)) -} - -//Perform seq auto-increment operation of user messages -func (d *DataBases) IncrUserSeq(uid string) (uint64, error) { - key := userIncrSeq + uid - return redis.Uint64(d.Exec("INCR", key)) -} - -//Get the largest Seq -func (d *DataBases) GetUserMaxSeq(uid string) (uint64, error) { - key := userIncrSeq + uid - return redis.Uint64(d.Exec("GET", key)) -} - -//set the largest Seq -func (d *DataBases) SetUserMaxSeq(uid string, maxSeq uint64) error { - key := userIncrSeq + uid - _, err := d.Exec("SET", key, maxSeq) - return err -} - -//Set the user's minimum seq -func (d *DataBases) SetUserMinSeq(uid string, minSeq uint32) (err error) { - key := userMinSeq + uid - _, err = d.Exec("SET", key, minSeq) - return err -} - -//Get the smallest Seq -func (d *DataBases) GetUserMinSeq(uid string) (uint64, error) { - key := userMinSeq + uid - return redis.Uint64(d.Exec("GET", key)) -} - -//Store Apple's device token to redis -func (d *DataBases) SetAppleDeviceToken(accountAddress, value string) (err error) { - key := appleDeviceToken + accountAddress - _, err = d.Exec("SET", key, value) - return err -} - -//Delete Apple device token -func (d *DataBases) DelAppleDeviceToken(accountAddress string) (err error) { - key := appleDeviceToken + accountAddress - _, err = d.Exec("DEL", key) - return err -} - -//Store userid and platform class to redis -func (d *DataBases) AddTokenFlag(userID string, platformID int, token string, flag int) error { - key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) - log2.NewDebug("", "add token key is ", key) - _, err1 := d.Exec("HSet", key, token, flag) - return err1 -} - -func (d *DataBases) GetTokenMapByUidPid(userID, platformID string) (map[string]int, error) { - key := uidPidToken + userID + ":" + platformID - log2.NewDebug("", "get token key is ", key) - return redis.IntMap(d.Exec("HGETALL", key)) -} -func (d *DataBases) SetTokenMapByUidPid(userID string, platformID int, m map[string]int) error { - key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) - _, err := d.Exec("hmset", key, redis.Args{}.Add().AddFlat(m)...) - return err -} -func (d *DataBases) DeleteTokenByUidPid(userID string, platformID int, fields []string) error { - key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) - _, err := d.Exec("HDEL", key, redis.Args{}.Add().AddFlat(fields)...) - return err -} -func (d *DataBases) SetSingleConversationRecvMsgOpt(userID, conversationID string, opt int32) error { - key := conversationReceiveMessageOpt + userID - _, err := d.Exec("HSet", key, conversationID, opt) - return err -} - -func (d *DataBases) GetSingleConversationRecvMsgOpt(userID, conversationID string) (int, error) { - key := conversationReceiveMessageOpt + userID - return redis.Int(d.Exec("HGet", key, conversationID)) -} -func (d *DataBases) GetAllConversationMsgOpt(userID string) (map[string]int, error) { - key := conversationReceiveMessageOpt + userID - return redis.IntMap(d.Exec("HGETALL", key)) -} -func (d *DataBases) SetMultiConversationMsgOpt(userID string, m map[string]int) error { - key := conversationReceiveMessageOpt + userID - _, err := d.Exec("hmset", key, redis.Args{}.Add().AddFlat(m)...) - return err -} -func (d *DataBases) GetMultiConversationMsgOpt(userID string, conversationIDs []string) (m map[string]int, err error) { - m = make(map[string]int) - key := conversationReceiveMessageOpt + userID - i, err := redis.Ints(d.Exec("hmget", key, redis.Args{}.Add().AddFlat(conversationIDs)...)) - if err != nil { - return m, err - } - for k, v := range conversationIDs { - m[v] = i[k] - } - return m, nil - -} +//func (d *DataBases) JudgeAccountEXISTS(account string) (bool, error) { +// key := accountTempCode + account +// return redis.Bool(d.Exec("EXISTS", key)) +//} +//func (d *DataBases) SetAccountCode(account string, code, ttl int) (err error) { +// key := accountTempCode + account +// _, err = d.Exec("SET", key, code, "ex", ttl) +// return err +//} +//func (d *DataBases) GetAccountCode(account string) (string, error) { +// key := accountTempCode + account +// return redis.String(d.Exec("GET", key)) +//} +// +////Perform seq auto-increment operation of user messages +//func (d *DataBases) IncrUserSeq(uid string) (uint64, error) { +// key := userIncrSeq + uid +// return redis.Uint64(d.Exec("INCR", key)) +//} +// +////Get the largest Seq +//func (d *DataBases) GetUserMaxSeq(uid string) (uint64, error) { +// key := userIncrSeq + uid +// return redis.Uint64(d.Exec("GET", key)) +//} +// +////set the largest Seq +//func (d *DataBases) SetUserMaxSeq(uid string, maxSeq uint64) error { +// key := userIncrSeq + uid +// _, err := d.Exec("SET", key, maxSeq) +// return err +//} +// +////Set the user's minimum seq +//func (d *DataBases) SetUserMinSeq(uid string, minSeq uint32) (err error) { +// key := userMinSeq + uid +// _, err = d.Exec("SET", key, minSeq) +// return err +//} +// +////Get the smallest Seq +//func (d *DataBases) GetUserMinSeq(uid string) (uint64, error) { +// key := userMinSeq + uid +// return redis.Uint64(d.Exec("GET", key)) +//} +// +// +////Store userid and platform class to redis +//func (d *DataBases) AddTokenFlag(userID string, platformID int, token string, flag int) error { +// key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) +// log2.NewDebug("", "add token key is ", key) +// _, err1 := d.Exec("HSet", key, token, flag) +// return err1 +//} +// +//func (d *DataBases) GetTokenMapByUidPid(userID, platformID string) (map[string]int, error) { +// key := uidPidToken + userID + ":" + platformID +// log2.NewDebug("", "get token key is ", key) +// return redis.IntMap(d.Exec("HGETALL", key)) +//} +//func (d *DataBases) SetTokenMapByUidPid(userID string, platformID int, m map[string]int) error { +// key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) +// _, err := d.Exec("hmset", key, redis.Args{}.Add().AddFlat(m)...) +// return err +//} +//func (d *DataBases) DeleteTokenByUidPid(userID string, platformID int, fields []string) error { +// key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) +// _, err := d.Exec("HDEL", key, redis.Args{}.Add().AddFlat(fields)...) +// return err +//} +// +//func (d *DataBases) SetSingleConversationRecvMsgOpt(userID, conversationID string, opt int32) error { +// key := conversationReceiveMessageOpt + userID +// _, err := d.Exec("HSet", key, conversationID, opt) +// return err +//} +// +//func (d *DataBases) GetSingleConversationRecvMsgOpt(userID, conversationID string) (int, error) { +// key := conversationReceiveMessageOpt + userID +// return redis.Int(d.Exec("HGet", key, conversationID)) +//} +//func (d *DataBases) GetMultiConversationMsgOpt(userID string, conversationIDs []string) (m map[string]int, err error) { +// m = make(map[string]int) +// key := conversationReceiveMessageOpt + userID +// i, err := redis.Ints(d.Exec("hmget", key, redis.Args{}.Add().AddFlat(conversationIDs)...)) +// if err != nil { +// return m, err +// } +// for k, v := range conversationIDs { +// m[v] = i[k] +// } +// return m, nil +// +//} //func (d *DataBases) SetGetuiToken(token string, expireTime int64) error { // _, err := d.Exec("SET", getuiToken, token, "ex", expireTime) @@ -270,54 +237,54 @@ func (d *DataBases) SearchContentType() { // return result, err //} -func (d *DataBases) GetMessageListBySeq(userID string, seqList []uint32, operationID string) (seqMsg []*pbCommon.MsgData, failedSeqList []uint32, errResult error) { - for _, v := range seqList { - //MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 - key := messageCache + userID + "_" + strconv.Itoa(int(v)) - - result, err := redis.String(d.Exec("GET", key)) - if err != nil { - errResult = err - failedSeqList = append(failedSeqList, v) - log2.NewWarn(operationID, "redis get message error:", err.Error(), v) - } else { - msg := pbCommon.MsgData{} - err = jsonpb.UnmarshalString(result, &msg) - if err != nil { - errResult = err - failedSeqList = append(failedSeqList, v) - log2.NewWarn(operationID, "Unmarshal err", result, err.Error()) - } else { - log2.NewDebug(operationID, "redis get msg is ", msg.String()) - seqMsg = append(seqMsg, &msg) - } - - } - } - return seqMsg, failedSeqList, errResult -} +//func (d *DataBases) GetMessageListBySeq(userID string, seqList []uint32, operationID string) (seqMsg []*pbCommon.MsgData, failedSeqList []uint32, errResult error) { +// for _, v := range seqList { +// //MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 +// key := messageCache + userID + "_" + strconv.Itoa(int(v)) +// +// result, err := redis.String(d.Exec("GET", key)) +// if err != nil { +// errResult = err +// failedSeqList = append(failedSeqList, v) +// log2.NewWarn(operationID, "redis get message error:", err.Error(), v) +// } else { +// msg := pbCommon.MsgData{} +// err = jsonpb.UnmarshalString(result, &msg) +// if err != nil { +// errResult = err +// failedSeqList = append(failedSeqList, v) +// log2.NewWarn(operationID, "Unmarshal err", result, err.Error()) +// } else { +// log2.NewDebug(operationID, "redis get msg is ", msg.String()) +// seqMsg = append(seqMsg, &msg) +// } +// +// } +// } +// return seqMsg, failedSeqList, errResult +//} -func (d *DataBases) SetMessageToCache(msgList []*pbChat.MsgDataToMQ, uid string, operationID string) error { - var failedList []pbChat.MsgDataToMQ - for _, msg := range msgList { - key := messageCache + uid + "_" + strconv.Itoa(int(msg.MsgData.Seq)) - s, err := utils.Pb2String(msg.MsgData) - if err != nil { - log2.NewWarn(operationID, utils.GetSelfFuncName(), "Pb2String failed", msg.MsgData.String(), uid, err.Error()) - continue - } - log2.NewDebug(operationID, "convert string is ", s) - _, err = d.Exec("SET", key, s, "ex", config.Config.MsgCacheTimeout) - if err != nil { - log2.NewWarn(operationID, utils.GetSelfFuncName(), "redis failed", "args:", key, *msg, uid, s) - failedList = append(failedList, *msg) - } - } - if len(failedList) != 0 { - return errors.New(fmt.Sprintf("set msg to cache failed, failed lists: %q", failedList)) - } - return nil -} +//func (d *DataBases) SetMessageToCache(msgList []*pbChat.MsgDataToMQ, uid string, operationID string) error { +// var failedList []pbChat.MsgDataToMQ +// for _, msg := range msgList { +// key := messageCache + uid + "_" + strconv.Itoa(int(msg.MsgData.Seq)) +// s, err := utils.Pb2String(msg.MsgData) +// if err != nil { +// log2.NewWarn(operationID, utils.GetSelfFuncName(), "Pb2String failed", msg.MsgData.String(), uid, err.Error()) +// continue +// } +// log2.NewDebug(operationID, "convert string is ", s) +// _, err = d.Exec("SET", key, s, "ex", config.Config.MsgCacheTimeout) +// if err != nil { +// log2.NewWarn(operationID, utils.GetSelfFuncName(), "redis failed", "args:", key, *msg, uid, s) +// failedList = append(failedList, *msg) +// } +// } +// if len(failedList) != 0 { +// return errors.New(fmt.Sprintf("set msg to cache failed, failed lists: %q", failedList)) +// } +// return nil +//} //func (d *DataBases) DelMsgFromCache(uid string, seqList []uint32, operationID string) { // for _, seq := range seqList { diff --git a/pkg/common/db/redisModel_test.go b/pkg/common/db/redisModel_test.go index d9d71b94a..f90cd99cd 100644 --- a/pkg/common/db/redisModel_test.go +++ b/pkg/common/db/redisModel_test.go @@ -25,11 +25,11 @@ func Test_GetTokenMapByUidPid(t *testing.T) { fmt.Println(m) } -func TestDataBases_GetMultiConversationMsgOpt(t *testing.T) { - m, err := DB.GetMultiConversationMsgOpt("fg", []string{"user", "age", "color"}) - assert.Nil(t, err) - fmt.Println(m) -} +//func TestDataBases_GetMultiConversationMsgOpt(t *testing.T) { +// m, err := DB.GetMultiConversationMsgOpt("fg", []string{"user", "age", "color"}) +// assert.Nil(t, err) +// fmt.Println(m) +//} func Test_GetKeyTTL(t *testing.T) { ctx := context.Background() key := flag.String("key", "key", "key value") @@ -70,7 +70,7 @@ func Test_NewSetMessageToCache(t *testing.T) { data.AtUserIDList = []string{"1212", "23232"} msg.MsgData = &data messageList := []*pbChat.MsgDataToMQ{&msg} - err := DB.NewSetMessageToCache(messageList, uid, "cacheTest") + err := DB.SetMessageToCache(messageList, uid, "cacheTest") assert.Nil(t, err) } @@ -82,7 +82,7 @@ func Test_NewGetMessageListBySeq(t *testing.T) { data.ClientMsgID = "23jwhjsdf" msg.MsgData = &data - seqMsg, failedSeqList, err := DB.NewGetMessageListBySeq(uid, []uint32{1212}, "cacheTest") + seqMsg, failedSeqList, err := DB.GetMessageListBySeq(uid, []uint32{1212}, "cacheTest") assert.Nil(t, err) fmt.Println(seqMsg, failedSeqList) @@ -100,3 +100,21 @@ func Test_GetUserGlobalMsgRecvOpt(t *testing.T) { assert.Nil(t, err) fmt.Println("get opt", opt) } +func Test_JudgeAccountEXISTS(t *testing.T) { + uid := "test_uid" + b, err := DB.JudgeAccountEXISTS(uid) + assert.Nil(t, err) + fmt.Println(b) +} +func Test_SetAccountCode(t *testing.T) { + uid := "test_uid" + code := 666666 + err := DB.SetAccountCode(uid, code, 100) + assert.Nil(t, err) +} +func Test_GetAccountCode(t *testing.T) { + uid := "test_uid" + code, err := DB.GetAccountCode(uid) + assert.Nil(t, err) + fmt.Println(code) +}