fix: user seq bug (#2442)

* fix: GroupApplicationAcceptedNotification

* fix: GroupApplicationAcceptedNotification

* fix: NotificationUserInfoUpdate

* cicd: robot automated Change

* fix: component

* fix: getConversationInfo

* feat: cron task

* feat: cron task

* feat: cron task

* feat: cron task

* feat: cron task

* fix: minio config url recognition error

* new mongo

* new mongo

* new mongo

* new mongo

* new mongo

* new mongo

* new mongo

* new mongo

* friend incr sync

* friend incr sync

* friend incr sync

* friend incr sync

* friend incr sync

* mage

* optimization version log

* optimization version log

* sync

* sync

* sync

* group sync

* sync option

* sync option

* refactor: replace `friend` package with `realtion`.

* refactor: update lastest commit to relation.

* sync option

* sync option

* sync option

* sync

* sync

* go.mod

* seq

* update: go mod

* refactor: change incremental to full

* feat: get full friend user ids

* feat: api and config

* seq

* group version

* merge

* seq

* seq

* seq

* fix: sort by id avoid unstable sort friends.

* group

* group

* group

* fix: sort by id avoid unstable sort friends.

* fix: sort by id avoid unstable sort friends.

* fix: sort by id avoid unstable sort friends.

* user version

* seq

* seq

* seq user

* user online

* implement minio expire delete.

* user online

* config

* fix

* fix

* implement minio expire delete logic.

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* feat: implement scheduled delete outdated object in minio.

* update gomake version

* update gomake version

* implement FindExpires pagination.

* remove unnesseary incr.

* fix uncorrect args call.

* online push

* online push

* online push

* resolving conflicts

* resolving conflicts

* test

* api prommetrics

* api prommetrics

* api prommetrics

* api prommetrics

* api prommetrics

* rpc prommetrics

* rpc prommetrics

* online status

* online status

* online status

* online status

* sub

* conversation version incremental

* merge seq

* merge online

* merge online

* merge online

* merge seq

* GetOwnerConversation

* fix: change incremental syncer router name.

* rockscache batch get

* rockscache seq batch get

* fix: GetMsgDocModelByIndex bug

* update go.mod

* update go.mod

* merge

* feat: prometheus

* feat: prometheus

* group member sort

* sub

* sub

* fix: seq conversion bug

* fix: redis pipe exec

* sort version

* sort version

* sort version

* remove old version online subscription

* remove old version online subscription

* version log index

* version log index

* batch push

* batch push

* seq void filling

* fix: batchGetMaxSeq

* fix: batchGetMaxSeq

* cache db error log

* 111

---------

Co-authored-by: withchao <withchao@users.noreply.github.com>
Co-authored-by: Monet Lee <monet_lee@163.com>
Co-authored-by: OpenIM-Gordon <46924906+FGadvancer@users.noreply.github.com>
Co-authored-by: icey-yu <1186114839@qq.com>
pull/2444/head v3.8.0-rc.0
chao 4 months ago committed by GitHub
parent 80b332cb82
commit ebdc91a966
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"github.com/dtm-labs/rockscache"
"github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9"
"golang.org/x/sync/singleflight"
"time"
@ -49,6 +50,7 @@ func batchGetCache2[K comparable, V any](ctx context.Context, rcClient *rockscac
}
values, err := fn(ctx, queryIds)
if err != nil {
log.ZError(ctx, "batchGetCache query database failed", err, "keys", keys, "queryIds", queryIds)
return nil, err
}
if len(values) == 0 {

@ -118,6 +118,7 @@ func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key strin
v, err := rcClient.Fetch2(ctx, key, expire, func() (s string, err error) {
t, err = fn(ctx)
if err != nil {
log.ZError(ctx, "getCache query database failed", err, "key", key)
return "", err
}
bs, err := json.Marshal(t)

@ -44,38 +44,38 @@ func (s *seqUserCacheRedis) getSeqUserReadSeqKey(conversationID string, userID s
return cachekey.GetSeqUserReadSeqKey(conversationID, userID)
}
func (s *seqUserCacheRedis) GetMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
func (s *seqUserCacheRedis) GetUserMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return getCache(ctx, s.rocks, s.getSeqUserMaxSeqKey(conversationID, userID), s.expireTime, func(ctx context.Context) (int64, error) {
return s.mgo.GetMaxSeq(ctx, conversationID, userID)
return s.mgo.GetUserMaxSeq(ctx, conversationID, userID)
})
}
func (s *seqUserCacheRedis) SetMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
if err := s.mgo.SetMaxSeq(ctx, conversationID, userID, seq); err != nil {
func (s *seqUserCacheRedis) SetUserMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
if err := s.mgo.SetUserMaxSeq(ctx, conversationID, userID, seq); err != nil {
return err
}
return s.rocks.TagAsDeleted2(ctx, s.getSeqUserMaxSeqKey(conversationID, userID))
}
func (s *seqUserCacheRedis) GetMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
func (s *seqUserCacheRedis) GetUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return getCache(ctx, s.rocks, s.getSeqUserMinSeqKey(conversationID, userID), s.expireTime, func(ctx context.Context) (int64, error) {
return s.mgo.GetMaxSeq(ctx, conversationID, userID)
return s.mgo.GetUserMinSeq(ctx, conversationID, userID)
})
}
func (s *seqUserCacheRedis) SetMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
return s.SetMinSeqs(ctx, userID, map[string]int64{conversationID: seq})
func (s *seqUserCacheRedis) SetUserMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
return s.SetUserMinSeqs(ctx, userID, map[string]int64{conversationID: seq})
}
func (s *seqUserCacheRedis) GetReadSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
func (s *seqUserCacheRedis) GetUserReadSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return getCache(ctx, s.rocks, s.getSeqUserReadSeqKey(conversationID, userID), s.readExpireTime, func(ctx context.Context) (int64, error) {
return s.mgo.GetMaxSeq(ctx, conversationID, userID)
return s.mgo.GetUserReadSeq(ctx, conversationID, userID)
})
}
func (s *seqUserCacheRedis) SetReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
func (s *seqUserCacheRedis) SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
if seq%s.readSeqWriteRatio == 0 {
if err := s.mgo.SetReadSeq(ctx, conversationID, userID, seq); err != nil {
if err := s.mgo.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil {
return err
}
}
@ -85,10 +85,10 @@ func (s *seqUserCacheRedis) SetReadSeq(ctx context.Context, conversationID strin
return nil
}
func (s *seqUserCacheRedis) SetMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error {
func (s *seqUserCacheRedis) SetUserMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error {
keys := make([]string, 0, len(seqs))
for conversationID, seq := range seqs {
if err := s.mgo.SetMinSeq(ctx, conversationID, userID, seq); err != nil {
if err := s.mgo.SetUserMinSeq(ctx, conversationID, userID, seq); err != nil {
return err
}
keys = append(keys, s.getSeqUserMinSeqKey(conversationID, userID))
@ -96,7 +96,7 @@ func (s *seqUserCacheRedis) SetMinSeqs(ctx context.Context, userID string, seqs
return DeleteCacheBySlot(ctx, s.rocks, keys)
}
func (s *seqUserCacheRedis) setRedisReadSeqs(ctx context.Context, userID string, seqs map[string]int64) error {
func (s *seqUserCacheRedis) setUserRedisReadSeqs(ctx context.Context, userID string, seqs map[string]int64) error {
keys := make([]string, 0, len(seqs))
keySeq := make(map[string]int64)
for conversationID, seq := range seqs {
@ -121,16 +121,16 @@ func (s *seqUserCacheRedis) setRedisReadSeqs(ctx context.Context, userID string,
return nil
}
func (s *seqUserCacheRedis) SetReadSeqs(ctx context.Context, userID string, seqs map[string]int64) error {
func (s *seqUserCacheRedis) SetUserReadSeqs(ctx context.Context, userID string, seqs map[string]int64) error {
if len(seqs) == 0 {
return nil
}
if err := s.setRedisReadSeqs(ctx, userID, seqs); err != nil {
if err := s.setUserRedisReadSeqs(ctx, userID, seqs); err != nil {
return err
}
for conversationID, seq := range seqs {
if seq%s.readSeqWriteRatio == 0 {
if err := s.mgo.SetReadSeq(ctx, conversationID, userID, seq); err != nil {
if err := s.mgo.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil {
return err
}
}
@ -138,13 +138,13 @@ func (s *seqUserCacheRedis) SetReadSeqs(ctx context.Context, userID string, seqs
return nil
}
func (s *seqUserCacheRedis) GetReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
func (s *seqUserCacheRedis) GetUserReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
res, err := batchGetCache2(ctx, s.rocks, s.readExpireTime, conversationIDs, func(conversationID string) string {
return s.getSeqUserReadSeqKey(conversationID, userID)
}, func(v *readSeqModel) string {
return v.ConversationID
}, func(ctx context.Context, conversationIDs []string) ([]*readSeqModel, error) {
seqs, err := s.mgo.GetReadSeqs(ctx, userID, conversationIDs)
seqs, err := s.mgo.GetUserReadSeqs(ctx, userID, conversationIDs)
if err != nil {
return nil, err
}

@ -3,13 +3,13 @@ package cache
import "context"
type SeqUser interface {
GetMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error
GetMinSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error
GetReadSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error
SetMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error
SetReadSeqs(ctx context.Context, userID string, seqs map[string]int64) error
GetReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error)
GetUserMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetUserMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error
GetUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetUserMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error
GetUserReadSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error
SetUserMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error
SetUserReadSeqs(ctx context.Context, userID string, seqs map[string]int64) error
GetUserReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error)
}

@ -334,7 +334,7 @@ func (db *commonMsgDatabase) DeleteMessagesFromCache(ctx context.Context, conver
func (db *commonMsgDatabase) setHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error {
for userID, seq := range userSeqMap {
if err := db.seqUser.SetReadSeq(ctx, conversationID, userID, seq); err != nil {
if err := db.seqUser.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil {
return err
}
}
@ -498,7 +498,7 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID strin
// "userMinSeq" can be set as the same value as the conversation's "maxSeq" at the moment they join the group.
// This ensures that their message retrieval starts from the point they joined.
func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num, userMaxSeq int64) (int64, int64, []*sdkws.MsgData, error) {
userMinSeq, err := db.seqUser.GetMinSeq(ctx, conversationID, userID)
userMinSeq, err := db.seqUser.GetUserMinSeq(ctx, conversationID, userID)
if err != nil && errs.Unwrap(err) != redis.Nil {
return 0, 0, nil, err
}
@ -576,7 +576,7 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin
}
func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (int64, int64, []*sdkws.MsgData, error) {
userMinSeq, err := db.seqUser.GetMinSeq(ctx, conversationID, userID)
userMinSeq, err := db.seqUser.GetUserMinSeq(ctx, conversationID, userID)
if err != nil && errs.Unwrap(err) != redis.Nil {
return 0, 0, nil, err
}
@ -674,12 +674,12 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string
log.ZDebug(ctx, "UserMsgsDestruct", "conversationID", conversationID, "userID", userID, "seqs", seqs)
if len(seqs) > 0 {
userMinSeq := seqs[len(seqs)-1] + 1
currentUserMinSeq, err := db.seqUser.GetMinSeq(ctx, conversationID, userID)
currentUserMinSeq, err := db.seqUser.GetUserMinSeq(ctx, conversationID, userID)
if err != nil {
return nil, err
}
if currentUserMinSeq < userMinSeq {
if err := db.seqUser.SetMinSeq(ctx, conversationID, userID, userMinSeq); err != nil {
if err := db.seqUser.SetUserMinSeq(ctx, conversationID, userID, userMinSeq); err != nil {
return nil, err
}
}
@ -794,23 +794,23 @@ func (db *commonMsgDatabase) SetMinSeqs(ctx context.Context, seqs map[string]int
}
func (db *commonMsgDatabase) SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error {
return db.seqUser.SetMinSeqs(ctx, userID, seqs)
return db.seqUser.SetUserMinSeqs(ctx, userID, seqs)
}
func (db *commonMsgDatabase) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error {
return db.seqUser.SetReadSeqs(ctx, userID, hasReadSeqs)
return db.seqUser.SetUserReadSeqs(ctx, userID, hasReadSeqs)
}
func (db *commonMsgDatabase) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
return db.seqUser.SetReadSeq(ctx, conversationID, userID, hasReadSeq)
return db.seqUser.SetUserReadSeq(ctx, conversationID, userID, hasReadSeq)
}
func (db *commonMsgDatabase) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
return db.seqUser.GetReadSeqs(ctx, userID, conversationIDs)
return db.seqUser.GetUserReadSeqs(ctx, userID, conversationIDs)
}
func (db *commonMsgDatabase) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) {
return db.seqUser.GetReadSeq(ctx, conversationID, userID)
return db.seqUser.GetUserReadSeq(ctx, conversationID, userID)
}
func (db *commonMsgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error {

@ -68,27 +68,27 @@ func (s *seqUserMongo) getSeq(ctx context.Context, conversationID string, userID
}
}
func (s *seqUserMongo) GetMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
func (s *seqUserMongo) GetUserMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return s.getSeq(ctx, conversationID, userID, "max_seq")
}
func (s *seqUserMongo) SetMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
func (s *seqUserMongo) SetUserMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
return s.setSeq(ctx, conversationID, userID, seq, "max_seq")
}
func (s *seqUserMongo) GetMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
func (s *seqUserMongo) GetUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return s.getSeq(ctx, conversationID, userID, "min_seq")
}
func (s *seqUserMongo) SetMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
func (s *seqUserMongo) SetUserMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
return s.setSeq(ctx, conversationID, userID, seq, "min_seq")
}
func (s *seqUserMongo) GetReadSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
func (s *seqUserMongo) GetUserReadSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return s.getSeq(ctx, conversationID, userID, "read_seq")
}
func (s *seqUserMongo) GetReadSeqs(ctx context.Context, userID string, conversationID []string) (map[string]int64, error) {
func (s *seqUserMongo) GetUserReadSeqs(ctx context.Context, userID string, conversationID []string) (map[string]int64, error) {
if len(conversationID) == 0 {
return map[string]int64{}, nil
}
@ -105,6 +105,6 @@ func (s *seqUserMongo) GetReadSeqs(ctx context.Context, userID string, conversat
return res, nil
}
func (s *seqUserMongo) SetReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
func (s *seqUserMongo) SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
return s.setSeq(ctx, conversationID, userID, seq, "read_seq")
}

@ -3,11 +3,11 @@ package database
import "context"
type SeqUser interface {
GetMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error
GetMinSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error
GetReadSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error
GetReadSeqs(ctx context.Context, userID string, conversationID []string) (map[string]int64, error)
GetUserMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetUserMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error
GetUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetUserMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error
GetUserReadSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error
GetUserReadSeqs(ctx context.Context, userID string, conversationID []string) (map[string]int64, error)
}

@ -130,14 +130,14 @@ func Main(conf string, del time.Duration) error {
if err != nil {
return 0, err
}
return uSeq.GetReadSeq(ctx, conversationID, userID)
return uSeq.GetUserReadSeq(ctx, conversationID, userID)
},
SetSeq: func(ctx context.Context, id string, seq int64) error {
conversationID, userID, err := uSpitHasReadSeq(id)
if err != nil {
return err
}
return uSeq.SetReadSeq(ctx, conversationID, userID, seq)
return uSeq.SetUserReadSeq(ctx, conversationID, userID, seq)
},
},
{
@ -147,14 +147,14 @@ func Main(conf string, del time.Duration) error {
if err != nil {
return 0, err
}
return uSeq.GetMinSeq(ctx, conversationID, userID)
return uSeq.GetUserMinSeq(ctx, conversationID, userID)
},
SetSeq: func(ctx context.Context, id string, seq int64) error {
conversationID, userID, err := uSpitConversationUserMinSeq(id)
if err != nil {
return err
}
return uSeq.SetMinSeq(ctx, conversationID, userID, seq)
return uSeq.SetUserMinSeq(ctx, conversationID, userID, seq)
},
},
}

Loading…
Cancel
Save