You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Open-IM-Server/pkg/common/db/cache/redis.go

565 lines
20 KiB

package cache
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
log2 "Open_IM/pkg/common/log"
pbChat "Open_IM/pkg/proto/msg"
pbRtc "Open_IM/pkg/proto/rtc"
pbCommon "Open_IM/pkg/proto/sdkws"
"Open_IM/pkg/utils"
"context"
"errors"
"fmt"
"strconv"
"time"
"github.com/go-redis/redis/v8"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
)
const (
userIncrSeq = "REDIS_USER_INCR_SEQ:" // user incr seq
appleDeviceToken = "DEVICE_TOKEN"
userMinSeq = "REDIS_USER_MIN_SEQ:"
getuiToken = "GETUI_TOKEN"
getuiTaskID = "GETUI_TASK_ID"
messageCache = "MESSAGE_CACHE:"
signalCache = "SIGNAL_CACHE:"
signalListCache = "SIGNAL_LIST_CACHE:"
FcmToken = "FCM_TOKEN:"
groupUserMinSeq = "GROUP_USER_MIN_SEQ:"
groupMaxSeq = "GROUP_MAX_SEQ:"
groupMinSeq = "GROUP_MIN_SEQ:"
sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:"
userBadgeUnreadCountSum = "USER_BADGE_UNREAD_COUNT_SUM:"
exTypeKeyLocker = "EX_LOCK:"
)
type Cache interface {
IncrUserSeq(uid string) (uint64, error)
GetUserMaxSeq(uid string) (uint64, error)
SetUserMaxSeq(uid string, maxSeq uint64) error
SetUserMinSeq(uid string, minSeq uint32) (err error)
GetUserMinSeq(uid string) (uint64, error)
SetGroupUserMinSeq(groupID, userID string, minSeq uint64) (err error)
GetGroupUserMinSeq(groupID, userID string) (uint64, error)
}
// native redis operate
//func NewRedis() *RedisClient {
// o := &RedisClient{}
// o.InitRedis()
// return o
//}
func NewRedis() (*RedisClient, error) {
var rdb redis.UniversalClient
if config.Config.Redis.EnableCluster {
rdb = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: config.Config.Redis.DBAddress,
Username: config.Config.Redis.DBUserName,
Password: config.Config.Redis.DBPassWord, // no password set
PoolSize: 50,
})
//if err := rdb.Ping(ctx).Err();err != nil {
// return nil, fmt.Errorf("redis ping %w", err)
//}
//return &RedisClient{rdb: rdb}, nil
} else {
rdb = redis.NewClient(&redis.Options{
Addr: config.Config.Redis.DBAddress[0],
Username: config.Config.Redis.DBUserName,
Password: config.Config.Redis.DBPassWord, // no password set
DB: 0, // use default DB
PoolSize: 100, // 连接池大小
})
//err := rdb.Ping(ctx).Err()
//if err != nil {
// panic(err.Error() + " redis " + config.Config.Redis.DBAddress[0] + config.Config.Redis.DBUserName + config.Config.Redis.DBPassWord)
//}
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
err := rdb.Ping(ctx).Err()
if err != nil {
return nil, fmt.Errorf("redis ping %w", err)
}
return &RedisClient{rdb: rdb}, nil
}
type RedisClient struct {
rdb redis.UniversalClient
}
func NewRedisClient(rdb redis.UniversalClient) *RedisClient {
return &RedisClient{rdb: rdb}
}
//func (r *RedisClient) InitRedis() {
// var rdb redis.UniversalClient
// var err error
// ctx := context.Background()
// if config.Config.Redis.EnableCluster {
// rdb = redis.NewClusterClient(&redis.ClusterOptions{
// Addrs: config.Config.Redis.DBAddress,
// Username: config.Config.Redis.DBUserName,
// Password: config.Config.Redis.DBPassWord, // no password set
// PoolSize: 50,
// })
// _, err = rdb.Ping(ctx).Result()
// if err != nil {
// fmt.Println("redis cluster failed address ", config.Config.Redis.DBAddress)
// panic(err.Error() + " redis cluster " + config.Config.Redis.DBUserName + config.Config.Redis.DBPassWord)
// }
// } else {
// rdb = redis.NewClient(&redis.Options{
// Addr: config.Config.Redis.DBAddress[0],
// Username: config.Config.Redis.DBUserName,
// Password: config.Config.Redis.DBPassWord, // no password set
// DB: 0, // use default DB
// PoolSize: 100, // 连接池大小
// })
// _, err = rdb.Ping(ctx).Result()
// if err != nil {
// panic(err.Error() + " redis " + config.Config.Redis.DBAddress[0] + config.Config.Redis.DBUserName + config.Config.Redis.DBPassWord)
// }
// }
// r.rdb = rdb
//}
func (r *RedisClient) GetClient() redis.UniversalClient {
return r.rdb
}
//func NewRedisClient(rdb redis.UniversalClient) *RedisClient {
// return &RedisClient{rdb: rdb}
//}
// Perform seq auto-increment operation of user messages
func (r *RedisClient) IncrUserSeq(uid string) (uint64, error) {
key := userIncrSeq + uid
seq, err := r.rdb.Incr(context.Background(), key).Result()
return uint64(seq), err
}
// Get the largest Seq
func (r *RedisClient) GetUserMaxSeq(uid string) (uint64, error) {
key := userIncrSeq + uid
seq, err := r.rdb.Get(context.Background(), key).Result()
return uint64(utils.StringToInt(seq)), err
}
// set the largest Seq
func (r *RedisClient) SetUserMaxSeq(uid string, maxSeq uint64) error {
key := userIncrSeq + uid
return r.rdb.Set(context.Background(), key, maxSeq, 0).Err()
}
// Set the user's minimum seq
func (r *RedisClient) SetUserMinSeq(uid string, minSeq uint32) (err error) {
key := userMinSeq + uid
return r.rdb.Set(context.Background(), key, minSeq, 0).Err()
}
// Get the smallest Seq
func (r *RedisClient) GetUserMinSeq(uid string) (uint64, error) {
key := userMinSeq + uid
seq, err := r.rdb.Get(context.Background(), key).Result()
return uint64(utils.StringToInt(seq)), err
}
func (r *RedisClient) SetGroupUserMinSeq(groupID, userID string, minSeq uint64) (err error) {
key := groupUserMinSeq + "g:" + groupID + "u:" + userID
return r.rdb.Set(context.Background(), key, minSeq, 0).Err()
}
func (r *RedisClient) GetGroupUserMinSeq(groupID, userID string) (uint64, error) {
key := groupUserMinSeq + "g:" + groupID + "u:" + userID
seq, err := r.rdb.Get(context.Background(), key).Result()
return uint64(utils.StringToInt(seq)), err
}
func (r *RedisClient) GetGroupMaxSeq(groupID string) (uint64, error) {
key := groupMaxSeq + groupID
seq, err := r.rdb.Get(context.Background(), key).Result()
return uint64(utils.StringToInt(seq)), err
}
func (r *RedisClient) IncrGroupMaxSeq(groupID string) (uint64, error) {
key := groupMaxSeq + groupID
seq, err := r.rdb.Incr(context.Background(), key).Result()
return uint64(seq), err
}
func (r *RedisClient) SetGroupMaxSeq(groupID string, maxSeq uint64) error {
key := groupMaxSeq + groupID
return r.rdb.Set(context.Background(), key, maxSeq, 0).Err()
}
func (r *RedisClient) SetGroupMinSeq(groupID string, minSeq uint32) error {
key := groupMinSeq + groupID
return r.rdb.Set(context.Background(), key, minSeq, 0).Err()
}
// Store userid and platform class to redis
func (r *RedisClient) AddTokenFlag(userID string, platformID int, token string, flag int) error {
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
log2.NewDebug("", "add token key is ", key)
return r.rdb.HSet(context.Background(), key, token, flag).Err()
}
func (r *RedisClient) GetTokenMapByUidPid(userID, platformID string) (map[string]int, error) {
key := uidPidToken + userID + ":" + platformID
log2.NewDebug("", "get token key is ", key)
m, err := r.rdb.HGetAll(context.Background(), key).Result()
mm := make(map[string]int)
for k, v := range m {
mm[k] = utils.StringToInt(v)
}
return mm, err
}
func (r *RedisClient) SetTokenMapByUidPid(userID string, platformID int, m map[string]int) error {
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
mm := make(map[string]interface{})
for k, v := range m {
mm[k] = v
}
return r.rdb.HSet(context.Background(), key, mm).Err()
}
func (r *RedisClient) DeleteTokenByUidPid(userID string, platformID int, fields []string) error {
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
return r.rdb.HDel(context.Background(), key, fields...).Err()
}
func (r *RedisClient) GetMessageListBySeq(userID string, seqList []uint32, operationID string) (seqMsg []*pbCommon.MsgData, failedSeqList []uint32, errResult error) {
for _, v := range seqList {
//MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1
key := messageCache + userID + "_" + strconv.Itoa(int(v))
result, err := r.rdb.Get(context.Background(), key).Result()
if err != nil {
errResult = err
failedSeqList = append(failedSeqList, v)
log2.Debug(operationID, "redis get message error: ", err.Error(), v)
} else {
msg := pbCommon.MsgData{}
err = jsonpb.UnmarshalString(result, &msg)
if err != nil {
errResult = err
failedSeqList = append(failedSeqList, v)
log2.NewWarn(operationID, "Unmarshal err ", result, err.Error())
} else {
log2.NewDebug(operationID, "redis get msg is ", msg.String())
seqMsg = append(seqMsg, &msg)
}
}
}
return seqMsg, failedSeqList, errResult
}
func (r *RedisClient) SetMessageToCache(msgList []*pbChat.MsgDataToMQ, uid string, operationID string) (error, int) {
ctx := context.Background()
pipe := r.rdb.Pipeline()
var failedList []pbChat.MsgDataToMQ
for _, msg := range msgList {
key := messageCache + uid + "_" + strconv.Itoa(int(msg.MsgData.Seq))
s, err := utils.Pb2String(msg.MsgData)
if err != nil {
log2.NewWarn(operationID, utils.GetSelfFuncName(), "Pb2String failed", msg.MsgData.String(), uid, err.Error())
continue
}
log2.NewDebug(operationID, "convert string is ", s)
err = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err()
//err = r.rdb.HMSet(context.Background(), "12", map[string]interface{}{"1": 2, "343": false}).Err()
if err != nil {
log2.NewWarn(operationID, utils.GetSelfFuncName(), "redis failed", "args:", key, *msg, uid, s, err.Error())
failedList = append(failedList, *msg)
}
}
if len(failedList) != 0 {
return errors.New(fmt.Sprintf("set msg to cache failed, failed lists: %q,%s", failedList, operationID)), len(failedList)
}
_, err := pipe.Exec(ctx)
return err, 0
}
func (r *RedisClient) DeleteMessageFromCache(msgList []*pbChat.MsgDataToMQ, uid string, operationID string) error {
ctx := context.Background()
for _, msg := range msgList {
key := messageCache + uid + "_" + strconv.Itoa(int(msg.MsgData.Seq))
err := r.rdb.Del(ctx, key).Err()
if err != nil {
log2.NewWarn(operationID, utils.GetSelfFuncName(), "redis failed", "args:", key, uid, err.Error(), msgList)
}
}
return nil
}
func (r *RedisClient) CleanUpOneUserAllMsgFromRedis(userID string, operationID string) error {
ctx := context.Background()
key := messageCache + userID + "_" + "*"
vals, err := r.rdb.Keys(ctx, key).Result()
log2.Debug(operationID, "vals: ", vals)
if err == redis.Nil {
return nil
}
if err != nil {
return utils.Wrap(err, "")
}
for _, v := range vals {
err = r.rdb.Del(ctx, v).Err()
}
return nil
}
func (r *RedisClient) HandleSignalInfo(operationID string, msg *pbCommon.MsgData, pushToUserID string) (isSend bool, err error) {
req := &pbRtc.SignalReq{}
if err := proto.Unmarshal(msg.Content, req); err != nil {
return false, err
}
//log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "SignalReq: ", req.String())
var inviteeUserIDList []string
var isInviteSignal bool
switch signalInfo := req.Payload.(type) {
case *pbRtc.SignalReq_Invite:
inviteeUserIDList = signalInfo.Invite.Invitation.InviteeUserIDList
isInviteSignal = true
case *pbRtc.SignalReq_InviteInGroup:
inviteeUserIDList = signalInfo.InviteInGroup.Invitation.InviteeUserIDList
isInviteSignal = true
if !utils.IsContain(pushToUserID, inviteeUserIDList) {
return false, nil
}
case *pbRtc.SignalReq_HungUp, *pbRtc.SignalReq_Cancel, *pbRtc.SignalReq_Reject, *pbRtc.SignalReq_Accept:
return false, nil
default:
return false, nil
}
if isInviteSignal {
log2.NewDebug(operationID, utils.GetSelfFuncName(), "invite userID list:", inviteeUserIDList)
for _, userID := range inviteeUserIDList {
log2.NewInfo(operationID, utils.GetSelfFuncName(), "invite userID:", userID)
timeout, err := strconv.Atoi(config.Config.Rtc.SignalTimeout)
if err != nil {
return false, err
}
keyList := signalListCache + userID
err = r.rdb.LPush(context.Background(), keyList, msg.ClientMsgID).Err()
if err != nil {
return false, err
}
err = r.rdb.Expire(context.Background(), keyList, time.Duration(timeout)*time.Second).Err()
if err != nil {
return false, err
}
key := signalCache + msg.ClientMsgID
err = r.rdb.Set(context.Background(), key, msg.Content, time.Duration(timeout)*time.Second).Err()
if err != nil {
return false, err
}
}
}
return true, nil
}
func (r *RedisClient) GetSignalInfoFromCacheByClientMsgID(clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error) {
key := signalCache + clientMsgID
invitationInfo = &pbRtc.SignalInviteReq{}
bytes, err := r.rdb.Get(context.Background(), key).Bytes()
if err != nil {
return nil, err
}
req := &pbRtc.SignalReq{}
if err = proto.Unmarshal(bytes, req); err != nil {
return nil, err
}
switch req2 := req.Payload.(type) {
case *pbRtc.SignalReq_Invite:
invitationInfo.Invitation = req2.Invite.Invitation
invitationInfo.OpUserID = req2.Invite.OpUserID
case *pbRtc.SignalReq_InviteInGroup:
invitationInfo.Invitation = req2.InviteInGroup.Invitation
invitationInfo.OpUserID = req2.InviteInGroup.OpUserID
}
return invitationInfo, err
}
func (r *RedisClient) GetAvailableSignalInvitationInfo(userID string) (invitationInfo *pbRtc.SignalInviteReq, err error) {
keyList := signalListCache + userID
result := r.rdb.LPop(context.Background(), keyList)
if err = result.Err(); err != nil {
return nil, utils.Wrap(err, "GetAvailableSignalInvitationInfo failed")
}
key, err := result.Result()
if err != nil {
return nil, utils.Wrap(err, "GetAvailableSignalInvitationInfo failed")
}
log2.NewDebug("", utils.GetSelfFuncName(), result, result.String())
invitationInfo, err = r.GetSignalInfoFromCacheByClientMsgID(key)
if err != nil {
return nil, utils.Wrap(err, "GetSignalInfoFromCacheByClientMsgID")
}
err = r.DelUserSignalList(userID)
if err != nil {
return nil, utils.Wrap(err, "GetSignalInfoFromCacheByClientMsgID")
}
return invitationInfo, nil
}
func (r *RedisClient) DelUserSignalList(userID string) error {
keyList := signalListCache + userID
err := r.rdb.Del(context.Background(), keyList).Err()
return err
}
func (r *RedisClient) DelMsgFromCache(uid string, seqList []uint32, operationID string) {
for _, seq := range seqList {
key := messageCache + uid + "_" + strconv.Itoa(int(seq))
result, err := r.rdb.Get(context.Background(), key).Result()
if err != nil {
if err == redis.Nil {
log2.NewDebug(operationID, utils.GetSelfFuncName(), err.Error(), "redis nil")
} else {
log2.NewError(operationID, utils.GetSelfFuncName(), err.Error(), key)
}
continue
}
var msg pbCommon.MsgData
if err := utils.String2Pb(result, &msg); err != nil {
log2.Error(operationID, utils.GetSelfFuncName(), "String2Pb failed", msg, result, key, err.Error())
continue
}
msg.Status = constant.MsgDeleted
s, err := utils.Pb2String(&msg)
if err != nil {
log2.Error(operationID, utils.GetSelfFuncName(), "Pb2String failed", msg, err.Error())
continue
}
if err := r.rdb.Set(context.Background(), key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil {
log2.Error(operationID, utils.GetSelfFuncName(), "Set failed", err.Error())
}
}
}
func (r *RedisClient) SetGetuiToken(token string, expireTime int64) error {
return r.rdb.Set(context.Background(), getuiToken, token, time.Duration(expireTime)*time.Second).Err()
}
func (r *RedisClient) GetGetuiToken() (string, error) {
result, err := r.rdb.Get(context.Background(), getuiToken).Result()
return result, err
}
func (r *RedisClient) SetGetuiTaskID(taskID string, expireTime int64) error {
return r.rdb.Set(context.Background(), getuiTaskID, taskID, time.Duration(expireTime)*time.Second).Err()
}
func (r *RedisClient) GetGetuiTaskID() (string, error) {
result, err := r.rdb.Get(context.Background(), getuiTaskID).Result()
return result, err
}
func (r *RedisClient) SetSendMsgStatus(status int32, operationID string) error {
return r.rdb.Set(context.Background(), sendMsgFailedFlag+operationID, status, time.Hour*24).Err()
}
func (r *RedisClient) GetSendMsgStatus(operationID string) (int, error) {
result, err := r.rdb.Get(context.Background(), sendMsgFailedFlag+operationID).Result()
if err != nil {
return 0, err
}
status, err := strconv.Atoi(result)
return status, err
}
func (r *RedisClient) SetFcmToken(account string, platformID int, fcmToken string, expireTime int64) (err error) {
key := FcmToken + account + ":" + strconv.Itoa(platformID)
return r.rdb.Set(context.Background(), key, fcmToken, time.Duration(expireTime)*time.Second).Err()
}
func (r *RedisClient) GetFcmToken(account string, platformID int) (string, error) {
key := FcmToken + account + ":" + strconv.Itoa(platformID)
return r.rdb.Get(context.Background(), key).Result()
}
func (r *RedisClient) DelFcmToken(account string, platformID int) error {
key := FcmToken + account + ":" + strconv.Itoa(platformID)
return r.rdb.Del(context.Background(), key).Err()
}
func (r *RedisClient) IncrUserBadgeUnreadCountSum(uid string) (int, error) {
key := userBadgeUnreadCountSum + uid
seq, err := r.rdb.Incr(context.Background(), key).Result()
return int(seq), err
}
func (r *RedisClient) SetUserBadgeUnreadCountSum(uid string, value int) error {
key := userBadgeUnreadCountSum + uid
return r.rdb.Set(context.Background(), key, value, 0).Err()
}
func (r *RedisClient) GetUserBadgeUnreadCountSum(uid string) (int, error) {
key := userBadgeUnreadCountSum + uid
seq, err := r.rdb.Get(context.Background(), key).Result()
return utils.StringToInt(seq), err
}
func (r *RedisClient) JudgeMessageReactionEXISTS(clientMsgID string, sessionType int32) (bool, error) {
key := getMessageReactionExPrefix(clientMsgID, sessionType)
n, err := r.rdb.Exists(context.Background(), key).Result()
if n > 0 {
return true, err
} else {
return false, err
}
}
func (r *RedisClient) GetOneMessageAllReactionList(clientMsgID string, sessionType int32) (map[string]string, error) {
key := getMessageReactionExPrefix(clientMsgID, sessionType)
return r.rdb.HGetAll(context.Background(), key).Result()
}
func (r *RedisClient) DeleteOneMessageKey(clientMsgID string, sessionType int32, subKey string) error {
key := getMessageReactionExPrefix(clientMsgID, sessionType)
return r.rdb.HDel(context.Background(), key, subKey).Err()
}
func (r *RedisClient) SetMessageReactionExpire(clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) {
key := getMessageReactionExPrefix(clientMsgID, sessionType)
return r.rdb.Expire(context.Background(), key, expiration).Result()
}
func (r *RedisClient) GetMessageTypeKeyValue(clientMsgID string, sessionType int32, typeKey string) (string, error) {
key := getMessageReactionExPrefix(clientMsgID, sessionType)
result, err := r.rdb.HGet(context.Background(), key, typeKey).Result()
return result, err
}
func (r *RedisClient) SetMessageTypeKeyValue(clientMsgID string, sessionType int32, typeKey, value string) error {
key := getMessageReactionExPrefix(clientMsgID, sessionType)
return r.rdb.HSet(context.Background(), key, typeKey, value).Err()
}
func (r *RedisClient) LockMessageTypeKey(clientMsgID string, TypeKey string) error {
key := exTypeKeyLocker + clientMsgID + "_" + TypeKey
return r.rdb.SetNX(context.Background(), key, 1, time.Minute).Err()
}
func (r *RedisClient) UnLockMessageTypeKey(clientMsgID string, TypeKey string) error {
key := exTypeKeyLocker + clientMsgID + "_" + TypeKey
return r.rdb.Del(context.Background(), key).Err()
}
func (r *RedisClient) getMessageReactionExPrefix(clientMsgID string, sessionType int32) string {
switch sessionType {
case constant.SingleChatType:
return "EX_SINGLE_" + clientMsgID
case constant.GroupChatType:
return "EX_GROUP_" + clientMsgID
case constant.SuperGroupChatType:
return "EX_SUPER_GROUP_" + clientMsgID
case constant.NotificationChatType:
return "EX_NOTIFICATION" + clientMsgID
}
return ""
}