From 32c2c37efad04f30bde9106e57ac93de7234cb5d Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Sun, 7 Apr 2024 16:27:54 +0800 Subject: [PATCH] feat: msg cache split --- internal/msgtransfer/init.go | 5 +- internal/push/offlinepush/fcm/push.go | 4 +- internal/push/push_rpc_server.go | 2 +- internal/push/push_to_client.go | 2 +- internal/rpc/msg/server.go | 4 +- pkg/common/db/cache/msg.go | 479 +++++++------------------- pkg/common/db/cache/seq.go | 182 ++++++++++ pkg/common/db/cache/third.go | 85 +++++ pkg/common/db/controller/msg.go | 167 +++++---- pkg/common/db/controller/push.go | 2 +- 10 files changed, 483 insertions(+), 449 deletions(-) create mode 100644 pkg/common/db/cache/seq.go create mode 100644 pkg/common/db/cache/third.go diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 9c185b2e1..3ca93bd2a 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -77,12 +77,13 @@ func Start(ctx context.Context, config *config.GlobalConfig, prometheusPort, ind } client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) - msgModel := cache.NewMsgCacheModel(rdb, config.MsgCacheTimeout, &config.Redis) + msgModel := cache.NewMsgCache(rdb, config.MsgCacheTimeout, &config.Redis) + seqModel := cache.NewSeqCache(rdb) msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB()) if err != nil { return err } - msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, &config.Kafka) + msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqModel, &config.Kafka) if err != nil { return err } diff --git a/internal/push/offlinepush/fcm/push.go b/internal/push/offlinepush/fcm/push.go index d2f772724..e4808ab2d 100644 --- a/internal/push/offlinepush/fcm/push.go +++ b/internal/push/offlinepush/fcm/push.go @@ -35,12 +35,12 @@ var Terminal = []int{constant.IOSPlatformID, constant.AndroidPlatformID, constan type Fcm struct { fcmMsgCli *messaging.Client - cache cache.MsgModel + cache cache.ThirdCache } // NewClient initializes a new FCM client using the Firebase Admin SDK. // It requires the FCM service account credentials file located within the project's configuration directory. -func NewClient(pushConf *config.Push, cache cache.MsgModel) (*Fcm, error) { +func NewClient(pushConf *config.Push, cache cache.ThirdCache) (*Fcm, error) { projectRoot, err := config.GetProjectRoot() if err != nil { return nil, err diff --git a/internal/push/push_rpc_server.go b/internal/push/push_rpc_server.go index afb141091..57c1bb4d0 100644 --- a/internal/push/push_rpc_server.go +++ b/internal/push/push_rpc_server.go @@ -40,7 +40,7 @@ func Start(ctx context.Context, config *config.GlobalConfig, client discovery.Sv if err != nil { return err } - cacheModel := cache.NewMsgCacheModel(rdb, config.MsgCacheTimeout, &config.Redis) + cacheModel := cache.NewThirdCache(rdb) offlinePusher, err := NewOfflinePusher(&config.Push, &config.IOSPush, cacheModel) if err != nil { return err diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index 5cf11512f..41675087f 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -78,7 +78,7 @@ func NewPusher(config *config.GlobalConfig, discov discovery.SvcDiscoveryRegistr } } -func NewOfflinePusher(pushConf *config.Push, iOSPushConf *config.IOSPush, cache cache.MsgModel) (offlinepush.OfflinePusher, error) { +func NewOfflinePusher(pushConf *config.Push, iOSPushConf *config.IOSPush, cache cache.ThirdCache) (offlinepush.OfflinePusher, error) { var offlinePusher offlinepush.OfflinePusher switch pushConf.Enable { case "getui": diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 65032c11f..0b6a24644 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -69,11 +69,13 @@ func Start(ctx context.Context, config *config.GlobalConfig, client discovery.Sv if err != nil { return err } + msgModel := cache.NewMsgCache(rdb, config.MsgCacheTimeout, &config.Redis) + seqModel := cache.NewSeqCache(rdb) conversationClient := rpcclient.NewConversationRpcClient(client, config.RpcRegisterName.OpenImConversationName) userRpcClient := rpcclient.NewUserRpcClient(client, config.RpcRegisterName.OpenImUserName, &config.Manager, &config.IMAdmin) groupRpcClient := rpcclient.NewGroupRpcClient(client, config.RpcRegisterName.OpenImGroupName) friendRpcClient := rpcclient.NewFriendRpcClient(client, config.RpcRegisterName.OpenImFriendName) - msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, cacheModel, &config.Kafka) + msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqModel, &config.Kafka) if err != nil { return err } diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index 8857a80a3..56ea2dd8b 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -21,7 +21,6 @@ import ( "time" "github.com/gogo/protobuf/jsonpb" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/sdkws" @@ -52,47 +51,13 @@ const ( var concurrentLimit = 3 -type SeqCache interface { - SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error - GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) - GetMaxSeq(ctx context.Context, conversationID string) (int64, error) - SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error - SetMinSeqs(ctx context.Context, seqs map[string]int64) error - GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) - GetMinSeq(ctx context.Context, conversationID string) (int64, error) - GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) - GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error) - SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error - // seqs map: key userID value minSeq - SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) - // seqs map: key conversationID value minSeq - SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error - // has read seq - SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error - // k: user, v: seq - SetHasReadSeqs(ctx context.Context, conversationID string, hasReadSeqs map[string]int64) error - // k: conversation, v :seq - UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error - GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) - GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) -} - -type thirdCache interface { - SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) - GetFcmToken(ctx context.Context, account string, platformID int) (string, error) - DelFcmToken(ctx context.Context, account string, platformID int) error - IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) - SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error - GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) - SetGetuiToken(ctx context.Context, token string, expireTime int64) error - GetGetuiToken(ctx context.Context) (string, error) - SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error - GetGetuiTaskID(ctx context.Context) (string, error) -} - -type MsgModel interface { - SeqCache - thirdCache +//type MsgModel interface { +// SeqCache +// ThirdCache +// MsgCache +//} + +type MsgCache interface { GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error) SetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) UserDeleteMsgs(ctx context.Context, conversationID string, seqs []int64, userID string) error @@ -113,269 +78,31 @@ type MsgModel interface { UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error } -func NewMsgCacheModel(client redis.UniversalClient, msgCacheTimeout int, redisConf *config.Redis) MsgModel { - return &msgCache{rdb: client, msgCacheTimeout: msgCacheTimeout, redisConf: redisConf} +//func NewMsgCacheModel(client redis.UniversalClient, msgCacheTimeout int, redisConf *config.Redis) MsgModel { +// return &msgCache{rdb: client, msgCacheTimeout: msgCacheTimeout, redisConf: redisConf} +//} + +func NewMsgCache(client redis.UniversalClient, msgCacheTimeout time.Duration, redisEnablePipeline bool) MsgCache { + return &msgCache{rdb: client, msgCacheTimeout: msgCacheTimeout, redisEnablePipeline: redisEnablePipeline} } type msgCache struct { metaCache - rdb redis.UniversalClient - msgCacheTimeout int - redisConf *config.Redis -} - -func (c *msgCache) getMaxSeqKey(conversationID string) string { - return maxSeq + conversationID -} - -func (c *msgCache) getMinSeqKey(conversationID string) string { - return minSeq + conversationID -} - -func (c *msgCache) getHasReadSeqKey(conversationID string, userID string) string { - return hasReadSeq + userID + ":" + conversationID -} - -func (c *msgCache) getConversationUserMinSeqKey(conversationID, userID string) string { - return conversationUserMinSeq + conversationID + "u:" + userID -} - -func (c *msgCache) setSeq(ctx context.Context, conversationID string, seq int64, getkey func(conversationID string) string) error { - return errs.Wrap(c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err()) -} - -func (c *msgCache) getSeq(ctx context.Context, conversationID string, getkey func(conversationID string) string) (int64, error) { - val, err := c.rdb.Get(ctx, getkey(conversationID)).Int64() - if err != nil { - return 0, errs.Wrap(err) - } - return val, nil -} - -func (c *msgCache) getSeqs(ctx context.Context, items []string, getkey func(s string) string) (m map[string]int64, err error) { - m = make(map[string]int64, len(items)) - for i, v := range items { - res, err := c.rdb.Get(ctx, getkey(v)).Result() - if err != nil && err != redis.Nil { - return nil, errs.Wrap(err) - } - val := stringutil.StringToInt64(res) - if val != 0 { - m[items[i]] = val - } - } - - return m, nil -} - -func (c *msgCache) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error { - return c.setSeq(ctx, conversationID, maxSeq, c.getMaxSeqKey) -} - -func (c *msgCache) GetMaxSeqs(ctx context.Context, conversationIDs []string) (m map[string]int64, err error) { - return c.getSeqs(ctx, conversationIDs, c.getMaxSeqKey) -} - -func (c *msgCache) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) { - return c.getSeq(ctx, conversationID, c.getMaxSeqKey) -} - -func (c *msgCache) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error { - return c.setSeq(ctx, conversationID, minSeq, c.getMinSeqKey) -} - -func (c *msgCache) setSeqs(ctx context.Context, seqs map[string]int64, getkey func(key string) string) error { - for conversationID, seq := range seqs { - if err := c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err(); err != nil { - return errs.Wrap(err) - } - } - return nil -} - -func (c *msgCache) SetMinSeqs(ctx context.Context, seqs map[string]int64) error { - return c.setSeqs(ctx, seqs, c.getMinSeqKey) -} - -func (c *msgCache) GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { - return c.getSeqs(ctx, conversationIDs, c.getMinSeqKey) -} - -func (c *msgCache) GetMinSeq(ctx context.Context, conversationID string) (int64, error) { - return c.getSeq(ctx, conversationID, c.getMinSeqKey) -} - -func (c *msgCache) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) { - val, err := c.rdb.Get(ctx, c.getConversationUserMinSeqKey(conversationID, userID)).Int64() - if err != nil { - return 0, errs.Wrap(err) - } - return val, nil -} - -func (c *msgCache) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (m map[string]int64, err error) { - return c.getSeqs(ctx, userIDs, func(userID string) string { - return c.getConversationUserMinSeqKey(conversationID, userID) - }) -} - -func (c *msgCache) SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error { - return errs.Wrap(c.rdb.Set(ctx, c.getConversationUserMinSeqKey(conversationID, userID), minSeq, 0).Err()) -} - -func (c *msgCache) SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) { - return c.setSeqs(ctx, seqs, func(userID string) string { - return c.getConversationUserMinSeqKey(conversationID, userID) - }) -} - -func (c *msgCache) SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error) { - return c.setSeqs(ctx, seqs, func(conversationID string) string { - return c.getConversationUserMinSeqKey(conversationID, userID) - }) -} - -func (c *msgCache) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error { - return errs.Wrap(c.rdb.Set(ctx, c.getHasReadSeqKey(conversationID, userID), hasReadSeq, 0).Err()) -} - -func (c *msgCache) SetHasReadSeqs(ctx context.Context, conversationID string, hasReadSeqs map[string]int64) error { - return c.setSeqs(ctx, hasReadSeqs, func(userID string) string { - return c.getHasReadSeqKey(conversationID, userID) - }) -} - -func (c *msgCache) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error { - return c.setSeqs(ctx, hasReadSeqs, func(conversationID string) string { - return c.getHasReadSeqKey(conversationID, userID) - }) -} - -func (c *msgCache) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) { - return c.getSeqs(ctx, conversationIDs, func(conversationID string) string { - return c.getHasReadSeqKey(conversationID, userID) - }) -} - -func (c *msgCache) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) { - val, err := c.rdb.Get(ctx, c.getHasReadSeqKey(conversationID, userID)).Int64() - if err != nil { - return 0, err - } - return val, nil -} - -func (c *msgCache) getMessageCacheKey(conversationID string, seq int64) string { - return messageCache + conversationID + "_" + strconv.Itoa(int(seq)) + rdb redis.UniversalClient + msgCacheTimeout time.Duration + redisEnablePipeline bool } func (c *msgCache) allMessageCacheKey(conversationID string) string { return messageCache + conversationID + "_*" } -func (c *msgCache) GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) { - if c.redisConf.EnablePipeline { - return c.PipeGetMessagesBySeq(ctx, conversationID, seqs) - } - - return c.ParallelGetMessagesBySeq(ctx, conversationID, seqs) -} - -func (c *msgCache) PipeGetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) { - pipe := c.rdb.Pipeline() - - results := []*redis.StringCmd{} - for _, seq := range seqs { - results = append(results, pipe.Get(ctx, c.getMessageCacheKey(conversationID, seq))) - } - - _, err = pipe.Exec(ctx) - if err != nil && err != redis.Nil { - return seqMsgs, failedSeqs, errs.WrapMsg(err, "pipe.get") - } - - for idx, res := range results { - seq := seqs[idx] - if res.Err() != nil { - log.ZError(ctx, "GetMessagesBySeq failed", err, "conversationID", conversationID, "seq", seq, "err", res.Err()) - failedSeqs = append(failedSeqs, seq) - continue - } - - msg := sdkws.MsgData{} - if err = msgprocessor.String2Pb(res.Val(), &msg); err != nil { - log.ZError(ctx, "GetMessagesBySeq Unmarshal failed", err, "res", res, "conversationID", conversationID, "seq", seq) - failedSeqs = append(failedSeqs, seq) - continue - } - - if msg.Status == constant.MsgDeleted { - failedSeqs = append(failedSeqs, seq) - continue - } - - seqMsgs = append(seqMsgs, &msg) - } - - return -} - -func (c *msgCache) ParallelGetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) { - type entry struct { - err error - msg *sdkws.MsgData - } - - wg := errgroup.Group{} - wg.SetLimit(concurrentLimit) - - results := make([]entry, len(seqs)) // set slice len/cap to length of seqs. - for idx, seq := range seqs { - // closure safe var - idx := idx - seq := seq - - wg.Go(func() error { - res, err := c.rdb.Get(ctx, c.getMessageCacheKey(conversationID, seq)).Result() - if err != nil { - log.ZError(ctx, "GetMessagesBySeq failed", err, "conversationID", conversationID, "seq", seq) - results[idx] = entry{err: err} - return nil - } - - msg := sdkws.MsgData{} - if err = msgprocessor.String2Pb(res, &msg); err != nil { - log.ZError(ctx, "GetMessagesBySeq Unmarshal failed", err, "res", res, "conversationID", conversationID, "seq", seq) - results[idx] = entry{err: err} - return nil - } - - if msg.Status == constant.MsgDeleted { - results[idx] = entry{err: err} - return nil - } - - results[idx] = entry{msg: &msg} - return nil - }) - } - - _ = wg.Wait() - - for idx, res := range results { - if res.err != nil { - failedSeqs = append(failedSeqs, seqs[idx]) - continue - } - - seqMsgs = append(seqMsgs, res.msg) - } - - return +func (c *msgCache) getMessageCacheKey(conversationID string, seq int64) string { + return messageCache + conversationID + "_" + strconv.Itoa(int(seq)) } func (c *msgCache) SetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) { - if c.redisConf.EnablePipeline { + if c.redisEnablePipeline { return c.PipeSetMessageToCache(ctx, conversationID, msgs) } return c.ParallelSetMessageToCache(ctx, conversationID, msgs) @@ -390,7 +117,7 @@ func (c *msgCache) PipeSetMessageToCache(ctx context.Context, conversationID str } key := c.getMessageCacheKey(conversationID, msg.Seq) - _ = pipe.Set(ctx, key, s, time.Duration(c.msgCacheTimeout)*time.Second) + _ = pipe.Set(ctx, key, s, c.msgCacheTimeout) } results, err := pipe.Exec(ctx) @@ -420,7 +147,7 @@ func (c *msgCache) ParallelSetMessageToCache(ctx context.Context, conversationID } key := c.getMessageCacheKey(conversationID, msg.Seq) - if err := c.rdb.Set(ctx, key, s, time.Duration(c.msgCacheTimeout)*time.Second).Err(); err != nil { + if err := c.rdb.Set(ctx, key, s, c.msgCacheTimeout).Err(); err != nil { return errs.Wrap(err) } return nil @@ -455,10 +182,10 @@ func (c *msgCache) UserDeleteMsgs(ctx context.Context, conversationID string, se if err != nil { return errs.Wrap(err) } - if err := c.rdb.Expire(ctx, delUserListKey, time.Duration(c.msgCacheTimeout)*time.Second).Err(); err != nil { + if err := c.rdb.Expire(ctx, delUserListKey, c.msgCacheTimeout).Err(); err != nil { return errs.Wrap(err) } - if err := c.rdb.Expire(ctx, userDelListKey, time.Duration(c.msgCacheTimeout)*time.Second).Err(); err != nil { + if err := c.rdb.Expire(ctx, userDelListKey, c.msgCacheTimeout).Err(); err != nil { return errs.Wrap(err) } } @@ -563,7 +290,7 @@ func (c *msgCache) DelUserDeleteMsgsList(ctx context.Context, conversationID str } func (c *msgCache) DeleteMessages(ctx context.Context, conversationID string, seqs []int64) error { - if c.redisConf.EnablePipeline { + if c.redisEnablePipeline { return c.PipeDeleteMessages(ctx, conversationID, seqs) } @@ -645,7 +372,7 @@ func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []in if err != nil { return errs.Wrap(err) } - if err := c.rdb.Set(ctx, key, s, time.Duration(c.msgCacheTimeout)*time.Second).Err(); err != nil { + if err := c.rdb.Set(ctx, key, s, c.msgCacheTimeout).Err(); err != nil { return errs.Wrap(err) } } @@ -653,30 +380,6 @@ func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []in return nil } -func (c *msgCache) SetGetuiToken(ctx context.Context, token string, expireTime int64) error { - return errs.Wrap(c.rdb.Set(ctx, getuiToken, token, time.Duration(expireTime)*time.Second).Err()) -} - -func (c *msgCache) GetGetuiToken(ctx context.Context) (string, error) { - val, err := c.rdb.Get(ctx, getuiToken).Result() - if err != nil { - return "", errs.Wrap(err) - } - return val, nil -} - -func (c *msgCache) SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error { - return errs.Wrap(c.rdb.Set(ctx, getuiTaskID, taskID, time.Duration(expireTime)*time.Second).Err()) -} - -func (c *msgCache) GetGetuiTaskID(ctx context.Context) (string, error) { - val, err := c.rdb.Get(ctx, getuiTaskID).Result() - if err != nil { - return "", errs.Wrap(err) - } - return val, nil -} - func (c *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error { return errs.Wrap(c.rdb.Set(ctx, sendMsgFailedFlag+id, status, time.Hour*24).Err()) } @@ -687,37 +390,6 @@ func (c *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, erro return int32(result), errs.Wrap(err) } -func (c *msgCache) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) { - return errs.Wrap(c.rdb.Set(ctx, FCM_TOKEN+account+":"+strconv.Itoa(platformID), fcmToken, time.Duration(expireTime)*time.Second).Err()) -} - -func (c *msgCache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) { - val, err := c.rdb.Get(ctx, FCM_TOKEN+account+":"+strconv.Itoa(platformID)).Result() - if err != nil { - return "", errs.Wrap(err) - } - return val, nil -} - -func (c *msgCache) DelFcmToken(ctx context.Context, account string, platformID int) error { - return errs.Wrap(c.rdb.Del(ctx, FCM_TOKEN+account+":"+strconv.Itoa(platformID)).Err()) -} - -func (c *msgCache) IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) { - seq, err := c.rdb.Incr(ctx, userBadgeUnreadCountSum+userID).Result() - - return int(seq), errs.Wrap(err) -} - -func (c *msgCache) SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error { - return errs.Wrap(c.rdb.Set(ctx, userBadgeUnreadCountSum+userID, value, 0).Err()) -} - -func (c *msgCache) GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) { - val, err := c.rdb.Get(ctx, userBadgeUnreadCountSum+userID).Int() - return val, errs.Wrap(err) -} - func (c *msgCache) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error { key := exTypeKeyLocker + clientMsgID + "_" + TypeKey @@ -776,3 +448,104 @@ func (c *msgCache) GetOneMessageAllReactionList(ctx context.Context, clientMsgID func (c *msgCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error { return errs.Wrap(c.rdb.HDel(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err()) } + +func (c *msgCache) GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) { + if c.redisEnablePipeline { + return c.PipeGetMessagesBySeq(ctx, conversationID, seqs) + } + + return c.ParallelGetMessagesBySeq(ctx, conversationID, seqs) +} + +func (c *msgCache) PipeGetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) { + pipe := c.rdb.Pipeline() + + results := []*redis.StringCmd{} + for _, seq := range seqs { + results = append(results, pipe.Get(ctx, c.getMessageCacheKey(conversationID, seq))) + } + + _, err = pipe.Exec(ctx) + if err != nil && err != redis.Nil { + return seqMsgs, failedSeqs, errs.WrapMsg(err, "pipe.get") + } + + for idx, res := range results { + seq := seqs[idx] + if res.Err() != nil { + log.ZError(ctx, "GetMessagesBySeq failed", err, "conversationID", conversationID, "seq", seq, "err", res.Err()) + failedSeqs = append(failedSeqs, seq) + continue + } + + msg := sdkws.MsgData{} + if err = msgprocessor.String2Pb(res.Val(), &msg); err != nil { + log.ZError(ctx, "GetMessagesBySeq Unmarshal failed", err, "res", res, "conversationID", conversationID, "seq", seq) + failedSeqs = append(failedSeqs, seq) + continue + } + + if msg.Status == constant.MsgDeleted { + failedSeqs = append(failedSeqs, seq) + continue + } + + seqMsgs = append(seqMsgs, &msg) + } + + return +} + +func (c *msgCache) ParallelGetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) { + type entry struct { + err error + msg *sdkws.MsgData + } + + wg := errgroup.Group{} + wg.SetLimit(concurrentLimit) + + results := make([]entry, len(seqs)) // set slice len/cap to length of seqs. + for idx, seq := range seqs { + // closure safe var + idx := idx + seq := seq + + wg.Go(func() error { + res, err := c.rdb.Get(ctx, c.getMessageCacheKey(conversationID, seq)).Result() + if err != nil { + log.ZError(ctx, "GetMessagesBySeq failed", err, "conversationID", conversationID, "seq", seq) + results[idx] = entry{err: err} + return nil + } + + msg := sdkws.MsgData{} + if err = msgprocessor.String2Pb(res, &msg); err != nil { + log.ZError(ctx, "GetMessagesBySeq Unmarshal failed", err, "res", res, "conversationID", conversationID, "seq", seq) + results[idx] = entry{err: err} + return nil + } + + if msg.Status == constant.MsgDeleted { + results[idx] = entry{err: err} + return nil + } + + results[idx] = entry{msg: &msg} + return nil + }) + } + + _ = wg.Wait() + + for idx, res := range results { + if res.err != nil { + failedSeqs = append(failedSeqs, seqs[idx]) + continue + } + + seqMsgs = append(seqMsgs, res.msg) + } + + return +} diff --git a/pkg/common/db/cache/seq.go b/pkg/common/db/cache/seq.go new file mode 100644 index 000000000..6fbb09183 --- /dev/null +++ b/pkg/common/db/cache/seq.go @@ -0,0 +1,182 @@ +package cache + +import ( + "context" + "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/utils/stringutil" + "github.com/redis/go-redis/v9" +) + +type SeqCache interface { + SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error + GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) + GetMaxSeq(ctx context.Context, conversationID string) (int64, error) + SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error + SetMinSeqs(ctx context.Context, seqs map[string]int64) error + GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) + GetMinSeq(ctx context.Context, conversationID string) (int64, error) + GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) + GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error) + SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error + // seqs map: key userID value minSeq + SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) + // seqs map: key conversationID value minSeq + SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error + // has read seq + SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error + // k: user, v: seq + SetHasReadSeqs(ctx context.Context, conversationID string, hasReadSeqs map[string]int64) error + // k: conversation, v :seq + UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error + GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) + GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) +} + +func NewSeqCache(rdb redis.UniversalClient) SeqCache { + return &seqCache{rdb: rdb} +} + +type seqCache struct { + rdb redis.UniversalClient +} + +func (c *seqCache) getMaxSeqKey(conversationID string) string { + return maxSeq + conversationID +} + +func (c *seqCache) getMinSeqKey(conversationID string) string { + return minSeq + conversationID +} + +func (c *seqCache) getHasReadSeqKey(conversationID string, userID string) string { + return hasReadSeq + userID + ":" + conversationID +} + +func (c *seqCache) getConversationUserMinSeqKey(conversationID, userID string) string { + return conversationUserMinSeq + conversationID + "u:" + userID +} + +func (c *seqCache) setSeq(ctx context.Context, conversationID string, seq int64, getkey func(conversationID string) string) error { + return errs.Wrap(c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err()) +} + +func (c *seqCache) getSeq(ctx context.Context, conversationID string, getkey func(conversationID string) string) (int64, error) { + val, err := c.rdb.Get(ctx, getkey(conversationID)).Int64() + if err != nil { + return 0, errs.Wrap(err) + } + return val, nil +} + +func (c *seqCache) getSeqs(ctx context.Context, items []string, getkey func(s string) string) (m map[string]int64, err error) { + m = make(map[string]int64, len(items)) + for i, v := range items { + res, err := c.rdb.Get(ctx, getkey(v)).Result() + if err != nil && err != redis.Nil { + return nil, errs.Wrap(err) + } + val := stringutil.StringToInt64(res) + if val != 0 { + m[items[i]] = val + } + } + + return m, nil +} + +func (c *seqCache) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error { + return c.setSeq(ctx, conversationID, maxSeq, c.getMaxSeqKey) +} + +func (c *seqCache) GetMaxSeqs(ctx context.Context, conversationIDs []string) (m map[string]int64, err error) { + return c.getSeqs(ctx, conversationIDs, c.getMaxSeqKey) +} + +func (c *seqCache) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) { + return c.getSeq(ctx, conversationID, c.getMaxSeqKey) +} + +func (c *seqCache) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error { + return c.setSeq(ctx, conversationID, minSeq, c.getMinSeqKey) +} + +func (c *seqCache) setSeqs(ctx context.Context, seqs map[string]int64, getkey func(key string) string) error { + for conversationID, seq := range seqs { + if err := c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err(); err != nil { + return errs.Wrap(err) + } + } + return nil +} + +func (c *seqCache) SetMinSeqs(ctx context.Context, seqs map[string]int64) error { + return c.setSeqs(ctx, seqs, c.getMinSeqKey) +} + +func (c *seqCache) GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { + return c.getSeqs(ctx, conversationIDs, c.getMinSeqKey) +} + +func (c *seqCache) GetMinSeq(ctx context.Context, conversationID string) (int64, error) { + return c.getSeq(ctx, conversationID, c.getMinSeqKey) +} + +func (c *seqCache) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) { + val, err := c.rdb.Get(ctx, c.getConversationUserMinSeqKey(conversationID, userID)).Int64() + if err != nil { + return 0, errs.Wrap(err) + } + return val, nil +} + +func (c *seqCache) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (m map[string]int64, err error) { + return c.getSeqs(ctx, userIDs, func(userID string) string { + return c.getConversationUserMinSeqKey(conversationID, userID) + }) +} + +func (c *seqCache) SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error { + return errs.Wrap(c.rdb.Set(ctx, c.getConversationUserMinSeqKey(conversationID, userID), minSeq, 0).Err()) +} + +func (c *seqCache) SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) { + return c.setSeqs(ctx, seqs, func(userID string) string { + return c.getConversationUserMinSeqKey(conversationID, userID) + }) +} + +func (c *seqCache) SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error) { + return c.setSeqs(ctx, seqs, func(conversationID string) string { + return c.getConversationUserMinSeqKey(conversationID, userID) + }) +} + +func (c *seqCache) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error { + return errs.Wrap(c.rdb.Set(ctx, c.getHasReadSeqKey(conversationID, userID), hasReadSeq, 0).Err()) +} + +func (c *seqCache) SetHasReadSeqs(ctx context.Context, conversationID string, hasReadSeqs map[string]int64) error { + return c.setSeqs(ctx, hasReadSeqs, func(userID string) string { + return c.getHasReadSeqKey(conversationID, userID) + }) +} + +func (c *seqCache) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error { + return c.setSeqs(ctx, hasReadSeqs, func(conversationID string) string { + return c.getHasReadSeqKey(conversationID, userID) + }) +} + +func (c *seqCache) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) { + return c.getSeqs(ctx, conversationIDs, func(conversationID string) string { + return c.getHasReadSeqKey(conversationID, userID) + }) +} + +func (c *seqCache) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) { + val, err := c.rdb.Get(ctx, c.getHasReadSeqKey(conversationID, userID)).Int64() + if err != nil { + return 0, err + } + return val, nil +} diff --git a/pkg/common/db/cache/third.go b/pkg/common/db/cache/third.go new file mode 100644 index 000000000..d2900a32d --- /dev/null +++ b/pkg/common/db/cache/third.go @@ -0,0 +1,85 @@ +package cache + +import ( + "context" + "github.com/openimsdk/tools/errs" + "github.com/redis/go-redis/v9" + "strconv" + "time" +) + +type ThirdCache interface { + SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) + GetFcmToken(ctx context.Context, account string, platformID int) (string, error) + DelFcmToken(ctx context.Context, account string, platformID int) error + IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) + SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error + GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) + SetGetuiToken(ctx context.Context, token string, expireTime int64) error + GetGetuiToken(ctx context.Context) (string, error) + SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error + GetGetuiTaskID(ctx context.Context) (string, error) +} + +func NewThirdCache(rdb redis.UniversalClient) ThirdCache { + return &thirdCache{rdb: rdb} +} + +type thirdCache struct { + rdb redis.UniversalClient +} + +func (c *thirdCache) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) { + return errs.Wrap(c.rdb.Set(ctx, FCM_TOKEN+account+":"+strconv.Itoa(platformID), fcmToken, time.Duration(expireTime)*time.Second).Err()) +} + +func (c *thirdCache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) { + val, err := c.rdb.Get(ctx, FCM_TOKEN+account+":"+strconv.Itoa(platformID)).Result() + if err != nil { + return "", errs.Wrap(err) + } + return val, nil +} + +func (c *thirdCache) DelFcmToken(ctx context.Context, account string, platformID int) error { + return errs.Wrap(c.rdb.Del(ctx, FCM_TOKEN+account+":"+strconv.Itoa(platformID)).Err()) +} + +func (c *thirdCache) IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) { + seq, err := c.rdb.Incr(ctx, userBadgeUnreadCountSum+userID).Result() + + return int(seq), errs.Wrap(err) +} + +func (c *thirdCache) SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error { + return errs.Wrap(c.rdb.Set(ctx, userBadgeUnreadCountSum+userID, value, 0).Err()) +} + +func (c *thirdCache) GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) { + val, err := c.rdb.Get(ctx, userBadgeUnreadCountSum+userID).Int() + return val, errs.Wrap(err) +} + +func (c *thirdCache) SetGetuiToken(ctx context.Context, token string, expireTime int64) error { + return errs.Wrap(c.rdb.Set(ctx, getuiToken, token, time.Duration(expireTime)*time.Second).Err()) +} + +func (c *thirdCache) GetGetuiToken(ctx context.Context) (string, error) { + val, err := c.rdb.Get(ctx, getuiToken).Result() + if err != nil { + return "", errs.Wrap(err) + } + return val, nil +} + +func (c *thirdCache) SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error { + return errs.Wrap(c.rdb.Set(ctx, getuiTaskID, taskID, time.Duration(expireTime)*time.Second).Err()) +} + +func (c *thirdCache) GetGetuiTaskID(ctx context.Context) (string, error) { + val, err := c.rdb.Get(ctx, getuiTaskID).Result() + if err != nil { + return "", errs.Wrap(err) + } + return val, nil +} diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index e680cd491..6a4e70ea6 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -105,7 +105,7 @@ type CommonMsgDatabase interface { ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) } -func NewCommonMsgDatabase(msgDocModel relation.MsgDocModelInterface, cacheModel cache.MsgModel, kafkaConf *config.Kafka) (CommonMsgDatabase, error) { +func NewCommonMsgDatabase(msgDocModel relation.MsgDocModelInterface, msg cache.MsgCache, seq cache.SeqCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) { conf, err := kafka.BuildProducerConfig(kafkaConf.Config) if err != nil { return nil, err @@ -124,7 +124,8 @@ func NewCommonMsgDatabase(msgDocModel relation.MsgDocModelInterface, cacheModel } return &commonMsgDatabase{ msgDocDatabase: msgDocModel, - cache: cacheModel, + msg: msg, + seq: seq, producer: producerToRedis, producerToMongo: producerToMongo, producerToPush: producerToPush, @@ -132,18 +133,20 @@ func NewCommonMsgDatabase(msgDocModel relation.MsgDocModelInterface, cacheModel } func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database, config *config.GlobalConfig) (CommonMsgDatabase, error) { - cacheModel := cache.NewMsgCacheModel(rdb, config.MsgCacheTimeout, &config.Redis) msgDocModel, err := mgo.NewMsgMongo(database) if err != nil { return nil, err } - return NewCommonMsgDatabase(msgDocModel, cacheModel, &config.Kafka) + msg := cache.NewMsgCache(rdb, config.MsgCacheTimeout, config.Redis.EnablePipeline) + seq := cache.NewSeqCache(rdb) + return NewCommonMsgDatabase(msgDocModel, msg, seq, config.Kafka) } type commonMsgDatabase struct { msgDocDatabase relation.MsgDocModelInterface - msg relation.MsgDocModel - cache cache.MsgModel + msgTable relation.MsgDocModel + msg cache.MsgCache + seq cache.SeqCache producer *kafka.Producer producerToMongo *kafka.Producer producerToModify *kafka.Producer @@ -184,7 +187,7 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI if len(fields) == 0 { return nil } - num := db.msg.GetSingleGocMsgNum() + num := db.msgTable.GetSingleGocMsgNum() // num = 100 for i, field := range fields { // Check the type of the field var ok bool @@ -210,8 +213,8 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI res *mongo.UpdateResult err error ) - docID := db.msg.GetDocID(conversationID, seq) - index := db.msg.GetMsgIndex(seq) + docID := db.msgTable.GetDocID(conversationID, seq) + index := db.msgTable.GetMsgIndex(seq) field := fields[i] switch key { case updateKeyMsg: @@ -237,23 +240,23 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI } } doc := relation.MsgDocModel{ - DocID: db.msg.GetDocID(conversationID, seq), + DocID: db.msgTable.GetDocID(conversationID, seq), Msg: make([]*relation.MsgInfoModel, num), } var insert int // Inserted data number for j := i; j < len(fields); j++ { seq = firstSeq + int64(j) - if db.msg.GetDocID(conversationID, seq) != doc.DocID { + if db.msgTable.GetDocID(conversationID, seq) != doc.DocID { break } insert++ switch key { case updateKeyMsg: - doc.Msg[db.msg.GetMsgIndex(seq)] = &relation.MsgInfoModel{ + doc.Msg[db.msgTable.GetMsgIndex(seq)] = &relation.MsgInfoModel{ Msg: fields[j].(*relation.MsgDataModel), } case updateKeyRevoke: - doc.Msg[db.msg.GetMsgIndex(seq)] = &relation.MsgInfoModel{ + doc.Msg[db.msgTable.GetMsgIndex(seq)] = &relation.MsgInfoModel{ Revoke: fields[j].(*relation.RevokeModel), } } @@ -332,10 +335,10 @@ func (db *commonMsgDatabase) RevokeMsg(ctx context.Context, conversationID strin } func (db *commonMsgDatabase) MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, totalSeqs []int64) error { - for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, totalSeqs) { + for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, totalSeqs) { var indexes []int64 for _, seq := range seqs { - indexes = append(indexes, db.msg.GetMsgIndex(seq)) + indexes = append(indexes, db.msgTable.GetMsgIndex(seq)) } log.ZDebug(ctx, "MarkSingleChatMsgsAsRead", "userID", userID, "docID", docID, "indexes", indexes) if err := db.msgDocDatabase.MarkSingleChatMsgsAsRead(ctx, userID, docID, indexes); err != nil { @@ -347,22 +350,22 @@ func (db *commonMsgDatabase) MarkSingleChatMsgsAsRead(ctx context.Context, userI } func (db *commonMsgDatabase) DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error { - return db.cache.DeleteMessages(ctx, conversationID, seqs) + return db.msg.DeleteMessages(ctx, conversationID, seqs) } func (db *commonMsgDatabase) DelUserDeleteMsgsList(ctx context.Context, conversationID string, seqs []int64) { - db.cache.DelUserDeleteMsgsList(ctx, conversationID, seqs) + db.msg.DelUserDeleteMsgsList(ctx, conversationID, seqs) } func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) { - currentMaxSeq, err := db.cache.GetMaxSeq(ctx, conversationID) + currentMaxSeq, err := db.seq.GetMaxSeq(ctx, conversationID) if err != nil && errs.Unwrap(err) != redis.Nil { - log.ZError(ctx, "db.cache.GetMaxSeq", err) + log.ZError(ctx, "db.seq.GetMaxSeq", err) return 0, false, err } lenList := len(msgs) - if int64(lenList) > db.msg.GetSingleGocMsgNum() { - return 0, false, errs.New("message count exceeds limit", "limit", db.msg.GetSingleGocMsgNum()).Wrap() + if int64(lenList) > db.msgTable.GetSingleGocMsgNum() { + return 0, false, errs.New("message count exceeds limit", "limit", db.msgTable.GetSingleGocMsgNum()).Wrap() } if lenList < 1 { return 0, false, errs.New("no messages to insert", "minCount", 1).Wrap() @@ -378,7 +381,7 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa userSeqMap[m.SendID] = m.Seq } - failedNum, err := db.cache.SetMessageToCache(ctx, conversationID, msgs) + failedNum, err := db.msg.SetMessageToCache(ctx, conversationID, msgs) if err != nil { prommetrics.MsgInsertRedisFailedCounter.Add(float64(failedNum)) log.ZError(ctx, "setMessageToCache error", err, "len", len(msgs), "conversationID", conversationID) @@ -386,13 +389,13 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa prommetrics.MsgInsertRedisSuccessCounter.Inc() } - err = db.cache.SetMaxSeq(ctx, conversationID, currentMaxSeq) + err = db.seq.SetMaxSeq(ctx, conversationID, currentMaxSeq) if err != nil { - log.ZError(ctx, "db.cache.SetMaxSeq error", err, "conversationID", conversationID) + log.ZError(ctx, "db.seq.SetMaxSeq error", err, "conversationID", conversationID) prommetrics.SeqSetFailedCounter.Inc() } - err = db.cache.SetHasReadSeqs(ctx, conversationID, userSeqMap) + err = db.seq.SetHasReadSeqs(ctx, conversationID, userSeqMap) if err != nil { log.ZError(ctx, "SetHasReadSeqs error", err, "userSeqMap", userSeqMap, "conversationID", conversationID) prommetrics.SeqSetFailedCounter.Inc() @@ -401,7 +404,7 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa } func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) { - for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, seqs) { + for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, seqs) { // log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs) msgs, err := db.findMsgInfoBySeq(ctx, userID, docID, conversationID, seqs) if err != nil { @@ -441,7 +444,7 @@ func (db *commonMsgDatabase) handlerDBMsg(ctx context.Context, cache map[int64][ msgs = v } else { if quoteMsg.QuoteMessage.Seq > 0 { - ms, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, userID, db.msg.GetDocID(conversationID, quoteMsg.QuoteMessage.Seq), []int64{quoteMsg.QuoteMessage.Seq}) + ms, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, userID, db.msgTable.GetDocID(conversationID, quoteMsg.QuoteMessage.Seq), []int64{quoteMsg.QuoteMessage.Seq}) if err != nil { log.ZError(ctx, "GetMsgBySeqIndexIn1Doc", err, "conversationID", conversationID, "seq", quoteMsg.QuoteMessage.Seq) return @@ -465,7 +468,7 @@ func (db *commonMsgDatabase) handlerDBMsg(ctx context.Context, cache map[int64][ return } msg.Msg.Content = string(data) - if _, err := db.msgDocDatabase.UpdateMsg(ctx, db.msg.GetDocID(conversationID, msg.Msg.Seq), db.msg.GetMsgIndex(msg.Msg.Seq), "msg", msg.Msg); err != nil { + if _, err := db.msgDocDatabase.UpdateMsg(ctx, db.msgTable.GetDocID(conversationID, msg.Msg.Seq), db.msgTable.GetMsgIndex(msg.Msg.Seq), "msg", msg.Msg); err != nil { log.ZError(ctx, "UpdateMsgContent", err) } } @@ -484,7 +487,7 @@ func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID string, conversationID string, allSeqs []int64, begin, end int64) (seqMsgs []*sdkws.MsgData, err error) { log.ZDebug(ctx, "getMsgBySeqsRange", "conversationID", conversationID, "allSeqs", allSeqs, "begin", begin, "end", end) - for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, allSeqs) { + for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, allSeqs) { log.ZDebug(ctx, "getMsgBySeqsRange", "docID", docID, "seqs", seqs) msgs, err := db.findMsgInfoBySeq(ctx, userID, docID, conversationID, seqs) if err != nil { @@ -520,11 +523,11 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID strin // "userMinSeq" can be set as the same value as the conversation's "maxSeq" at the moment they join the group. // This ensures that their message retrieval starts from the point they joined. func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num, userMaxSeq int64) (int64, int64, []*sdkws.MsgData, error) { - userMinSeq, err := db.cache.GetConversationUserMinSeq(ctx, conversationID, userID) + userMinSeq, err := db.seq.GetConversationUserMinSeq(ctx, conversationID, userID) if err != nil && errs.Unwrap(err) != redis.Nil { return 0, 0, nil, err } - minSeq, err := db.cache.GetMinSeq(ctx, conversationID) + minSeq, err := db.seq.GetMinSeq(ctx, conversationID) if err != nil && errs.Unwrap(err) != redis.Nil { return 0, 0, nil, err } @@ -536,7 +539,7 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin log.ZWarn(ctx, "minSeq > end", errs.New("minSeq>end"), "minSeq", minSeq, "end", end) return 0, 0, nil, nil } - maxSeq, err := db.cache.GetMaxSeq(ctx, conversationID) + maxSeq, err := db.seq.GetMaxSeq(ctx, conversationID) if err != nil && errs.Unwrap(err) != redis.Nil { return 0, 0, nil, err } @@ -569,25 +572,13 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin } } - // 167 178 10 - // if end-num < { - // - //} - // var seqs []int64 - // for i := end; i > end-num; i-- { - // if i >= begin { - // seqs = append([]int64{i}, seqs...) - // } else { - // break - // } - //} if len(seqs) == 0 { return 0, 0, nil, nil } newBegin := seqs[0] newEnd := seqs[len(seqs)-1] log.ZDebug(ctx, "GetMsgBySeqsRange", "first seqs", seqs, "newBegin", newBegin, "newEnd", newEnd) - cachedMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, conversationID, seqs) + cachedMsgs, failedSeqs, err := db.msg.GetMessagesBySeq(ctx, conversationID, seqs) if err != nil { if err != redis.Nil { @@ -596,7 +587,7 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin } var successMsgs []*sdkws.MsgData if len(cachedMsgs) > 0 { - delSeqs, err := db.cache.GetUserDelList(ctx, userID, conversationID) + delSeqs, err := db.msg.GetUserDelList(ctx, userID, conversationID) if err != nil && errs.Unwrap(err) != redis.Nil { return 0, 0, nil, err } @@ -624,7 +615,7 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin } if len(reGetSeqsCache) > 0 { log.ZDebug(ctx, "reGetSeqsCache", "reGetSeqsCache", reGetSeqsCache) - cachedMsgs, failedSeqs2, err := db.cache.GetMessagesBySeq(ctx, conversationID, reGetSeqsCache) + cachedMsgs, failedSeqs2, err := db.msg.GetMessagesBySeq(ctx, conversationID, reGetSeqsCache) if err != nil { if err != redis.Nil { @@ -654,15 +645,15 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin } func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (int64, int64, []*sdkws.MsgData, error) { - userMinSeq, err := db.cache.GetConversationUserMinSeq(ctx, conversationID, userID) + userMinSeq, err := db.seq.GetConversationUserMinSeq(ctx, conversationID, userID) if err != nil && errs.Unwrap(err) != redis.Nil { return 0, 0, nil, err } - minSeq, err := db.cache.GetMinSeq(ctx, conversationID) + minSeq, err := db.seq.GetMinSeq(ctx, conversationID) if err != nil && errs.Unwrap(err) != redis.Nil { return 0, 0, nil, err } - maxSeq, err := db.cache.GetMaxSeq(ctx, conversationID) + maxSeq, err := db.seq.GetMaxSeq(ctx, conversationID) if err != nil && errs.Unwrap(err) != redis.Nil { return 0, 0, nil, err } @@ -675,13 +666,13 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co newSeqs = append(newSeqs, seq) } } - successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, conversationID, newSeqs) + successMsgs, failedSeqs, err := db.msg.GetMessagesBySeq(ctx, conversationID, newSeqs) if err != nil { if err != redis.Nil { log.ZError(ctx, "get message from redis exception", err, "failedSeqs", failedSeqs, "conversationID", conversationID) } } - log.ZDebug(ctx, "db.cache.GetMessagesBySeq", "userID", userID, "conversationID", conversationID, "seqs", + log.ZDebug(ctx, "db.seq.GetMessagesBySeq", "userID", userID, "conversationID", conversationID, "seqs", seqs, "len(successMsgs)", len(successMsgs), "failedSeqs", failedSeqs) if len(failedSeqs) > 0 { @@ -708,12 +699,12 @@ func (db *commonMsgDatabase) DeleteConversationMsgsAndSetMinSeq(ctx context.Cont return nil } if remainTime == 0 { - err = db.cache.CleanUpOneConversationAllMsg(ctx, conversationID) + err = db.msg.CleanUpOneConversationAllMsg(ctx, conversationID) if err != nil { log.ZWarn(ctx, "CleanUpOneUserAllMsg", err, "conversationID", conversationID) } } - return db.cache.SetMinSeq(ctx, conversationID, minSeq) + return db.seq.SetMinSeq(ctx, conversationID, minSeq) } func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string, conversationID string, destructTime int64, lastMsgDestructTime time.Time) (seqs []int64, err error) { @@ -758,12 +749,12 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string log.ZDebug(ctx, "UserMsgsDestruct", "conversationID", conversationID, "userID", userID, "seqs", seqs) if len(seqs) > 0 { userMinSeq := seqs[len(seqs)-1] + 1 - currentUserMinSeq, err := db.cache.GetConversationUserMinSeq(ctx, conversationID, userID) + currentUserMinSeq, err := db.seq.GetConversationUserMinSeq(ctx, conversationID, userID) if err != nil && errs.Unwrap(err) != redis.Nil { return nil, err } if currentUserMinSeq < userMinSeq { - if err := db.cache.SetConversationUserMinSeq(ctx, conversationID, userID, userMinSeq); err != nil { + if err := db.seq.SetConversationUserMinSeq(ctx, conversationID, userID, userMinSeq); err != nil { return nil, err } } @@ -804,7 +795,7 @@ func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversatio return delStruct.getSetMinSeq() + 1, nil } log.ZDebug(ctx, "doc info", "conversationID", conversationID, "index", index, "docID", msgDocModel.DocID, "len", len(msgDocModel.Msg)) - if int64(len(msgDocModel.Msg)) > db.msg.GetSingleGocMsgNum() { + if int64(len(msgDocModel.Msg)) > db.msgTable.GetSingleGocMsgNum() { log.ZWarn(ctx, "msgs too large", nil, "lenth", len(msgDocModel.Msg), "docID:", msgDocModel.DocID) } if msgDocModel.IsFull() && msgDocModel.Msg[len(msgDocModel.Msg)-1].Msg.SendTime+(remainTime*1000) < timeutil.GetCurrentTimestampByMill() { @@ -832,13 +823,13 @@ func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversatio } func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conversationID string, allSeqs []int64) error { - if err := db.cache.DeleteMessages(ctx, conversationID, allSeqs); err != nil { + if err := db.msg.DeleteMessages(ctx, conversationID, allSeqs); err != nil { return err } - for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, allSeqs) { + for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, allSeqs) { var indexes []int for _, seq := range seqs { - indexes = append(indexes, int(db.msg.GetMsgIndex(seq))) + indexes = append(indexes, int(db.msgTable.GetMsgIndex(seq))) } if err := db.msgDocDatabase.DeleteMsgsInOneDocByIndex(ctx, docID, indexes); err != nil { return err @@ -848,7 +839,7 @@ func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conve } func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error { - cachedMsgs, _, err := db.cache.GetMessagesBySeq(ctx, conversationID, seqs) + cachedMsgs, _, err := db.msg.GetMessagesBySeq(ctx, conversationID, seqs) if err != nil && errs.Unwrap(err) != redis.Nil { log.ZWarn(ctx, "DeleteUserMsgsBySeqs", err, "conversationID", conversationID, "seqs", seqs) return err @@ -858,14 +849,14 @@ func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID st for _, msg := range cachedMsgs { cacheSeqs = append(cacheSeqs, msg.Seq) } - if err := db.cache.UserDeleteMsgs(ctx, conversationID, cacheSeqs, userID); err != nil { + if err := db.msg.UserDeleteMsgs(ctx, conversationID, cacheSeqs, userID); err != nil { return err } } - for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, seqs) { + for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, seqs) { for _, seq := range seqs { - if _, err := db.msgDocDatabase.PushUnique(ctx, docID, db.msg.GetMsgIndex(seq), "del_list", []string{userID}); err != nil { + if _, err := db.msgDocDatabase.PushUnique(ctx, docID, db.msgTable.GetMsgIndex(seq), "del_list", []string{userID}); err != nil { return err } } @@ -879,7 +870,7 @@ func (db *commonMsgDatabase) DeleteMsgsBySeqs(ctx context.Context, conversationI func (db *commonMsgDatabase) CleanUpUserConversationsMsgs(ctx context.Context, user string, conversationIDs []string) { for _, conversationID := range conversationIDs { - maxSeq, err := db.cache.GetMaxSeq(ctx, conversationID) + maxSeq, err := db.seq.GetMaxSeq(ctx, conversationID) if err != nil { if err == redis.Nil { log.ZDebug(ctx, "max seq is nil", "conversationID", conversationID) @@ -888,82 +879,82 @@ func (db *commonMsgDatabase) CleanUpUserConversationsMsgs(ctx context.Context, u } continue } - if err := db.cache.SetMinSeq(ctx, conversationID, maxSeq+1); err != nil { + if err := db.seq.SetMinSeq(ctx, conversationID, maxSeq+1); err != nil { log.ZError(ctx, "set min seq failed", err, "conversationID", conversationID, "minSeq", maxSeq+1) } } } func (db *commonMsgDatabase) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error { - return db.cache.SetMaxSeq(ctx, conversationID, maxSeq) + return db.seq.SetMaxSeq(ctx, conversationID, maxSeq) } func (db *commonMsgDatabase) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { - return db.cache.GetMaxSeqs(ctx, conversationIDs) + return db.seq.GetMaxSeqs(ctx, conversationIDs) } func (db *commonMsgDatabase) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) { - return db.cache.GetMaxSeq(ctx, conversationID) + return db.seq.GetMaxSeq(ctx, conversationID) } func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error { - return db.cache.SetMinSeq(ctx, conversationID, minSeq) + return db.seq.SetMinSeq(ctx, conversationID, minSeq) } func (db *commonMsgDatabase) SetMinSeqs(ctx context.Context, seqs map[string]int64) error { - return db.cache.SetMinSeqs(ctx, seqs) + return db.seq.SetMinSeqs(ctx, seqs) } func (db *commonMsgDatabase) GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { - return db.cache.GetMinSeqs(ctx, conversationIDs) + return db.seq.GetMinSeqs(ctx, conversationIDs) } func (db *commonMsgDatabase) GetMinSeq(ctx context.Context, conversationID string) (int64, error) { - return db.cache.GetMinSeq(ctx, conversationID) + return db.seq.GetMinSeq(ctx, conversationID) } func (db *commonMsgDatabase) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) { - return db.cache.GetConversationUserMinSeq(ctx, conversationID, userID) + return db.seq.GetConversationUserMinSeq(ctx, conversationID, userID) } func (db *commonMsgDatabase) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error) { - return db.cache.GetConversationUserMinSeqs(ctx, conversationID, userIDs) + return db.seq.GetConversationUserMinSeqs(ctx, conversationID, userIDs) } func (db *commonMsgDatabase) SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error { - return db.cache.SetConversationUserMinSeq(ctx, conversationID, userID, minSeq) + return db.seq.SetConversationUserMinSeq(ctx, conversationID, userID, minSeq) } func (db *commonMsgDatabase) SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) { - return db.cache.SetConversationUserMinSeqs(ctx, conversationID, seqs) + return db.seq.SetConversationUserMinSeqs(ctx, conversationID, seqs) } func (db *commonMsgDatabase) SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error { - return db.cache.SetUserConversationsMinSeqs(ctx, userID, seqs) + return db.seq.SetUserConversationsMinSeqs(ctx, userID, seqs) } func (db *commonMsgDatabase) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error { - return db.cache.UserSetHasReadSeqs(ctx, userID, hasReadSeqs) + return db.seq.UserSetHasReadSeqs(ctx, userID, hasReadSeqs) } func (db *commonMsgDatabase) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error { - return db.cache.SetHasReadSeq(ctx, userID, conversationID, hasReadSeq) + return db.seq.SetHasReadSeq(ctx, userID, conversationID, hasReadSeq) } func (db *commonMsgDatabase) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) { - return db.cache.GetHasReadSeqs(ctx, userID, conversationIDs) + return db.seq.GetHasReadSeqs(ctx, userID, conversationIDs) } func (db *commonMsgDatabase) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) { - return db.cache.GetHasReadSeq(ctx, userID, conversationID) + return db.seq.GetHasReadSeq(ctx, userID, conversationID) } func (db *commonMsgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error { - return db.cache.SetSendMsgStatus(ctx, id, status) + return db.msg.SetSendMsgStatus(ctx, id, status) } func (db *commonMsgDatabase) GetSendMsgStatus(ctx context.Context, id string) (int32, error) { - return db.cache.GetSendMsgStatus(ctx, id) + return db.msg.GetSendMsgStatus(ctx, id) } func (db *commonMsgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) { @@ -971,11 +962,11 @@ func (db *commonMsgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context if err != nil { return } - minSeqCache, err = db.cache.GetMinSeq(ctx, conversationID) + minSeqCache, err = db.seq.GetMinSeq(ctx, conversationID) if err != nil { return } - maxSeqCache, err = db.cache.GetMaxSeq(ctx, conversationID) + maxSeqCache, err = db.seq.GetMaxSeq(ctx, conversationID) if err != nil { return } @@ -1042,12 +1033,12 @@ func (db *commonMsgDatabase) FindOneByDocIDs(ctx context.Context, conversationID totalMsgs := make(map[string]*sdkws.MsgData) for _, conversationID := range conversationIDs { seq := seqs[conversationID] - docID := db.msg.GetDocID(conversationID, seq) + docID := db.msgTable.GetDocID(conversationID, seq) msgs, err := db.msgDocDatabase.FindOneByDocID(ctx, docID) if err != nil { return nil, err } - index := db.msg.GetMsgIndex(seq) + index := db.msgTable.GetMsgIndex(seq) totalMsgs[conversationID] = convert.MsgDB2Pb(msgs.Msg[index].Msg) } return totalMsgs, nil diff --git a/pkg/common/db/controller/push.go b/pkg/common/db/controller/push.go index 76d8c3efb..cf72027ee 100644 --- a/pkg/common/db/controller/push.go +++ b/pkg/common/db/controller/push.go @@ -28,7 +28,7 @@ type pushDataBase struct { cache cache.MsgModel } -func NewPushDatabase(cache cache.MsgModel) PushDatabase { +func NewPushDatabase(cache cache.ThirdCache) PushDatabase { return &pushDataBase{cache: cache} }