pull/2442/head
withchao 1 year ago
parent 8d5a139f51
commit a04e0e8ec2

@ -44,38 +44,38 @@ func (s *seqUserCacheRedis) getSeqUserReadSeqKey(conversationID string, userID s
return cachekey.GetSeqUserReadSeqKey(conversationID, userID) return cachekey.GetSeqUserReadSeqKey(conversationID, userID)
} }
func (s *seqUserCacheRedis) GetMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error) { func (s *seqUserCacheRedis) GetUserMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return getCache(ctx, s.rocks, s.getSeqUserMaxSeqKey(conversationID, userID), s.expireTime, func(ctx context.Context) (int64, error) { return getCache(ctx, s.rocks, s.getSeqUserMaxSeqKey(conversationID, userID), s.expireTime, func(ctx context.Context) (int64, error) {
return s.mgo.GetMaxSeq(ctx, conversationID, userID) return s.mgo.GetUserMaxSeq(ctx, conversationID, userID)
}) })
} }
func (s *seqUserCacheRedis) SetMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error { func (s *seqUserCacheRedis) SetUserMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
if err := s.mgo.SetMaxSeq(ctx, conversationID, userID, seq); err != nil { if err := s.mgo.SetUserMaxSeq(ctx, conversationID, userID, seq); err != nil {
return err return err
} }
return s.rocks.TagAsDeleted2(ctx, s.getSeqUserMaxSeqKey(conversationID, userID)) return s.rocks.TagAsDeleted2(ctx, s.getSeqUserMaxSeqKey(conversationID, userID))
} }
func (s *seqUserCacheRedis) GetMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) { func (s *seqUserCacheRedis) GetUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return getCache(ctx, s.rocks, s.getSeqUserMinSeqKey(conversationID, userID), s.expireTime, func(ctx context.Context) (int64, error) { return getCache(ctx, s.rocks, s.getSeqUserMinSeqKey(conversationID, userID), s.expireTime, func(ctx context.Context) (int64, error) {
return s.mgo.GetMaxSeq(ctx, conversationID, userID) return s.mgo.GetUserMinSeq(ctx, conversationID, userID)
}) })
} }
func (s *seqUserCacheRedis) SetMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error { func (s *seqUserCacheRedis) SetUserMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
return s.SetMinSeqs(ctx, userID, map[string]int64{conversationID: seq}) return s.SetUserMinSeqs(ctx, userID, map[string]int64{conversationID: seq})
} }
func (s *seqUserCacheRedis) GetReadSeq(ctx context.Context, conversationID string, userID string) (int64, error) { func (s *seqUserCacheRedis) GetUserReadSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return getCache(ctx, s.rocks, s.getSeqUserReadSeqKey(conversationID, userID), s.readExpireTime, func(ctx context.Context) (int64, error) { return getCache(ctx, s.rocks, s.getSeqUserReadSeqKey(conversationID, userID), s.readExpireTime, func(ctx context.Context) (int64, error) {
return s.mgo.GetMaxSeq(ctx, conversationID, userID) return s.mgo.GetUserReadSeq(ctx, conversationID, userID)
}) })
} }
func (s *seqUserCacheRedis) SetReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error { func (s *seqUserCacheRedis) SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
if seq%s.readSeqWriteRatio == 0 { if seq%s.readSeqWriteRatio == 0 {
if err := s.mgo.SetReadSeq(ctx, conversationID, userID, seq); err != nil { if err := s.mgo.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil {
return err return err
} }
} }
@ -85,10 +85,10 @@ func (s *seqUserCacheRedis) SetReadSeq(ctx context.Context, conversationID strin
return nil return nil
} }
func (s *seqUserCacheRedis) SetMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error { func (s *seqUserCacheRedis) SetUserMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error {
keys := make([]string, 0, len(seqs)) keys := make([]string, 0, len(seqs))
for conversationID, seq := range seqs { for conversationID, seq := range seqs {
if err := s.mgo.SetMinSeq(ctx, conversationID, userID, seq); err != nil { if err := s.mgo.SetUserMinSeq(ctx, conversationID, userID, seq); err != nil {
return err return err
} }
keys = append(keys, s.getSeqUserMinSeqKey(conversationID, userID)) keys = append(keys, s.getSeqUserMinSeqKey(conversationID, userID))
@ -96,7 +96,7 @@ func (s *seqUserCacheRedis) SetMinSeqs(ctx context.Context, userID string, seqs
return DeleteCacheBySlot(ctx, s.rocks, keys) return DeleteCacheBySlot(ctx, s.rocks, keys)
} }
func (s *seqUserCacheRedis) setRedisReadSeqs(ctx context.Context, userID string, seqs map[string]int64) error { func (s *seqUserCacheRedis) setUserRedisReadSeqs(ctx context.Context, userID string, seqs map[string]int64) error {
keys := make([]string, 0, len(seqs)) keys := make([]string, 0, len(seqs))
keySeq := make(map[string]int64) keySeq := make(map[string]int64)
for conversationID, seq := range seqs { for conversationID, seq := range seqs {
@ -121,16 +121,16 @@ func (s *seqUserCacheRedis) setRedisReadSeqs(ctx context.Context, userID string,
return nil return nil
} }
func (s *seqUserCacheRedis) SetReadSeqs(ctx context.Context, userID string, seqs map[string]int64) error { func (s *seqUserCacheRedis) SetUserReadSeqs(ctx context.Context, userID string, seqs map[string]int64) error {
if len(seqs) == 0 { if len(seqs) == 0 {
return nil return nil
} }
if err := s.setRedisReadSeqs(ctx, userID, seqs); err != nil { if err := s.setUserRedisReadSeqs(ctx, userID, seqs); err != nil {
return err return err
} }
for conversationID, seq := range seqs { for conversationID, seq := range seqs {
if seq%s.readSeqWriteRatio == 0 { if seq%s.readSeqWriteRatio == 0 {
if err := s.mgo.SetReadSeq(ctx, conversationID, userID, seq); err != nil { if err := s.mgo.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil {
return err return err
} }
} }
@ -138,13 +138,13 @@ func (s *seqUserCacheRedis) SetReadSeqs(ctx context.Context, userID string, seqs
return nil return nil
} }
func (s *seqUserCacheRedis) GetReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) { func (s *seqUserCacheRedis) GetUserReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
res, err := batchGetCache2(ctx, s.rocks, s.readExpireTime, conversationIDs, func(conversationID string) string { res, err := batchGetCache2(ctx, s.rocks, s.readExpireTime, conversationIDs, func(conversationID string) string {
return s.getSeqUserReadSeqKey(conversationID, userID) return s.getSeqUserReadSeqKey(conversationID, userID)
}, func(v *readSeqModel) string { }, func(v *readSeqModel) string {
return v.ConversationID return v.ConversationID
}, func(ctx context.Context, conversationIDs []string) ([]*readSeqModel, error) { }, func(ctx context.Context, conversationIDs []string) ([]*readSeqModel, error) {
seqs, err := s.mgo.GetReadSeqs(ctx, userID, conversationIDs) seqs, err := s.mgo.GetUserReadSeqs(ctx, userID, conversationIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }

@ -3,13 +3,13 @@ package cache
import "context" import "context"
type SeqUser interface { type SeqUser interface {
GetMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error) GetUserMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error SetUserMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error
GetMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) GetUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error SetUserMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error
GetReadSeq(ctx context.Context, conversationID string, userID string) (int64, error) GetUserReadSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error
SetMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error SetUserMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error
SetReadSeqs(ctx context.Context, userID string, seqs map[string]int64) error SetUserReadSeqs(ctx context.Context, userID string, seqs map[string]int64) error
GetReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) GetUserReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error)
} }

@ -334,7 +334,7 @@ func (db *commonMsgDatabase) DeleteMessagesFromCache(ctx context.Context, conver
func (db *commonMsgDatabase) setHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error { func (db *commonMsgDatabase) setHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error {
for userID, seq := range userSeqMap { for userID, seq := range userSeqMap {
if err := db.seqUser.SetReadSeq(ctx, conversationID, userID, seq); err != nil { if err := db.seqUser.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil {
return err return err
} }
} }
@ -498,7 +498,7 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID strin
// "userMinSeq" can be set as the same value as the conversation's "maxSeq" at the moment they join the group. // "userMinSeq" can be set as the same value as the conversation's "maxSeq" at the moment they join the group.
// This ensures that their message retrieval starts from the point they joined. // This ensures that their message retrieval starts from the point they joined.
func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num, userMaxSeq int64) (int64, int64, []*sdkws.MsgData, error) { func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num, userMaxSeq int64) (int64, int64, []*sdkws.MsgData, error) {
userMinSeq, err := db.seqUser.GetMinSeq(ctx, conversationID, userID) userMinSeq, err := db.seqUser.GetUserMinSeq(ctx, conversationID, userID)
if err != nil && errs.Unwrap(err) != redis.Nil { if err != nil && errs.Unwrap(err) != redis.Nil {
return 0, 0, nil, err return 0, 0, nil, err
} }
@ -576,7 +576,7 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin
} }
func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (int64, int64, []*sdkws.MsgData, error) { func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (int64, int64, []*sdkws.MsgData, error) {
userMinSeq, err := db.seqUser.GetMinSeq(ctx, conversationID, userID) userMinSeq, err := db.seqUser.GetUserMinSeq(ctx, conversationID, userID)
if err != nil && errs.Unwrap(err) != redis.Nil { if err != nil && errs.Unwrap(err) != redis.Nil {
return 0, 0, nil, err return 0, 0, nil, err
} }
@ -674,12 +674,12 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string
log.ZDebug(ctx, "UserMsgsDestruct", "conversationID", conversationID, "userID", userID, "seqs", seqs) log.ZDebug(ctx, "UserMsgsDestruct", "conversationID", conversationID, "userID", userID, "seqs", seqs)
if len(seqs) > 0 { if len(seqs) > 0 {
userMinSeq := seqs[len(seqs)-1] + 1 userMinSeq := seqs[len(seqs)-1] + 1
currentUserMinSeq, err := db.seqUser.GetMinSeq(ctx, conversationID, userID) currentUserMinSeq, err := db.seqUser.GetUserMinSeq(ctx, conversationID, userID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if currentUserMinSeq < userMinSeq { if currentUserMinSeq < userMinSeq {
if err := db.seqUser.SetMinSeq(ctx, conversationID, userID, userMinSeq); err != nil { if err := db.seqUser.SetUserMinSeq(ctx, conversationID, userID, userMinSeq); err != nil {
return nil, err return nil, err
} }
} }
@ -794,23 +794,23 @@ func (db *commonMsgDatabase) SetMinSeqs(ctx context.Context, seqs map[string]int
} }
func (db *commonMsgDatabase) SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error { func (db *commonMsgDatabase) SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error {
return db.seqUser.SetMinSeqs(ctx, userID, seqs) return db.seqUser.SetUserMinSeqs(ctx, userID, seqs)
} }
func (db *commonMsgDatabase) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error { func (db *commonMsgDatabase) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error {
return db.seqUser.SetReadSeqs(ctx, userID, hasReadSeqs) return db.seqUser.SetUserReadSeqs(ctx, userID, hasReadSeqs)
} }
func (db *commonMsgDatabase) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error { func (db *commonMsgDatabase) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
return db.seqUser.SetReadSeq(ctx, conversationID, userID, hasReadSeq) return db.seqUser.SetUserReadSeq(ctx, conversationID, userID, hasReadSeq)
} }
func (db *commonMsgDatabase) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) { func (db *commonMsgDatabase) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
return db.seqUser.GetReadSeqs(ctx, userID, conversationIDs) return db.seqUser.GetUserReadSeqs(ctx, userID, conversationIDs)
} }
func (db *commonMsgDatabase) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) { func (db *commonMsgDatabase) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) {
return db.seqUser.GetReadSeq(ctx, conversationID, userID) return db.seqUser.GetUserReadSeq(ctx, conversationID, userID)
} }
func (db *commonMsgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error { func (db *commonMsgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error {

@ -68,27 +68,27 @@ func (s *seqUserMongo) getSeq(ctx context.Context, conversationID string, userID
} }
} }
func (s *seqUserMongo) GetMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error) { func (s *seqUserMongo) GetUserMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return s.getSeq(ctx, conversationID, userID, "max_seq") return s.getSeq(ctx, conversationID, userID, "max_seq")
} }
func (s *seqUserMongo) SetMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error { func (s *seqUserMongo) SetUserMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
return s.setSeq(ctx, conversationID, userID, seq, "max_seq") return s.setSeq(ctx, conversationID, userID, seq, "max_seq")
} }
func (s *seqUserMongo) GetMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) { func (s *seqUserMongo) GetUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return s.getSeq(ctx, conversationID, userID, "min_seq") return s.getSeq(ctx, conversationID, userID, "min_seq")
} }
func (s *seqUserMongo) SetMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error { func (s *seqUserMongo) SetUserMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
return s.setSeq(ctx, conversationID, userID, seq, "min_seq") return s.setSeq(ctx, conversationID, userID, seq, "min_seq")
} }
func (s *seqUserMongo) GetReadSeq(ctx context.Context, conversationID string, userID string) (int64, error) { func (s *seqUserMongo) GetUserReadSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return s.getSeq(ctx, conversationID, userID, "read_seq") return s.getSeq(ctx, conversationID, userID, "read_seq")
} }
func (s *seqUserMongo) GetReadSeqs(ctx context.Context, userID string, conversationID []string) (map[string]int64, error) { func (s *seqUserMongo) GetUserReadSeqs(ctx context.Context, userID string, conversationID []string) (map[string]int64, error) {
if len(conversationID) == 0 { if len(conversationID) == 0 {
return map[string]int64{}, nil return map[string]int64{}, nil
} }
@ -105,6 +105,6 @@ func (s *seqUserMongo) GetReadSeqs(ctx context.Context, userID string, conversat
return res, nil return res, nil
} }
func (s *seqUserMongo) SetReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error { func (s *seqUserMongo) SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
return s.setSeq(ctx, conversationID, userID, seq, "read_seq") return s.setSeq(ctx, conversationID, userID, seq, "read_seq")
} }

@ -3,11 +3,11 @@ package database
import "context" import "context"
type SeqUser interface { type SeqUser interface {
GetMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error) GetUserMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error SetUserMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error
GetMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) GetUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error SetUserMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error
GetReadSeq(ctx context.Context, conversationID string, userID string) (int64, error) GetUserReadSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error
GetReadSeqs(ctx context.Context, userID string, conversationID []string) (map[string]int64, error) GetUserReadSeqs(ctx context.Context, userID string, conversationID []string) (map[string]int64, error)
} }

@ -130,14 +130,14 @@ func Main(conf string, del time.Duration) error {
if err != nil { if err != nil {
return 0, err return 0, err
} }
return uSeq.GetReadSeq(ctx, conversationID, userID) return uSeq.GetUserReadSeq(ctx, conversationID, userID)
}, },
SetSeq: func(ctx context.Context, id string, seq int64) error { SetSeq: func(ctx context.Context, id string, seq int64) error {
conversationID, userID, err := uSpitHasReadSeq(id) conversationID, userID, err := uSpitHasReadSeq(id)
if err != nil { if err != nil {
return err return err
} }
return uSeq.SetReadSeq(ctx, conversationID, userID, seq) return uSeq.SetUserReadSeq(ctx, conversationID, userID, seq)
}, },
}, },
{ {
@ -147,14 +147,14 @@ func Main(conf string, del time.Duration) error {
if err != nil { if err != nil {
return 0, err return 0, err
} }
return uSeq.GetMinSeq(ctx, conversationID, userID) return uSeq.GetUserMinSeq(ctx, conversationID, userID)
}, },
SetSeq: func(ctx context.Context, id string, seq int64) error { SetSeq: func(ctx context.Context, id string, seq int64) error {
conversationID, userID, err := uSpitConversationUserMinSeq(id) conversationID, userID, err := uSpitConversationUserMinSeq(id)
if err != nil { if err != nil {
return err return err
} }
return uSeq.SetMinSeq(ctx, conversationID, userID, seq) return uSeq.SetUserMinSeq(ctx, conversationID, userID, seq)
}, },
}, },
} }

Loading…
Cancel
Save