pull/2336/head
withchao 1 year ago
parent a1523f47e7
commit 58c4c13cf1

@ -179,10 +179,18 @@ func (o *GroupApi) GetIncrementalGroupMemberBatch(c *gin.Context) {
return
}
resp.List[req.GroupID] = res
changeCount += len(res.Changes) + len(res.DeleteUserIds)
if changeCount >= int(res.SyncCount) {
changeCount += len(res.Insert) + len(res.Delete) + len(res.Update)
if changeCount >= 200 {
break
}
}
apiresp.GinSuccess(c, resp)
}
func (o *GroupApi) GetIncrementalGroupMemberUserIDs(c *gin.Context) {
a2r.Call(group.GroupClient.GetIncrementalGroupMemberUserIDs, o.Client, c)
}
func (o *GroupApi) GetIncrementalJoinGroupIDs(c *gin.Context) {
a2r.Call(group.GroupClient.GetIncrementalJoinGroupIDs, o.Client, c)
}

@ -120,6 +120,8 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
groupRouterGroup.POST("/get_incremental_join_group", g.GetIncrementalJoinGroup)
groupRouterGroup.POST("/get_incremental_group_member", g.GetIncrementalGroupMember)
groupRouterGroup.POST("/get_incremental_group_member_batch", g.GetIncrementalGroupMemberBatch)
groupRouterGroup.POST("/get_incremental_group_member_user_ids", g.GetIncrementalGroupMemberUserIDs)
groupRouterGroup.POST("/get_incremental_join_group_ids", g.GetIncrementalJoinGroupIDs)
}
// certificate
authRouterGroup := r.Group("/auth")

@ -64,9 +64,6 @@ type Config struct {
}
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
if config.RpcConfig.FriendSyncCount < 1 {
config.RpcConfig.FriendSyncCount = constant.MaxSyncPullNumber
}
mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
if err != nil {
return err

@ -25,14 +25,14 @@ func (s *friendServer) GetIncrementalFriends(ctx context.Context, req *relation.
return s.getFriend(ctx, req.UserID, ids)
},
ID: func(elem *sdkws.FriendInfo) string { return elem.FriendUser.UserID },
Resp: func(version *model.VersionLog, delIDs []string, list []*sdkws.FriendInfo, full bool) *relation.GetIncrementalFriendsResp {
Resp: func(version *model.VersionLog, deleteIds []string, insertList, updateList []*sdkws.FriendInfo, full bool) *relation.GetIncrementalFriendsResp {
return &relation.GetIncrementalFriendsResp{
VersionID: version.ID.Hex(),
Version: uint64(version.Version),
Full: full,
SyncCount: uint32(s.config.RpcConfig.FriendSyncCount),
DeleteUserIds: delIDs,
Changes: list,
VersionID: version.ID.Hex(),
Version: uint64(version.Version),
Full: full,
Delete: deleteIds,
Insert: insertList,
Update: updateList,
}
},
}

@ -77,9 +77,6 @@ type Config struct {
}
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
if config.RpcConfig.GroupSyncCount <= 0 {
config.RpcConfig.GroupSyncCount = constant.MaxSyncPullNumber
}
mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
if err != nil {
return err
@ -104,7 +101,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg)
conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation)
var gs groupServer
database := controller.NewGroupDatabase(rdb, &config.LocalCacheConfig, groupDB, groupMemberDB, groupRequestDB, mgocli.GetTx(), grouphash.NewGroupHashFromGroupServer(&gs), config.RpcConfig.GroupSyncCount)
database := controller.NewGroupDatabase(rdb, &config.LocalCacheConfig, groupDB, groupMemberDB, groupRequestDB, mgocli.GetTx(), grouphash.NewGroupHashFromGroupServer(&gs))
gs.db = database
gs.user = userRpcClient
gs.notification = NewGroupNotificationSender(database, &msgRpcClient, &userRpcClient, config, func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error) {

@ -2,6 +2,9 @@ package group
import (
"context"
"crypto/md5"
"encoding/binary"
"encoding/json"
"github.com/openimsdk/open-im-server/v3/internal/rpc/incrversion"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
@ -9,6 +12,57 @@ import (
"github.com/openimsdk/protocol/sdkws"
)
func (s *groupServer) idHash(ids []string) uint64 {
if len(ids) == 0 {
return 0
}
data, _ := json.Marshal(ids)
sum := md5.Sum(data)
return binary.BigEndian.Uint64(sum[:])
}
func (s *groupServer) GetIncrementalGroupMemberUserIDs(ctx context.Context, req *pbgroup.GetIncrementalGroupMemberUserIDsReq) (*pbgroup.GetIncrementalGroupMemberUserIDsResp, error) {
vl, err := s.db.FindMaxGroupMemberVersionCache(ctx, req.GroupID)
if err != nil {
return nil, err
}
userIDs, err := s.db.FindGroupMemberUserID(ctx, req.GroupID)
if err != nil {
return nil, err
}
idHash := s.idHash(userIDs)
if req.IdHash == idHash {
userIDs = nil
}
return &pbgroup.GetIncrementalGroupMemberUserIDsResp{
Version: idHash,
VersionID: vl.ID.Hex(),
Equal: req.IdHash == idHash,
UserIDs: userIDs,
}, nil
}
func (s *groupServer) GetIncrementalJoinGroupIDs(ctx context.Context, req *pbgroup.GetIncrementalJoinGroupIDsReq) (*pbgroup.GetIncrementalJoinGroupIDsResp, error) {
vl, err := s.db.FindMaxJoinGroupVersionCache(ctx, req.UserID)
if err != nil {
return nil, err
}
groupIDs, err := s.db.FindJoinGroupID(ctx, req.UserID)
if err != nil {
return nil, err
}
idHash := s.idHash(groupIDs)
if req.IdHash == idHash {
groupIDs = nil
}
return &pbgroup.GetIncrementalJoinGroupIDsResp{
Version: idHash,
VersionID: vl.ID.Hex(),
Equal: req.IdHash == idHash,
GroupIDs: groupIDs,
}, nil
}
func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgroup.GetIncrementalGroupMemberReq) (*pbgroup.GetIncrementalGroupMemberResp, error) {
opt := incrversion.Option[*sdkws.GroupMemberFullInfo, pbgroup.GetIncrementalGroupMemberResp]{
Ctx: ctx,
@ -21,14 +75,14 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou
return s.getGroupMembersInfo(ctx, req.GroupID, ids)
},
ID: func(elem *sdkws.GroupMemberFullInfo) string { return elem.UserID },
Resp: func(version *model.VersionLog, delIDs []string, list []*sdkws.GroupMemberFullInfo, full bool) *pbgroup.GetIncrementalGroupMemberResp {
Resp: func(version *model.VersionLog, delIDs []string, insertList, updateList []*sdkws.GroupMemberFullInfo, full bool) *pbgroup.GetIncrementalGroupMemberResp {
return &pbgroup.GetIncrementalGroupMemberResp{
VersionID: version.ID.Hex(),
Version: uint64(version.Version),
Full: full,
SyncCount: uint32(s.config.RpcConfig.GroupSyncCount),
DeleteUserIds: delIDs,
Changes: list,
VersionID: version.ID.Hex(),
Version: uint64(version.Version),
Full: full,
Delete: delIDs,
Insert: insertList,
Update: updateList,
}
},
}
@ -48,14 +102,14 @@ func (s *groupServer) GetIncrementalJoinGroup(ctx context.Context, req *pbgroup.
CacheMaxVersion: s.db.FindMaxJoinGroupVersionCache,
Find: s.getGroupsInfo,
ID: func(elem *sdkws.GroupInfo) string { return elem.GroupID },
Resp: func(version *model.VersionLog, delIDs []string, list []*sdkws.GroupInfo, full bool) *pbgroup.GetIncrementalJoinGroupResp {
Resp: func(version *model.VersionLog, delIDs []string, insertList, updateList []*sdkws.GroupInfo, full bool) *pbgroup.GetIncrementalJoinGroupResp {
return &pbgroup.GetIncrementalJoinGroupResp{
VersionID: version.ID.Hex(),
Version: uint64(version.Version),
Full: full,
SyncCount: uint32(s.config.RpcConfig.GroupSyncCount),
DeleteGroupIds: delIDs,
Changes: list,
VersionID: version.ID.Hex(),
Version: uint64(version.Version),
Full: full,
Delete: delIDs,
Insert: insertList,
Update: updateList,
}
},
}

@ -5,7 +5,6 @@ import (
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/datautil"
"go.mongodb.org/mongo-driver/bson/primitive"
)
@ -35,7 +34,7 @@ type Option[A, B any] struct {
//SortID func(ctx context.Context, dId string) ([]string, error)
Find func(ctx context.Context, ids []string) ([]A, error)
ID func(elem A) string
Resp func(version *model.VersionLog, delIDs []string, list []A, full bool) *B
Resp func(version *model.VersionLog, deleteIds []string, insertList, updateList []A, full bool) *B
}
func (o *Option[A, B]) newError(msg string) error {
@ -130,31 +129,28 @@ func (o *Option[A, B]) Build() (*B, error) {
panic(fmt.Errorf("undefined tag %d", tag))
}
var (
deleteIDs []string
changeIDs []string
insertIds []string
deleteIds []string
updateIds []string
)
if full {
//changeIDs, err = o.SortID(o.Ctx, o.VersionKey)
//if err != nil {
// return nil, err
//}
} else {
deleteIDs, changeIDs = version.DeleteAndChangeIDs()
if !full {
insertIds, deleteIds, updateIds = version.DeleteAndChangeIDs()
}
var list []A
if len(changeIDs) > 0 {
list, err = o.Find(o.Ctx, changeIDs)
var (
insertList []A
updateList []A
)
if len(insertIds) > 0 {
insertList, err = o.Find(o.Ctx, insertIds)
if err != nil {
return nil, err
}
if (!full) && o.ID != nil && len(changeIDs) != len(list) {
foundIDs := datautil.SliceSetAny(list, o.ID)
for _, id := range changeIDs {
if _, ok := foundIDs[id]; !ok {
deleteIDs = append(deleteIDs, id)
}
}
}
if len(updateIds) > 0 {
updateList, err = o.Find(o.Ctx, updateIds)
if err != nil {
return nil, err
}
}
return o.Resp(version, deleteIDs, list, full), nil
return o.Resp(version, deleteIds, insertList, updateList, full), nil
}

@ -244,8 +244,7 @@ type Friend struct {
ListenIP string `mapstructure:"listenIP"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"`
Prometheus Prometheus `mapstructure:"prometheus"`
FriendSyncCount int `mapstructure:"friendSyncCount"`
Prometheus Prometheus `mapstructure:"prometheus"`
}
type Group struct {
@ -254,8 +253,7 @@ type Group struct {
ListenIP string `mapstructure:"listenIP"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"`
Prometheus Prometheus `mapstructure:"prometheus"`
GroupSyncCount int `mapstructure:"groupSyncCount"`
Prometheus Prometheus `mapstructure:"prometheus"`
}
type Msg struct {

@ -45,7 +45,6 @@ type GroupCacheRedis struct {
expireTime time.Duration
rcClient *rockscache.Client
groupHash cache.GroupHash
syncCount int
}
func NewGroupCacheRedis(
@ -56,7 +55,6 @@ func NewGroupCacheRedis(
groupRequestDB database.GroupRequest,
hashCode cache.GroupHash,
opts *rockscache.Options,
syncCount int,
) cache.GroupCache {
batchHandler := NewBatchDeleterRedis(rdb, opts, []string{localCache.Group.Topic})
g := localCache.Group
@ -70,7 +68,6 @@ func NewGroupCacheRedis(
groupMemberDB: groupMemberDB,
groupRequestDB: groupRequestDB,
groupHash: hashCode,
syncCount: syncCount,
}
}

@ -117,6 +117,8 @@ type GroupDatabase interface {
FindMaxJoinGroupVersionCache(ctx context.Context, userID string) (*model.VersionLog, error)
SearchJoinGroup(ctx context.Context, userID string, keyword string, pagination pagination.Pagination) (int64, []*model.Group, error)
FindJoinGroupID(ctx context.Context, userID string) ([]string, error)
}
func NewGroupDatabase(
@ -127,14 +129,13 @@ func NewGroupDatabase(
groupRequestDB database.GroupRequest,
ctxTx tx.Tx,
groupHash cache.GroupHash,
syncCount int,
) GroupDatabase {
return &groupDatabase{
groupDB: groupDB,
groupMemberDB: groupMemberDB,
groupRequestDB: groupRequestDB,
ctxTx: ctxTx,
cache: redis2.NewGroupCacheRedis(rdb, localCache, groupDB, groupMemberDB, groupRequestDB, groupHash, redis2.GetRocksCacheOptions(), syncCount),
cache: redis2.NewGroupCacheRedis(rdb, localCache, groupDB, groupMemberDB, groupRequestDB, groupHash, redis2.GetRocksCacheOptions()),
}
}
@ -146,6 +147,10 @@ type groupDatabase struct {
cache cache.GroupCache
}
func (g *groupDatabase) FindJoinGroupID(ctx context.Context, userID string) ([]string, error) {
return g.cache.GetJoinedGroupIDs(ctx, userID)
}
func (g *groupDatabase) FindGroupMembers(ctx context.Context, groupID string, userIDs []string) ([]*model.GroupMember, error) {
return g.cache.GetGroupMembersInfo(ctx, groupID, userIDs)
}
@ -243,7 +248,7 @@ func (g *groupDatabase) UpdateGroup(ctx context.Context, groupID string, data ma
return err
}
for _, userID := range userIDs {
if err := g.groupMemberDB.JoinGroupIncrVersion(ctx, userID, []string{groupID}, false); err != nil {
if err := g.groupMemberDB.JoinGroupIncrVersion(ctx, userID, []string{groupID}, model.VersionStateUpdate); err != nil {
return err
}
@ -276,7 +281,7 @@ func (g *groupDatabase) DismissGroup(ctx context.Context, groupID string, delete
}
c = c.DelMaxJoinGroupVersion(userIDs...)
if len(userIDs) > 0 {
if err := g.groupMemberDB.JoinGroupIncrVersion(ctx, groupID, userIDs, true); err != nil {
if err := g.groupMemberDB.JoinGroupIncrVersion(ctx, groupID, userIDs, model.VersionStateDelete); err != nil {
return err
}
}

@ -34,7 +34,7 @@ type GroupMember interface {
TakeGroupMemberNum(ctx context.Context, groupID string) (count int64, err error)
FindUserManagedGroupID(ctx context.Context, userID string) (groupIDs []string, err error)
IsUpdateRoleLevel(data map[string]any) bool
JoinGroupIncrVersion(ctx context.Context, userID string, groupIDs []string, deleted bool) error
JoinGroupIncrVersion(ctx context.Context, userID string, groupIDs []string, state int32) error
FindMemberIncrVersion(ctx context.Context, groupID string, version uint, limit int) (*model.VersionLog, error)
FindJoinIncrVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error)
}

@ -66,7 +66,7 @@ func (f *FriendMgo) Create(ctx context.Context, friends []*model.Friend) error {
mp[friend.OwnerUserID] = append(mp[friend.OwnerUserID], friend.FriendUserID)
}
for ownerUserID, friendUserIDs := range mp {
if err := f.owner.IncrVersion(ctx, ownerUserID, friendUserIDs, false); err != nil {
if err := f.owner.IncrVersion(ctx, ownerUserID, friendUserIDs, model.VersionStateInsert); err != nil {
return err
}
}
@ -83,7 +83,7 @@ func (f *FriendMgo) Delete(ctx context.Context, ownerUserID string, friendUserID
return mongoutil.IncrVersion(func() error {
return mongoutil.DeleteOne(ctx, f.coll, filter)
}, func() error {
return f.owner.IncrVersion(ctx, ownerUserID, friendUserIDs, true)
return f.owner.IncrVersion(ctx, ownerUserID, friendUserIDs, model.VersionStateDelete)
})
}
@ -99,7 +99,7 @@ func (f *FriendMgo) UpdateByMap(ctx context.Context, ownerUserID string, friendU
return mongoutil.IncrVersion(func() error {
return mongoutil.UpdateOne(ctx, f.coll, filter, bson.M{"$set": args}, true)
}, func() error {
return f.owner.IncrVersion(ctx, ownerUserID, []string{friendUserID}, false)
return f.owner.IncrVersion(ctx, ownerUserID, []string{friendUserID}, model.VersionStateUpdate)
})
}
@ -189,7 +189,7 @@ func (f *FriendMgo) UpdateFriends(ctx context.Context, ownerUserID string, frien
return mongoutil.IncrVersion(func() error {
return mongoutil.Ignore(mongoutil.UpdateMany(ctx, f.coll, filter, update))
}, func() error {
return f.owner.IncrVersion(ctx, ownerUserID, friendUserIDs, false)
return f.owner.IncrVersion(ctx, ownerUserID, friendUserIDs, model.VersionStateUpdate)
})
}

@ -70,7 +70,7 @@ func (g *GroupMemberMgo) Create(ctx context.Context, groupMembers []*model.Group
gms[member.GroupID] = append(gms[member.GroupID], member.UserID)
}
for groupID, userIDs := range gms {
if err := g.member.IncrVersion(ctx, groupID, userIDs, false); err != nil {
if err := g.member.IncrVersion(ctx, groupID, userIDs, model.VersionStateInsert); err != nil {
return err
}
}
@ -81,7 +81,7 @@ func (g *GroupMemberMgo) Create(ctx context.Context, groupMembers []*model.Group
gms[member.UserID] = append(gms[member.UserID], member.GroupID)
}
for userID, groupIDs := range gms {
if err := g.join.IncrVersion(ctx, userID, groupIDs, false); err != nil {
if err := g.join.IncrVersion(ctx, userID, groupIDs, model.VersionStateInsert); err != nil {
return err
}
}
@ -97,12 +97,12 @@ func (g *GroupMemberMgo) Delete(ctx context.Context, groupID string, userIDs []s
return mongoutil.IncrVersion(func() error {
return mongoutil.DeleteMany(ctx, g.coll, filter)
}, func() error {
return g.member.IncrVersion(ctx, groupID, userIDs, true)
return g.member.IncrVersion(ctx, groupID, userIDs, model.VersionStateDelete)
}, func() error {
if len(userIDs) == 0 {
return nil
}
return g.member.IncrVersion(ctx, groupID, userIDs, true)
return g.member.IncrVersion(ctx, groupID, userIDs, model.VersionStateDelete)
})
}
@ -110,9 +110,9 @@ func (g *GroupMemberMgo) UpdateRoleLevel(ctx context.Context, groupID string, us
return mongoutil.IncrVersion(func() error {
return g.Update(ctx, groupID, userID, bson.M{"role_level": roleLevel})
}, func() error {
return g.member.IncrVersion(ctx, groupID, []string{userID}, true)
return g.member.IncrVersion(ctx, groupID, []string{userID}, model.VersionStateUpdate)
}, func() error {
return g.join.IncrVersion(ctx, groupID, []string{userID}, true)
return g.join.IncrVersion(ctx, groupID, []string{userID}, model.VersionStateUpdate)
})
}
@ -123,7 +123,7 @@ func (g *GroupMemberMgo) Update(ctx context.Context, groupID string, userID stri
return mongoutil.IncrVersion(func() error {
return mongoutil.UpdateOne(ctx, g.coll, bson.M{"group_id": groupID, "user_id": userID}, bson.M{"$set": data}, true)
}, func() error {
return g.member.IncrVersion(ctx, groupID, []string{userID}, false)
return g.member.IncrVersion(ctx, groupID, []string{userID}, model.VersionStateUpdate)
})
}
@ -174,8 +174,8 @@ func (g *GroupMemberMgo) IsUpdateRoleLevel(data map[string]any) bool {
return ok
}
func (g *GroupMemberMgo) JoinGroupIncrVersion(ctx context.Context, userID string, groupIDs []string, deleted bool) error {
return g.join.IncrVersion(ctx, userID, groupIDs, deleted)
func (g *GroupMemberMgo) JoinGroupIncrVersion(ctx context.Context, userID string, groupIDs []string, state int32) error {
return g.join.IncrVersion(ctx, userID, groupIDs, state)
}
func (g *GroupMemberMgo) FindMemberIncrVersion(ctx context.Context, groupID string, version uint, limit int) (*model.VersionLog, error) {

@ -36,7 +36,7 @@ func (l *VersionLogMgo) initIndex(ctx context.Context) error {
return err
}
func (l *VersionLogMgo) IncrVersion(ctx context.Context, dId string, eIds []string, deleted bool) error {
func (l *VersionLogMgo) IncrVersion(ctx context.Context, dId string, eIds []string, state int32) error {
if len(eIds) == 0 {
return errs.ErrArgs.WrapMsg("elem id is empty", "dId", dId)
}
@ -44,19 +44,19 @@ func (l *VersionLogMgo) IncrVersion(ctx context.Context, dId string, eIds []stri
return errs.ErrArgs.WrapMsg("elem id is duplicate", "dId", dId, "eIds", eIds)
}
now := time.Now()
res, err := l.writeLogBatch(ctx, dId, eIds, deleted, now)
res, err := l.writeLogBatch(ctx, dId, eIds, state, now)
if err != nil {
return err
}
if res.MatchedCount > 0 {
return nil
}
if _, err := l.initDoc(ctx, dId, eIds, deleted, now); err == nil {
if _, err := l.initDoc(ctx, dId, eIds, state, now); err == nil {
return nil
} else if !mongo.IsDuplicateKeyError(err) {
return err
}
if res, err := l.writeLogBatch(ctx, dId, eIds, deleted, now); err != nil {
if res, err := l.writeLogBatch(ctx, dId, eIds, state, now); err != nil {
return err
} else if res.MatchedCount == 0 {
return errs.ErrInternalServer.WrapMsg("mongodb return value that should not occur", "coll", l.coll.Name(), "dId", dId, "eIds", eIds)
@ -64,7 +64,7 @@ func (l *VersionLogMgo) IncrVersion(ctx context.Context, dId string, eIds []stri
return nil
}
func (l *VersionLogMgo) initDoc(ctx context.Context, dId string, eIds []string, deleted bool, now time.Time) (*model.VersionLogTable, error) {
func (l *VersionLogMgo) initDoc(ctx context.Context, dId string, eIds []string, state int32, now time.Time) (*model.VersionLogTable, error) {
wl := model.VersionLogTable{
ID: primitive.NewObjectID(),
DID: dId,
@ -76,7 +76,7 @@ func (l *VersionLogMgo) initDoc(ctx context.Context, dId string, eIds []string,
for _, eId := range eIds {
wl.Logs = append(wl.Logs, model.VersionLogElem{
EID: eId,
Deleted: deleted,
State: state,
Version: database.FirstVersion,
LastUpdate: now,
})
@ -85,7 +85,7 @@ func (l *VersionLogMgo) initDoc(ctx context.Context, dId string, eIds []string,
return &wl, err
}
func (l *VersionLogMgo) writeLogBatch(ctx context.Context, dId string, eIds []string, deleted bool, now time.Time) (*mongo.UpdateResult, error) {
func (l *VersionLogMgo) writeLogBatch(ctx context.Context, dId string, eIds []string, state int32, now time.Time) (*mongo.UpdateResult, error) {
if eIds == nil {
eIds = []string{}
}
@ -97,7 +97,7 @@ func (l *VersionLogMgo) writeLogBatch(ctx context.Context, dId string, eIds []st
elems = append(elems, bson.M{
"e_id": eId,
"version": "$version",
"deleted": deleted,
"state": state,
"last_update": now,
})
}
@ -159,7 +159,7 @@ func (l *VersionLogMgo) FindChangeLog(ctx context.Context, dId string, version u
} else if !errors.Is(err, mongo.ErrNoDocuments) {
return nil, err
}
if res, err := l.initDoc(ctx, dId, nil, false, time.Now()); err == nil {
if res, err := l.initDoc(ctx, dId, nil, 0, time.Now()); err == nil {
return res.VersionLog(), nil
} else if mongo.IsDuplicateKeyError(err) {
return l.findChangeLog(ctx, dId, version, limit)

@ -12,7 +12,7 @@ const (
)
type VersionLog interface {
IncrVersion(ctx context.Context, dId string, eIds []string, deleted bool) error
IncrVersion(ctx context.Context, dId string, eIds []string, state int32) error
FindChangeLog(ctx context.Context, dId string, version uint, limit int) (*model.VersionLog, error)
DeleteAfterUnchangedLog(ctx context.Context, deadline time.Time) error
}

@ -1,13 +1,22 @@
package model
import (
"context"
"errors"
"github.com/openimsdk/tools/log"
"go.mongodb.org/mongo-driver/bson/primitive"
"time"
)
const (
VersionStateInsert = iota + 1
VersionStateDelete
VersionStateUpdate
)
type VersionLogElem struct {
EID string `bson:"e_id"`
Deleted bool `bson:"deleted"`
State int32 `bson:"state"`
Version uint `bson:"version"`
LastUpdate time.Time `bson:"last_update"`
}
@ -43,12 +52,17 @@ type VersionLog struct {
LogLen int `bson:"log_len"`
}
func (v *VersionLog) DeleteAndChangeIDs() (delIds []string, changeIds []string) {
func (v *VersionLog) DeleteAndChangeIDs() (insertIds, deleteIds, updateIds []string) {
for _, l := range v.Logs {
if l.Deleted {
delIds = append(delIds, l.EID)
} else {
changeIds = append(changeIds, l.EID)
switch l.State {
case VersionStateInsert:
insertIds = append(insertIds, l.EID)
case VersionStateDelete:
deleteIds = append(deleteIds, l.EID)
case VersionStateUpdate:
updateIds = append(updateIds, l.EID)
default:
log.ZError(context.Background(), "invalid version status found", errors.New("dirty database data"), "objID", v.ID.Hex(), "did", v.DID, "elem", l)
}
}
return

Loading…
Cancel
Save