From 641781c2b5b46f55d0044be45c2c5a4f7bc58b65 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Wed, 17 Jul 2024 17:08:40 +0800 Subject: [PATCH] feat: implement true BatchGetIncrGroupMember RPC method and corresponding dependency methods. --- internal/api/group.go | 117 +++++++++------ internal/rpc/group/sync.go | 137 +++++++++++++++++- internal/rpc/incrversion/batch_option.go | 65 ++++----- pkg/common/storage/cache/redis/group.go | 20 +-- pkg/common/storage/controller/group.go | 38 ++++- pkg/common/storage/database/group_member.go | 1 + .../storage/database/mgo/group_member.go | 5 + .../storage/database/mgo/version_log.go | 23 ++- pkg/common/storage/database/version_log.go | 4 +- 9 files changed, 320 insertions(+), 90 deletions(-) diff --git a/internal/api/group.go b/internal/api/group.go index e48191ee1..933a6eb7a 100644 --- a/internal/api/group.go +++ b/internal/api/group.go @@ -19,8 +19,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/protocol/group" "github.com/openimsdk/tools/a2r" - "github.com/openimsdk/tools/apiresp" - "github.com/openimsdk/tools/log" ) type GroupApi rpcclient.Group @@ -148,45 +146,82 @@ func (o *GroupApi) GetIncrementalGroupMember(c *gin.Context) { } func (o *GroupApi) GetIncrementalGroupMemberBatch(c *gin.Context) { - type BatchIncrementalReq struct { - UserID string `json:"user_id"` - List []*group.GetIncrementalGroupMemberReq `json:"list"` - } - type BatchIncrementalResp struct { - List map[string]*group.GetIncrementalGroupMemberResp `json:"list"` - } - req, err := a2r.ParseRequestNotCheck[BatchIncrementalReq](c) - if err != nil { - apiresp.GinError(c, err) - return - } - resp := &BatchIncrementalResp{ - List: make(map[string]*group.GetIncrementalGroupMemberResp), - } - var ( - changeCount int - ) - for _, req := range req.List { - if _, ok := resp.List[req.GroupID]; ok { - continue - } - res, err := o.Client.GetIncrementalGroupMember(c, req) - if err != nil { - if len(resp.List) == 0 { - apiresp.GinError(c, err) - } else { - log.ZError(c, "group incr sync versopn", err, "groupID", req.GroupID, "success", len(resp.List)) - apiresp.GinSuccess(c, resp) - } - return - } - resp.List[req.GroupID] = res - changeCount += len(res.Insert) + len(res.Delete) + len(res.Update) - if changeCount >= 200 { - break - } - } - apiresp.GinSuccess(c, resp) + a2r.Call(group.GroupClient.BatchGetIncrementalGroupMember, o.Client, c) + + // // OLd. Need Deprecated + // // type BatchIncrementalReq struct { + // // UserID string `json:"user_id"` + // // List []*group.GetIncrementalGroupMemberReq `json:"list"` + // // } + // // type BatchIncrementalResp struct { + // // List map[string]*group.GetIncrementalGroupMemberResp `json:"list"` + // // } + // // req, err := a2r.ParseRequestNotCheck[BatchIncrementalReq](c) + // // if err != nil { + // // apiresp.GinError(c, err) + // // return + // // } + // // resp := &BatchIncrementalResp{ + // // List: make(map[string]*group.GetIncrementalGroupMemberResp), + // // } + + // //**** Start ****// + // var ( + // changeCount int + // ) + // req, err := a2r.ParseRequestNotCheck[group.BatchGetIncrementalGroupMemberReq](c) + // if err != nil { + // apiresp.GinError(c, err) + // return + // } + // resp, err := o.Client.BatchGetIncrementalGroupMember(c, req) + // if err != nil { + // if len(resp.RespList) == 0 { + // apiresp.GinError(c, err) + // } else { + // log.ZError(c, "group incr sync version", err) + // apiresp.GinSuccess(c, resp) + // } + // } + + // res := make(map[string]*group.GetIncrementalGroupMemberResp) + // for k, val := range resp.RespList { + // changeCount += len(val.Insert) + len(val.Delete) + len(val.Update) + // if changeCount >= 200 { + // break + // } + // res[k] = val + // } + // resp = &group.BatchGetIncrementalGroupMemberResp{ + // RespList: res, + // } + + // // OLd. Need Deprecated + // // for _, req := range req.List { + // // if _, ok := resp.List[req.GroupID]; ok { + // // continue + // // } + // // res, err := o.Client.GetIncrementalGroupMember(c, req) + // // if err != nil { + // // if len(resp.List) == 0 { + // // apiresp.GinError(c, err) + // // } else { + // // log.ZError(c, "group incr sync version", err, "groupID", req.GroupID, "success", len(resp.List)) + // // apiresp.GinSuccess(c, resp) + // // } + // // return + // // } + // // resp.List[req.GroupID] = res + // // changeCount += len(res.Insert) + len(res.Delete) + len(res.Update) + // // if changeCount >= 200 { + // // break + // // } + // // } + + // apiresp.GinSuccess(c, resp) + + //**** End ****// + } func (o *GroupApi) GetFullGroupMemberUserIDs(c *gin.Context) { diff --git a/internal/rpc/group/sync.go b/internal/rpc/group/sync.go index 75d060c0e..06ef8f671 100644 --- a/internal/rpc/group/sync.go +++ b/internal/rpc/group/sync.go @@ -2,6 +2,8 @@ package group import ( "context" + "slices" + "github.com/openimsdk/open-im-server/v3/internal/rpc/incrversion" "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" @@ -10,7 +12,7 @@ import ( "github.com/openimsdk/protocol/constant" pbgroup "github.com/openimsdk/protocol/group" "github.com/openimsdk/protocol/sdkws" - "slices" + "github.com/openimsdk/tools/log" ) func (s *groupServer) GetFullGroupMemberUserIDs(ctx context.Context, req *pbgroup.GetFullGroupMemberUserIDsReq) (*pbgroup.GetFullGroupMemberUserIDsResp, error) { @@ -91,7 +93,6 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou Find: func(ctx context.Context, ids []string) ([]*sdkws.GroupMemberFullInfo, error) { return s.getGroupMembersInfo(ctx, req.GroupID, ids) }, - ID: func(elem *sdkws.GroupMemberFullInfo) string { return elem.UserID }, Resp: func(version *model.VersionLog, delIDs []string, insertList, updateList []*sdkws.GroupMemberFullInfo, full bool) *pbgroup.GetIncrementalGroupMemberResp { return &pbgroup.GetIncrementalGroupMemberResp{ VersionID: version.ID.Hex(), @@ -121,6 +122,137 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou return resp, nil } +func (s *groupServer) BatchGetIncrementalGroupMember(ctx context.Context, req *pbgroup.BatchGetIncrementalGroupMemberReq) (resp *pbgroup.BatchGetIncrementalGroupMemberResp, err error) { + type VersionInfo struct { + GroupID string + VersionID string + VersionNumber uint64 + } + + var groupIDs []string + groupVersionMap := make(map[string]*VersionInfo) + groupsMap := make(map[string]*model.Group) + + var targetKeys, versionIDs []string + var versionNumbers []uint64 + + // var requestBodyLen int + + for _, group := range req.ReqList { + groupVersionMap[group.GroupID] = &VersionInfo{ + GroupID: group.GroupID, + VersionID: group.VersionID, + VersionNumber: group.Version, + } + + groupIDs = append(groupIDs, group.GroupID) + } + + groups, err := s.db.FindGroup(ctx, groupIDs) + if err != nil { + return nil, err + } + + for _, group := range groups { + if group.Status == constant.GroupStatusDismissed { + err = servererrs.ErrDismissedAlready.Wrap() + log.ZError(ctx, "This group is Dismissed Already", err, "group is", group.GroupID) + delete(groupVersionMap, group.GroupID) + } else { + groupsMap[group.GroupID] = group + // truegroupIDs = append(truegroupIDs, group.GroupID) + } + } + for key, val := range groupVersionMap { + targetKeys = append(targetKeys, key) + versionIDs = append(versionIDs, val.VersionID) + versionNumbers = append(versionNumbers, val.VersionNumber) + } + + var hasGroupUpdate map[string]bool + opt := incrversion.BatchOption[[]*sdkws.GroupMemberFullInfo, pbgroup.BatchGetIncrementalGroupMemberResp]{ + Ctx: ctx, + TargetKeys: targetKeys, + VersionIDs: versionIDs, + VersionNumbers: versionNumbers, + Versions: func(ctx context.Context, groupIDs []string, versions []uint64, limits []int) (map[string]*model.VersionLog, error) { + vLogs, err := s.db.BatchFindMemberIncrVersion(ctx, groupIDs, versions, limits) + if err != nil { + return nil, err + } + for key, vlog := range vLogs { + vlog.Logs = slices.DeleteFunc(vlog.Logs, func(elem model.VersionLogElem) bool { + if elem.EID == "" { + vlog.LogLen-- + hasGroupUpdate[key] = true + return true + } + return false + }) + if vlog.LogLen > 0 { + hasGroupUpdate[key] = true + } + } + + return vLogs, nil + }, + CacheMaxVersions: s.db.BatchFindMaxGroupMemberVersionCache, + Find: func(ctx context.Context, groupID string, ids []string) ([]*sdkws.GroupMemberFullInfo, error) { + // memberInfoMap := make(map[string][]*sdkws.GroupMemberFullInfo) + // for _, groupID := range groupIDs { + memberInfo, err := s.getGroupMembersInfo(ctx, groupID, ids) + if err != nil { + return nil, err + } + // memberInfoMap:=datautil.SliceToMap(memberInfo, func(e *sdkws.GroupMemberFullInfo) string { + // return e.GroupID + // }) + // // memberInfoMap[groupID] = memberInfo + // // } + return memberInfo, err + }, + Resp: func(versions map[string]*model.VersionLog, deleteIdsMap map[string][]string, insertListMap, updateListMap map[string][]*sdkws.GroupMemberFullInfo, fullMap map[string]bool) *pbgroup.BatchGetIncrementalGroupMemberResp { + resList := make(map[string]*pbgroup.GetIncrementalGroupMemberResp) + + for key, version := range versions { + resList[key] = &pbgroup.GetIncrementalGroupMemberResp{ + VersionID: version.ID.Hex(), + Version: uint64(version.Version), + Full: fullMap[key], + Delete: deleteIdsMap[key], + Insert: insertListMap[key], + Update: updateListMap[key], + } + } + + return &pbgroup.BatchGetIncrementalGroupMemberResp{ + RespList: resList, + } + }, + } + + resp, err = opt.Build() + if err != nil { + return nil, err + } + for key, val := range resp.RespList { + if val.Full || hasGroupUpdate[key] { + count, err := s.db.FindGroupMemberNum(ctx, key) + if err != nil { + return nil, err + } + owner, err := s.db.TakeGroupOwner(ctx, key) + if err != nil { + return nil, err + } + resp.RespList[key].Group = s.groupDB2PB(groupsMap[key], owner.UserID, count) + } + } + + return resp, nil + +} + func (s *groupServer) GetIncrementalJoinGroup(ctx context.Context, req *pbgroup.GetIncrementalJoinGroupReq) (*pbgroup.GetIncrementalJoinGroupResp, error) { if err := authverify.CheckAccessV3(ctx, req.UserID, s.config.Share.IMAdminUserID); err != nil { return nil, err @@ -133,7 +265,6 @@ func (s *groupServer) GetIncrementalJoinGroup(ctx context.Context, req *pbgroup. Version: s.db.FindJoinIncrVersion, CacheMaxVersion: s.db.FindMaxJoinGroupVersionCache, Find: s.getGroupsInfo, - ID: func(elem *sdkws.GroupInfo) string { return elem.GroupID }, Resp: func(version *model.VersionLog, delIDs []string, insertList, updateList []*sdkws.GroupInfo, full bool) *pbgroup.GetIncrementalJoinGroupResp { return &pbgroup.GetIncrementalJoinGroupResp{ VersionID: version.ID.Hex(), diff --git a/internal/rpc/incrversion/batch_option.go b/internal/rpc/incrversion/batch_option.go index 1d0feb838..3c7d0a391 100644 --- a/internal/rpc/incrversion/batch_option.go +++ b/internal/rpc/incrversion/batch_option.go @@ -11,16 +11,15 @@ import ( type BatchOption[A, B any] struct { Ctx context.Context - VersionKeys []string + TargetKeys []string VersionIDs []string VersionNumbers []uint64 //SyncLimit int Versions func(ctx context.Context, dIds []string, versions []uint64, limits []int) (map[string]*model.VersionLog, error) CacheMaxVersions func(ctx context.Context, dIds []string) (map[string]*model.VersionLog, error) - //SortID func(ctx context.Context, dId string) ([]string, error) - Find func(ctx context.Context, ids []string) ([]A, error) + Find func(ctx context.Context, dId string, ids []string) (A, error) // Resp func(version map[string]*model.VersionLog, deleteIds, insertList, updateList []A, full bool) []*B - Resp func(versions map[string]*model.VersionLog, deleteIdsMap map[string][]string, insertListMap, updateListMap map[string][]A, fullMap map[string]bool) *B + Resp func(versions map[string]*model.VersionLog, deleteIdsMap map[string][]string, insertListMap, updateListMap map[string]A, fullMap map[string]bool) *B } func (o *BatchOption[A, B]) newError(msg string) error { @@ -31,8 +30,8 @@ func (o *BatchOption[A, B]) check() error { if o.Ctx == nil { return o.newError("opt ctx is nil") } - if len(o.VersionKeys) == 0 { - return o.newError("versionKeys is empty") + if len(o.TargetKeys) == 0 { + return o.newError("targetKeys is empty") } if o.Versions == nil { return o.newError("func versions is nil") @@ -64,17 +63,17 @@ func (o *BatchOption[A, B]) equalIDs(objIDs []primitive.ObjectID) []bool { } func (o *BatchOption[A, B]) getVersions(tags *[]int) (versions map[string]*model.VersionLog, err error) { - valids := o.validVersions() - var dIDs []string var versionNums []uint64 var limits []int + valids := o.validVersions() + if o.CacheMaxVersions == nil { for i, valid := range valids { if valid { (*tags)[i] = tagQuery - dIDs = append(dIDs, o.VersionKeys[i]) + dIDs = append(dIDs, o.TargetKeys[i]) versionNums = append(versionNums, o.VersionNumbers[i]) limits = append(limits, syncLimit) @@ -85,7 +84,7 @@ func (o *BatchOption[A, B]) getVersions(tags *[]int) (versions map[string]*model // versions[o.VersionKeys[i]] = version[o.VersionKeys[i]] } else { (*tags)[i] = tagFull - dIDs = append(dIDs, o.VersionKeys[i]) + dIDs = append(dIDs, o.TargetKeys[i]) versionNums = append(versionNums, 0) limits = append(limits, 0) @@ -102,7 +101,7 @@ func (o *BatchOption[A, B]) getVersions(tags *[]int) (versions map[string]*model } return versions, nil } else { - caches, err := o.CacheMaxVersions(o.Ctx, o.VersionKeys) + caches, err := o.CacheMaxVersions(o.Ctx, o.TargetKeys) if err != nil { return nil, err } @@ -119,16 +118,16 @@ func (o *BatchOption[A, B]) getVersions(tags *[]int) (versions map[string]*model } else if !equals[i] { (*tags)[i] = tagFull // versions[o.VersionKeys[i]] = caches[o.VersionKeys[i]] - } else if o.VersionNumbers[i] == uint64(caches[o.VersionKeys[i]].Version) { + } else if o.VersionNumbers[i] == uint64(caches[o.TargetKeys[i]].Version) { (*tags)[i] = tagEqual // versions[o.VersionKeys[i]] = caches[o.VersionKeys[i]] } else { (*tags)[i] = tagQuery - dIDs = append(dIDs, o.VersionKeys[i]) + dIDs = append(dIDs, o.TargetKeys[i]) versionNums = append(versionNums, o.VersionNumbers[i]) limits = append(limits, syncLimit) - delete(caches, o.VersionKeys[i]) + delete(caches, o.TargetKeys[i]) // versions[o.VersionKeys[i]] = version[o.VersionKeys[i]] } } @@ -149,7 +148,7 @@ func (o *BatchOption[A, B]) Build() (*B, error) { return nil, err } - tags := make([]int, len(o.VersionKeys)) + tags := make([]int, len(o.TargetKeys)) versions, err := o.getVersions(&tags) if err != nil { return nil, err @@ -159,12 +158,12 @@ func (o *BatchOption[A, B]) Build() (*B, error) { for i, tag := range tags { switch tag { case tagQuery: - version := versions[o.VersionKeys[i]] - fullMap[o.VersionKeys[i]] = version.ID.Hex() != o.VersionIDs[i] || uint64(version.Version) < o.VersionNumbers[i] || len(version.Logs) != version.LogLen + vLog := versions[o.TargetKeys[i]] + fullMap[o.TargetKeys[i]] = vLog.ID.Hex() != o.VersionIDs[i] || uint64(vLog.Version) < o.VersionNumbers[i] || len(vLog.Logs) != vLog.LogLen case tagFull: - fullMap[o.VersionKeys[i]] = true + fullMap[o.TargetKeys[i]] = true case tagEqual: - fullMap[o.VersionKeys[i]] = false + fullMap[o.TargetKeys[i]] = false default: panic(fmt.Errorf("undefined tag %d", tag)) } @@ -176,38 +175,38 @@ func (o *BatchOption[A, B]) Build() (*B, error) { updateIdsMap = make(map[string][]string) ) - for _, versionKey := range o.VersionKeys { - if !fullMap[versionKey] { - version := versions[versionKey] + for _, targetKey := range o.TargetKeys { + if !fullMap[targetKey] { + version := versions[targetKey] insertIds, deleteIds, updateIds := version.DeleteAndChangeIDs() - insertIdsMap[versionKey] = insertIds - deleteIdsMap[versionKey] = deleteIds - updateIdsMap[versionKey] = updateIds + insertIdsMap[targetKey] = insertIds + deleteIdsMap[targetKey] = deleteIds + updateIdsMap[targetKey] = updateIds } } var ( - insertListMap = make(map[string][]A) - updateListMap = make(map[string][]A) + insertListMap = make(map[string]A) + updateListMap = make(map[string]A) ) - for versionKey, insertIds := range insertIdsMap { + for targetKey, insertIds := range insertIdsMap { if len(insertIds) > 0 { - insertList, err := o.Find(o.Ctx, insertIds) + insertList, err := o.Find(o.Ctx, targetKey, insertIds) if err != nil { return nil, err } - insertListMap[versionKey] = insertList + insertListMap[targetKey] = insertList } } - for versionKey, updateIds := range updateIdsMap { + for targetKey, updateIds := range updateIdsMap { if len(updateIds) > 0 { - updateList, err := o.Find(o.Ctx, updateIds) + updateList, err := o.Find(o.Ctx, targetKey, updateIds) if err != nil { return nil, err } - updateListMap[versionKey] = updateList + updateListMap[targetKey] = updateList } } diff --git a/pkg/common/storage/cache/redis/group.go b/pkg/common/storage/cache/redis/group.go index d5af717f1..4764897bb 100644 --- a/pkg/common/storage/cache/redis/group.go +++ b/pkg/common/storage/cache/redis/group.go @@ -391,17 +391,17 @@ func (g *GroupCacheRedis) FindMaxGroupMemberVersion(ctx context.Context, groupID }) } -func (g *GroupCacheRedis) BatchFindMaxGroupMemberVersion(ctx context.Context, groupIDs []string) (versionLogs []*model.VersionLog, err error) { - for _, groupID := range groupIDs { - verionLog, err := getCache(ctx, g.rcClient, g.getGroupMemberMaxVersionKey(groupID), g.expireTime, func(ctx context.Context) (*model.VersionLog, error) { - return g.groupMemberDB.FindMemberIncrVersion(ctx, groupID, 0, 0) +func (g *GroupCacheRedis) BatchFindMaxGroupMemberVersion(ctx context.Context, groupIDs []string) ([]*model.VersionLog, error) { + return batchGetCache2(ctx, g.rcClient, g.expireTime, groupIDs, + func(groupID string) string { + return g.getGroupMemberMaxVersionKey(groupID) + }, func(versionLog *model.VersionLog) string { + return versionLog.DID + }, func(ctx context.Context, groupIDs []string) ([]*model.VersionLog, error) { + versions := make([]uint, len(groupIDs)) + limits := make([]int, len(groupIDs)) + return g.groupMemberDB.BatchFindMemberIncrVersion(ctx, groupIDs, versions, limits) }) - if err != nil { - return nil, errs.Wrap(err) - } - versionLogs = append(versionLogs, verionLog) - } - return versionLogs, errs.Wrap(err) } func (g *GroupCacheRedis) FindMaxJoinGroupVersion(ctx context.Context, userID string) (*model.VersionLog, error) { diff --git a/pkg/common/storage/controller/group.go b/pkg/common/storage/controller/group.go index 3a5f48d4c..de79e5374 100644 --- a/pkg/common/storage/controller/group.go +++ b/pkg/common/storage/controller/group.go @@ -16,17 +16,19 @@ package controller import ( "context" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" redis2 "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/common" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "time" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/tools/db/pagination" "github.com/openimsdk/tools/db/tx" + "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/utils/datautil" "github.com/redis/go-redis/v9" ) @@ -108,6 +110,7 @@ type GroupDatabase interface { DeleteGroupMemberHash(ctx context.Context, groupIDs []string) error FindMemberIncrVersion(ctx context.Context, groupID string, version uint, limit int) (*model.VersionLog, error) + BatchFindMemberIncrVersion(ctx context.Context, groupIDs []string, versions []uint64, limits []int) (map[string]*model.VersionLog, error) FindJoinIncrVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) MemberGroupIncrVersion(ctx context.Context, groupID string, userIDs []string, state int32) error @@ -115,6 +118,7 @@ type GroupDatabase interface { //FindSortJoinGroupIDs(ctx context.Context, userID string) ([]string, error) FindMaxGroupMemberVersionCache(ctx context.Context, groupID string) (*model.VersionLog, error) + BatchFindMaxGroupMemberVersionCache(ctx context.Context, groupIDs []string) (map[string]*model.VersionLog, error) FindMaxJoinGroupVersionCache(ctx context.Context, userID string) (*model.VersionLog, error) SearchJoinGroup(ctx context.Context, userID string, keyword string, pagination pagination.Pagination) (int64, []*model.Group, error) @@ -498,6 +502,24 @@ func (g *groupDatabase) FindMemberIncrVersion(ctx context.Context, groupID strin return g.groupMemberDB.FindMemberIncrVersion(ctx, groupID, version, limit) } +func (g *groupDatabase) BatchFindMemberIncrVersion(ctx context.Context, groupIDs []string, versions []uint64, limits []int) (map[string]*model.VersionLog, error) { + if len(groupIDs) == 0 { + return nil, nil + } + var uintVersions []uint + for _, version := range versions { + uintVersions = append(uintVersions, uint(version)) + } + versionLogs, err := g.groupMemberDB.BatchFindMemberIncrVersion(ctx, groupIDs, uintVersions, limits) + if err != nil { + return nil, errs.Wrap(err) + } + groupMemberIncrVersionsMap := datautil.SliceToMap(versionLogs, func(e *model.VersionLog) string { + return e.DID + }) + return groupMemberIncrVersionsMap, nil +} + func (g *groupDatabase) FindJoinIncrVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) { return g.groupMemberDB.FindJoinIncrVersion(ctx, userID, version, limit) } @@ -506,6 +528,20 @@ func (g *groupDatabase) FindMaxGroupMemberVersionCache(ctx context.Context, grou return g.cache.FindMaxGroupMemberVersion(ctx, groupID) } +func (g *groupDatabase) BatchFindMaxGroupMemberVersionCache(ctx context.Context, groupIDs []string) (map[string]*model.VersionLog, error) { + if len(groupIDs) == 0 { + return nil, nil + } + versionLogs, err := g.cache.BatchFindMaxGroupMemberVersion(ctx, groupIDs) + if err != nil { + return nil, errs.Wrap(err) + } + maxGroupMemberVersionsMap := datautil.SliceToMap(versionLogs, func(e *model.VersionLog) string { + return e.DID + }) + return maxGroupMemberVersionsMap, nil +} + func (g *groupDatabase) FindMaxJoinGroupVersionCache(ctx context.Context, userID string) (*model.VersionLog, error) { return g.cache.FindMaxJoinGroupVersion(ctx, userID) } diff --git a/pkg/common/storage/database/group_member.go b/pkg/common/storage/database/group_member.go index 04f38bf27..0ddf0654c 100644 --- a/pkg/common/storage/database/group_member.go +++ b/pkg/common/storage/database/group_member.go @@ -41,5 +41,6 @@ type GroupMember interface { JoinGroupIncrVersion(ctx context.Context, userID string, groupIDs []string, state int32) error MemberGroupIncrVersion(ctx context.Context, groupID string, userIDs []string, state int32) error FindMemberIncrVersion(ctx context.Context, groupID string, version uint, limit int) (*model.VersionLog, error) + BatchFindMemberIncrVersion(ctx context.Context, groupIDs []string, versions []uint, limits []int) ([]*model.VersionLog, error) FindJoinIncrVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) } diff --git a/pkg/common/storage/database/mgo/group_member.go b/pkg/common/storage/database/mgo/group_member.go index e9a780f78..783bb11ad 100644 --- a/pkg/common/storage/database/mgo/group_member.go +++ b/pkg/common/storage/database/mgo/group_member.go @@ -226,6 +226,11 @@ func (g *GroupMemberMgo) FindMemberIncrVersion(ctx context.Context, groupID stri return g.member.FindChangeLog(ctx, groupID, version, limit) } +func (g *GroupMemberMgo) BatchFindMemberIncrVersion(ctx context.Context, groupIDs []string, versions []uint, limits []int) ([]*model.VersionLog, error) { + log.ZDebug(ctx, "Batch find member incr version", "groupIDs", groupIDs, "versions", versions) + return g.member.BatchFindChangeLog(ctx, groupIDs, versions, limits) +} + func (g *GroupMemberMgo) FindJoinIncrVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) { log.ZDebug(ctx, "find join incr version", "userID", userID, "version", version) return g.join.FindChangeLog(ctx, userID, version, limit) diff --git a/pkg/common/storage/database/mgo/version_log.go b/pkg/common/storage/database/mgo/version_log.go index 8836742f0..3929d7031 100644 --- a/pkg/common/storage/database/mgo/version_log.go +++ b/pkg/common/storage/database/mgo/version_log.go @@ -3,6 +3,8 @@ package mgo import ( "context" "errors" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx" @@ -13,7 +15,6 @@ import ( "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" - "time" ) func NewVersionLog(coll *mongo.Collection) (database.VersionLog, error) { @@ -181,6 +182,26 @@ func (l *VersionLogMgo) FindChangeLog(ctx context.Context, dId string, version u } } +func (l *VersionLogMgo) BatchFindChangeLog(ctx context.Context, dIds []string, versions []uint, limits []int) (vLogs []*model.VersionLog, err error) { + for i := 0; i < len(dIds); i++ { + if vLog, err := l.findChangeLog(ctx, dIds[i], versions[i], limits[i]); err == nil { + vLogs = append(vLogs, vLog) + } else if !errors.Is(err, mongo.ErrNoDocuments) { + log.ZError(ctx, "findChangeLog error:", errs.Wrap(err)) + } + log.ZDebug(ctx, "init doc", "dId", dIds[i]) + if res, err := l.initDoc(ctx, dIds[i], nil, 0, time.Now()); err == nil { + log.ZDebug(ctx, "init doc success", "dId", dIds[i]) + vLogs = append(vLogs, res) + } else if mongo.IsDuplicateKeyError(err) { + l.findChangeLog(ctx, dIds[i], versions[i], limits[i]) + } else { + log.ZError(ctx, "init doc error:", errs.Wrap(err)) + } + } + return vLogs, errs.Wrap(err) +} + func (l *VersionLogMgo) findChangeLog(ctx context.Context, dId string, version uint, limit int) (*model.VersionLog, error) { if version == 0 && limit == 0 { return l.findDoc(ctx, dId) diff --git a/pkg/common/storage/database/version_log.go b/pkg/common/storage/database/version_log.go index 9d7bcc172..28224a7c7 100644 --- a/pkg/common/storage/database/version_log.go +++ b/pkg/common/storage/database/version_log.go @@ -2,8 +2,9 @@ package database import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "time" + + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" ) const ( @@ -14,6 +15,7 @@ const ( type VersionLog interface { IncrVersion(ctx context.Context, dId string, eIds []string, state int32) error FindChangeLog(ctx context.Context, dId string, version uint, limit int) (*model.VersionLog, error) + BatchFindChangeLog(ctx context.Context, dIds []string, versions []uint, limits []int) ([]*model.VersionLog, error) DeleteAfterUnchangedLog(ctx context.Context, deadline time.Time) error Delete(ctx context.Context, dId string) error }