|
|
|
@ -16,13 +16,12 @@ package cache
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"errors"
|
|
|
|
|
"strconv"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
|
|
|
|
|
|
|
|
|
|
"github.com/dtm-labs/rockscache"
|
|
|
|
|
|
|
|
|
|
"github.com/OpenIMSDK/tools/errs"
|
|
|
|
|
|
|
|
|
|
"github.com/gogo/protobuf/jsonpb"
|
|
|
|
@ -33,7 +32,6 @@ import (
|
|
|
|
|
"github.com/OpenIMSDK/tools/utils"
|
|
|
|
|
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
|
|
|
|
unrelationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation"
|
|
|
|
|
|
|
|
|
|
"github.com/redis/go-redis/v9"
|
|
|
|
|
)
|
|
|
|
@ -140,10 +138,10 @@ func NewMsgCacheModel(client redis.UniversalClient) MsgModel {
|
|
|
|
|
|
|
|
|
|
type msgCache struct {
|
|
|
|
|
metaCache
|
|
|
|
|
rdb redis.UniversalClient
|
|
|
|
|
expireTime time.Duration
|
|
|
|
|
rcClient *rockscache.Client
|
|
|
|
|
msgDocDatabase unrelationtb.MsgDocModelInterface
|
|
|
|
|
rdb redis.UniversalClient
|
|
|
|
|
// expireTime time.Duration
|
|
|
|
|
// rcClient *rockscache.Client
|
|
|
|
|
// msgDocDatabase unrelationtb.MsgDocModelInterface
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *msgCache) getMaxSeqKey(conversationID string) string {
|
|
|
|
@ -182,18 +180,20 @@ func (c *msgCache) getSeqs(
|
|
|
|
|
) (m map[string]int64, err error) {
|
|
|
|
|
pipe := c.rdb.Pipeline()
|
|
|
|
|
for _, v := range items {
|
|
|
|
|
if err := pipe.Get(ctx, getkey(v)).Err(); err != nil && err != redis.Nil {
|
|
|
|
|
return nil, errs.Wrap(err)
|
|
|
|
|
err2 := pipe.Get(ctx, getkey(v)).Err()
|
|
|
|
|
if err2 != nil && !errors.Is(err2, redis.Nil) {
|
|
|
|
|
return nil, errs.Wrap(err2)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
result, err := pipe.Exec(ctx)
|
|
|
|
|
if err != nil && err != redis.Nil {
|
|
|
|
|
if err != nil && !errors.Is(err, redis.Nil) {
|
|
|
|
|
return nil, errs.Wrap(err)
|
|
|
|
|
}
|
|
|
|
|
m = make(map[string]int64, len(items))
|
|
|
|
|
for i, v := range result {
|
|
|
|
|
seq := v.(*redis.StringCmd)
|
|
|
|
|
if seq.Err() != nil && seq.Err() != redis.Nil {
|
|
|
|
|
|
|
|
|
|
if seq.Err() != nil && !errors.Is(seq.Err(), redis.Nil) {
|
|
|
|
|
return nil, errs.Wrap(v.Err())
|
|
|
|
|
}
|
|
|
|
|
val := utils.StringToInt64(seq.Val())
|
|
|
|
@ -201,6 +201,7 @@ func (c *msgCache) getSeqs(
|
|
|
|
|
m[items[i]] = val
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return m, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -229,6 +230,7 @@ func (c *msgCache) setSeqs(ctx context.Context, seqs map[string]int64, getkey fu
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
_, err := pipe.Exec(ctx)
|
|
|
|
|
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -319,6 +321,7 @@ func (c *msgCache) GetHasReadSeq(ctx context.Context, userID string, conversatio
|
|
|
|
|
|
|
|
|
|
func (c *msgCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error {
|
|
|
|
|
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
|
|
|
|
|
|
|
|
|
|
return errs.Wrap(c.rdb.HSet(ctx, key, token, flag).Err())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -332,6 +335,7 @@ func (c *msgCache) GetTokensWithoutError(ctx context.Context, userID string, pla
|
|
|
|
|
for k, v := range m {
|
|
|
|
|
mm[k] = utils.StringToInt(v)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return mm, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -341,11 +345,13 @@ func (c *msgCache) SetTokenMapByUidPid(ctx context.Context, userID string, platf
|
|
|
|
|
for k, v := range m {
|
|
|
|
|
mm[k] = v
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return errs.Wrap(c.rdb.HSet(ctx, key, mm).Err())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *msgCache) DeleteTokenByUidPid(ctx context.Context, userID string, platform int, fields []string) error {
|
|
|
|
|
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platform)
|
|
|
|
|
|
|
|
|
|
return errs.Wrap(c.rdb.HDel(ctx, key, fields...).Err())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -366,8 +372,9 @@ func (c *msgCache) GetMessagesBySeq(
|
|
|
|
|
for _, v := range seqs {
|
|
|
|
|
// MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1
|
|
|
|
|
key := c.getMessageCacheKey(conversationID, v)
|
|
|
|
|
if err := pipe.Get(ctx, key).Err(); err != nil && err != redis.Nil {
|
|
|
|
|
return nil, nil, err
|
|
|
|
|
err2 := pipe.Get(ctx, key).Err()
|
|
|
|
|
if err2 != nil && errors.Is(err2, redis.Nil) {
|
|
|
|
|
return nil, nil, err2
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
result, err := pipe.Exec(ctx)
|
|
|
|
@ -381,6 +388,7 @@ func (c *msgCache) GetMessagesBySeq(
|
|
|
|
|
if err == nil {
|
|
|
|
|
if msg.Status != constant.MsgDeleted {
|
|
|
|
|
seqMsgs = append(seqMsgs, &msg)
|
|
|
|
|
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
@ -389,6 +397,7 @@ func (c *msgCache) GetMessagesBySeq(
|
|
|
|
|
failedSeqs = append(failedSeqs, seqs[i])
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return seqMsgs, failedSeqs, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -408,6 +417,7 @@ func (c *msgCache) SetMessageToCache(ctx context.Context, conversationID string,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
_, err := pipe.Exec(ctx)
|
|
|
|
|
|
|
|
|
|
return len(failedMsgs), err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -440,6 +450,7 @@ func (c *msgCache) UserDeleteMsgs(ctx context.Context, conversationID string, se
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
_, err := pipe.Exec(ctx)
|
|
|
|
|
|
|
|
|
|
return errs.Wrap(err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -452,6 +463,7 @@ func (c *msgCache) GetUserDelList(ctx context.Context, userID, conversationID st
|
|
|
|
|
for i, v := range result {
|
|
|
|
|
seqs[i] = utils.StringToInt64(v)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return seqs, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -460,6 +472,7 @@ func (c *msgCache) DelUserDeleteMsgsList(ctx context.Context, conversationID str
|
|
|
|
|
delUsers, err := c.rdb.SMembers(ctx, c.getMessageDelUserListKey(conversationID, seq)).Result()
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq)
|
|
|
|
|
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if len(delUsers) > 0 {
|
|
|
|
@ -502,12 +515,13 @@ func (c *msgCache) DeleteMessages(ctx context.Context, conversationID string, se
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
_, err := pipe.Exec(ctx)
|
|
|
|
|
|
|
|
|
|
return errs.Wrap(err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *msgCache) CleanUpOneConversationAllMsg(ctx context.Context, conversationID string) error {
|
|
|
|
|
vals, err := c.rdb.Keys(ctx, c.allMessageCacheKey(conversationID)).Result()
|
|
|
|
|
if err == redis.Nil {
|
|
|
|
|
if errors.Is(err, redis.Nil) {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
@ -515,11 +529,13 @@ func (c *msgCache) CleanUpOneConversationAllMsg(ctx context.Context, conversatio
|
|
|
|
|
}
|
|
|
|
|
pipe := c.rdb.Pipeline()
|
|
|
|
|
for _, v := range vals {
|
|
|
|
|
if err := pipe.Del(ctx, v).Err(); err != nil {
|
|
|
|
|
return errs.Wrap(err)
|
|
|
|
|
err2 := pipe.Del(ctx, v).Err()
|
|
|
|
|
if err2 != nil {
|
|
|
|
|
return errs.Wrap(err2)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
_, err = pipe.Exec(ctx)
|
|
|
|
|
|
|
|
|
|
return errs.Wrap(err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -528,13 +544,15 @@ func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []in
|
|
|
|
|
key := c.getMessageCacheKey(userID, seq)
|
|
|
|
|
result, err := c.rdb.Get(ctx, key).Result()
|
|
|
|
|
if err != nil {
|
|
|
|
|
if err == redis.Nil {
|
|
|
|
|
if errors.Is(err, redis.Nil) {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return errs.Wrap(err)
|
|
|
|
|
}
|
|
|
|
|
var msg sdkws.MsgData
|
|
|
|
|
if err := jsonpb.UnmarshalString(result, &msg); err != nil {
|
|
|
|
|
err = jsonpb.UnmarshalString(result, &msg)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
msg.Status = constant.MsgDeleted
|
|
|
|
@ -546,6 +564,7 @@ func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []in
|
|
|
|
|
return errs.Wrap(err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -571,6 +590,7 @@ func (c *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32
|
|
|
|
|
|
|
|
|
|
func (c *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, error) {
|
|
|
|
|
result, err := c.rdb.Get(ctx, sendMsgFailedFlag+id).Int()
|
|
|
|
|
|
|
|
|
|
return int32(result), errs.Wrap(err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -597,6 +617,7 @@ func (c *msgCache) DelFcmToken(ctx context.Context, account string, platformID i
|
|
|
|
|
|
|
|
|
|
func (c *msgCache) IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) {
|
|
|
|
|
seq, err := c.rdb.Incr(ctx, userBadgeUnreadCountSum+userID).Result()
|
|
|
|
|
|
|
|
|
|
return int(seq), errs.Wrap(err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -610,11 +631,13 @@ func (c *msgCache) GetUserBadgeUnreadCountSum(ctx context.Context, userID string
|
|
|
|
|
|
|
|
|
|
func (c *msgCache) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error {
|
|
|
|
|
key := exTypeKeyLocker + clientMsgID + "_" + TypeKey
|
|
|
|
|
|
|
|
|
|
return errs.Wrap(c.rdb.SetNX(ctx, key, 1, time.Minute).Err())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *msgCache) UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error {
|
|
|
|
|
key := exTypeKeyLocker + clientMsgID + "_" + TypeKey
|
|
|
|
|
|
|
|
|
|
return errs.Wrap(c.rdb.Del(ctx, key).Err())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -629,6 +652,7 @@ func (c *msgCache) getMessageReactionExPrefix(clientMsgID string, sessionType in
|
|
|
|
|
case constant.NotificationChatType:
|
|
|
|
|
return "EX_NOTIFICATION" + clientMsgID
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return ""
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -637,6 +661,7 @@ func (c *msgCache) JudgeMessageReactionExist(ctx context.Context, clientMsgID st
|
|
|
|
|
if err != nil {
|
|
|
|
|
return false, utils.Wrap(err, "")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return n > 0, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|