diff --git a/go.mod b/go.mod index 8e06e503f..c3d69b0fb 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.65 + github.com/openimsdk/protocol v0.0.69-alpha.1 github.com/openimsdk/tools v0.0.49-alpha.23 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.18.0 @@ -177,3 +177,7 @@ require ( golang.org/x/crypto v0.21.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect ) + +//replace ( +// github.com/openimsdk/protocol => /Users/chao/Desktop/project/protocol +//) diff --git a/go.sum b/go.sum index a80b2fb0a..908c15c87 100644 --- a/go.sum +++ b/go.sum @@ -286,8 +286,8 @@ github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJp0= github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.65 h1:SPT9qyUsFRTTKSKb/FjpS+xr6sxz/Kbnu+su1bxYagc= -github.com/openimsdk/protocol v0.0.65/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/protocol v0.0.69-alpha.1 h1:l/PN8mwmh5O7PRoaiMZvey+hUxyuNNnMgPGKOfEMOKs= +github.com/openimsdk/protocol v0.0.69-alpha.1/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/tools v0.0.49-alpha.23 h1:/KkJ7vfx8FAoJhq3veH9PWnxbSkEf+dTSshvDrHBR38= github.com/openimsdk/tools v0.0.49-alpha.23/go.mod h1:g7mkHXYUPi0/8aAX8VPMHpnb3hqdV69Jph+bXOGvvNM= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= diff --git a/internal/rpc/friend/convert.go b/internal/rpc/friend/convert.go new file mode 100644 index 000000000..aa6895284 --- /dev/null +++ b/internal/rpc/friend/convert.go @@ -0,0 +1,26 @@ +package friend + +import ( + relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" + "github.com/openimsdk/protocol/friend" + "github.com/openimsdk/tools/utils/datautil" +) + +func friendDB2PB(db *relationtb.FriendModel) *friend.FriendInfo { + return &friend.FriendInfo{ + OwnerUserID: db.OwnerUserID, + FriendUserID: db.FriendUserID, + FriendNickname: db.FriendNickname, + FriendFaceURL: db.FriendFaceURL, + Remark: db.Remark, + CreateTime: db.CreateTime.UnixMilli(), + AddSource: db.AddSource, + OperatorUserID: db.OperatorUserID, + Ex: db.Ex, + IsPinned: db.IsPinned, + } +} + +func friendsDB2PB(db []*relationtb.FriendModel) []*friend.FriendInfo { + return datautil.Slice(db, friendDB2PB) +} diff --git a/internal/rpc/friend/sync.go b/internal/rpc/friend/sync.go new file mode 100644 index 000000000..1e74cbc0d --- /dev/null +++ b/internal/rpc/friend/sync.go @@ -0,0 +1,65 @@ +package friend + +import ( + "context" + "crypto/md5" + "encoding/binary" + "encoding/json" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/dataver" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" + pbfriend "github.com/openimsdk/protocol/friend" +) + +func (s *friendServer) SearchFriends(ctx context.Context, req *pbfriend.SearchFriendsReq) (*pbfriend.SearchFriendsResp, error) { + //TODO implement me + panic("implement me") +} + +func (s *friendServer) sortFriendUserIDsHash(userIDs []string) uint64 { + data, _ := json.Marshal(userIDs) + sum := md5.Sum(data) + return binary.BigEndian.Uint64(sum[:]) +} + +func (s *friendServer) IncrSyncFriends(ctx context.Context, req *pbfriend.IncrSyncFriendsReq) (*pbfriend.IncrSyncFriendsResp, error) { + var limit int + if req.Version > 0 { + limit = s.config.RpcConfig.FriendSyncCount + } + incrVer, err := s.friendDatabase.FindFriendIncrVersion(ctx, req.UserID, uint(req.Version), limit) + if err != nil { + return nil, err + } + sortUserIDs, err := s.friendDatabase.FindSortFriendUserIDs(ctx, req.UserID) + if err != nil { + return nil, err + } + if len(sortUserIDs) == 0 { + return &pbfriend.IncrSyncFriendsResp{ + Version: uint64(incrVer.Version), + Full: true, + SyncCount: uint32(s.config.RpcConfig.FriendSyncCount), + }, nil + } + var changes []*relation.FriendModel + res := dataver.NewSyncResult(incrVer, sortUserIDs) + if len(res.Changes) > 0 { + changes, err = s.friendDatabase.FindFriendsWithError(ctx, req.UserID, res.Changes) + if err != nil { + return nil, err + } + } + calcHash := s.sortFriendUserIDsHash(sortUserIDs) + if calcHash == req.IdHash { + sortUserIDs = nil + } + return &pbfriend.IncrSyncFriendsResp{ + Version: uint64(res.Version), + Full: res.Full, + SyncCount: uint32(s.config.RpcConfig.FriendSyncCount), + SortUserIdHash: calcHash, + SortUserIds: sortUserIDs, + DeleteUserIds: res.DeleteEID, + Changes: friendsDB2PB(changes), + }, nil +} diff --git a/pkg/common/cachekey/friend.go b/pkg/common/cachekey/friend.go index 9691b1f5c..6a217bdef 100644 --- a/pkg/common/cachekey/friend.go +++ b/pkg/common/cachekey/friend.go @@ -14,11 +14,14 @@ package cachekey +import "strconv" + const ( - FriendIDsKey = "FRIEND_IDS:" - TwoWayFriendsIDsKey = "COMMON_FRIENDS_IDS:" - FriendKey = "FRIEND_INFO:" - IsFriendKey = "IS_FRIEND:" // local cache key + FriendIDsKey = "FRIEND_IDS:" + TwoWayFriendsIDsKey = "COMMON_FRIENDS_IDS:" + FriendKey = "FRIEND_INFO:" + IsFriendKey = "IS_FRIEND:" // local cache key + FriendSyncSortUserIDsKey = "FRIEND_SYNC_SORT_USER_IDS:" ) func GetFriendIDsKey(ownerUserID string) string { @@ -36,3 +39,7 @@ func GetFriendKey(ownerUserID, friendUserID string) string { func GetIsFriendKey(possibleFriendUserID, userID string) string { return IsFriendKey + possibleFriendUserID + "-" + userID } + +func GetFriendSyncSortUserIDsKey(ownerUserID string, count int) string { + return FriendSyncSortUserIDsKey + strconv.Itoa(count) + ":" + ownerUserID +} diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 12c4f7f78..881e05598 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -244,7 +244,8 @@ type Friend struct { ListenIP string `mapstructure:"listenIP"` Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` - Prometheus Prometheus `mapstructure:"prometheus"` + Prometheus Prometheus `mapstructure:"prometheus"` + FriendSyncCount int `mapstructure:"friendSyncCount"` } type Group struct { diff --git a/pkg/common/db/cache/friend.go b/pkg/common/db/cache/friend.go index 73fe5ea69..4a51a8c17 100644 --- a/pkg/common/db/cache/friend.go +++ b/pkg/common/db/cache/friend.go @@ -16,6 +16,7 @@ package cache import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/dataver" "time" "github.com/dtm-labs/rockscache" @@ -41,12 +42,19 @@ type FriendCache interface { GetFriendIDs(ctx context.Context, ownerUserID string) (friendIDs []string, err error) // Called when friendID list changed DelFriendIDs(ownerUserID ...string) FriendCache + + DelSortFriendUserIDs(ownerUserIDs ...string) FriendCache + // Get single friendInfo from the cache GetFriend(ctx context.Context, ownerUserID, friendUserID string) (friend *relationtb.FriendModel, err error) // Delete friend when friend info changed DelFriend(ownerUserID, friendUserID string) FriendCache // Delete friends when friends' info changed DelFriends(ownerUserID string, friendUserIDs []string) FriendCache + + FindSortFriendUserIDs(ctx context.Context, ownerUserID string) ([]string, error) + + FindFriendIncrVersion(ctx context.Context, ownerUserID string, version uint, limit int) (*dataver.WriteLog, error) } // FriendCacheRedis is an implementation of the FriendCache interface using Redis. @@ -55,6 +63,7 @@ type FriendCacheRedis struct { friendDB relationtb.FriendModelInterface expireTime time.Duration rcClient *rockscache.Client + syncCount int } // NewFriendCacheRedis creates a new instance of FriendCacheRedis. @@ -89,6 +98,10 @@ func (f *FriendCacheRedis) getFriendIDsKey(ownerUserID string) string { return cachekey.GetFriendIDsKey(ownerUserID) } +func (f *FriendCacheRedis) getFriendSyncSortUserIDsKey(ownerUserID string) string { + return cachekey.GetFriendSyncSortUserIDsKey(ownerUserID, f.syncCount) +} + // getTwoWayFriendsIDsKey returns the key for storing two-way friend IDs in the cache. func (f *FriendCacheRedis) getTwoWayFriendsIDsKey(ownerUserID string) string { return cachekey.GetTwoWayFriendsIDsKey(ownerUserID) @@ -118,6 +131,16 @@ func (f *FriendCacheRedis) DelFriendIDs(ownerUserIDs ...string) FriendCache { return newGroupCache } +func (f *FriendCacheRedis) DelSortFriendUserIDs(ownerUserIDs ...string) FriendCache { + newGroupCache := f.NewCache() + keys := make([]string, 0, len(ownerUserIDs)) + for _, userID := range ownerUserIDs { + keys = append(keys, f.getFriendSyncSortUserIDsKey(userID)) + } + newGroupCache.AddKeys(keys...) + return newGroupCache +} + // GetTwoWayFriendIDs retrieves two-way friend IDs from the cache. func (f *FriendCacheRedis) GetTwoWayFriendIDs(ctx context.Context, ownerUserID string) (twoWayFriendIDs []string, err error) { friendIDs, err := f.GetFriendIDs(ctx, ownerUserID) @@ -172,3 +195,13 @@ func (f *FriendCacheRedis) DelFriends(ownerUserID string, friendUserIDs []string return newFriendCache } + +func (f *FriendCacheRedis) FindSortFriendUserIDs(ctx context.Context, ownerUserID string) ([]string, error) { + return getCache(ctx, f.rcClient, f.getFriendSyncSortUserIDsKey(ownerUserID), f.expireTime, func(ctx context.Context) ([]string, error) { + return f.friendDB.FindOwnerFriendUserIds(ctx, ownerUserID, f.syncCount) + }) +} + +func (f *FriendCacheRedis) FindFriendIncrVersion(ctx context.Context, ownerUserID string, version uint, limit int) (*dataver.WriteLog, error) { + return f.friendDB.FindIncrVersion(ctx, ownerUserID, version, limit) +} diff --git a/pkg/common/db/controller/friend.go b/pkg/common/db/controller/friend.go index 49136f228..49c0cb990 100644 --- a/pkg/common/db/controller/friend.go +++ b/pkg/common/db/controller/friend.go @@ -17,6 +17,7 @@ package controller import ( "context" "fmt" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/dataver" "time" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" @@ -75,6 +76,10 @@ type FriendDatabase interface { // UpdateFriends updates fields for friends UpdateFriends(ctx context.Context, ownerUserID string, friendUserIDs []string, val map[string]any) (err error) + + FindSortFriendUserIDs(ctx context.Context, ownerUserID string) ([]string, error) + + FindFriendIncrVersion(ctx context.Context, ownerUserID string, version uint, limit int) (*dataver.WriteLog, error) } type friendDatabase struct { @@ -173,7 +178,7 @@ func (f *friendDatabase) BecomeFriends(ctx context.Context, ownerUserID string, return err } newFriendIDs = append(newFriendIDs, ownerUserID) - cache = cache.DelFriendIDs(newFriendIDs...) + cache = cache.DelFriendIDs(newFriendIDs...).DelSortFriendUserIDs(ownerUserID) return cache.ExecDel(ctx) }) @@ -276,7 +281,7 @@ func (f *friendDatabase) AgreeFriendRequest(ctx context.Context, friendRequest * return err } } - return f.cache.DelFriendIDs(friendRequest.ToUserID, friendRequest.FromUserID).ExecDel(ctx) + return f.cache.DelFriendIDs(friendRequest.ToUserID, friendRequest.FromUserID).DelSortFriendUserIDs(friendRequest.ToUserID, friendRequest.FromUserID).ExecDel(ctx) }) } @@ -285,7 +290,7 @@ func (f *friendDatabase) Delete(ctx context.Context, ownerUserID string, friendU if err := f.friend.Delete(ctx, ownerUserID, friendUserIDs); err != nil { return err } - return f.cache.DelFriendIDs(append(friendUserIDs, ownerUserID)...).ExecDel(ctx) + return f.cache.DelFriendIDs(append(friendUserIDs, ownerUserID)...).DelSortFriendUserIDs(ownerUserID).ExecDel(ctx) } // UpdateRemark updates the remark for a friend. Zero value for remark is also supported. @@ -342,5 +347,13 @@ func (f *friendDatabase) UpdateFriends(ctx context.Context, ownerUserID string, if err := f.friend.UpdateFriends(ctx, ownerUserID, friendUserIDs, val); err != nil { return err } - return f.cache.DelFriends(ownerUserID, friendUserIDs).ExecDel(ctx) + return f.cache.DelFriends(ownerUserID, friendUserIDs).DelSortFriendUserIDs(ownerUserID).ExecDel(ctx) +} + +func (f *friendDatabase) FindSortFriendUserIDs(ctx context.Context, ownerUserID string) ([]string, error) { + return f.cache.FindSortFriendUserIDs(ctx, ownerUserID) +} + +func (f *friendDatabase) FindFriendIncrVersion(ctx context.Context, ownerUserID string, version uint, limit int) (*dataver.WriteLog, error) { + return f.cache.FindFriendIncrVersion(ctx, ownerUserID, version, limit) } diff --git a/pkg/common/db/dataver/common.go b/pkg/common/db/dataver/common.go index bc367d795..c9a4e0419 100644 --- a/pkg/common/db/dataver/common.go +++ b/pkg/common/db/dataver/common.go @@ -2,12 +2,14 @@ package dataver import ( "context" + "errors" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/utils/datautil" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" "time" ) @@ -192,7 +194,21 @@ func (l *logModel) writeLogBatch(ctx context.Context, dId string, eIds []string, return mongoutil.UpdateMany(ctx, l.coll, filter, pipeline) } +func (l *logModel) findDoc(ctx context.Context, dId string) (*WriteLog, error) { + res, err := mongoutil.FindOne[*WriteLog](ctx, l.coll, bson.M{"d_id": dId}, options.FindOne().SetProjection(bson.M{"logs": 0})) + if err == nil { + return res, nil + } else if errors.Is(err, mongo.ErrNoDocuments) { + return &WriteLog{}, nil + } else { + return nil, err + } +} + func (l *logModel) FindChangeLog(ctx context.Context, dId string, version uint, limit int) (*WriteLog, error) { + if version == 0 && limit == 0 { + return l.findDoc(ctx, dId) + } pipeline := []bson.M{ { "$match": bson.M{ diff --git a/pkg/common/db/dataver/result.go b/pkg/common/db/dataver/result.go index b6e6d22b4..915a618d0 100644 --- a/pkg/common/db/dataver/result.go +++ b/pkg/common/db/dataver/result.go @@ -1,32 +1,31 @@ package dataver -type SyncResult[T any] struct { +import "github.com/openimsdk/tools/utils/datautil" + +type SyncResult struct { Version uint DeleteEID []string - Changes []T + Changes []string Full bool } -func NewSyncResult[T any](wl *WriteLog, find func(eIds []string) ([]T, error)) (*SyncResult[T], error) { +func NewSyncResult(wl *WriteLog, fullIDs []string) *SyncResult { var findEIDs []string - var res SyncResult[T] + var res SyncResult if wl.Full() { + res.Changes = fullIDs res.Full = true } else { + idSet := datautil.SliceSet(fullIDs) for _, l := range wl.Logs { if l.Deleted { res.DeleteEID = append(res.DeleteEID, l.EID) } else { - findEIDs = append(findEIDs, l.EID) + if _, ok := idSet[l.EID]; ok { + findEIDs = append(findEIDs, l.EID) + } } } } - if res.Full || len(findEIDs) > 0 { - var err error - res.Changes, err = find(findEIDs) - if err != nil { - return nil, err - } - } - return &res, nil + return &res } diff --git a/pkg/common/db/dataver/todo.go b/pkg/common/db/dataver/todo.go index 69b979fe5..cd2be14f0 100644 --- a/pkg/common/db/dataver/todo.go +++ b/pkg/common/db/dataver/todo.go @@ -19,7 +19,7 @@ UserIDs 顺序 全量同步有标识,只返回全量id 变更记录只包含id,不包括详细信息。 4.sdk通过变更记录,同步数据不一致进行重试。 - +5.先修改db,在自增版本号,外层加事务 diff --git a/pkg/common/db/mgo/friend.go b/pkg/common/db/mgo/friend.go index 78e74edd3..bca6873c2 100644 --- a/pkg/common/db/mgo/friend.go +++ b/pkg/common/db/mgo/friend.go @@ -16,7 +16,6 @@ package mgo import ( "context" - "errors" "github.com/openimsdk/open-im-server/v3/pkg/common/db/dataver" "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" @@ -55,7 +54,7 @@ func NewFriendMongo(db *mongo.Database) (relation.FriendModelInterface, error) { // Create inserts multiple friend records. func (f *FriendMgo) Create(ctx context.Context, friends []*relation.FriendModel) error { - return Success(func() error { + return IncrVersion(func() error { return mongoutil.InsertMany(ctx, f.coll, friends) }, func() error { mp := make(map[string][]string) @@ -77,7 +76,7 @@ func (f *FriendMgo) Delete(ctx context.Context, ownerUserID string, friendUserID "owner_user_id": ownerUserID, "friend_user_id": bson.M{"$in": friendUserIDs}, } - return Success(func() error { + return IncrVersion(func() error { return mongoutil.DeleteOne(ctx, f.coll, filter) }, func() error { return f.owner.WriteLog(ctx, ownerUserID, friendUserIDs, true) @@ -93,7 +92,7 @@ func (f *FriendMgo) UpdateByMap(ctx context.Context, ownerUserID string, friendU "owner_user_id": ownerUserID, "friend_user_id": friendUserID, } - return Success(func() error { + return IncrVersion(func() error { return mongoutil.UpdateOne(ctx, f.coll, filter, bson.M{"$set": args}, true) }, func() error { return f.owner.WriteLog(ctx, ownerUserID, []string{friendUserID}, false) @@ -146,7 +145,14 @@ func (f *FriendMgo) FindReversalFriends(ctx context.Context, friendUserID string // FindOwnerFriends retrieves a paginated list of friends for a given owner. func (f *FriendMgo) FindOwnerFriends(ctx context.Context, ownerUserID string, pagination pagination.Pagination) (int64, []*relation.FriendModel, error) { filter := bson.M{"owner_user_id": ownerUserID} - return mongoutil.FindPage[*relation.FriendModel](ctx, f.coll, filter, pagination) + opt := options.Find().SetSort(bson.A{bson.M{"friend_nickname": 1}, bson.M{"create_time": 1}}) + return mongoutil.FindPage[*relation.FriendModel](ctx, f.coll, filter, pagination, opt) +} + +func (f *FriendMgo) FindOwnerFriendUserIds(ctx context.Context, ownerUserID string, limit int) ([]string, error) { + filter := bson.M{"owner_user_id": ownerUserID} + opt := options.Find().SetProjection(bson.M{"_id": 0, "friend_user_id": 1}).SetSort(bson.A{bson.M{"friend_nickname": 1}, bson.M{"create_time": 1}}).SetLimit(int64(limit)) + return mongoutil.Find[string](ctx, f.coll, filter, opt) } // FindInWhoseFriends finds users who have added the specified user as a friend, with pagination. @@ -176,29 +182,33 @@ func (f *FriendMgo) UpdateFriends(ctx context.Context, ownerUserID string, frien // Create an update document update := bson.M{"$set": val} - return Success(func() error { + return IncrVersion(func() error { return mongoutil.Ignore(mongoutil.UpdateMany(ctx, f.coll, filter, update)) }, func() error { return f.owner.WriteLog(ctx, ownerUserID, friendUserIDs, false) }) } -func (f *FriendMgo) IncrSync(ctx context.Context, ownerUserID string, version uint, limit int) (*dataver.SyncResult[*relation.FriendModel], error) { - res, err := f.owner.FindChangeLog(ctx, ownerUserID, version, limit) - if err != nil { - return nil, err - } - return dataver.NewSyncResult[*relation.FriendModel](res, func(eIds []string) ([]*relation.FriendModel, error) { - if len(eIds) == 0 { - return nil, errors.New("todo") - } else { - return f.FindFriends(ctx, ownerUserID, eIds) - } - }) +func (f *FriendMgo) FindIncrVersion(ctx context.Context, ownerUserID string, version uint, limit int) (*dataver.WriteLog, error) { + return f.owner.FindChangeLog(ctx, ownerUserID, version, limit) } -func Success(fns ...func() error) error { - for _, fn := range fns { +//func (f *FriendMgo) IncrSync(ctx context.Context, ownerUserID string, version uint, limit int) (*dataver.SyncResult[*relation.FriendModel], error) { +// res, err := f.owner.FindChangeLog(ctx, ownerUserID, version, limit) +// if err != nil { +// return nil, err +// } +// return dataver.NewSyncResult[*relation.FriendModel](res, func(eIds []string) ([]*relation.FriendModel, error) { +// if len(eIds) == 0 { +// return nil, errors.New("todo") +// } else { +// return f.FindFriends(ctx, ownerUserID, eIds) +// } +// }) +//} + +func IncrVersion(dbs ...func() error) error { + for _, fn := range dbs { if err := fn(); err != nil { return err } diff --git a/pkg/common/db/table/relation/friend.go b/pkg/common/db/table/relation/friend.go index 4c84e773d..a28b2278d 100644 --- a/pkg/common/db/table/relation/friend.go +++ b/pkg/common/db/table/relation/friend.go @@ -16,6 +16,7 @@ package relation import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/dataver" "time" "github.com/openimsdk/tools/db/pagination" @@ -25,6 +26,8 @@ import ( type FriendModel struct { OwnerUserID string `bson:"owner_user_id"` FriendUserID string `bson:"friend_user_id"` + FriendNickname string `bson:"friend_nickname"` + FriendFaceURL string `bson:"friend_face_url"` Remark string `bson:"remark"` CreateTime time.Time `bson:"create_time"` AddSource int32 `bson:"add_source"` @@ -53,10 +56,14 @@ type FriendModelInterface interface { FindReversalFriends(ctx context.Context, friendUserID string, ownerUserIDs []string) (friends []*FriendModel, err error) // FindOwnerFriends retrieves a paginated list of friends for a given owner. FindOwnerFriends(ctx context.Context, ownerUserID string, pagination pagination.Pagination) (total int64, friends []*FriendModel, err error) + + FindOwnerFriendUserIds(ctx context.Context, ownerUserID string, limit int) ([]string, error) // FindInWhoseFriends finds users who have added the specified user as a friend, with pagination. FindInWhoseFriends(ctx context.Context, friendUserID string, pagination pagination.Pagination) (total int64, friends []*FriendModel, err error) // FindFriendUserIDs retrieves a list of friend user IDs for a given owner. FindFriendUserIDs(ctx context.Context, ownerUserID string) (friendUserIDs []string, err error) // UpdateFriends update friends' fields UpdateFriends(ctx context.Context, ownerUserID string, friendUserIDs []string, val map[string]any) (err error) + + FindIncrVersion(ctx context.Context, ownerUserID string, version uint, limit int) (*dataver.WriteLog, error) } diff --git a/pkg/common/listdemo2/common_test.go b/pkg/common/listdemo2/common_test.go index bcaf4a19a..c03cc31bd 100644 --- a/pkg/common/listdemo2/common_test.go +++ b/pkg/common/listdemo2/common_test.go @@ -2,10 +2,9 @@ package listdemo import ( "context" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" + "errors" + "github.com/openimsdk/tools/db/mongoutil" "testing" - "time" ) func Result[V any](val V, err error) V { @@ -22,20 +21,25 @@ func Check(err error) { } func TestName(t *testing.T) { - cli := Result(mongo.Connect(context.Background(), options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second))) - coll := cli.Database("openim_v3").Collection("friend_version") - _ = coll - //Result(coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ - // { - // Keys: map[string]int{"user_id": 1}, - // }, - // { - // Keys: map[string]int{"friends.friend_user_id": 1}, - // }, - //})) + cli := Result(mongoutil.NewMongoDB(context.Background(), &mongoutil.Config{Uri: "mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100", Database: "openim_v3"})) + + db := cli.GetDB() + tx := cli.GetTx() const num = 1 - lm := &LogModel{coll: coll} + lm := &LogModel{coll: db.Collection("friend_version")} + + err := tx.Transaction(context.Background(), func(ctx context.Context) error { + err := tx.Transaction(ctx, func(ctx context.Context) error { + return lm.WriteLogBatch(ctx, "100", []string{"1000", "2000"}, true) + }) + if err != nil { + t.Log("--------->") + return err + } + return errors.New("1234") + }) + t.Log(err) //start := time.Now() //eIds := make([]string, 0, num) @@ -47,11 +51,11 @@ func TestName(t *testing.T) { //t.Log(end.Sub(start)) // 509.962208ms //t.Log(end.Sub(start) / num) // 511.496µs - start := time.Now() - wll, err := lm.FindChangeLog(context.Background(), "100", 3, 100) - if err != nil { - panic(err) - } - t.Log(time.Since(start)) - t.Log(wll) + //start := time.Now() + //wll, err := lm.FindChangeLog(context.Background(), "100", 3, 100) + //if err != nil { + // panic(err) + //} + //t.Log(time.Since(start)) + //t.Log(wll) }