friend incr sync

pull/2336/head
withchao 1 year ago
parent 1f02bdc267
commit 8e37a417db

@ -178,6 +178,6 @@ require (
gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect
) )
//replace ( replace (
// github.com/openimsdk/protocol => /Users/chao/Desktop/project/protocol github.com/openimsdk/protocol => /Users/chao/Desktop/project/protocol
//) )

@ -90,6 +90,11 @@ func (o *FriendApi) GetFriendIDs(c *gin.Context) {
func (o *FriendApi) GetSpecifiedFriendsInfo(c *gin.Context) { func (o *FriendApi) GetSpecifiedFriendsInfo(c *gin.Context) {
a2r.Call(friend.FriendClient.GetSpecifiedFriendsInfo, o.Client, c) a2r.Call(friend.FriendClient.GetSpecifiedFriendsInfo, o.Client, c)
} }
func (o *FriendApi) UpdateFriends(c *gin.Context) { func (o *FriendApi) UpdateFriends(c *gin.Context) {
a2r.Call(friend.FriendClient.UpdateFriends, o.Client, c) a2r.Call(friend.FriendClient.UpdateFriends, o.Client, c)
} }
func (o *FriendApi) GetIncrementalFriends(c *gin.Context) {
a2r.Call(friend.FriendClient.GetIncrementalFriends, o.Client, c)
}

@ -86,6 +86,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
friendRouterGroup.POST("/get_friend_id", f.GetFriendIDs) friendRouterGroup.POST("/get_friend_id", f.GetFriendIDs)
friendRouterGroup.POST("/get_specified_friends_info", f.GetSpecifiedFriendsInfo) friendRouterGroup.POST("/get_specified_friends_info", f.GetSpecifiedFriendsInfo)
friendRouterGroup.POST("/update_friends", f.UpdateFriends) friendRouterGroup.POST("/update_friends", f.UpdateFriends)
friendRouterGroup.POST("/get_incremental_friends", f.GetIncrementalFriends)
} }
g := NewGroupApi(*groupRpc) g := NewGroupApi(*groupRpc)
groupRouterGroup := r.Group("/group") groupRouterGroup := r.Group("/group")

@ -5,6 +5,7 @@ import (
"crypto/md5" "crypto/md5"
"encoding/binary" "encoding/binary"
"encoding/json" "encoding/json"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/dataver" "github.com/openimsdk/open-im-server/v3/pkg/common/db/dataver"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
pbfriend "github.com/openimsdk/protocol/friend" pbfriend "github.com/openimsdk/protocol/friend"
@ -21,7 +22,10 @@ func (s *friendServer) sortFriendUserIDsHash(userIDs []string) uint64 {
return binary.BigEndian.Uint64(sum[:]) return binary.BigEndian.Uint64(sum[:])
} }
func (s *friendServer) IncrSyncFriends(ctx context.Context, req *pbfriend.IncrSyncFriendsReq) (*pbfriend.IncrSyncFriendsResp, error) { func (s *friendServer) GetIncrementalFriends(ctx context.Context, req *pbfriend.GetIncrementalFriendsReq) (*pbfriend.GetIncrementalFriendsResp, error) {
if err := authverify.CheckAccessV3(ctx, req.UserID, s.config.Share.IMAdminUserID); err != nil {
return nil, err
}
var limit int var limit int
if req.Version > 0 { if req.Version > 0 {
limit = s.config.RpcConfig.FriendSyncCount limit = s.config.RpcConfig.FriendSyncCount
@ -35,14 +39,15 @@ func (s *friendServer) IncrSyncFriends(ctx context.Context, req *pbfriend.IncrSy
return nil, err return nil, err
} }
if len(sortUserIDs) == 0 { if len(sortUserIDs) == 0 {
return &pbfriend.IncrSyncFriendsResp{ return &pbfriend.GetIncrementalFriendsResp{
Version: uint64(incrVer.Version), Version: uint64(incrVer.Version),
VersionID: dataver.VersionIDStr(incrVer.ID),
Full: true, Full: true,
SyncCount: uint32(s.config.RpcConfig.FriendSyncCount), SyncCount: uint32(s.config.RpcConfig.FriendSyncCount),
}, nil }, nil
} }
var changes []*relation.FriendModel var changes []*relation.FriendModel
res := dataver.NewSyncResult(incrVer, sortUserIDs) res := dataver.NewSyncResult(incrVer, sortUserIDs, req.VersionID)
if len(res.Changes) > 0 { if len(res.Changes) > 0 {
changes, err = s.friendDatabase.FindFriendsWithError(ctx, req.UserID, res.Changes) changes, err = s.friendDatabase.FindFriendsWithError(ctx, req.UserID, res.Changes)
if err != nil { if err != nil {
@ -53,8 +58,9 @@ func (s *friendServer) IncrSyncFriends(ctx context.Context, req *pbfriend.IncrSy
if calcHash == req.IdHash { if calcHash == req.IdHash {
sortUserIDs = nil sortUserIDs = nil
} }
return &pbfriend.IncrSyncFriendsResp{ return &pbfriend.GetIncrementalFriendsResp{
Version: uint64(res.Version), Version: uint64(res.Version),
VersionID: res.VersionID,
Full: res.Full, Full: res.Full,
SyncCount: uint32(s.config.RpcConfig.FriendSyncCount), SyncCount: uint32(s.config.RpcConfig.FriendSyncCount),
SortUserIdHash: calcHash, SortUserIdHash: calcHash,

@ -0,0 +1,16 @@
package group
import (
"context"
pbgroup "github.com/openimsdk/protocol/group"
)
func (s *groupServer) SearchGroupMember(ctx context.Context, req *pbgroup.SearchGroupMemberReq) (*pbgroup.SearchGroupMemberResp, error) {
//TODO implement me
panic("implement me")
}
func (s *groupServer) GetGroupMemberHash(ctx context.Context, req *pbgroup.GetGroupMemberHashReq) (*pbgroup.GetGroupMemberHashResp, error) {
//TODO implement me
panic("implement me")
}

@ -56,6 +56,11 @@ type userServer struct {
webhookClient *webhook.Client webhookClient *webhook.Client
} }
func (s *userServer) SearchUser(ctx context.Context, req *pbuser.SearchUserReq) (*pbuser.SearchUserResp, error) {
//TODO implement me
panic("implement me")
}
type Config struct { type Config struct {
RpcConfig config.User RpcConfig config.User
RedisConfig config.Redis RedisConfig config.Redis

@ -52,6 +52,27 @@ type Elem struct {
LastUpdate time.Time `bson:"last_update"` LastUpdate time.Time `bson:"last_update"`
} }
type tableWriteLog struct {
ID primitive.ObjectID `bson:"_id"`
DID string `bson:"d_id"`
Logs []Elem `bson:"logs"`
Version uint `bson:"version"`
Deleted uint `bson:"deleted"`
LastUpdate time.Time `bson:"last_update"`
}
func (t *tableWriteLog) WriteLog() *WriteLog {
return &WriteLog{
ID: t.ID,
DID: t.DID,
Logs: t.Logs,
Version: t.Version,
Deleted: t.Deleted,
LastUpdate: t.LastUpdate,
LogLen: 0,
}
}
type DataLog interface { type DataLog interface {
WriteLog(ctx context.Context, dId string, eIds []string, deleted bool) error WriteLog(ctx context.Context, dId string, eIds []string, deleted bool) error
FindChangeLog(ctx context.Context, dId string, version uint, limit int) (*WriteLog, error) FindChangeLog(ctx context.Context, dId string, version uint, limit int) (*WriteLog, error)
@ -94,7 +115,7 @@ func (l *logModel) WriteLog(ctx context.Context, dId string, eIds []string, dele
if res.MatchedCount > 0 { if res.MatchedCount > 0 {
return nil return nil
} }
if err := l.initDoc(ctx, dId, eIds, deleted, now); err == nil { if _, err := l.initDoc(ctx, dId, eIds, deleted, now); err == nil {
return nil return nil
} else if !mongo.IsDuplicateKeyError(err) { } else if !mongo.IsDuplicateKeyError(err) {
return err return err
@ -107,15 +128,9 @@ func (l *logModel) WriteLog(ctx context.Context, dId string, eIds []string, dele
return nil return nil
} }
func (l *logModel) initDoc(ctx context.Context, dId string, eIds []string, deleted bool, now time.Time) error { func (l *logModel) initDoc(ctx context.Context, dId string, eIds []string, deleted bool, now time.Time) (*tableWriteLog, error) {
type tableWriteLog struct {
DID string `bson:"d_id"`
Logs []Elem `bson:"logs"`
Version uint `bson:"version"`
Deleted uint `bson:"deleted"`
LastUpdate time.Time `bson:"last_update"`
}
wl := tableWriteLog{ wl := tableWriteLog{
ID: primitive.NewObjectID(),
DID: dId, DID: dId,
Logs: make([]Elem, 0, len(eIds)), Logs: make([]Elem, 0, len(eIds)),
Version: FirstVersion, Version: FirstVersion,
@ -131,12 +146,12 @@ func (l *logModel) initDoc(ctx context.Context, dId string, eIds []string, delet
}) })
} }
_, err := l.coll.InsertOne(ctx, &wl) _, err := l.coll.InsertOne(ctx, &wl)
return err return &wl, err
} }
func (l *logModel) writeLogBatch(ctx context.Context, dId string, eIds []string, deleted bool, now time.Time) (*mongo.UpdateResult, error) { func (l *logModel) writeLogBatch(ctx context.Context, dId string, eIds []string, deleted bool, now time.Time) (*mongo.UpdateResult, error) {
if len(eIds) == 0 { if eIds == nil {
return nil, errs.ErrArgs.WrapMsg("elem id is empty", "dId", dId) eIds = []string{}
} }
filter := bson.M{ filter := bson.M{
"d_id": dId, "d_id": dId,
@ -195,17 +210,25 @@ func (l *logModel) writeLogBatch(ctx context.Context, dId string, eIds []string,
} }
func (l *logModel) findDoc(ctx context.Context, dId string) (*WriteLog, error) { func (l *logModel) findDoc(ctx context.Context, dId string) (*WriteLog, error) {
res, err := mongoutil.FindOne[*WriteLog](ctx, l.coll, bson.M{"d_id": dId}, options.FindOne().SetProjection(bson.M{"logs": 0})) return mongoutil.FindOne[*WriteLog](ctx, l.coll, bson.M{"d_id": dId}, options.FindOne().SetProjection(bson.M{"logs": 0}))
if err == nil { }
return res, nil
} else if errors.Is(err, mongo.ErrNoDocuments) { func (l *logModel) FindChangeLog(ctx context.Context, dId string, version uint, limit int) (*WriteLog, error) {
return &WriteLog{}, nil if wl, err := l.findChangeLog(ctx, dId, version, limit); err == nil {
return wl, nil
} else if !errors.Is(err, mongo.ErrNoDocuments) {
return nil, err
}
if res, err := l.initDoc(ctx, dId, nil, false, time.Now()); err == nil {
return res.WriteLog(), nil
} else if mongo.IsDuplicateKeyError(err) {
return l.findChangeLog(ctx, dId, version, limit)
} else { } else {
return nil, err return nil, err
} }
} }
func (l *logModel) FindChangeLog(ctx context.Context, dId string, version uint, limit int) (*WriteLog, error) { func (l *logModel) findChangeLog(ctx context.Context, dId string, version uint, limit int) (*WriteLog, error) {
if version == 0 && limit == 0 { if version == 0 && limit == 0 {
return l.findDoc(ctx, dId) return l.findDoc(ctx, dId)
} }
@ -271,7 +294,7 @@ func (l *logModel) FindChangeLog(ctx context.Context, dId string, version uint,
return nil, err return nil, err
} }
if len(res) == 0 { if len(res) == 0 {
return &WriteLog{}, nil return nil, mongo.ErrNoDocuments
} }
return res[0], nil return res[0], nil
} }

@ -1,18 +1,29 @@
package dataver package dataver
import "github.com/openimsdk/tools/utils/datautil" import (
"github.com/openimsdk/tools/utils/datautil"
"go.mongodb.org/mongo-driver/bson/primitive"
)
type SyncResult struct { type SyncResult struct {
Version uint Version uint
VersionID string
DeleteEID []string DeleteEID []string
Changes []string Changes []string
Full bool Full bool
} }
func NewSyncResult(wl *WriteLog, fullIDs []string) *SyncResult { func VersionIDStr(id primitive.ObjectID) string {
if id.IsZero() {
return ""
}
return id.String()
}
func NewSyncResult(wl *WriteLog, fullIDs []string, versionID string) *SyncResult {
var findEIDs []string var findEIDs []string
var res SyncResult var res SyncResult
if wl.Full() { if wl.Full() || VersionIDStr(wl.ID) != versionID {
res.Changes = fullIDs res.Changes = fullIDs
res.Full = true res.Full = true
} else { } else {

@ -193,20 +193,6 @@ func (f *FriendMgo) FindIncrVersion(ctx context.Context, ownerUserID string, ver
return f.owner.FindChangeLog(ctx, ownerUserID, version, limit) return f.owner.FindChangeLog(ctx, ownerUserID, version, limit)
} }
//func (f *FriendMgo) IncrSync(ctx context.Context, ownerUserID string, version uint, limit int) (*dataver.SyncResult[*relation.FriendModel], error) {
// res, err := f.owner.FindChangeLog(ctx, ownerUserID, version, limit)
// if err != nil {
// return nil, err
// }
// return dataver.NewSyncResult[*relation.FriendModel](res, func(eIds []string) ([]*relation.FriendModel, error) {
// if len(eIds) == 0 {
// return nil, errors.New("todo")
// } else {
// return f.FindFriends(ctx, ownerUserID, eIds)
// }
// })
//}
func IncrVersion(dbs ...func() error) error { func IncrVersion(dbs ...func() error) error {
for _, fn := range dbs { for _, fn := range dbs {
if err := fn(); err != nil { if err := fn(); err != nil {

Loading…
Cancel
Save