@ -16,13 +16,16 @@ package cache
import (
"context"
"errors"
"strconv"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/dtm-labs/rockscache"
unrelationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/OpenIMSDK/tools/errs"
"github.com/gogo/protobuf/jsonpb"
@ -33,7 +36,6 @@ import (
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
unrelationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation"
"github.com/redis/go-redis/v9"
)
@ -105,11 +107,7 @@ type MsgModel interface {
GetTokensWithoutError ( ctx context . Context , userID string , platformID int ) ( map [ string ] int , error )
SetTokenMapByUidPid ( ctx context . Context , userID string , platformID int , m map [ string ] int ) error
DeleteTokenByUidPid ( ctx context . Context , userID string , platformID int , fields [ ] string ) error
GetMessagesBySeq (
ctx context . Context ,
conversationID string ,
seqs [ ] int64 ,
) ( seqMsg [ ] * sdkws . MsgData , failedSeqList [ ] int64 , err error )
GetMessagesBySeq ( ctx context . Context , conversationID string , seqs [ ] int64 ) ( seqMsg [ ] * sdkws . MsgData , failedSeqList [ ] int64 , err error )
SetMessageToCache ( ctx context . Context , conversationID string , msgs [ ] * sdkws . MsgData ) ( int , error )
UserDeleteMsgs ( ctx context . Context , conversationID string , seqs [ ] int64 , userID string ) error
DelUserDeleteMsgsList ( ctx context . Context , conversationID string , seqs [ ] int64 )
@ -122,12 +120,7 @@ type MsgModel interface {
JudgeMessageReactionExist ( ctx context . Context , clientMsgID string , sessionType int32 ) ( bool , error )
GetOneMessageAllReactionList ( ctx context . Context , clientMsgID string , sessionType int32 ) ( map [ string ] string , error )
DeleteOneMessageKey ( ctx context . Context , clientMsgID string , sessionType int32 , subKey string ) error
SetMessageReactionExpire (
ctx context . Context ,
clientMsgID string ,
sessionType int32 ,
expiration time . Duration ,
) ( bool , error )
SetMessageReactionExpire ( ctx context . Context , clientMsgID string , sessionType int32 , expiration time . Duration ) ( bool , error )
GetMessageTypeKeyValue ( ctx context . Context , clientMsgID string , sessionType int32 , typeKey string ) ( string , error )
SetMessageTypeKeyValue ( ctx context . Context , clientMsgID string , sessionType int32 , typeKey , value string ) error
LockMessageTypeKey ( ctx context . Context , clientMsgID string , TypeKey string ) error
@ -158,50 +151,51 @@ func (c *msgCache) getHasReadSeqKey(conversationID string, userID string) string
return hasReadSeq + userID + ":" + conversationID
}
func ( c * msgCache ) setSeq (
ctx context . Context ,
conversationID string ,
seq int64 ,
getkey func ( conversationID string ) string ,
) error {
func ( c * msgCache ) setSeq ( ctx context . Context , conversationID string , seq int64 , getkey func ( conversationID string ) string ) error {
return utils . Wrap1 ( c . rdb . Set ( ctx , getkey ( conversationID ) , seq , 0 ) . Err ( ) )
}
func ( c * msgCache ) getSeq (
ctx context . Context ,
conversationID string ,
getkey func ( conversationID string ) string ,
) ( int64 , error ) {
func ( c * msgCache ) getSeq ( ctx context . Context , conversationID string , getkey func ( conversationID string ) string ) ( int64 , error ) {
return utils . Wrap2 ( c . rdb . Get ( ctx , getkey ( conversationID ) ) . Int64 ( ) )
}
func ( c * msgCache ) getSeqs (
ctx context . Context ,
items [ ] string ,
getkey func ( s string ) string ,
) ( m map [ string ] int64 , err error ) {
pipe := c . rdb . Pipeline ( )
for _ , v := range items {
if err := pipe . Get ( ctx , getkey ( v ) ) . Err ( ) ; err != nil && err != redis . Nil {
return nil , errs . Wrap ( err )
}
}
result , err := pipe . Exec ( ctx )
if err != nil && err != redis . Nil {
return nil , errs . Wrap ( err )
}
func ( c * msgCache ) getSeqs ( ctx context . Context , items [ ] string , getkey func ( s string ) string ) ( m map [ string ] int64 , err error ) {
m = make ( map [ string ] int64 , len ( items ) )
for i , v := range result {
seq := v . ( * redis . StringCmd )
if s eq. E rr( ) != nil && s eq. E rr( ) != redis . Nil {
return nil , errs . Wrap ( v. Err ( ) )
for i , v := range items {
res , err := c . rdb . Get ( ctx , getkey ( v ) ) . Result ( )
if err != nil && err != redis . Nil {
return nil , errs . Wrap ( err )
}
val := utils . StringToInt64 ( seq. Val ( ) )
val := utils . StringToInt64 ( res )
if val != 0 {
m [ items [ i ] ] = val
}
}
return m , nil
//pipe := c.rdb.Pipeline()
//for _, v := range items {
// if err := pipe.Get(ctx, getkey(v)).Err(); err != nil && err != redis.Nil {
// return nil, errs.Wrap(err)
// }
//}
//result, err := pipe.Exec(ctx)
//if err != nil && err != redis.Nil {
// return nil, errs.Wrap(err)
//}
//m = make(map[string]int64, len(items))
//for i, v := range result {
// seq := v.(*redis.StringCmd)
// if seq.Err() != nil && seq.Err() != redis.Nil {
// return nil, errs.Wrap(v.Err())
// }
// val := utils.StringToInt64(seq.Val())
// if val != 0 {
// m[items[i]] = val
// }
//}
//return m, nil
}
func ( c * msgCache ) SetMaxSeq ( ctx context . Context , conversationID string , maxSeq int64 ) error {
@ -221,15 +215,21 @@ func (c *msgCache) SetMinSeq(ctx context.Context, conversationID string, minSeq
}
func ( c * msgCache ) setSeqs ( ctx context . Context , seqs map [ string ] int64 , getkey func ( key string ) string ) error {
pipe := c . rdb . Pipeline ( )
for k , seq := range seqs {
err := pipe . Set ( ctx , getkey ( k ) , seq , 0 ) . Err ( )
if err != nil {
for conversationID , seq := range seqs {
if err := c . rdb . Set ( ctx , getkey ( conversationID ) , seq , 0 ) . Err ( ) ; err != nil {
return errs . Wrap ( err )
}
}
_ , err := pipe . Exec ( ctx )
return err
return nil
//pipe := c.rdb.Pipeline()
//for k, seq := range seqs {
// err := pipe.Set(ctx, getkey(k), seq, 0).Err()
// if err != nil {
// return errs.Wrap(err)
// }
//}
//_, err := pipe.Exec(ctx)
//return err
}
func ( c * msgCache ) SetMinSeqs ( ctx context . Context , seqs map [ string ] int64 ) error {
@ -252,30 +252,17 @@ func (c *msgCache) GetConversationUserMinSeq(ctx context.Context, conversationID
return utils . Wrap2 ( c . rdb . Get ( ctx , c . getConversationUserMinSeqKey ( conversationID , userID ) ) . Int64 ( ) )
}
func ( c * msgCache ) GetConversationUserMinSeqs (
ctx context . Context ,
conversationID string ,
userIDs [ ] string ,
) ( m map [ string ] int64 , err error ) {
func ( c * msgCache ) GetConversationUserMinSeqs ( ctx context . Context , conversationID string , userIDs [ ] string ) ( m map [ string ] int64 , err error ) {
return c . getSeqs ( ctx , userIDs , func ( userID string ) string {
return c . getConversationUserMinSeqKey ( conversationID , userID )
} )
}
func ( c * msgCache ) SetConversationUserMinSeq (
ctx context . Context ,
conversationID string ,
userID string ,
minSeq int64 ,
) error {
func ( c * msgCache ) SetConversationUserMinSeq ( ctx context . Context , conversationID string , userID string , minSeq int64 ) error {
return utils . Wrap1 ( c . rdb . Set ( ctx , c . getConversationUserMinSeqKey ( conversationID , userID ) , minSeq , 0 ) . Err ( ) )
}
func ( c * msgCache ) SetConversationUserMinSeqs (
ctx context . Context ,
conversationID string ,
seqs map [ string ] int64 ,
) ( err error ) {
func ( c * msgCache ) SetConversationUserMinSeqs ( ctx context . Context , conversationID string , seqs map [ string ] int64 ) ( err error ) {
return c . setSeqs ( ctx , seqs , func ( userID string ) string {
return c . getConversationUserMinSeqKey ( conversationID , userID )
} )
@ -303,11 +290,7 @@ func (c *msgCache) UserSetHasReadSeqs(ctx context.Context, userID string, hasRea
} )
}
func ( c * msgCache ) GetHasReadSeqs (
ctx context . Context ,
userID string ,
conversationIDs [ ] string ,
) ( map [ string ] int64 , error ) {
func ( c * msgCache ) GetHasReadSeqs ( ctx context . Context , userID string , conversationIDs [ ] string ) ( map [ string ] int64 , error ) {
return c . getSeqs ( ctx , conversationIDs , func ( conversationID string ) string {
return c . getHasReadSeqKey ( conversationID , userID )
} )
@ -319,6 +302,7 @@ func (c *msgCache) GetHasReadSeq(ctx context.Context, userID string, conversatio
func ( c * msgCache ) AddTokenFlag ( ctx context . Context , userID string , platformID int , token string , flag int ) error {
key := uidPidToken + userID + ":" + constant . PlatformIDToName ( platformID )
return errs . Wrap ( c . rdb . HSet ( ctx , key , token , flag ) . Err ( ) )
}
@ -332,6 +316,7 @@ func (c *msgCache) GetTokensWithoutError(ctx context.Context, userID string, pla
for k , v := range m {
mm [ k ] = utils . StringToInt ( v )
}
return mm , nil
}
@ -341,11 +326,13 @@ func (c *msgCache) SetTokenMapByUidPid(ctx context.Context, userID string, platf
for k , v := range m {
mm [ k ] = v
}
return errs . Wrap ( c . rdb . HSet ( ctx , key , mm ) . Err ( ) )
}
func ( c * msgCache ) DeleteTokenByUidPid ( ctx context . Context , userID string , platform int , fields [ ] string ) error {
key := uidPidToken + userID + ":" + constant . PlatformIDToName ( platform )
return errs . Wrap ( c . rdb . HDel ( ctx , key , fields ... ) . Err ( ) )
}
@ -357,58 +344,86 @@ func (c *msgCache) allMessageCacheKey(conversationID string) string {
return messageCache + conversationID + "_*"
}
func ( c * msgCache ) GetMessagesBySeq (
ctx context . Context ,
conversationID string ,
seqs [ ] int64 ,
) ( seqMsgs [ ] * sdkws . MsgData , failedSeqs [ ] int64 , err error ) {
pipe := c . rdb . Pipeline ( )
for _ , v := range seqs {
// MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1
key := c . getMessageCacheKey ( conversationID , v )
if err := pipe . Get ( ctx , key ) . Err ( ) ; err != nil && err != redis . Nil {
return nil , nil , err
func ( c * msgCache ) GetMessagesBySeq ( ctx context . Context , conversationID string , seqs [ ] int64 ) ( seqMsgs [ ] * sdkws . MsgData , failedSeqs [ ] int64 , err error ) {
for _ , seq := range seqs {
res , err := c . rdb . Get ( ctx , c . getMessageCacheKey ( conversationID , seq ) ) . Result ( )
if err != nil {
log . ZError ( ctx , "GetMessagesBySeq failed" , err , "conversationID" , conversationID , "seq" , seq )
failedSeqs = append ( failedSeqs , seq )
continue
}
}
result , err := pipe . Exec ( ctx )
for i , v := range result {
cmd := v . ( * redis . StringCmd )
if cmd . Err ( ) != nil {
failedSeqs = append ( failedSeqs , seqs [ i ] )
} else {
msg := sdkws . MsgData { }
err = msgprocessor . String2Pb ( cmd . Val ( ) , & msg )
if err == nil {
if msg . Status != constant . MsgDeleted {
seqMsgs = append ( seqMsgs , & msg )
continue
}
} else {
log . ZWarn ( ctx , "UnmarshalString failed" , err , "conversationID" , conversationID , "seq" , seqs [ i ] , "msg" , cmd . Val ( ) )
}
failedSeqs = append ( failedSeqs , seqs [ i ] )
msg := sdkws . MsgData { }
if err = msgprocessor . String2Pb ( res , & msg ) ; err != nil {
log . ZError ( ctx , "GetMessagesBySeq Unmarshal failed" , err , "res" , res , "conversationID" , conversationID , "seq" , seq )
failedSeqs = append ( failedSeqs , seq )
continue
}
if msg . Status == constant . MsgDeleted {
failedSeqs = append ( failedSeqs , seq )
continue
}
seqMsgs = append ( seqMsgs , & msg )
}
return seqMsgs , failedSeqs , err
return
//pipe := c.rdb.Pipeline()
//for _, v := range seqs {
// // MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1
// key := c.getMessageCacheKey(conversationID, v)
// if err := pipe.Get(ctx, key).Err(); err != nil && err != redis.Nil {
// return nil, nil, err
// }
//}
//result, err := pipe.Exec(ctx)
//for i, v := range result {
// cmd := v.(*redis.StringCmd)
// if cmd.Err() != nil {
// failedSeqs = append(failedSeqs, seqs[i])
// } else {
// msg := sdkws.MsgData{}
// err = msgprocessor.String2Pb(cmd.Val(), &msg)
// if err == nil {
// if msg.Status != constant.MsgDeleted {
// seqMsgs = append(seqMsgs, &msg)
// continue
// }
// } else {
// log.ZWarn(ctx, "UnmarshalString failed", err, "conversationID", conversationID, "seq", seqs[i], "msg", cmd.Val())
// }
// failedSeqs = append(failedSeqs, seqs[i])
// }
//}
//return seqMsgs, failedSeqs, err
}
func ( c * msgCache ) SetMessageToCache ( ctx context . Context , conversationID string , msgs [ ] * sdkws . MsgData ) ( int , error ) {
pipe := c . rdb . Pipeline ( )
var failedMsgs [ ] * sdkws . MsgData
for _ , msg := range msgs {
key := c . getMessageCacheKey ( conversationID , msg . Seq )
s , err := msgprocessor . Pb2String ( msg )
if err != nil {
return 0 , errs . Wrap ( err )
}
err = pipe . Set ( ctx , key , s , time . Duration ( config . Config . MsgCacheTimeout ) * time . Second ) . Err ( )
if err != nil {
failedMsgs = append ( failedMsgs , msg )
log . ZWarn ( ctx , "set msg 2 cache failed" , err , "msg" , failedMsgs )
key := c . getMessageCacheKey ( conversationID , msg . Seq )
if err := c . rdb . Set ( ctx , key , s , time . Duration ( config . Config . MsgCacheTimeout ) * time . Second ) . Err ( ) ; err != nil {
return 0 , errs . Wrap ( err )
}
}
_ , err := pipe . Exec ( ctx )
return len ( failedMsgs ) , err
return len ( msgs ) , nil
//pipe := c.rdb.Pipeline()
//var failedMsgs []*sdkws.MsgData
//for _, msg := range msgs {
// key := c.getMessageCacheKey(conversationID, msg.Seq)
// s, err := msgprocessor.Pb2String(msg)
// if err != nil {
// return 0, errs.Wrap(err)
// }
// err = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err()
// if err != nil {
// failedMsgs = append(failedMsgs, msg)
// log.ZWarn(ctx, "set msg 2 cache failed", err, "msg", failedMsgs)
// }
//}
//_, err := pipe.Exec(ctx)
//return len(failedMsgs), err
}
func ( c * msgCache ) getMessageDelUserListKey ( conversationID string , seq int64 ) string {
@ -420,27 +435,47 @@ func (c *msgCache) getUserDelList(conversationID, userID string) string {
}
func ( c * msgCache ) UserDeleteMsgs ( ctx context . Context , conversationID string , seqs [ ] int64 , userID string ) error {
pipe := c . rdb . Pipeline ( )
for _ , seq := range seqs {
delUserListKey := c . getMessageDelUserListKey ( conversationID , seq )
userDelListKey := c . getUserDelList ( conversationID , userID )
err := pipe . SAdd ( ctx , delUserListKey , userID ) . Err ( )
err := c. rdb . SAdd ( ctx , delUserListKey , userID ) . Err ( )
if err != nil {
return errs . Wrap ( err )
}
err = pipe . SAdd ( ctx , userDelListKey , seq ) . Err ( )
err = c. rdb . SAdd ( ctx , userDelListKey , seq ) . Err ( )
if err != nil {
return errs . Wrap ( err )
}
if err := pipe . Expire ( ctx , delUserListKey , time . Duration ( config . Config . MsgCacheTimeout ) * time . Second ) . Err ( ) ; err != nil {
if err := c. rdb . Expire ( ctx , delUserListKey , time . Duration ( config . Config . MsgCacheTimeout ) * time . Second ) . Err ( ) ; err != nil {
return errs . Wrap ( err )
}
if err := pipe . Expire ( ctx , userDelListKey , time . Duration ( config . Config . MsgCacheTimeout ) * time . Second ) . Err ( ) ; err != nil {
if err := c. rdb . Expire ( ctx , userDelListKey , time . Duration ( config . Config . MsgCacheTimeout ) * time . Second ) . Err ( ) ; err != nil {
return errs . Wrap ( err )
}
}
_ , err := pipe . Exec ( ctx )
return errs . Wrap ( err )
return nil
//pipe := c.rdb.Pipeline()
//for _, seq := range seqs {
// delUserListKey := c.getMessageDelUserListKey(conversationID, seq)
// userDelListKey := c.getUserDelList(conversationID, userID)
// err := pipe.SAdd(ctx, delUserListKey, userID).Err()
// if err != nil {
// return errs.Wrap(err)
// }
// err = pipe.SAdd(ctx, userDelListKey, seq).Err()
// if err != nil {
// return errs.Wrap(err)
// }
// if err := pipe.Expire(ctx, delUserListKey, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil {
// return errs.Wrap(err)
// }
// if err := pipe.Expire(ctx, userDelListKey, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil {
// return errs.Wrap(err)
// }
//}
//_, err := pipe.Exec(ctx)
//return errs.Wrap(err)
}
func ( c * msgCache ) GetUserDelList ( ctx context . Context , userID , conversationID string ) ( seqs [ ] int64 , err error ) {
@ -452,6 +487,7 @@ func (c *msgCache) GetUserDelList(ctx context.Context, userID, conversationID st
for i , v := range result {
seqs [ i ] = utils . StringToInt64 ( v )
}
return seqs , nil
}
@ -460,67 +496,102 @@ func (c *msgCache) DelUserDeleteMsgsList(ctx context.Context, conversationID str
delUsers , err := c . rdb . SMembers ( ctx , c . getMessageDelUserListKey ( conversationID , seq ) ) . Result ( )
if err != nil {
log . ZWarn ( ctx , "DelUserDeleteMsgsList failed" , err , "conversationID" , conversationID , "seq" , seq )
continue
}
if len ( delUsers ) > 0 {
pipe := c . rdb . Pipeline ( )
var failedFlag bool
for _ , userID := range delUsers {
err = pipe . SRem ( ctx , c . getUserDelList ( conversationID , userID ) , seq ) . Err ( )
err = c. rdb . SRem ( ctx , c . getUserDelList ( conversationID , userID ) , seq ) . Err ( )
if err != nil {
failedFlag = true
log . ZWarn (
ctx ,
"DelUserDeleteMsgsList failed" ,
err ,
"conversationID" ,
conversationID ,
"seq" ,
seq ,
"userID" ,
userID ,
)
log . ZWarn ( ctx , "DelUserDeleteMsgsList failed" , err , "conversationID" , conversationID , "seq" , seq , "userID" , userID )
}
}
if ! failedFlag {
if err := pipe . Del ( ctx , c . getMessageDelUserListKey ( conversationID , seq ) ) . Err ( ) ; err != nil {
if err := c . rdb . Del ( ctx , c . getMessageDelUserListKey ( conversationID , seq ) ) . Err ( ) ; err != nil {
log . ZWarn ( ctx , "DelUserDeleteMsgsList failed" , err , "conversationID" , conversationID , "seq" , seq )
}
}
if _ , err := pipe . Exec ( ctx ) ; err != nil {
log . ZError ( ctx , "pipe exec failed" , err , "conversationID" , conversationID , "seq" , seq )
}
}
}
//for _, seq := range seqs {
// delUsers, err := c.rdb.SMembers(ctx, c.getMessageDelUserListKey(conversationID, seq)).Result()
// if err != nil {
// log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq)
// continue
// }
// if len(delUsers) > 0 {
// pipe := c.rdb.Pipeline()
// var failedFlag bool
// for _, userID := range delUsers {
// err = pipe.SRem(ctx, c.getUserDelList(conversationID, userID), seq).Err()
// if err != nil {
// failedFlag = true
// log.ZWarn(
// ctx,
// "DelUserDeleteMsgsList failed",
// err,
// "conversationID",
// conversationID,
// "seq",
// seq,
// "userID",
// userID,
// )
// }
// }
// if !failedFlag {
// if err := pipe.Del(ctx, c.getMessageDelUserListKey(conversationID, seq)).Err(); err != nil {
// log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq)
// }
// }
// if _, err := pipe.Exec(ctx); err != nil {
// log.ZError(ctx, "pipe exec failed", err, "conversationID", conversationID, "seq", seq)
// }
// }
//}
}
func ( c * msgCache ) DeleteMessages ( ctx context . Context , conversationID string , seqs [ ] int64 ) error {
pipe := c . rdb . Pipeline ( )
for _ , seq := range seqs {
if err := pipe . Del ( ctx , c . getMessageCacheKey ( conversationID , seq ) ) . Err ( ) ; err != nil {
if err := c. rdb . Del ( ctx , c . getMessageCacheKey ( conversationID , seq ) ) . Err ( ) ; err != nil {
return errs . Wrap ( err )
}
}
_ , err := pipe . Exec ( ctx )
return errs . Wrap ( err )
return nil
//pipe := c.rdb.Pipeline()
//for _, seq := range seqs {
// if err := pipe.Del(ctx, c.getMessageCacheKey(conversationID, seq)).Err(); err != nil {
// return errs.Wrap(err)
// }
//}
//_, err := pipe.Exec(ctx)
//return errs.Wrap(err)
}
func ( c * msgCache ) CleanUpOneConversationAllMsg ( ctx context . Context , conversationID string ) error {
vals , err := c . rdb . Keys ( ctx , c . allMessageCacheKey ( conversationID ) ) . Result ( )
if err == redis . Nil {
if err ors. Is ( err , redis . Nil ) {
return nil
}
if err != nil {
return errs . Wrap ( err )
}
pipe := c . rdb . Pipeline ( )
for _ , v := range vals {
if err := pipe . Del ( ctx , v ) . Err ( ) ; err != nil {
if err := c. rdb . Del ( ctx , v ) . Err ( ) ; err != nil {
return errs . Wrap ( err )
}
}
_ , err = pipe . Exec ( ctx )
return errs . Wrap ( err )
return nil
//pipe := c.rdb.Pipeline()
//for _, v := range vals {
// if err := pipe.Del(ctx, v).Err(); err != nil {
// return errs.Wrap(err)
// }
//}
//_, err = pipe.Exec(ctx)
//return errs.Wrap(err)
}
func ( c * msgCache ) DelMsgFromCache ( ctx context . Context , userID string , seqs [ ] int64 ) error {
@ -528,13 +599,15 @@ func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []in
key := c . getMessageCacheKey ( userID , seq )
result , err := c . rdb . Get ( ctx , key ) . Result ( )
if err != nil {
if err == redis . Nil {
if err ors. Is ( err , redis . Nil ) {
continue
}
return errs . Wrap ( err )
}
var msg sdkws . MsgData
if err := jsonpb . UnmarshalString ( result , & msg ) ; err != nil {
err = jsonpb . UnmarshalString ( result , & msg )
if err != nil {
return err
}
msg . Status = constant . MsgDeleted
@ -546,6 +619,7 @@ func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []in
return errs . Wrap ( err )
}
}
return nil
}
@ -571,20 +645,12 @@ func (c *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32
func ( c * msgCache ) GetSendMsgStatus ( ctx context . Context , id string ) ( int32 , error ) {
result , err := c . rdb . Get ( ctx , sendMsgFailedFlag + id ) . Int ( )
return int32 ( result ) , errs . Wrap ( err )
}
func ( c * msgCache ) SetFcmToken (
ctx context . Context ,
account string ,
platformID int ,
fcmToken string ,
expireTime int64 ,
) ( err error ) {
return errs . Wrap (
c . rdb . Set ( ctx , fcmToken + account + ":" + strconv . Itoa ( platformID ) , fcmToken , time . Duration ( expireTime ) * time . Second ) .
Err ( ) ,
)
func ( c * msgCache ) SetFcmToken ( ctx context . Context , account string , platformID int , fcmToken string , expireTime int64 ) ( err error ) {
return errs . Wrap ( c . rdb . Set ( ctx , fcmToken + account + ":" + strconv . Itoa ( platformID ) , fcmToken , time . Duration ( expireTime ) * time . Second ) . Err ( ) )
}
func ( c * msgCache ) GetFcmToken ( ctx context . Context , account string , platformID int ) ( string , error ) {
@ -597,6 +663,7 @@ func (c *msgCache) DelFcmToken(ctx context.Context, account string, platformID i
func ( c * msgCache ) IncrUserBadgeUnreadCountSum ( ctx context . Context , userID string ) ( int , error ) {
seq , err := c . rdb . Incr ( ctx , userBadgeUnreadCountSum + userID ) . Result ( )
return int ( seq ) , errs . Wrap ( err )
}
@ -610,11 +677,13 @@ func (c *msgCache) GetUserBadgeUnreadCountSum(ctx context.Context, userID string
func ( c * msgCache ) LockMessageTypeKey ( ctx context . Context , clientMsgID string , TypeKey string ) error {
key := exTypeKeyLocker + clientMsgID + "_" + TypeKey
return errs . Wrap ( c . rdb . SetNX ( ctx , key , 1 , time . Minute ) . Err ( ) )
}
func ( c * msgCache ) UnLockMessageTypeKey ( ctx context . Context , clientMsgID string , TypeKey string ) error {
key := exTypeKeyLocker + clientMsgID + "_" + TypeKey
return errs . Wrap ( c . rdb . Del ( ctx , key ) . Err ( ) )
}
@ -629,6 +698,7 @@ func (c *msgCache) getMessageReactionExPrefix(clientMsgID string, sessionType in
case constant . NotificationChatType :
return "EX_NOTIFICATION" + clientMsgID
}
return ""
}
@ -637,6 +707,7 @@ func (c *msgCache) JudgeMessageReactionExist(ctx context.Context, clientMsgID st
if err != nil {
return false , utils . Wrap ( err , "" )
}
return n > 0 , nil
}
@ -649,21 +720,11 @@ func (c *msgCache) SetMessageTypeKeyValue(
return errs . Wrap ( c . rdb . HSet ( ctx , c . getMessageReactionExPrefix ( clientMsgID , sessionType ) , typeKey , value ) . Err ( ) )
}
func ( c * msgCache ) SetMessageReactionExpire (
ctx context . Context ,
clientMsgID string ,
sessionType int32 ,
expiration time . Duration ,
) ( bool , error ) {
func ( c * msgCache ) SetMessageReactionExpire ( ctx context . Context , clientMsgID string , sessionType int32 , expiration time . Duration ) ( bool , error ) {
return utils . Wrap2 ( c . rdb . Expire ( ctx , c . getMessageReactionExPrefix ( clientMsgID , sessionType ) , expiration ) . Result ( ) )
}
func ( c * msgCache ) GetMessageTypeKeyValue (
ctx context . Context ,
clientMsgID string ,
sessionType int32 ,
typeKey string ,
) ( string , error ) {
func ( c * msgCache ) GetMessageTypeKeyValue ( ctx context . Context , clientMsgID string , sessionType int32 , typeKey string ) ( string , error ) {
return utils . Wrap2 ( c . rdb . HGet ( ctx , c . getMessageReactionExPrefix ( clientMsgID , sessionType ) , typeKey ) . Result ( ) )
}