You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Open-IM-Server/pkg/common/db/cache/redis.go

520 lines
18 KiB

2 years ago
package cache
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
log2 "Open_IM/pkg/common/log"
2 years ago
pbChat "Open_IM/pkg/proto/msg"
pbRtc "Open_IM/pkg/proto/rtc"
pbCommon "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
"context"
"errors"
"fmt"
2 years ago
"strconv"
"time"
2 years ago
"github.com/go-redis/redis/v8"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
)
const (
2 years ago
userIncrSeq = "REDIS_USER_INCR_SEQ:" // user incr seq
appleDeviceToken = "DEVICE_TOKEN"
userMinSeq = "REDIS_USER_MIN_SEQ:"
uidPidToken = "UID_PID_TOKEN_STATUS:"
getuiToken = "GETUI_TOKEN"
getuiTaskID = "GETUI_TASK_ID"
messageCache = "MESSAGE_CACHE:"
signalCache = "SIGNAL_CACHE:"
signalListCache = "SIGNAL_LIST_CACHE:"
FcmToken = "FCM_TOKEN:"
groupUserMinSeq = "GROUP_USER_MIN_SEQ:"
groupMaxSeq = "GROUP_MAX_SEQ:"
groupMinSeq = "GROUP_MIN_SEQ:"
sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:"
userBadgeUnreadCountSum = "USER_BADGE_UNREAD_COUNT_SUM:"
exTypeKeyLocker = "EX_LOCK:"
)
2 years ago
type Cache interface {
IncrUserSeq(uid string) (uint64, error)
GetUserMaxSeq(uid string) (uint64, error)
SetUserMaxSeq(uid string, maxSeq uint64) error
SetUserMinSeq(uid string, minSeq uint32) (err error)
GetUserMinSeq(uid string) (uint64, error)
SetGroupUserMinSeq(groupID, userID string, minSeq uint64) (err error)
GetGroupUserMinSeq(groupID, userID string) (uint64, error)
}
// native redis operate
2 years ago
type RedisClient struct {
rdb redis.UniversalClient
}
func (r *RedisClient) InitRedis() {
var rdb redis.UniversalClient
2 years ago
var err error
2 years ago
ctx := context.Background()
2 years ago
if config.Config.Redis.EnableCluster {
2 years ago
rdb = redis.NewClusterClient(&redis.ClusterOptions{
2 years ago
Addrs: config.Config.Redis.DBAddress,
Username: config.Config.Redis.DBUserName,
Password: config.Config.Redis.DBPassWord, // no password set
PoolSize: 50,
})
_, err = rdb.Ping(ctx).Result()
if err != nil {
fmt.Println("redis cluster failed address ", config.Config.Redis.DBAddress)
panic(err.Error() + " redis cluster " + config.Config.Redis.DBUserName + config.Config.Redis.DBPassWord)
}
} else {
2 years ago
rdb = redis.NewClient(&redis.Options{
2 years ago
Addr: config.Config.Redis.DBAddress[0],
Username: config.Config.Redis.DBUserName,
Password: config.Config.Redis.DBPassWord, // no password set
DB: 0, // use default DB
PoolSize: 100, // 连接池大小
})
_, err = rdb.Ping(ctx).Result()
if err != nil {
panic(err.Error() + " redis " + config.Config.Redis.DBAddress[0] + config.Config.Redis.DBUserName + config.Config.Redis.DBPassWord)
}
}
2 years ago
r.rdb = rdb
2 years ago
}
2 years ago
func (r *RedisClient) GetClient() redis.UniversalClient {
return r.rdb
2 years ago
}
2 years ago
func NewRedisClient(rdb redis.UniversalClient) *RedisClient {
2 years ago
return &RedisClient{rdb: rdb}
}
//Perform seq auto-increment operation of user messages
2 years ago
func (r *RedisClient) IncrUserSeq(uid string) (uint64, error) {
key := userIncrSeq + uid
2 years ago
seq, err := r.rdb.Incr(context.Background(), key).Result()
return uint64(seq), err
}
//Get the largest Seq
2 years ago
func (r *RedisClient) GetUserMaxSeq(uid string) (uint64, error) {
key := userIncrSeq + uid
2 years ago
seq, err := r.rdb.Get(context.Background(), key).Result()
return uint64(utils.StringToInt(seq)), err
}
//set the largest Seq
2 years ago
func (r *RedisClient) SetUserMaxSeq(uid string, maxSeq uint64) error {
key := userIncrSeq + uid
2 years ago
return r.rdb.Set(context.Background(), key, maxSeq, 0).Err()
}
//Set the user's minimum seq
2 years ago
func (r *RedisClient) SetUserMinSeq(uid string, minSeq uint32) (err error) {
key := userMinSeq + uid
2 years ago
return r.rdb.Set(context.Background(), key, minSeq, 0).Err()
}
//Get the smallest Seq
2 years ago
func (r *RedisClient) GetUserMinSeq(uid string) (uint64, error) {
key := userMinSeq + uid
2 years ago
seq, err := r.rdb.Get(context.Background(), key).Result()
return uint64(utils.StringToInt(seq)), err
}
2 years ago
func (r *RedisClient) SetGroupUserMinSeq(groupID, userID string, minSeq uint64) (err error) {
key := groupUserMinSeq + "g:" + groupID + "u:" + userID
2 years ago
return r.rdb.Set(context.Background(), key, minSeq, 0).Err()
}
2 years ago
func (r *RedisClient) GetGroupUserMinSeq(groupID, userID string) (uint64, error) {
key := groupUserMinSeq + "g:" + groupID + "u:" + userID
2 years ago
seq, err := r.rdb.Get(context.Background(), key).Result()
return uint64(utils.StringToInt(seq)), err
}
2 years ago
func (r *RedisClient) GetGroupMaxSeq(groupID string) (uint64, error) {
2 years ago
key := groupMaxSeq + groupID
2 years ago
seq, err := r.rdb.Get(context.Background(), key).Result()
2 years ago
return uint64(utils.StringToInt(seq)), err
}
2 years ago
func (r *RedisClient) IncrGroupMaxSeq(groupID string) (uint64, error) {
2 years ago
key := groupMaxSeq + groupID
2 years ago
seq, err := r.rdb.Incr(context.Background(), key).Result()
2 years ago
return uint64(seq), err
}
2 years ago
func (r *RedisClient) SetGroupMaxSeq(groupID string, maxSeq uint64) error {
2 years ago
key := groupMaxSeq + groupID
2 years ago
return r.rdb.Set(context.Background(), key, maxSeq, 0).Err()
2 years ago
}
2 years ago
func (r *RedisClient) SetGroupMinSeq(groupID string, minSeq uint32) error {
2 years ago
key := groupMinSeq + groupID
2 years ago
return r.rdb.Set(context.Background(), key, minSeq, 0).Err()
2 years ago
}
//Store userid and platform class to redis
2 years ago
func (r *RedisClient) AddTokenFlag(userID string, platformID int, token string, flag int) error {
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
log2.NewDebug("", "add token key is ", key)
2 years ago
return r.rdb.HSet(context.Background(), key, token, flag).Err()
}
2 years ago
func (r *RedisClient) GetTokenMapByUidPid(userID, platformID string) (map[string]int, error) {
key := uidPidToken + userID + ":" + platformID
log2.NewDebug("", "get token key is ", key)
2 years ago
m, err := r.rdb.HGetAll(context.Background(), key).Result()
mm := make(map[string]int)
for k, v := range m {
mm[k] = utils.StringToInt(v)
}
return mm, err
}
2 years ago
func (r *RedisClient) SetTokenMapByUidPid(userID string, platformID int, m map[string]int) error {
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
2 years ago
mm := make(map[string]interface{})
for k, v := range m {
mm[k] = v
}
2 years ago
return r.rdb.HSet(context.Background(), key, mm).Err()
}
2 years ago
2 years ago
func (r *RedisClient) DeleteTokenByUidPid(userID string, platformID int, fields []string) error {
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
2 years ago
return r.rdb.HDel(context.Background(), key, fields...).Err()
}
2 years ago
func (r *RedisClient) GetMessageListBySeq(userID string, seqList []uint32, operationID string) (seqMsg []*pbCommon.MsgData, failedSeqList []uint32, errResult error) {
for _, v := range seqList {
//MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1
key := messageCache + userID + "_" + strconv.Itoa(int(v))
2 years ago
result, err := r.rdb.Get(context.Background(), key).Result()
2 years ago
if err != nil {
errResult = err
failedSeqList = append(failedSeqList, v)
2 years ago
log2.Debug(operationID, "redis get message error: ", err.Error(), v)
2 years ago
} else {
msg := pbCommon.MsgData{}
2 years ago
err = jsonpb.UnmarshalString(result, &msg)
if err != nil {
errResult = err
2 years ago
failedSeqList = append(failedSeqList, v)
2 years ago
log2.NewWarn(operationID, "Unmarshal err ", result, err.Error())
} else {
log2.NewDebug(operationID, "redis get msg is ", msg.String())
seqMsg = append(seqMsg, &msg)
}
2 years ago
}
}
return seqMsg, failedSeqList, errResult
}
2 years ago
func (r *RedisClient) SetMessageToCache(msgList []*pbChat.MsgDataToMQ, uid string, operationID string) (error, int) {
ctx := context.Background()
2 years ago
pipe := r.rdb.Pipeline()
var failedList []pbChat.MsgDataToMQ
for _, msg := range msgList {
key := messageCache + uid + "_" + strconv.Itoa(int(msg.MsgData.Seq))
s, err := utils.Pb2String(msg.MsgData)
if err != nil {
log2.NewWarn(operationID, utils.GetSelfFuncName(), "Pb2String failed", msg.MsgData.String(), uid, err.Error())
continue
}
log2.NewDebug(operationID, "convert string is ", s)
err = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err()
2 years ago
//err = r.rdb.HMSet(context.Background(), "12", map[string]interface{}{"1": 2, "343": false}).Err()
if err != nil {
log2.NewWarn(operationID, utils.GetSelfFuncName(), "redis failed", "args:", key, *msg, uid, s, err.Error())
failedList = append(failedList, *msg)
}
}
if len(failedList) != 0 {
return errors.New(fmt.Sprintf("set msg to cache failed, failed lists: %q,%s", failedList, operationID)), len(failedList)
}
_, err := pipe.Exec(ctx)
return err, 0
}
2 years ago
func (r *RedisClient) DeleteMessageFromCache(msgList []*pbChat.MsgDataToMQ, uid string, operationID string) error {
ctx := context.Background()
for _, msg := range msgList {
key := messageCache + uid + "_" + strconv.Itoa(int(msg.MsgData.Seq))
2 years ago
err := r.rdb.Del(ctx, key).Err()
2 years ago
if err != nil {
log2.NewWarn(operationID, utils.GetSelfFuncName(), "redis failed", "args:", key, uid, err.Error(), msgList)
}
}
2 years ago
return nil
}
2 years ago
func (r *RedisClient) CleanUpOneUserAllMsgFromRedis(userID string, operationID string) error {
ctx := context.Background()
key := messageCache + userID + "_" + "*"
2 years ago
vals, err := r.rdb.Keys(ctx, key).Result()
log2.Debug(operationID, "vals: ", vals)
2 years ago
if err == redis.Nil {
return nil
}
if err != nil {
return utils.Wrap(err, "")
}
2 years ago
for _, v := range vals {
2 years ago
err = r.rdb.Del(ctx, v).Err()
}
return nil
}
2 years ago
func (r *RedisClient) HandleSignalInfo(operationID string, msg *pbCommon.MsgData, pushToUserID string) (isSend bool, err error) {
req := &pbRtc.SignalReq{}
if err := proto.Unmarshal(msg.Content, req); err != nil {
2 years ago
return false, err
}
//log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "SignalReq: ", req.String())
var inviteeUserIDList []string
2 years ago
var isInviteSignal bool
switch signalInfo := req.Payload.(type) {
case *pbRtc.SignalReq_Invite:
2 years ago
inviteeUserIDList = signalInfo.Invite.Invitation.InviteeUserIDList
isInviteSignal = true
case *pbRtc.SignalReq_InviteInGroup:
2 years ago
inviteeUserIDList = signalInfo.InviteInGroup.Invitation.InviteeUserIDList
isInviteSignal = true
2 years ago
if !utils.IsContain(pushToUserID, inviteeUserIDList) {
return false, nil
}
2 years ago
case *pbRtc.SignalReq_HungUp, *pbRtc.SignalReq_Cancel, *pbRtc.SignalReq_Reject, *pbRtc.SignalReq_Accept:
2 years ago
return false, nil
default:
2 years ago
return false, nil
}
2 years ago
if isInviteSignal {
2 years ago
log2.NewDebug(operationID, utils.GetSelfFuncName(), "invite userID list:", inviteeUserIDList)
2 years ago
for _, userID := range inviteeUserIDList {
2 years ago
log2.NewInfo(operationID, utils.GetSelfFuncName(), "invite userID:", userID)
2 years ago
timeout, err := strconv.Atoi(config.Config.Rtc.SignalTimeout)
if err != nil {
2 years ago
return false, err
2 years ago
}
2 years ago
keyList := signalListCache + userID
2 years ago
err = r.rdb.LPush(context.Background(), keyList, msg.ClientMsgID).Err()
2 years ago
if err != nil {
2 years ago
return false, err
2 years ago
}
2 years ago
err = r.rdb.Expire(context.Background(), keyList, time.Duration(timeout)*time.Second).Err()
2 years ago
if err != nil {
2 years ago
return false, err
2 years ago
}
2 years ago
key := signalCache + msg.ClientMsgID
2 years ago
err = r.rdb.Set(context.Background(), key, msg.Content, time.Duration(timeout)*time.Second).Err()
2 years ago
if err != nil {
2 years ago
return false, err
2 years ago
}
}
}
2 years ago
return true, nil
}
2 years ago
func (r *RedisClient) GetSignalInfoFromCacheByClientMsgID(clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error) {
2 years ago
key := signalCache + clientMsgID
invitationInfo = &pbRtc.SignalInviteReq{}
2 years ago
bytes, err := r.rdb.Get(context.Background(), key).Bytes()
if err != nil {
return nil, err
}
req := &pbRtc.SignalReq{}
if err = proto.Unmarshal(bytes, req); err != nil {
return nil, err
}
switch req2 := req.Payload.(type) {
case *pbRtc.SignalReq_Invite:
invitationInfo.Invitation = req2.Invite.Invitation
2 years ago
invitationInfo.OpUserID = req2.Invite.OpUserID
case *pbRtc.SignalReq_InviteInGroup:
invitationInfo.Invitation = req2.InviteInGroup.Invitation
2 years ago
invitationInfo.OpUserID = req2.InviteInGroup.OpUserID
}
return invitationInfo, err
}
2 years ago
func (r *RedisClient) GetAvailableSignalInvitationInfo(userID string) (invitationInfo *pbRtc.SignalInviteReq, err error) {
2 years ago
keyList := signalListCache + userID
2 years ago
result := r.rdb.LPop(context.Background(), keyList)
if err = result.Err(); err != nil {
return nil, utils.Wrap(err, "GetAvailableSignalInvitationInfo failed")
}
key, err := result.Result()
if err != nil {
return nil, utils.Wrap(err, "GetAvailableSignalInvitationInfo failed")
}
log2.NewDebug("", utils.GetSelfFuncName(), result, result.String())
2 years ago
invitationInfo, err = r.GetSignalInfoFromCacheByClientMsgID(key)
2 years ago
if err != nil {
return nil, utils.Wrap(err, "GetSignalInfoFromCacheByClientMsgID")
}
2 years ago
err = r.DelUserSignalList(userID)
2 years ago
if err != nil {
return nil, utils.Wrap(err, "GetSignalInfoFromCacheByClientMsgID")
}
return invitationInfo, nil
}
2 years ago
func (r *RedisClient) DelUserSignalList(userID string) error {
2 years ago
keyList := signalListCache + userID
2 years ago
err := r.rdb.Del(context.Background(), keyList).Err()
2 years ago
return err
}
2 years ago
func (r *RedisClient) DelMsgFromCache(uid string, seqList []uint32, operationID string) {
for _, seq := range seqList {
key := messageCache + uid + "_" + strconv.Itoa(int(seq))
2 years ago
result, err := r.rdb.Get(context.Background(), key).Result()
2 years ago
if err != nil {
2 years ago
if err == redis.Nil {
2 years ago
log2.NewDebug(operationID, utils.GetSelfFuncName(), err.Error(), "redis nil")
} else {
log2.NewError(operationID, utils.GetSelfFuncName(), err.Error(), key)
}
continue
}
var msg pbCommon.MsgData
if err := utils.String2Pb(result, &msg); err != nil {
2 years ago
log2.Error(operationID, utils.GetSelfFuncName(), "String2Pb failed", msg, result, key, err.Error())
continue
}
msg.Status = constant.MsgDeleted
s, err := utils.Pb2String(&msg)
if err != nil {
log2.Error(operationID, utils.GetSelfFuncName(), "Pb2String failed", msg, err.Error())
continue
}
2 years ago
if err := r.rdb.Set(context.Background(), key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil {
log2.Error(operationID, utils.GetSelfFuncName(), "Set failed", err.Error())
}
}
}
2 years ago
2 years ago
func (r *RedisClient) SetGetuiToken(token string, expireTime int64) error {
return r.rdb.Set(context.Background(), getuiToken, token, time.Duration(expireTime)*time.Second).Err()
2 years ago
}
2 years ago
func (r *RedisClient) GetGetuiToken() (string, error) {
result, err := r.rdb.Get(context.Background(), getuiToken).Result()
2 years ago
return result, err
2 years ago
}
2 years ago
2 years ago
func (r *RedisClient) SetGetuiTaskID(taskID string, expireTime int64) error {
return r.rdb.Set(context.Background(), getuiTaskID, taskID, time.Duration(expireTime)*time.Second).Err()
2 years ago
}
2 years ago
func (r *RedisClient) GetGetuiTaskID() (string, error) {
result, err := r.rdb.Get(context.Background(), getuiTaskID).Result()
2 years ago
return result, err
}
2 years ago
func (r *RedisClient) SetSendMsgStatus(status int32, operationID string) error {
return r.rdb.Set(context.Background(), sendMsgFailedFlag+operationID, status, time.Hour*24).Err()
2 years ago
}
2 years ago
func (r *RedisClient) GetSendMsgStatus(operationID string) (int, error) {
result, err := r.rdb.Get(context.Background(), sendMsgFailedFlag+operationID).Result()
2 years ago
if err != nil {
return 0, err
}
status, err := strconv.Atoi(result)
return status, err
2 years ago
}
2 years ago
2 years ago
func (r *RedisClient) SetFcmToken(account string, platformID int, fcmToken string, expireTime int64) (err error) {
key := FcmToken + account + ":" + strconv.Itoa(platformID)
2 years ago
return r.rdb.Set(context.Background(), key, fcmToken, time.Duration(expireTime)*time.Second).Err()
2 years ago
}
2 years ago
func (r *RedisClient) GetFcmToken(account string, platformID int) (string, error) {
key := FcmToken + account + ":" + strconv.Itoa(platformID)
2 years ago
return r.rdb.Get(context.Background(), key).Result()
2 years ago
}
2 years ago
func (r *RedisClient) DelFcmToken(account string, platformID int) error {
key := FcmToken + account + ":" + strconv.Itoa(platformID)
2 years ago
return r.rdb.Del(context.Background(), key).Err()
}
2 years ago
func (r *RedisClient) IncrUserBadgeUnreadCountSum(uid string) (int, error) {
key := userBadgeUnreadCountSum + uid
2 years ago
seq, err := r.rdb.Incr(context.Background(), key).Result()
return int(seq), err
}
2 years ago
func (r *RedisClient) SetUserBadgeUnreadCountSum(uid string, value int) error {
key := userBadgeUnreadCountSum + uid
2 years ago
return r.rdb.Set(context.Background(), key, value, 0).Err()
}
2 years ago
func (r *RedisClient) GetUserBadgeUnreadCountSum(uid string) (int, error) {
2 years ago
key := userBadgeUnreadCountSum + uid
2 years ago
seq, err := r.rdb.Get(context.Background(), key).Result()
2 years ago
return utils.StringToInt(seq), err
}
2 years ago
func (r *RedisClient) JudgeMessageReactionEXISTS(clientMsgID string, sessionType int32) (bool, error) {
2 years ago
key := getMessageReactionExPrefix(clientMsgID, sessionType)
2 years ago
n, err := r.rdb.Exists(context.Background(), key).Result()
2 years ago
if n > 0 {
return true, err
} else {
return false, err
}
}
2 years ago
func (r *RedisClient) GetOneMessageAllReactionList(clientMsgID string, sessionType int32) (map[string]string, error) {
key := getMessageReactionExPrefix(clientMsgID, sessionType)
2 years ago
return r.rdb.HGetAll(context.Background(), key).Result()
}
2 years ago
func (r *RedisClient) DeleteOneMessageKey(clientMsgID string, sessionType int32, subKey string) error {
key := getMessageReactionExPrefix(clientMsgID, sessionType)
2 years ago
return r.rdb.HDel(context.Background(), key, subKey).Err()
}
2 years ago
func (r *RedisClient) SetMessageReactionExpire(clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) {
key := getMessageReactionExPrefix(clientMsgID, sessionType)
2 years ago
return r.rdb.Expire(context.Background(), key, expiration).Result()
}
2 years ago
2 years ago
func (r *RedisClient) GetMessageTypeKeyValue(clientMsgID string, sessionType int32, typeKey string) (string, error) {
2 years ago
key := getMessageReactionExPrefix(clientMsgID, sessionType)
2 years ago
result, err := r.rdb.HGet(context.Background(), key, typeKey).Result()
2 years ago
return result, err
}
2 years ago
2 years ago
func (r *RedisClient) SetMessageTypeKeyValue(clientMsgID string, sessionType int32, typeKey, value string) error {
2 years ago
key := getMessageReactionExPrefix(clientMsgID, sessionType)
2 years ago
return r.rdb.HSet(context.Background(), key, typeKey, value).Err()
2 years ago
}
2 years ago
2 years ago
func (r *RedisClient) LockMessageTypeKey(clientMsgID string, TypeKey string) error {
2 years ago
key := exTypeKeyLocker + clientMsgID + "_" + TypeKey
2 years ago
return r.rdb.SetNX(context.Background(), key, 1, time.Minute).Err()
2 years ago
}
2 years ago
func (r *RedisClient) UnLockMessageTypeKey(clientMsgID string, TypeKey string) error {
2 years ago
key := exTypeKeyLocker + clientMsgID + "_" + TypeKey
2 years ago
return r.rdb.Del(context.Background(), key).Err()
2 years ago
}
2 years ago
func (r *RedisClient) getMessageReactionExPrefix(clientMsgID string, sessionType int32) string {
2 years ago
switch sessionType {
case constant.SingleChatType:
return "EX_SINGLE_" + clientMsgID
case constant.GroupChatType:
return "EX_GROUP_" + clientMsgID
case constant.SuperGroupChatType:
return "EX_SUPER_GROUP_" + clientMsgID
case constant.NotificationChatType:
return "EX_NOTIFICATION" + clientMsgID
}
return ""
}