diff --git a/go.mod b/go.mod index c3d69b0fb..daf6cc0d2 100644 --- a/go.mod +++ b/go.mod @@ -178,6 +178,6 @@ require ( gopkg.in/ini.v1 v1.67.0 // indirect ) -//replace ( -// github.com/openimsdk/protocol => /Users/chao/Desktop/project/protocol -//) +replace ( + github.com/openimsdk/protocol => /Users/chao/Desktop/project/protocol +) diff --git a/internal/api/friend.go b/internal/api/friend.go index 1fea38b31..3af162a53 100644 --- a/internal/api/friend.go +++ b/internal/api/friend.go @@ -90,6 +90,11 @@ func (o *FriendApi) GetFriendIDs(c *gin.Context) { func (o *FriendApi) GetSpecifiedFriendsInfo(c *gin.Context) { a2r.Call(friend.FriendClient.GetSpecifiedFriendsInfo, o.Client, c) } + func (o *FriendApi) UpdateFriends(c *gin.Context) { a2r.Call(friend.FriendClient.UpdateFriends, o.Client, c) } + +func (o *FriendApi) GetIncrementalFriends(c *gin.Context) { + a2r.Call(friend.FriendClient.GetIncrementalFriends, o.Client, c) +} diff --git a/internal/api/router.go b/internal/api/router.go index 600567178..78e049e0f 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -86,6 +86,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En friendRouterGroup.POST("/get_friend_id", f.GetFriendIDs) friendRouterGroup.POST("/get_specified_friends_info", f.GetSpecifiedFriendsInfo) friendRouterGroup.POST("/update_friends", f.UpdateFriends) + friendRouterGroup.POST("/get_incremental_friends", f.GetIncrementalFriends) } g := NewGroupApi(*groupRpc) groupRouterGroup := r.Group("/group") diff --git a/internal/rpc/friend/sync.go b/internal/rpc/friend/sync.go index 1e74cbc0d..9d2a53b58 100644 --- a/internal/rpc/friend/sync.go +++ b/internal/rpc/friend/sync.go @@ -5,6 +5,7 @@ import ( "crypto/md5" "encoding/binary" "encoding/json" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/db/dataver" "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" pbfriend "github.com/openimsdk/protocol/friend" @@ -21,7 +22,10 @@ func (s *friendServer) sortFriendUserIDsHash(userIDs []string) uint64 { return binary.BigEndian.Uint64(sum[:]) } -func (s *friendServer) IncrSyncFriends(ctx context.Context, req *pbfriend.IncrSyncFriendsReq) (*pbfriend.IncrSyncFriendsResp, error) { +func (s *friendServer) GetIncrementalFriends(ctx context.Context, req *pbfriend.GetIncrementalFriendsReq) (*pbfriend.GetIncrementalFriendsResp, error) { + if err := authverify.CheckAccessV3(ctx, req.UserID, s.config.Share.IMAdminUserID); err != nil { + return nil, err + } var limit int if req.Version > 0 { limit = s.config.RpcConfig.FriendSyncCount @@ -35,14 +39,15 @@ func (s *friendServer) IncrSyncFriends(ctx context.Context, req *pbfriend.IncrSy return nil, err } if len(sortUserIDs) == 0 { - return &pbfriend.IncrSyncFriendsResp{ + return &pbfriend.GetIncrementalFriendsResp{ Version: uint64(incrVer.Version), + VersionID: dataver.VersionIDStr(incrVer.ID), Full: true, SyncCount: uint32(s.config.RpcConfig.FriendSyncCount), }, nil } var changes []*relation.FriendModel - res := dataver.NewSyncResult(incrVer, sortUserIDs) + res := dataver.NewSyncResult(incrVer, sortUserIDs, req.VersionID) if len(res.Changes) > 0 { changes, err = s.friendDatabase.FindFriendsWithError(ctx, req.UserID, res.Changes) if err != nil { @@ -53,8 +58,9 @@ func (s *friendServer) IncrSyncFriends(ctx context.Context, req *pbfriend.IncrSy if calcHash == req.IdHash { sortUserIDs = nil } - return &pbfriend.IncrSyncFriendsResp{ + return &pbfriend.GetIncrementalFriendsResp{ Version: uint64(res.Version), + VersionID: res.VersionID, Full: res.Full, SyncCount: uint32(s.config.RpcConfig.FriendSyncCount), SortUserIdHash: calcHash, diff --git a/internal/rpc/group/sync.go b/internal/rpc/group/sync.go new file mode 100644 index 000000000..765bceb63 --- /dev/null +++ b/internal/rpc/group/sync.go @@ -0,0 +1,16 @@ +package group + +import ( + "context" + pbgroup "github.com/openimsdk/protocol/group" +) + +func (s *groupServer) SearchGroupMember(ctx context.Context, req *pbgroup.SearchGroupMemberReq) (*pbgroup.SearchGroupMemberResp, error) { + //TODO implement me + panic("implement me") +} + +func (s *groupServer) GetGroupMemberHash(ctx context.Context, req *pbgroup.GetGroupMemberHashReq) (*pbgroup.GetGroupMemberHashResp, error) { + //TODO implement me + panic("implement me") +} diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index a28fa24e2..2296ee62f 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -56,6 +56,11 @@ type userServer struct { webhookClient *webhook.Client } +func (s *userServer) SearchUser(ctx context.Context, req *pbuser.SearchUserReq) (*pbuser.SearchUserResp, error) { + //TODO implement me + panic("implement me") +} + type Config struct { RpcConfig config.User RedisConfig config.Redis diff --git a/pkg/common/db/dataver/common.go b/pkg/common/db/dataver/common.go index c9a4e0419..3ff9906e5 100644 --- a/pkg/common/db/dataver/common.go +++ b/pkg/common/db/dataver/common.go @@ -52,6 +52,27 @@ type Elem struct { LastUpdate time.Time `bson:"last_update"` } +type tableWriteLog struct { + ID primitive.ObjectID `bson:"_id"` + DID string `bson:"d_id"` + Logs []Elem `bson:"logs"` + Version uint `bson:"version"` + Deleted uint `bson:"deleted"` + LastUpdate time.Time `bson:"last_update"` +} + +func (t *tableWriteLog) WriteLog() *WriteLog { + return &WriteLog{ + ID: t.ID, + DID: t.DID, + Logs: t.Logs, + Version: t.Version, + Deleted: t.Deleted, + LastUpdate: t.LastUpdate, + LogLen: 0, + } +} + type DataLog interface { WriteLog(ctx context.Context, dId string, eIds []string, deleted bool) error FindChangeLog(ctx context.Context, dId string, version uint, limit int) (*WriteLog, error) @@ -94,7 +115,7 @@ func (l *logModel) WriteLog(ctx context.Context, dId string, eIds []string, dele if res.MatchedCount > 0 { return nil } - if err := l.initDoc(ctx, dId, eIds, deleted, now); err == nil { + if _, err := l.initDoc(ctx, dId, eIds, deleted, now); err == nil { return nil } else if !mongo.IsDuplicateKeyError(err) { return err @@ -107,15 +128,9 @@ func (l *logModel) WriteLog(ctx context.Context, dId string, eIds []string, dele return nil } -func (l *logModel) initDoc(ctx context.Context, dId string, eIds []string, deleted bool, now time.Time) error { - type tableWriteLog struct { - DID string `bson:"d_id"` - Logs []Elem `bson:"logs"` - Version uint `bson:"version"` - Deleted uint `bson:"deleted"` - LastUpdate time.Time `bson:"last_update"` - } +func (l *logModel) initDoc(ctx context.Context, dId string, eIds []string, deleted bool, now time.Time) (*tableWriteLog, error) { wl := tableWriteLog{ + ID: primitive.NewObjectID(), DID: dId, Logs: make([]Elem, 0, len(eIds)), Version: FirstVersion, @@ -131,12 +146,12 @@ func (l *logModel) initDoc(ctx context.Context, dId string, eIds []string, delet }) } _, err := l.coll.InsertOne(ctx, &wl) - return err + return &wl, err } func (l *logModel) writeLogBatch(ctx context.Context, dId string, eIds []string, deleted bool, now time.Time) (*mongo.UpdateResult, error) { - if len(eIds) == 0 { - return nil, errs.ErrArgs.WrapMsg("elem id is empty", "dId", dId) + if eIds == nil { + eIds = []string{} } filter := bson.M{ "d_id": dId, @@ -195,17 +210,25 @@ func (l *logModel) writeLogBatch(ctx context.Context, dId string, eIds []string, } func (l *logModel) findDoc(ctx context.Context, dId string) (*WriteLog, error) { - res, err := mongoutil.FindOne[*WriteLog](ctx, l.coll, bson.M{"d_id": dId}, options.FindOne().SetProjection(bson.M{"logs": 0})) - if err == nil { - return res, nil - } else if errors.Is(err, mongo.ErrNoDocuments) { - return &WriteLog{}, nil + return mongoutil.FindOne[*WriteLog](ctx, l.coll, bson.M{"d_id": dId}, options.FindOne().SetProjection(bson.M{"logs": 0})) +} + +func (l *logModel) FindChangeLog(ctx context.Context, dId string, version uint, limit int) (*WriteLog, error) { + if wl, err := l.findChangeLog(ctx, dId, version, limit); err == nil { + return wl, nil + } else if !errors.Is(err, mongo.ErrNoDocuments) { + return nil, err + } + if res, err := l.initDoc(ctx, dId, nil, false, time.Now()); err == nil { + return res.WriteLog(), nil + } else if mongo.IsDuplicateKeyError(err) { + return l.findChangeLog(ctx, dId, version, limit) } else { return nil, err } } -func (l *logModel) FindChangeLog(ctx context.Context, dId string, version uint, limit int) (*WriteLog, error) { +func (l *logModel) findChangeLog(ctx context.Context, dId string, version uint, limit int) (*WriteLog, error) { if version == 0 && limit == 0 { return l.findDoc(ctx, dId) } @@ -271,7 +294,7 @@ func (l *logModel) FindChangeLog(ctx context.Context, dId string, version uint, return nil, err } if len(res) == 0 { - return &WriteLog{}, nil + return nil, mongo.ErrNoDocuments } return res[0], nil } diff --git a/pkg/common/db/dataver/result.go b/pkg/common/db/dataver/result.go index 915a618d0..ec09ffa31 100644 --- a/pkg/common/db/dataver/result.go +++ b/pkg/common/db/dataver/result.go @@ -1,18 +1,29 @@ package dataver -import "github.com/openimsdk/tools/utils/datautil" +import ( + "github.com/openimsdk/tools/utils/datautil" + "go.mongodb.org/mongo-driver/bson/primitive" +) type SyncResult struct { Version uint + VersionID string DeleteEID []string Changes []string Full bool } -func NewSyncResult(wl *WriteLog, fullIDs []string) *SyncResult { +func VersionIDStr(id primitive.ObjectID) string { + if id.IsZero() { + return "" + } + return id.String() +} + +func NewSyncResult(wl *WriteLog, fullIDs []string, versionID string) *SyncResult { var findEIDs []string var res SyncResult - if wl.Full() { + if wl.Full() || VersionIDStr(wl.ID) != versionID { res.Changes = fullIDs res.Full = true } else { diff --git a/pkg/common/db/mgo/friend.go b/pkg/common/db/mgo/friend.go index bca6873c2..37ba2d7ac 100644 --- a/pkg/common/db/mgo/friend.go +++ b/pkg/common/db/mgo/friend.go @@ -193,20 +193,6 @@ func (f *FriendMgo) FindIncrVersion(ctx context.Context, ownerUserID string, ver return f.owner.FindChangeLog(ctx, ownerUserID, version, limit) } -//func (f *FriendMgo) IncrSync(ctx context.Context, ownerUserID string, version uint, limit int) (*dataver.SyncResult[*relation.FriendModel], error) { -// res, err := f.owner.FindChangeLog(ctx, ownerUserID, version, limit) -// if err != nil { -// return nil, err -// } -// return dataver.NewSyncResult[*relation.FriendModel](res, func(eIds []string) ([]*relation.FriendModel, error) { -// if len(eIds) == 0 { -// return nil, errors.New("todo") -// } else { -// return f.FindFriends(ctx, ownerUserID, eIds) -// } -// }) -//} - func IncrVersion(dbs ...func() error) error { for _, fn := range dbs { if err := fn(); err != nil {