feat: msg cache split

pull/2148/head
withchao 1 year ago
parent 46201b77c3
commit 32c2c37efa

@ -77,12 +77,13 @@ func Start(ctx context.Context, config *config.GlobalConfig, prometheusPort, ind
} }
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
msgModel := cache.NewMsgCacheModel(rdb, config.MsgCacheTimeout, &config.Redis) msgModel := cache.NewMsgCache(rdb, config.MsgCacheTimeout, &config.Redis)
seqModel := cache.NewSeqCache(rdb)
msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB()) msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB())
if err != nil { if err != nil {
return err return err
} }
msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, &config.Kafka) msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqModel, &config.Kafka)
if err != nil { if err != nil {
return err return err
} }

@ -35,12 +35,12 @@ var Terminal = []int{constant.IOSPlatformID, constant.AndroidPlatformID, constan
type Fcm struct { type Fcm struct {
fcmMsgCli *messaging.Client fcmMsgCli *messaging.Client
cache cache.MsgModel cache cache.ThirdCache
} }
// NewClient initializes a new FCM client using the Firebase Admin SDK. // NewClient initializes a new FCM client using the Firebase Admin SDK.
// It requires the FCM service account credentials file located within the project's configuration directory. // It requires the FCM service account credentials file located within the project's configuration directory.
func NewClient(pushConf *config.Push, cache cache.MsgModel) (*Fcm, error) { func NewClient(pushConf *config.Push, cache cache.ThirdCache) (*Fcm, error) {
projectRoot, err := config.GetProjectRoot() projectRoot, err := config.GetProjectRoot()
if err != nil { if err != nil {
return nil, err return nil, err

@ -40,7 +40,7 @@ func Start(ctx context.Context, config *config.GlobalConfig, client discovery.Sv
if err != nil { if err != nil {
return err return err
} }
cacheModel := cache.NewMsgCacheModel(rdb, config.MsgCacheTimeout, &config.Redis) cacheModel := cache.NewThirdCache(rdb)
offlinePusher, err := NewOfflinePusher(&config.Push, &config.IOSPush, cacheModel) offlinePusher, err := NewOfflinePusher(&config.Push, &config.IOSPush, cacheModel)
if err != nil { if err != nil {
return err return err

@ -78,7 +78,7 @@ func NewPusher(config *config.GlobalConfig, discov discovery.SvcDiscoveryRegistr
} }
} }
func NewOfflinePusher(pushConf *config.Push, iOSPushConf *config.IOSPush, cache cache.MsgModel) (offlinepush.OfflinePusher, error) { func NewOfflinePusher(pushConf *config.Push, iOSPushConf *config.IOSPush, cache cache.ThirdCache) (offlinepush.OfflinePusher, error) {
var offlinePusher offlinepush.OfflinePusher var offlinePusher offlinepush.OfflinePusher
switch pushConf.Enable { switch pushConf.Enable {
case "getui": case "getui":

@ -69,11 +69,13 @@ func Start(ctx context.Context, config *config.GlobalConfig, client discovery.Sv
if err != nil { if err != nil {
return err return err
} }
msgModel := cache.NewMsgCache(rdb, config.MsgCacheTimeout, &config.Redis)
seqModel := cache.NewSeqCache(rdb)
conversationClient := rpcclient.NewConversationRpcClient(client, config.RpcRegisterName.OpenImConversationName) conversationClient := rpcclient.NewConversationRpcClient(client, config.RpcRegisterName.OpenImConversationName)
userRpcClient := rpcclient.NewUserRpcClient(client, config.RpcRegisterName.OpenImUserName, &config.Manager, &config.IMAdmin) userRpcClient := rpcclient.NewUserRpcClient(client, config.RpcRegisterName.OpenImUserName, &config.Manager, &config.IMAdmin)
groupRpcClient := rpcclient.NewGroupRpcClient(client, config.RpcRegisterName.OpenImGroupName) groupRpcClient := rpcclient.NewGroupRpcClient(client, config.RpcRegisterName.OpenImGroupName)
friendRpcClient := rpcclient.NewFriendRpcClient(client, config.RpcRegisterName.OpenImFriendName) friendRpcClient := rpcclient.NewFriendRpcClient(client, config.RpcRegisterName.OpenImFriendName)
msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, cacheModel, &config.Kafka) msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqModel, &config.Kafka)
if err != nil { if err != nil {
return err return err
} }

@ -21,7 +21,6 @@ import (
"time" "time"
"github.com/gogo/protobuf/jsonpb" "github.com/gogo/protobuf/jsonpb"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/protocol/sdkws"
@ -52,47 +51,13 @@ const (
var concurrentLimit = 3 var concurrentLimit = 3
type SeqCache interface { //type MsgModel interface {
SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error // SeqCache
GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) // ThirdCache
GetMaxSeq(ctx context.Context, conversationID string) (int64, error) // MsgCache
SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error //}
SetMinSeqs(ctx context.Context, seqs map[string]int64) error
GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) type MsgCache interface {
GetMinSeq(ctx context.Context, conversationID string) (int64, error)
GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error)
GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error)
SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error
// seqs map: key userID value minSeq
SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error)
// seqs map: key conversationID value minSeq
SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error
// has read seq
SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error
// k: user, v: seq
SetHasReadSeqs(ctx context.Context, conversationID string, hasReadSeqs map[string]int64) error
// k: conversation, v :seq
UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error
GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error)
GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error)
}
type thirdCache interface {
SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error)
GetFcmToken(ctx context.Context, account string, platformID int) (string, error)
DelFcmToken(ctx context.Context, account string, platformID int) error
IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error)
SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error
GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error)
SetGetuiToken(ctx context.Context, token string, expireTime int64) error
GetGetuiToken(ctx context.Context) (string, error)
SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error
GetGetuiTaskID(ctx context.Context) (string, error)
}
type MsgModel interface {
SeqCache
thirdCache
GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error) GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error)
SetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) SetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error)
UserDeleteMsgs(ctx context.Context, conversationID string, seqs []int64, userID string) error UserDeleteMsgs(ctx context.Context, conversationID string, seqs []int64, userID string) error
@ -113,269 +78,31 @@ type MsgModel interface {
UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
} }
func NewMsgCacheModel(client redis.UniversalClient, msgCacheTimeout int, redisConf *config.Redis) MsgModel { //func NewMsgCacheModel(client redis.UniversalClient, msgCacheTimeout int, redisConf *config.Redis) MsgModel {
return &msgCache{rdb: client, msgCacheTimeout: msgCacheTimeout, redisConf: redisConf} // return &msgCache{rdb: client, msgCacheTimeout: msgCacheTimeout, redisConf: redisConf}
//}
func NewMsgCache(client redis.UniversalClient, msgCacheTimeout time.Duration, redisEnablePipeline bool) MsgCache {
return &msgCache{rdb: client, msgCacheTimeout: msgCacheTimeout, redisEnablePipeline: redisEnablePipeline}
} }
type msgCache struct { type msgCache struct {
metaCache metaCache
rdb redis.UniversalClient rdb redis.UniversalClient
msgCacheTimeout int msgCacheTimeout time.Duration
redisConf *config.Redis redisEnablePipeline bool
}
func (c *msgCache) getMaxSeqKey(conversationID string) string {
return maxSeq + conversationID
}
func (c *msgCache) getMinSeqKey(conversationID string) string {
return minSeq + conversationID
}
func (c *msgCache) getHasReadSeqKey(conversationID string, userID string) string {
return hasReadSeq + userID + ":" + conversationID
}
func (c *msgCache) getConversationUserMinSeqKey(conversationID, userID string) string {
return conversationUserMinSeq + conversationID + "u:" + userID
}
func (c *msgCache) setSeq(ctx context.Context, conversationID string, seq int64, getkey func(conversationID string) string) error {
return errs.Wrap(c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err())
}
func (c *msgCache) getSeq(ctx context.Context, conversationID string, getkey func(conversationID string) string) (int64, error) {
val, err := c.rdb.Get(ctx, getkey(conversationID)).Int64()
if err != nil {
return 0, errs.Wrap(err)
}
return val, nil
}
func (c *msgCache) getSeqs(ctx context.Context, items []string, getkey func(s string) string) (m map[string]int64, err error) {
m = make(map[string]int64, len(items))
for i, v := range items {
res, err := c.rdb.Get(ctx, getkey(v)).Result()
if err != nil && err != redis.Nil {
return nil, errs.Wrap(err)
}
val := stringutil.StringToInt64(res)
if val != 0 {
m[items[i]] = val
}
}
return m, nil
}
func (c *msgCache) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error {
return c.setSeq(ctx, conversationID, maxSeq, c.getMaxSeqKey)
}
func (c *msgCache) GetMaxSeqs(ctx context.Context, conversationIDs []string) (m map[string]int64, err error) {
return c.getSeqs(ctx, conversationIDs, c.getMaxSeqKey)
}
func (c *msgCache) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) {
return c.getSeq(ctx, conversationID, c.getMaxSeqKey)
}
func (c *msgCache) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error {
return c.setSeq(ctx, conversationID, minSeq, c.getMinSeqKey)
}
func (c *msgCache) setSeqs(ctx context.Context, seqs map[string]int64, getkey func(key string) string) error {
for conversationID, seq := range seqs {
if err := c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err(); err != nil {
return errs.Wrap(err)
}
}
return nil
}
func (c *msgCache) SetMinSeqs(ctx context.Context, seqs map[string]int64) error {
return c.setSeqs(ctx, seqs, c.getMinSeqKey)
}
func (c *msgCache) GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) {
return c.getSeqs(ctx, conversationIDs, c.getMinSeqKey)
}
func (c *msgCache) GetMinSeq(ctx context.Context, conversationID string) (int64, error) {
return c.getSeq(ctx, conversationID, c.getMinSeqKey)
}
func (c *msgCache) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
val, err := c.rdb.Get(ctx, c.getConversationUserMinSeqKey(conversationID, userID)).Int64()
if err != nil {
return 0, errs.Wrap(err)
}
return val, nil
}
func (c *msgCache) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (m map[string]int64, err error) {
return c.getSeqs(ctx, userIDs, func(userID string) string {
return c.getConversationUserMinSeqKey(conversationID, userID)
})
}
func (c *msgCache) SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error {
return errs.Wrap(c.rdb.Set(ctx, c.getConversationUserMinSeqKey(conversationID, userID), minSeq, 0).Err())
}
func (c *msgCache) SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) {
return c.setSeqs(ctx, seqs, func(userID string) string {
return c.getConversationUserMinSeqKey(conversationID, userID)
})
}
func (c *msgCache) SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error) {
return c.setSeqs(ctx, seqs, func(conversationID string) string {
return c.getConversationUserMinSeqKey(conversationID, userID)
})
}
func (c *msgCache) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
return errs.Wrap(c.rdb.Set(ctx, c.getHasReadSeqKey(conversationID, userID), hasReadSeq, 0).Err())
}
func (c *msgCache) SetHasReadSeqs(ctx context.Context, conversationID string, hasReadSeqs map[string]int64) error {
return c.setSeqs(ctx, hasReadSeqs, func(userID string) string {
return c.getHasReadSeqKey(conversationID, userID)
})
}
func (c *msgCache) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error {
return c.setSeqs(ctx, hasReadSeqs, func(conversationID string) string {
return c.getHasReadSeqKey(conversationID, userID)
})
}
func (c *msgCache) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
return c.getSeqs(ctx, conversationIDs, func(conversationID string) string {
return c.getHasReadSeqKey(conversationID, userID)
})
}
func (c *msgCache) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) {
val, err := c.rdb.Get(ctx, c.getHasReadSeqKey(conversationID, userID)).Int64()
if err != nil {
return 0, err
}
return val, nil
}
func (c *msgCache) getMessageCacheKey(conversationID string, seq int64) string {
return messageCache + conversationID + "_" + strconv.Itoa(int(seq))
} }
func (c *msgCache) allMessageCacheKey(conversationID string) string { func (c *msgCache) allMessageCacheKey(conversationID string) string {
return messageCache + conversationID + "_*" return messageCache + conversationID + "_*"
} }
func (c *msgCache) GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) { func (c *msgCache) getMessageCacheKey(conversationID string, seq int64) string {
if c.redisConf.EnablePipeline { return messageCache + conversationID + "_" + strconv.Itoa(int(seq))
return c.PipeGetMessagesBySeq(ctx, conversationID, seqs)
}
return c.ParallelGetMessagesBySeq(ctx, conversationID, seqs)
}
func (c *msgCache) PipeGetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) {
pipe := c.rdb.Pipeline()
results := []*redis.StringCmd{}
for _, seq := range seqs {
results = append(results, pipe.Get(ctx, c.getMessageCacheKey(conversationID, seq)))
}
_, err = pipe.Exec(ctx)
if err != nil && err != redis.Nil {
return seqMsgs, failedSeqs, errs.WrapMsg(err, "pipe.get")
}
for idx, res := range results {
seq := seqs[idx]
if res.Err() != nil {
log.ZError(ctx, "GetMessagesBySeq failed", err, "conversationID", conversationID, "seq", seq, "err", res.Err())
failedSeqs = append(failedSeqs, seq)
continue
}
msg := sdkws.MsgData{}
if err = msgprocessor.String2Pb(res.Val(), &msg); err != nil {
log.ZError(ctx, "GetMessagesBySeq Unmarshal failed", err, "res", res, "conversationID", conversationID, "seq", seq)
failedSeqs = append(failedSeqs, seq)
continue
}
if msg.Status == constant.MsgDeleted {
failedSeqs = append(failedSeqs, seq)
continue
}
seqMsgs = append(seqMsgs, &msg)
}
return
}
func (c *msgCache) ParallelGetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) {
type entry struct {
err error
msg *sdkws.MsgData
}
wg := errgroup.Group{}
wg.SetLimit(concurrentLimit)
results := make([]entry, len(seqs)) // set slice len/cap to length of seqs.
for idx, seq := range seqs {
// closure safe var
idx := idx
seq := seq
wg.Go(func() error {
res, err := c.rdb.Get(ctx, c.getMessageCacheKey(conversationID, seq)).Result()
if err != nil {
log.ZError(ctx, "GetMessagesBySeq failed", err, "conversationID", conversationID, "seq", seq)
results[idx] = entry{err: err}
return nil
}
msg := sdkws.MsgData{}
if err = msgprocessor.String2Pb(res, &msg); err != nil {
log.ZError(ctx, "GetMessagesBySeq Unmarshal failed", err, "res", res, "conversationID", conversationID, "seq", seq)
results[idx] = entry{err: err}
return nil
}
if msg.Status == constant.MsgDeleted {
results[idx] = entry{err: err}
return nil
}
results[idx] = entry{msg: &msg}
return nil
})
}
_ = wg.Wait()
for idx, res := range results {
if res.err != nil {
failedSeqs = append(failedSeqs, seqs[idx])
continue
}
seqMsgs = append(seqMsgs, res.msg)
}
return
} }
func (c *msgCache) SetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) { func (c *msgCache) SetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) {
if c.redisConf.EnablePipeline { if c.redisEnablePipeline {
return c.PipeSetMessageToCache(ctx, conversationID, msgs) return c.PipeSetMessageToCache(ctx, conversationID, msgs)
} }
return c.ParallelSetMessageToCache(ctx, conversationID, msgs) return c.ParallelSetMessageToCache(ctx, conversationID, msgs)
@ -390,7 +117,7 @@ func (c *msgCache) PipeSetMessageToCache(ctx context.Context, conversationID str
} }
key := c.getMessageCacheKey(conversationID, msg.Seq) key := c.getMessageCacheKey(conversationID, msg.Seq)
_ = pipe.Set(ctx, key, s, time.Duration(c.msgCacheTimeout)*time.Second) _ = pipe.Set(ctx, key, s, c.msgCacheTimeout)
} }
results, err := pipe.Exec(ctx) results, err := pipe.Exec(ctx)
@ -420,7 +147,7 @@ func (c *msgCache) ParallelSetMessageToCache(ctx context.Context, conversationID
} }
key := c.getMessageCacheKey(conversationID, msg.Seq) key := c.getMessageCacheKey(conversationID, msg.Seq)
if err := c.rdb.Set(ctx, key, s, time.Duration(c.msgCacheTimeout)*time.Second).Err(); err != nil { if err := c.rdb.Set(ctx, key, s, c.msgCacheTimeout).Err(); err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
return nil return nil
@ -455,10 +182,10 @@ func (c *msgCache) UserDeleteMsgs(ctx context.Context, conversationID string, se
if err != nil { if err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
if err := c.rdb.Expire(ctx, delUserListKey, time.Duration(c.msgCacheTimeout)*time.Second).Err(); err != nil { if err := c.rdb.Expire(ctx, delUserListKey, c.msgCacheTimeout).Err(); err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
if err := c.rdb.Expire(ctx, userDelListKey, time.Duration(c.msgCacheTimeout)*time.Second).Err(); err != nil { if err := c.rdb.Expire(ctx, userDelListKey, c.msgCacheTimeout).Err(); err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
} }
@ -563,7 +290,7 @@ func (c *msgCache) DelUserDeleteMsgsList(ctx context.Context, conversationID str
} }
func (c *msgCache) DeleteMessages(ctx context.Context, conversationID string, seqs []int64) error { func (c *msgCache) DeleteMessages(ctx context.Context, conversationID string, seqs []int64) error {
if c.redisConf.EnablePipeline { if c.redisEnablePipeline {
return c.PipeDeleteMessages(ctx, conversationID, seqs) return c.PipeDeleteMessages(ctx, conversationID, seqs)
} }
@ -645,7 +372,7 @@ func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []in
if err != nil { if err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
if err := c.rdb.Set(ctx, key, s, time.Duration(c.msgCacheTimeout)*time.Second).Err(); err != nil { if err := c.rdb.Set(ctx, key, s, c.msgCacheTimeout).Err(); err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
} }
@ -653,30 +380,6 @@ func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []in
return nil return nil
} }
func (c *msgCache) SetGetuiToken(ctx context.Context, token string, expireTime int64) error {
return errs.Wrap(c.rdb.Set(ctx, getuiToken, token, time.Duration(expireTime)*time.Second).Err())
}
func (c *msgCache) GetGetuiToken(ctx context.Context) (string, error) {
val, err := c.rdb.Get(ctx, getuiToken).Result()
if err != nil {
return "", errs.Wrap(err)
}
return val, nil
}
func (c *msgCache) SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error {
return errs.Wrap(c.rdb.Set(ctx, getuiTaskID, taskID, time.Duration(expireTime)*time.Second).Err())
}
func (c *msgCache) GetGetuiTaskID(ctx context.Context) (string, error) {
val, err := c.rdb.Get(ctx, getuiTaskID).Result()
if err != nil {
return "", errs.Wrap(err)
}
return val, nil
}
func (c *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error { func (c *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
return errs.Wrap(c.rdb.Set(ctx, sendMsgFailedFlag+id, status, time.Hour*24).Err()) return errs.Wrap(c.rdb.Set(ctx, sendMsgFailedFlag+id, status, time.Hour*24).Err())
} }
@ -687,37 +390,6 @@ func (c *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, erro
return int32(result), errs.Wrap(err) return int32(result), errs.Wrap(err)
} }
func (c *msgCache) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) {
return errs.Wrap(c.rdb.Set(ctx, FCM_TOKEN+account+":"+strconv.Itoa(platformID), fcmToken, time.Duration(expireTime)*time.Second).Err())
}
func (c *msgCache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) {
val, err := c.rdb.Get(ctx, FCM_TOKEN+account+":"+strconv.Itoa(platformID)).Result()
if err != nil {
return "", errs.Wrap(err)
}
return val, nil
}
func (c *msgCache) DelFcmToken(ctx context.Context, account string, platformID int) error {
return errs.Wrap(c.rdb.Del(ctx, FCM_TOKEN+account+":"+strconv.Itoa(platformID)).Err())
}
func (c *msgCache) IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) {
seq, err := c.rdb.Incr(ctx, userBadgeUnreadCountSum+userID).Result()
return int(seq), errs.Wrap(err)
}
func (c *msgCache) SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error {
return errs.Wrap(c.rdb.Set(ctx, userBadgeUnreadCountSum+userID, value, 0).Err())
}
func (c *msgCache) GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) {
val, err := c.rdb.Get(ctx, userBadgeUnreadCountSum+userID).Int()
return val, errs.Wrap(err)
}
func (c *msgCache) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error { func (c *msgCache) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error {
key := exTypeKeyLocker + clientMsgID + "_" + TypeKey key := exTypeKeyLocker + clientMsgID + "_" + TypeKey
@ -776,3 +448,104 @@ func (c *msgCache) GetOneMessageAllReactionList(ctx context.Context, clientMsgID
func (c *msgCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error { func (c *msgCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error {
return errs.Wrap(c.rdb.HDel(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err()) return errs.Wrap(c.rdb.HDel(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err())
} }
func (c *msgCache) GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) {
if c.redisEnablePipeline {
return c.PipeGetMessagesBySeq(ctx, conversationID, seqs)
}
return c.ParallelGetMessagesBySeq(ctx, conversationID, seqs)
}
func (c *msgCache) PipeGetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) {
pipe := c.rdb.Pipeline()
results := []*redis.StringCmd{}
for _, seq := range seqs {
results = append(results, pipe.Get(ctx, c.getMessageCacheKey(conversationID, seq)))
}
_, err = pipe.Exec(ctx)
if err != nil && err != redis.Nil {
return seqMsgs, failedSeqs, errs.WrapMsg(err, "pipe.get")
}
for idx, res := range results {
seq := seqs[idx]
if res.Err() != nil {
log.ZError(ctx, "GetMessagesBySeq failed", err, "conversationID", conversationID, "seq", seq, "err", res.Err())
failedSeqs = append(failedSeqs, seq)
continue
}
msg := sdkws.MsgData{}
if err = msgprocessor.String2Pb(res.Val(), &msg); err != nil {
log.ZError(ctx, "GetMessagesBySeq Unmarshal failed", err, "res", res, "conversationID", conversationID, "seq", seq)
failedSeqs = append(failedSeqs, seq)
continue
}
if msg.Status == constant.MsgDeleted {
failedSeqs = append(failedSeqs, seq)
continue
}
seqMsgs = append(seqMsgs, &msg)
}
return
}
func (c *msgCache) ParallelGetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) {
type entry struct {
err error
msg *sdkws.MsgData
}
wg := errgroup.Group{}
wg.SetLimit(concurrentLimit)
results := make([]entry, len(seqs)) // set slice len/cap to length of seqs.
for idx, seq := range seqs {
// closure safe var
idx := idx
seq := seq
wg.Go(func() error {
res, err := c.rdb.Get(ctx, c.getMessageCacheKey(conversationID, seq)).Result()
if err != nil {
log.ZError(ctx, "GetMessagesBySeq failed", err, "conversationID", conversationID, "seq", seq)
results[idx] = entry{err: err}
return nil
}
msg := sdkws.MsgData{}
if err = msgprocessor.String2Pb(res, &msg); err != nil {
log.ZError(ctx, "GetMessagesBySeq Unmarshal failed", err, "res", res, "conversationID", conversationID, "seq", seq)
results[idx] = entry{err: err}
return nil
}
if msg.Status == constant.MsgDeleted {
results[idx] = entry{err: err}
return nil
}
results[idx] = entry{msg: &msg}
return nil
})
}
_ = wg.Wait()
for idx, res := range results {
if res.err != nil {
failedSeqs = append(failedSeqs, seqs[idx])
continue
}
seqMsgs = append(seqMsgs, res.msg)
}
return
}

@ -0,0 +1,182 @@
package cache
import (
"context"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/stringutil"
"github.com/redis/go-redis/v9"
)
type SeqCache interface {
SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error
GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error)
GetMaxSeq(ctx context.Context, conversationID string) (int64, error)
SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error
SetMinSeqs(ctx context.Context, seqs map[string]int64) error
GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error)
GetMinSeq(ctx context.Context, conversationID string) (int64, error)
GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error)
GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error)
SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error
// seqs map: key userID value minSeq
SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error)
// seqs map: key conversationID value minSeq
SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error
// has read seq
SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error
// k: user, v: seq
SetHasReadSeqs(ctx context.Context, conversationID string, hasReadSeqs map[string]int64) error
// k: conversation, v :seq
UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error
GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error)
GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error)
}
func NewSeqCache(rdb redis.UniversalClient) SeqCache {
return &seqCache{rdb: rdb}
}
type seqCache struct {
rdb redis.UniversalClient
}
func (c *seqCache) getMaxSeqKey(conversationID string) string {
return maxSeq + conversationID
}
func (c *seqCache) getMinSeqKey(conversationID string) string {
return minSeq + conversationID
}
func (c *seqCache) getHasReadSeqKey(conversationID string, userID string) string {
return hasReadSeq + userID + ":" + conversationID
}
func (c *seqCache) getConversationUserMinSeqKey(conversationID, userID string) string {
return conversationUserMinSeq + conversationID + "u:" + userID
}
func (c *seqCache) setSeq(ctx context.Context, conversationID string, seq int64, getkey func(conversationID string) string) error {
return errs.Wrap(c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err())
}
func (c *seqCache) getSeq(ctx context.Context, conversationID string, getkey func(conversationID string) string) (int64, error) {
val, err := c.rdb.Get(ctx, getkey(conversationID)).Int64()
if err != nil {
return 0, errs.Wrap(err)
}
return val, nil
}
func (c *seqCache) getSeqs(ctx context.Context, items []string, getkey func(s string) string) (m map[string]int64, err error) {
m = make(map[string]int64, len(items))
for i, v := range items {
res, err := c.rdb.Get(ctx, getkey(v)).Result()
if err != nil && err != redis.Nil {
return nil, errs.Wrap(err)
}
val := stringutil.StringToInt64(res)
if val != 0 {
m[items[i]] = val
}
}
return m, nil
}
func (c *seqCache) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error {
return c.setSeq(ctx, conversationID, maxSeq, c.getMaxSeqKey)
}
func (c *seqCache) GetMaxSeqs(ctx context.Context, conversationIDs []string) (m map[string]int64, err error) {
return c.getSeqs(ctx, conversationIDs, c.getMaxSeqKey)
}
func (c *seqCache) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) {
return c.getSeq(ctx, conversationID, c.getMaxSeqKey)
}
func (c *seqCache) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error {
return c.setSeq(ctx, conversationID, minSeq, c.getMinSeqKey)
}
func (c *seqCache) setSeqs(ctx context.Context, seqs map[string]int64, getkey func(key string) string) error {
for conversationID, seq := range seqs {
if err := c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err(); err != nil {
return errs.Wrap(err)
}
}
return nil
}
func (c *seqCache) SetMinSeqs(ctx context.Context, seqs map[string]int64) error {
return c.setSeqs(ctx, seqs, c.getMinSeqKey)
}
func (c *seqCache) GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) {
return c.getSeqs(ctx, conversationIDs, c.getMinSeqKey)
}
func (c *seqCache) GetMinSeq(ctx context.Context, conversationID string) (int64, error) {
return c.getSeq(ctx, conversationID, c.getMinSeqKey)
}
func (c *seqCache) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
val, err := c.rdb.Get(ctx, c.getConversationUserMinSeqKey(conversationID, userID)).Int64()
if err != nil {
return 0, errs.Wrap(err)
}
return val, nil
}
func (c *seqCache) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (m map[string]int64, err error) {
return c.getSeqs(ctx, userIDs, func(userID string) string {
return c.getConversationUserMinSeqKey(conversationID, userID)
})
}
func (c *seqCache) SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error {
return errs.Wrap(c.rdb.Set(ctx, c.getConversationUserMinSeqKey(conversationID, userID), minSeq, 0).Err())
}
func (c *seqCache) SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) {
return c.setSeqs(ctx, seqs, func(userID string) string {
return c.getConversationUserMinSeqKey(conversationID, userID)
})
}
func (c *seqCache) SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error) {
return c.setSeqs(ctx, seqs, func(conversationID string) string {
return c.getConversationUserMinSeqKey(conversationID, userID)
})
}
func (c *seqCache) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
return errs.Wrap(c.rdb.Set(ctx, c.getHasReadSeqKey(conversationID, userID), hasReadSeq, 0).Err())
}
func (c *seqCache) SetHasReadSeqs(ctx context.Context, conversationID string, hasReadSeqs map[string]int64) error {
return c.setSeqs(ctx, hasReadSeqs, func(userID string) string {
return c.getHasReadSeqKey(conversationID, userID)
})
}
func (c *seqCache) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error {
return c.setSeqs(ctx, hasReadSeqs, func(conversationID string) string {
return c.getHasReadSeqKey(conversationID, userID)
})
}
func (c *seqCache) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
return c.getSeqs(ctx, conversationIDs, func(conversationID string) string {
return c.getHasReadSeqKey(conversationID, userID)
})
}
func (c *seqCache) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) {
val, err := c.rdb.Get(ctx, c.getHasReadSeqKey(conversationID, userID)).Int64()
if err != nil {
return 0, err
}
return val, nil
}

@ -0,0 +1,85 @@
package cache
import (
"context"
"github.com/openimsdk/tools/errs"
"github.com/redis/go-redis/v9"
"strconv"
"time"
)
type ThirdCache interface {
SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error)
GetFcmToken(ctx context.Context, account string, platformID int) (string, error)
DelFcmToken(ctx context.Context, account string, platformID int) error
IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error)
SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error
GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error)
SetGetuiToken(ctx context.Context, token string, expireTime int64) error
GetGetuiToken(ctx context.Context) (string, error)
SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error
GetGetuiTaskID(ctx context.Context) (string, error)
}
func NewThirdCache(rdb redis.UniversalClient) ThirdCache {
return &thirdCache{rdb: rdb}
}
type thirdCache struct {
rdb redis.UniversalClient
}
func (c *thirdCache) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) {
return errs.Wrap(c.rdb.Set(ctx, FCM_TOKEN+account+":"+strconv.Itoa(platformID), fcmToken, time.Duration(expireTime)*time.Second).Err())
}
func (c *thirdCache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) {
val, err := c.rdb.Get(ctx, FCM_TOKEN+account+":"+strconv.Itoa(platformID)).Result()
if err != nil {
return "", errs.Wrap(err)
}
return val, nil
}
func (c *thirdCache) DelFcmToken(ctx context.Context, account string, platformID int) error {
return errs.Wrap(c.rdb.Del(ctx, FCM_TOKEN+account+":"+strconv.Itoa(platformID)).Err())
}
func (c *thirdCache) IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) {
seq, err := c.rdb.Incr(ctx, userBadgeUnreadCountSum+userID).Result()
return int(seq), errs.Wrap(err)
}
func (c *thirdCache) SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error {
return errs.Wrap(c.rdb.Set(ctx, userBadgeUnreadCountSum+userID, value, 0).Err())
}
func (c *thirdCache) GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) {
val, err := c.rdb.Get(ctx, userBadgeUnreadCountSum+userID).Int()
return val, errs.Wrap(err)
}
func (c *thirdCache) SetGetuiToken(ctx context.Context, token string, expireTime int64) error {
return errs.Wrap(c.rdb.Set(ctx, getuiToken, token, time.Duration(expireTime)*time.Second).Err())
}
func (c *thirdCache) GetGetuiToken(ctx context.Context) (string, error) {
val, err := c.rdb.Get(ctx, getuiToken).Result()
if err != nil {
return "", errs.Wrap(err)
}
return val, nil
}
func (c *thirdCache) SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error {
return errs.Wrap(c.rdb.Set(ctx, getuiTaskID, taskID, time.Duration(expireTime)*time.Second).Err())
}
func (c *thirdCache) GetGetuiTaskID(ctx context.Context) (string, error) {
val, err := c.rdb.Get(ctx, getuiTaskID).Result()
if err != nil {
return "", errs.Wrap(err)
}
return val, nil
}

@ -105,7 +105,7 @@ type CommonMsgDatabase interface {
ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string)
} }
func NewCommonMsgDatabase(msgDocModel relation.MsgDocModelInterface, cacheModel cache.MsgModel, kafkaConf *config.Kafka) (CommonMsgDatabase, error) { func NewCommonMsgDatabase(msgDocModel relation.MsgDocModelInterface, msg cache.MsgCache, seq cache.SeqCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) {
conf, err := kafka.BuildProducerConfig(kafkaConf.Config) conf, err := kafka.BuildProducerConfig(kafkaConf.Config)
if err != nil { if err != nil {
return nil, err return nil, err
@ -124,7 +124,8 @@ func NewCommonMsgDatabase(msgDocModel relation.MsgDocModelInterface, cacheModel
} }
return &commonMsgDatabase{ return &commonMsgDatabase{
msgDocDatabase: msgDocModel, msgDocDatabase: msgDocModel,
cache: cacheModel, msg: msg,
seq: seq,
producer: producerToRedis, producer: producerToRedis,
producerToMongo: producerToMongo, producerToMongo: producerToMongo,
producerToPush: producerToPush, producerToPush: producerToPush,
@ -132,18 +133,20 @@ func NewCommonMsgDatabase(msgDocModel relation.MsgDocModelInterface, cacheModel
} }
func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database, config *config.GlobalConfig) (CommonMsgDatabase, error) { func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database, config *config.GlobalConfig) (CommonMsgDatabase, error) {
cacheModel := cache.NewMsgCacheModel(rdb, config.MsgCacheTimeout, &config.Redis)
msgDocModel, err := mgo.NewMsgMongo(database) msgDocModel, err := mgo.NewMsgMongo(database)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return NewCommonMsgDatabase(msgDocModel, cacheModel, &config.Kafka) msg := cache.NewMsgCache(rdb, config.MsgCacheTimeout, config.Redis.EnablePipeline)
seq := cache.NewSeqCache(rdb)
return NewCommonMsgDatabase(msgDocModel, msg, seq, config.Kafka)
} }
type commonMsgDatabase struct { type commonMsgDatabase struct {
msgDocDatabase relation.MsgDocModelInterface msgDocDatabase relation.MsgDocModelInterface
msg relation.MsgDocModel msgTable relation.MsgDocModel
cache cache.MsgModel msg cache.MsgCache
seq cache.SeqCache
producer *kafka.Producer producer *kafka.Producer
producerToMongo *kafka.Producer producerToMongo *kafka.Producer
producerToModify *kafka.Producer producerToModify *kafka.Producer
@ -184,7 +187,7 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI
if len(fields) == 0 { if len(fields) == 0 {
return nil return nil
} }
num := db.msg.GetSingleGocMsgNum() num := db.msgTable.GetSingleGocMsgNum()
// num = 100 // num = 100
for i, field := range fields { // Check the type of the field for i, field := range fields { // Check the type of the field
var ok bool var ok bool
@ -210,8 +213,8 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI
res *mongo.UpdateResult res *mongo.UpdateResult
err error err error
) )
docID := db.msg.GetDocID(conversationID, seq) docID := db.msgTable.GetDocID(conversationID, seq)
index := db.msg.GetMsgIndex(seq) index := db.msgTable.GetMsgIndex(seq)
field := fields[i] field := fields[i]
switch key { switch key {
case updateKeyMsg: case updateKeyMsg:
@ -237,23 +240,23 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI
} }
} }
doc := relation.MsgDocModel{ doc := relation.MsgDocModel{
DocID: db.msg.GetDocID(conversationID, seq), DocID: db.msgTable.GetDocID(conversationID, seq),
Msg: make([]*relation.MsgInfoModel, num), Msg: make([]*relation.MsgInfoModel, num),
} }
var insert int // Inserted data number var insert int // Inserted data number
for j := i; j < len(fields); j++ { for j := i; j < len(fields); j++ {
seq = firstSeq + int64(j) seq = firstSeq + int64(j)
if db.msg.GetDocID(conversationID, seq) != doc.DocID { if db.msgTable.GetDocID(conversationID, seq) != doc.DocID {
break break
} }
insert++ insert++
switch key { switch key {
case updateKeyMsg: case updateKeyMsg:
doc.Msg[db.msg.GetMsgIndex(seq)] = &relation.MsgInfoModel{ doc.Msg[db.msgTable.GetMsgIndex(seq)] = &relation.MsgInfoModel{
Msg: fields[j].(*relation.MsgDataModel), Msg: fields[j].(*relation.MsgDataModel),
} }
case updateKeyRevoke: case updateKeyRevoke:
doc.Msg[db.msg.GetMsgIndex(seq)] = &relation.MsgInfoModel{ doc.Msg[db.msgTable.GetMsgIndex(seq)] = &relation.MsgInfoModel{
Revoke: fields[j].(*relation.RevokeModel), Revoke: fields[j].(*relation.RevokeModel),
} }
} }
@ -332,10 +335,10 @@ func (db *commonMsgDatabase) RevokeMsg(ctx context.Context, conversationID strin
} }
func (db *commonMsgDatabase) MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, totalSeqs []int64) error { func (db *commonMsgDatabase) MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, totalSeqs []int64) error {
for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, totalSeqs) { for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, totalSeqs) {
var indexes []int64 var indexes []int64
for _, seq := range seqs { for _, seq := range seqs {
indexes = append(indexes, db.msg.GetMsgIndex(seq)) indexes = append(indexes, db.msgTable.GetMsgIndex(seq))
} }
log.ZDebug(ctx, "MarkSingleChatMsgsAsRead", "userID", userID, "docID", docID, "indexes", indexes) log.ZDebug(ctx, "MarkSingleChatMsgsAsRead", "userID", userID, "docID", docID, "indexes", indexes)
if err := db.msgDocDatabase.MarkSingleChatMsgsAsRead(ctx, userID, docID, indexes); err != nil { if err := db.msgDocDatabase.MarkSingleChatMsgsAsRead(ctx, userID, docID, indexes); err != nil {
@ -347,22 +350,22 @@ func (db *commonMsgDatabase) MarkSingleChatMsgsAsRead(ctx context.Context, userI
} }
func (db *commonMsgDatabase) DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error { func (db *commonMsgDatabase) DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error {
return db.cache.DeleteMessages(ctx, conversationID, seqs) return db.msg.DeleteMessages(ctx, conversationID, seqs)
} }
func (db *commonMsgDatabase) DelUserDeleteMsgsList(ctx context.Context, conversationID string, seqs []int64) { func (db *commonMsgDatabase) DelUserDeleteMsgsList(ctx context.Context, conversationID string, seqs []int64) {
db.cache.DelUserDeleteMsgsList(ctx, conversationID, seqs) db.msg.DelUserDeleteMsgsList(ctx, conversationID, seqs)
} }
func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) { func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) {
currentMaxSeq, err := db.cache.GetMaxSeq(ctx, conversationID) currentMaxSeq, err := db.seq.GetMaxSeq(ctx, conversationID)
if err != nil && errs.Unwrap(err) != redis.Nil { if err != nil && errs.Unwrap(err) != redis.Nil {
log.ZError(ctx, "db.cache.GetMaxSeq", err) log.ZError(ctx, "db.seq.GetMaxSeq", err)
return 0, false, err return 0, false, err
} }
lenList := len(msgs) lenList := len(msgs)
if int64(lenList) > db.msg.GetSingleGocMsgNum() { if int64(lenList) > db.msgTable.GetSingleGocMsgNum() {
return 0, false, errs.New("message count exceeds limit", "limit", db.msg.GetSingleGocMsgNum()).Wrap() return 0, false, errs.New("message count exceeds limit", "limit", db.msgTable.GetSingleGocMsgNum()).Wrap()
} }
if lenList < 1 { if lenList < 1 {
return 0, false, errs.New("no messages to insert", "minCount", 1).Wrap() return 0, false, errs.New("no messages to insert", "minCount", 1).Wrap()
@ -378,7 +381,7 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa
userSeqMap[m.SendID] = m.Seq userSeqMap[m.SendID] = m.Seq
} }
failedNum, err := db.cache.SetMessageToCache(ctx, conversationID, msgs) failedNum, err := db.msg.SetMessageToCache(ctx, conversationID, msgs)
if err != nil { if err != nil {
prommetrics.MsgInsertRedisFailedCounter.Add(float64(failedNum)) prommetrics.MsgInsertRedisFailedCounter.Add(float64(failedNum))
log.ZError(ctx, "setMessageToCache error", err, "len", len(msgs), "conversationID", conversationID) log.ZError(ctx, "setMessageToCache error", err, "len", len(msgs), "conversationID", conversationID)
@ -386,13 +389,13 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa
prommetrics.MsgInsertRedisSuccessCounter.Inc() prommetrics.MsgInsertRedisSuccessCounter.Inc()
} }
err = db.cache.SetMaxSeq(ctx, conversationID, currentMaxSeq) err = db.seq.SetMaxSeq(ctx, conversationID, currentMaxSeq)
if err != nil { if err != nil {
log.ZError(ctx, "db.cache.SetMaxSeq error", err, "conversationID", conversationID) log.ZError(ctx, "db.seq.SetMaxSeq error", err, "conversationID", conversationID)
prommetrics.SeqSetFailedCounter.Inc() prommetrics.SeqSetFailedCounter.Inc()
} }
err = db.cache.SetHasReadSeqs(ctx, conversationID, userSeqMap) err = db.seq.SetHasReadSeqs(ctx, conversationID, userSeqMap)
if err != nil { if err != nil {
log.ZError(ctx, "SetHasReadSeqs error", err, "userSeqMap", userSeqMap, "conversationID", conversationID) log.ZError(ctx, "SetHasReadSeqs error", err, "userSeqMap", userSeqMap, "conversationID", conversationID)
prommetrics.SeqSetFailedCounter.Inc() prommetrics.SeqSetFailedCounter.Inc()
@ -401,7 +404,7 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa
} }
func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) { func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) {
for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, seqs) { for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, seqs) {
// log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs) // log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs)
msgs, err := db.findMsgInfoBySeq(ctx, userID, docID, conversationID, seqs) msgs, err := db.findMsgInfoBySeq(ctx, userID, docID, conversationID, seqs)
if err != nil { if err != nil {
@ -441,7 +444,7 @@ func (db *commonMsgDatabase) handlerDBMsg(ctx context.Context, cache map[int64][
msgs = v msgs = v
} else { } else {
if quoteMsg.QuoteMessage.Seq > 0 { if quoteMsg.QuoteMessage.Seq > 0 {
ms, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, userID, db.msg.GetDocID(conversationID, quoteMsg.QuoteMessage.Seq), []int64{quoteMsg.QuoteMessage.Seq}) ms, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, userID, db.msgTable.GetDocID(conversationID, quoteMsg.QuoteMessage.Seq), []int64{quoteMsg.QuoteMessage.Seq})
if err != nil { if err != nil {
log.ZError(ctx, "GetMsgBySeqIndexIn1Doc", err, "conversationID", conversationID, "seq", quoteMsg.QuoteMessage.Seq) log.ZError(ctx, "GetMsgBySeqIndexIn1Doc", err, "conversationID", conversationID, "seq", quoteMsg.QuoteMessage.Seq)
return return
@ -465,7 +468,7 @@ func (db *commonMsgDatabase) handlerDBMsg(ctx context.Context, cache map[int64][
return return
} }
msg.Msg.Content = string(data) msg.Msg.Content = string(data)
if _, err := db.msgDocDatabase.UpdateMsg(ctx, db.msg.GetDocID(conversationID, msg.Msg.Seq), db.msg.GetMsgIndex(msg.Msg.Seq), "msg", msg.Msg); err != nil { if _, err := db.msgDocDatabase.UpdateMsg(ctx, db.msgTable.GetDocID(conversationID, msg.Msg.Seq), db.msgTable.GetMsgIndex(msg.Msg.Seq), "msg", msg.Msg); err != nil {
log.ZError(ctx, "UpdateMsgContent", err) log.ZError(ctx, "UpdateMsgContent", err)
} }
} }
@ -484,7 +487,7 @@ func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID
func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID string, conversationID string, allSeqs []int64, begin, end int64) (seqMsgs []*sdkws.MsgData, err error) { func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID string, conversationID string, allSeqs []int64, begin, end int64) (seqMsgs []*sdkws.MsgData, err error) {
log.ZDebug(ctx, "getMsgBySeqsRange", "conversationID", conversationID, "allSeqs", allSeqs, "begin", begin, "end", end) log.ZDebug(ctx, "getMsgBySeqsRange", "conversationID", conversationID, "allSeqs", allSeqs, "begin", begin, "end", end)
for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, allSeqs) { for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, allSeqs) {
log.ZDebug(ctx, "getMsgBySeqsRange", "docID", docID, "seqs", seqs) log.ZDebug(ctx, "getMsgBySeqsRange", "docID", docID, "seqs", seqs)
msgs, err := db.findMsgInfoBySeq(ctx, userID, docID, conversationID, seqs) msgs, err := db.findMsgInfoBySeq(ctx, userID, docID, conversationID, seqs)
if err != nil { if err != nil {
@ -520,11 +523,11 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID strin
// "userMinSeq" can be set as the same value as the conversation's "maxSeq" at the moment they join the group. // "userMinSeq" can be set as the same value as the conversation's "maxSeq" at the moment they join the group.
// This ensures that their message retrieval starts from the point they joined. // This ensures that their message retrieval starts from the point they joined.
func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num, userMaxSeq int64) (int64, int64, []*sdkws.MsgData, error) { func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num, userMaxSeq int64) (int64, int64, []*sdkws.MsgData, error) {
userMinSeq, err := db.cache.GetConversationUserMinSeq(ctx, conversationID, userID) userMinSeq, err := db.seq.GetConversationUserMinSeq(ctx, conversationID, userID)
if err != nil && errs.Unwrap(err) != redis.Nil { if err != nil && errs.Unwrap(err) != redis.Nil {
return 0, 0, nil, err return 0, 0, nil, err
} }
minSeq, err := db.cache.GetMinSeq(ctx, conversationID) minSeq, err := db.seq.GetMinSeq(ctx, conversationID)
if err != nil && errs.Unwrap(err) != redis.Nil { if err != nil && errs.Unwrap(err) != redis.Nil {
return 0, 0, nil, err return 0, 0, nil, err
} }
@ -536,7 +539,7 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin
log.ZWarn(ctx, "minSeq > end", errs.New("minSeq>end"), "minSeq", minSeq, "end", end) log.ZWarn(ctx, "minSeq > end", errs.New("minSeq>end"), "minSeq", minSeq, "end", end)
return 0, 0, nil, nil return 0, 0, nil, nil
} }
maxSeq, err := db.cache.GetMaxSeq(ctx, conversationID) maxSeq, err := db.seq.GetMaxSeq(ctx, conversationID)
if err != nil && errs.Unwrap(err) != redis.Nil { if err != nil && errs.Unwrap(err) != redis.Nil {
return 0, 0, nil, err return 0, 0, nil, err
} }
@ -569,25 +572,13 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin
} }
} }
// 167 178 10
// if end-num < {
//
//}
// var seqs []int64
// for i := end; i > end-num; i-- {
// if i >= begin {
// seqs = append([]int64{i}, seqs...)
// } else {
// break
// }
//}
if len(seqs) == 0 { if len(seqs) == 0 {
return 0, 0, nil, nil return 0, 0, nil, nil
} }
newBegin := seqs[0] newBegin := seqs[0]
newEnd := seqs[len(seqs)-1] newEnd := seqs[len(seqs)-1]
log.ZDebug(ctx, "GetMsgBySeqsRange", "first seqs", seqs, "newBegin", newBegin, "newEnd", newEnd) log.ZDebug(ctx, "GetMsgBySeqsRange", "first seqs", seqs, "newBegin", newBegin, "newEnd", newEnd)
cachedMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, conversationID, seqs) cachedMsgs, failedSeqs, err := db.msg.GetMessagesBySeq(ctx, conversationID, seqs)
if err != nil { if err != nil {
if err != redis.Nil { if err != redis.Nil {
@ -596,7 +587,7 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin
} }
var successMsgs []*sdkws.MsgData var successMsgs []*sdkws.MsgData
if len(cachedMsgs) > 0 { if len(cachedMsgs) > 0 {
delSeqs, err := db.cache.GetUserDelList(ctx, userID, conversationID) delSeqs, err := db.msg.GetUserDelList(ctx, userID, conversationID)
if err != nil && errs.Unwrap(err) != redis.Nil { if err != nil && errs.Unwrap(err) != redis.Nil {
return 0, 0, nil, err return 0, 0, nil, err
} }
@ -624,7 +615,7 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin
} }
if len(reGetSeqsCache) > 0 { if len(reGetSeqsCache) > 0 {
log.ZDebug(ctx, "reGetSeqsCache", "reGetSeqsCache", reGetSeqsCache) log.ZDebug(ctx, "reGetSeqsCache", "reGetSeqsCache", reGetSeqsCache)
cachedMsgs, failedSeqs2, err := db.cache.GetMessagesBySeq(ctx, conversationID, reGetSeqsCache) cachedMsgs, failedSeqs2, err := db.msg.GetMessagesBySeq(ctx, conversationID, reGetSeqsCache)
if err != nil { if err != nil {
if err != redis.Nil { if err != redis.Nil {
@ -654,15 +645,15 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin
} }
func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (int64, int64, []*sdkws.MsgData, error) { func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (int64, int64, []*sdkws.MsgData, error) {
userMinSeq, err := db.cache.GetConversationUserMinSeq(ctx, conversationID, userID) userMinSeq, err := db.seq.GetConversationUserMinSeq(ctx, conversationID, userID)
if err != nil && errs.Unwrap(err) != redis.Nil { if err != nil && errs.Unwrap(err) != redis.Nil {
return 0, 0, nil, err return 0, 0, nil, err
} }
minSeq, err := db.cache.GetMinSeq(ctx, conversationID) minSeq, err := db.seq.GetMinSeq(ctx, conversationID)
if err != nil && errs.Unwrap(err) != redis.Nil { if err != nil && errs.Unwrap(err) != redis.Nil {
return 0, 0, nil, err return 0, 0, nil, err
} }
maxSeq, err := db.cache.GetMaxSeq(ctx, conversationID) maxSeq, err := db.seq.GetMaxSeq(ctx, conversationID)
if err != nil && errs.Unwrap(err) != redis.Nil { if err != nil && errs.Unwrap(err) != redis.Nil {
return 0, 0, nil, err return 0, 0, nil, err
} }
@ -675,13 +666,13 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co
newSeqs = append(newSeqs, seq) newSeqs = append(newSeqs, seq)
} }
} }
successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, conversationID, newSeqs) successMsgs, failedSeqs, err := db.msg.GetMessagesBySeq(ctx, conversationID, newSeqs)
if err != nil { if err != nil {
if err != redis.Nil { if err != redis.Nil {
log.ZError(ctx, "get message from redis exception", err, "failedSeqs", failedSeqs, "conversationID", conversationID) log.ZError(ctx, "get message from redis exception", err, "failedSeqs", failedSeqs, "conversationID", conversationID)
} }
} }
log.ZDebug(ctx, "db.cache.GetMessagesBySeq", "userID", userID, "conversationID", conversationID, "seqs", log.ZDebug(ctx, "db.seq.GetMessagesBySeq", "userID", userID, "conversationID", conversationID, "seqs",
seqs, "len(successMsgs)", len(successMsgs), "failedSeqs", failedSeqs) seqs, "len(successMsgs)", len(successMsgs), "failedSeqs", failedSeqs)
if len(failedSeqs) > 0 { if len(failedSeqs) > 0 {
@ -708,12 +699,12 @@ func (db *commonMsgDatabase) DeleteConversationMsgsAndSetMinSeq(ctx context.Cont
return nil return nil
} }
if remainTime == 0 { if remainTime == 0 {
err = db.cache.CleanUpOneConversationAllMsg(ctx, conversationID) err = db.msg.CleanUpOneConversationAllMsg(ctx, conversationID)
if err != nil { if err != nil {
log.ZWarn(ctx, "CleanUpOneUserAllMsg", err, "conversationID", conversationID) log.ZWarn(ctx, "CleanUpOneUserAllMsg", err, "conversationID", conversationID)
} }
} }
return db.cache.SetMinSeq(ctx, conversationID, minSeq) return db.seq.SetMinSeq(ctx, conversationID, minSeq)
} }
func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string, conversationID string, destructTime int64, lastMsgDestructTime time.Time) (seqs []int64, err error) { func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string, conversationID string, destructTime int64, lastMsgDestructTime time.Time) (seqs []int64, err error) {
@ -758,12 +749,12 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string
log.ZDebug(ctx, "UserMsgsDestruct", "conversationID", conversationID, "userID", userID, "seqs", seqs) log.ZDebug(ctx, "UserMsgsDestruct", "conversationID", conversationID, "userID", userID, "seqs", seqs)
if len(seqs) > 0 { if len(seqs) > 0 {
userMinSeq := seqs[len(seqs)-1] + 1 userMinSeq := seqs[len(seqs)-1] + 1
currentUserMinSeq, err := db.cache.GetConversationUserMinSeq(ctx, conversationID, userID) currentUserMinSeq, err := db.seq.GetConversationUserMinSeq(ctx, conversationID, userID)
if err != nil && errs.Unwrap(err) != redis.Nil { if err != nil && errs.Unwrap(err) != redis.Nil {
return nil, err return nil, err
} }
if currentUserMinSeq < userMinSeq { if currentUserMinSeq < userMinSeq {
if err := db.cache.SetConversationUserMinSeq(ctx, conversationID, userID, userMinSeq); err != nil { if err := db.seq.SetConversationUserMinSeq(ctx, conversationID, userID, userMinSeq); err != nil {
return nil, err return nil, err
} }
} }
@ -804,7 +795,7 @@ func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversatio
return delStruct.getSetMinSeq() + 1, nil return delStruct.getSetMinSeq() + 1, nil
} }
log.ZDebug(ctx, "doc info", "conversationID", conversationID, "index", index, "docID", msgDocModel.DocID, "len", len(msgDocModel.Msg)) log.ZDebug(ctx, "doc info", "conversationID", conversationID, "index", index, "docID", msgDocModel.DocID, "len", len(msgDocModel.Msg))
if int64(len(msgDocModel.Msg)) > db.msg.GetSingleGocMsgNum() { if int64(len(msgDocModel.Msg)) > db.msgTable.GetSingleGocMsgNum() {
log.ZWarn(ctx, "msgs too large", nil, "lenth", len(msgDocModel.Msg), "docID:", msgDocModel.DocID) log.ZWarn(ctx, "msgs too large", nil, "lenth", len(msgDocModel.Msg), "docID:", msgDocModel.DocID)
} }
if msgDocModel.IsFull() && msgDocModel.Msg[len(msgDocModel.Msg)-1].Msg.SendTime+(remainTime*1000) < timeutil.GetCurrentTimestampByMill() { if msgDocModel.IsFull() && msgDocModel.Msg[len(msgDocModel.Msg)-1].Msg.SendTime+(remainTime*1000) < timeutil.GetCurrentTimestampByMill() {
@ -832,13 +823,13 @@ func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversatio
} }
func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conversationID string, allSeqs []int64) error { func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conversationID string, allSeqs []int64) error {
if err := db.cache.DeleteMessages(ctx, conversationID, allSeqs); err != nil { if err := db.msg.DeleteMessages(ctx, conversationID, allSeqs); err != nil {
return err return err
} }
for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, allSeqs) { for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, allSeqs) {
var indexes []int var indexes []int
for _, seq := range seqs { for _, seq := range seqs {
indexes = append(indexes, int(db.msg.GetMsgIndex(seq))) indexes = append(indexes, int(db.msgTable.GetMsgIndex(seq)))
} }
if err := db.msgDocDatabase.DeleteMsgsInOneDocByIndex(ctx, docID, indexes); err != nil { if err := db.msgDocDatabase.DeleteMsgsInOneDocByIndex(ctx, docID, indexes); err != nil {
return err return err
@ -848,7 +839,7 @@ func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conve
} }
func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error { func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error {
cachedMsgs, _, err := db.cache.GetMessagesBySeq(ctx, conversationID, seqs) cachedMsgs, _, err := db.msg.GetMessagesBySeq(ctx, conversationID, seqs)
if err != nil && errs.Unwrap(err) != redis.Nil { if err != nil && errs.Unwrap(err) != redis.Nil {
log.ZWarn(ctx, "DeleteUserMsgsBySeqs", err, "conversationID", conversationID, "seqs", seqs) log.ZWarn(ctx, "DeleteUserMsgsBySeqs", err, "conversationID", conversationID, "seqs", seqs)
return err return err
@ -858,14 +849,14 @@ func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID st
for _, msg := range cachedMsgs { for _, msg := range cachedMsgs {
cacheSeqs = append(cacheSeqs, msg.Seq) cacheSeqs = append(cacheSeqs, msg.Seq)
} }
if err := db.cache.UserDeleteMsgs(ctx, conversationID, cacheSeqs, userID); err != nil { if err := db.msg.UserDeleteMsgs(ctx, conversationID, cacheSeqs, userID); err != nil {
return err return err
} }
} }
for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, seqs) { for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, seqs) {
for _, seq := range seqs { for _, seq := range seqs {
if _, err := db.msgDocDatabase.PushUnique(ctx, docID, db.msg.GetMsgIndex(seq), "del_list", []string{userID}); err != nil { if _, err := db.msgDocDatabase.PushUnique(ctx, docID, db.msgTable.GetMsgIndex(seq), "del_list", []string{userID}); err != nil {
return err return err
} }
} }
@ -879,7 +870,7 @@ func (db *commonMsgDatabase) DeleteMsgsBySeqs(ctx context.Context, conversationI
func (db *commonMsgDatabase) CleanUpUserConversationsMsgs(ctx context.Context, user string, conversationIDs []string) { func (db *commonMsgDatabase) CleanUpUserConversationsMsgs(ctx context.Context, user string, conversationIDs []string) {
for _, conversationID := range conversationIDs { for _, conversationID := range conversationIDs {
maxSeq, err := db.cache.GetMaxSeq(ctx, conversationID) maxSeq, err := db.seq.GetMaxSeq(ctx, conversationID)
if err != nil { if err != nil {
if err == redis.Nil { if err == redis.Nil {
log.ZDebug(ctx, "max seq is nil", "conversationID", conversationID) log.ZDebug(ctx, "max seq is nil", "conversationID", conversationID)
@ -888,82 +879,82 @@ func (db *commonMsgDatabase) CleanUpUserConversationsMsgs(ctx context.Context, u
} }
continue continue
} }
if err := db.cache.SetMinSeq(ctx, conversationID, maxSeq+1); err != nil { if err := db.seq.SetMinSeq(ctx, conversationID, maxSeq+1); err != nil {
log.ZError(ctx, "set min seq failed", err, "conversationID", conversationID, "minSeq", maxSeq+1) log.ZError(ctx, "set min seq failed", err, "conversationID", conversationID, "minSeq", maxSeq+1)
} }
} }
} }
func (db *commonMsgDatabase) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error { func (db *commonMsgDatabase) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error {
return db.cache.SetMaxSeq(ctx, conversationID, maxSeq) return db.seq.SetMaxSeq(ctx, conversationID, maxSeq)
} }
func (db *commonMsgDatabase) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { func (db *commonMsgDatabase) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) {
return db.cache.GetMaxSeqs(ctx, conversationIDs) return db.seq.GetMaxSeqs(ctx, conversationIDs)
} }
func (db *commonMsgDatabase) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) { func (db *commonMsgDatabase) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) {
return db.cache.GetMaxSeq(ctx, conversationID) return db.seq.GetMaxSeq(ctx, conversationID)
} }
func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error { func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error {
return db.cache.SetMinSeq(ctx, conversationID, minSeq) return db.seq.SetMinSeq(ctx, conversationID, minSeq)
} }
func (db *commonMsgDatabase) SetMinSeqs(ctx context.Context, seqs map[string]int64) error { func (db *commonMsgDatabase) SetMinSeqs(ctx context.Context, seqs map[string]int64) error {
return db.cache.SetMinSeqs(ctx, seqs) return db.seq.SetMinSeqs(ctx, seqs)
} }
func (db *commonMsgDatabase) GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { func (db *commonMsgDatabase) GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) {
return db.cache.GetMinSeqs(ctx, conversationIDs) return db.seq.GetMinSeqs(ctx, conversationIDs)
} }
func (db *commonMsgDatabase) GetMinSeq(ctx context.Context, conversationID string) (int64, error) { func (db *commonMsgDatabase) GetMinSeq(ctx context.Context, conversationID string) (int64, error) {
return db.cache.GetMinSeq(ctx, conversationID) return db.seq.GetMinSeq(ctx, conversationID)
} }
func (db *commonMsgDatabase) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) { func (db *commonMsgDatabase) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return db.cache.GetConversationUserMinSeq(ctx, conversationID, userID) return db.seq.GetConversationUserMinSeq(ctx, conversationID, userID)
} }
func (db *commonMsgDatabase) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error) { func (db *commonMsgDatabase) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error) {
return db.cache.GetConversationUserMinSeqs(ctx, conversationID, userIDs) return db.seq.GetConversationUserMinSeqs(ctx, conversationID, userIDs)
} }
func (db *commonMsgDatabase) SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error { func (db *commonMsgDatabase) SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error {
return db.cache.SetConversationUserMinSeq(ctx, conversationID, userID, minSeq) return db.seq.SetConversationUserMinSeq(ctx, conversationID, userID, minSeq)
} }
func (db *commonMsgDatabase) SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) { func (db *commonMsgDatabase) SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) {
return db.cache.SetConversationUserMinSeqs(ctx, conversationID, seqs) return db.seq.SetConversationUserMinSeqs(ctx, conversationID, seqs)
} }
func (db *commonMsgDatabase) SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error { func (db *commonMsgDatabase) SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error {
return db.cache.SetUserConversationsMinSeqs(ctx, userID, seqs) return db.seq.SetUserConversationsMinSeqs(ctx, userID, seqs)
} }
func (db *commonMsgDatabase) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error { func (db *commonMsgDatabase) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error {
return db.cache.UserSetHasReadSeqs(ctx, userID, hasReadSeqs) return db.seq.UserSetHasReadSeqs(ctx, userID, hasReadSeqs)
} }
func (db *commonMsgDatabase) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error { func (db *commonMsgDatabase) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
return db.cache.SetHasReadSeq(ctx, userID, conversationID, hasReadSeq) return db.seq.SetHasReadSeq(ctx, userID, conversationID, hasReadSeq)
} }
func (db *commonMsgDatabase) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) { func (db *commonMsgDatabase) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
return db.cache.GetHasReadSeqs(ctx, userID, conversationIDs) return db.seq.GetHasReadSeqs(ctx, userID, conversationIDs)
} }
func (db *commonMsgDatabase) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) { func (db *commonMsgDatabase) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) {
return db.cache.GetHasReadSeq(ctx, userID, conversationID) return db.seq.GetHasReadSeq(ctx, userID, conversationID)
} }
func (db *commonMsgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error { func (db *commonMsgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
return db.cache.SetSendMsgStatus(ctx, id, status) return db.msg.SetSendMsgStatus(ctx, id, status)
} }
func (db *commonMsgDatabase) GetSendMsgStatus(ctx context.Context, id string) (int32, error) { func (db *commonMsgDatabase) GetSendMsgStatus(ctx context.Context, id string) (int32, error) {
return db.cache.GetSendMsgStatus(ctx, id) return db.msg.GetSendMsgStatus(ctx, id)
} }
func (db *commonMsgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) { func (db *commonMsgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) {
@ -971,11 +962,11 @@ func (db *commonMsgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context
if err != nil { if err != nil {
return return
} }
minSeqCache, err = db.cache.GetMinSeq(ctx, conversationID) minSeqCache, err = db.seq.GetMinSeq(ctx, conversationID)
if err != nil { if err != nil {
return return
} }
maxSeqCache, err = db.cache.GetMaxSeq(ctx, conversationID) maxSeqCache, err = db.seq.GetMaxSeq(ctx, conversationID)
if err != nil { if err != nil {
return return
} }
@ -1042,12 +1033,12 @@ func (db *commonMsgDatabase) FindOneByDocIDs(ctx context.Context, conversationID
totalMsgs := make(map[string]*sdkws.MsgData) totalMsgs := make(map[string]*sdkws.MsgData)
for _, conversationID := range conversationIDs { for _, conversationID := range conversationIDs {
seq := seqs[conversationID] seq := seqs[conversationID]
docID := db.msg.GetDocID(conversationID, seq) docID := db.msgTable.GetDocID(conversationID, seq)
msgs, err := db.msgDocDatabase.FindOneByDocID(ctx, docID) msgs, err := db.msgDocDatabase.FindOneByDocID(ctx, docID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
index := db.msg.GetMsgIndex(seq) index := db.msgTable.GetMsgIndex(seq)
totalMsgs[conversationID] = convert.MsgDB2Pb(msgs.Msg[index].Msg) totalMsgs[conversationID] = convert.MsgDB2Pb(msgs.Msg[index].Msg)
} }
return totalMsgs, nil return totalMsgs, nil

@ -28,7 +28,7 @@ type pushDataBase struct {
cache cache.MsgModel cache cache.MsgModel
} }
func NewPushDatabase(cache cache.MsgModel) PushDatabase { func NewPushDatabase(cache cache.ThirdCache) PushDatabase {
return &pushDataBase{cache: cache} return &pushDataBase{cache: cache}
} }

Loading…
Cancel
Save