From 2e433fb9f31c7b05514a6829e017f873e4e67efa Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Thu, 18 Jul 2024 21:43:08 +0800 Subject: [PATCH] optimize method structures. --- internal/api/group.go | 75 ------------------------ internal/rpc/group/sync.go | 64 ++++++++++---------- internal/rpc/incrversion/batch_option.go | 26 +------- pkg/common/storage/cache/redis/group.go | 2 + pkg/common/storage/controller/group.go | 7 ++- 5 files changed, 41 insertions(+), 133 deletions(-) diff --git a/internal/api/group.go b/internal/api/group.go index 933a6eb7a..bff008974 100644 --- a/internal/api/group.go +++ b/internal/api/group.go @@ -147,81 +147,6 @@ func (o *GroupApi) GetIncrementalGroupMember(c *gin.Context) { func (o *GroupApi) GetIncrementalGroupMemberBatch(c *gin.Context) { a2r.Call(group.GroupClient.BatchGetIncrementalGroupMember, o.Client, c) - - // // OLd. Need Deprecated - // // type BatchIncrementalReq struct { - // // UserID string `json:"user_id"` - // // List []*group.GetIncrementalGroupMemberReq `json:"list"` - // // } - // // type BatchIncrementalResp struct { - // // List map[string]*group.GetIncrementalGroupMemberResp `json:"list"` - // // } - // // req, err := a2r.ParseRequestNotCheck[BatchIncrementalReq](c) - // // if err != nil { - // // apiresp.GinError(c, err) - // // return - // // } - // // resp := &BatchIncrementalResp{ - // // List: make(map[string]*group.GetIncrementalGroupMemberResp), - // // } - - // //**** Start ****// - // var ( - // changeCount int - // ) - // req, err := a2r.ParseRequestNotCheck[group.BatchGetIncrementalGroupMemberReq](c) - // if err != nil { - // apiresp.GinError(c, err) - // return - // } - // resp, err := o.Client.BatchGetIncrementalGroupMember(c, req) - // if err != nil { - // if len(resp.RespList) == 0 { - // apiresp.GinError(c, err) - // } else { - // log.ZError(c, "group incr sync version", err) - // apiresp.GinSuccess(c, resp) - // } - // } - - // res := make(map[string]*group.GetIncrementalGroupMemberResp) - // for k, val := range resp.RespList { - // changeCount += len(val.Insert) + len(val.Delete) + len(val.Update) - // if changeCount >= 200 { - // break - // } - // res[k] = val - // } - // resp = &group.BatchGetIncrementalGroupMemberResp{ - // RespList: res, - // } - - // // OLd. Need Deprecated - // // for _, req := range req.List { - // // if _, ok := resp.List[req.GroupID]; ok { - // // continue - // // } - // // res, err := o.Client.GetIncrementalGroupMember(c, req) - // // if err != nil { - // // if len(resp.List) == 0 { - // // apiresp.GinError(c, err) - // // } else { - // // log.ZError(c, "group incr sync version", err, "groupID", req.GroupID, "success", len(resp.List)) - // // apiresp.GinSuccess(c, resp) - // // } - // // return - // // } - // // resp.List[req.GroupID] = res - // // changeCount += len(res.Insert) + len(res.Delete) + len(res.Update) - // // if changeCount >= 200 { - // // break - // // } - // // } - - // apiresp.GinSuccess(c, resp) - - //**** End ****// - } func (o *GroupApi) GetFullGroupMemberUserIDs(c *gin.Context) { diff --git a/internal/rpc/group/sync.go b/internal/rpc/group/sync.go index 06ef8f671..bddcbd7cd 100644 --- a/internal/rpc/group/sync.go +++ b/internal/rpc/group/sync.go @@ -130,8 +130,10 @@ func (s *groupServer) BatchGetIncrementalGroupMember(ctx context.Context, req *p } var groupIDs []string - groupVersionMap := make(map[string]*VersionInfo) + + groupsVersionMap := make(map[string]*VersionInfo) groupsMap := make(map[string]*model.Group) + hasGroupUpdateMap := make(map[string]bool) var targetKeys, versionIDs []string var versionNumbers []uint64 @@ -139,7 +141,7 @@ func (s *groupServer) BatchGetIncrementalGroupMember(ctx context.Context, req *p // var requestBodyLen int for _, group := range req.ReqList { - groupVersionMap[group.GroupID] = &VersionInfo{ + groupsVersionMap[group.GroupID] = &VersionInfo{ GroupID: group.GroupID, VersionID: group.VersionID, VersionNumber: group.Version, @@ -157,19 +159,19 @@ func (s *groupServer) BatchGetIncrementalGroupMember(ctx context.Context, req *p if group.Status == constant.GroupStatusDismissed { err = servererrs.ErrDismissedAlready.Wrap() log.ZError(ctx, "This group is Dismissed Already", err, "group is", group.GroupID) - delete(groupVersionMap, group.GroupID) + + delete(groupsVersionMap, group.GroupID) } else { groupsMap[group.GroupID] = group - // truegroupIDs = append(truegroupIDs, group.GroupID) } } - for key, val := range groupVersionMap { - targetKeys = append(targetKeys, key) - versionIDs = append(versionIDs, val.VersionID) - versionNumbers = append(versionNumbers, val.VersionNumber) + + for groupID, vInfo := range groupsVersionMap { + targetKeys = append(targetKeys, groupID) + versionIDs = append(versionIDs, vInfo.VersionID) + versionNumbers = append(versionNumbers, vInfo.VersionNumber) } - var hasGroupUpdate map[string]bool opt := incrversion.BatchOption[[]*sdkws.GroupMemberFullInfo, pbgroup.BatchGetIncrementalGroupMemberResp]{ Ctx: ctx, TargetKeys: targetKeys, @@ -180,17 +182,18 @@ func (s *groupServer) BatchGetIncrementalGroupMember(ctx context.Context, req *p if err != nil { return nil, err } - for key, vlog := range vLogs { + + for groupID, vlog := range vLogs { vlog.Logs = slices.DeleteFunc(vlog.Logs, func(elem model.VersionLogElem) bool { if elem.EID == "" { vlog.LogLen-- - hasGroupUpdate[key] = true + hasGroupUpdateMap[groupID] = true return true } return false }) if vlog.LogLen > 0 { - hasGroupUpdate[key] = true + hasGroupUpdateMap[groupID] = true } } @@ -198,30 +201,24 @@ func (s *groupServer) BatchGetIncrementalGroupMember(ctx context.Context, req *p }, CacheMaxVersions: s.db.BatchFindMaxGroupMemberVersionCache, Find: func(ctx context.Context, groupID string, ids []string) ([]*sdkws.GroupMemberFullInfo, error) { - // memberInfoMap := make(map[string][]*sdkws.GroupMemberFullInfo) - // for _, groupID := range groupIDs { memberInfo, err := s.getGroupMembersInfo(ctx, groupID, ids) if err != nil { return nil, err } - // memberInfoMap:=datautil.SliceToMap(memberInfo, func(e *sdkws.GroupMemberFullInfo) string { - // return e.GroupID - // }) - // // memberInfoMap[groupID] = memberInfo - // // } + return memberInfo, err }, Resp: func(versions map[string]*model.VersionLog, deleteIdsMap map[string][]string, insertListMap, updateListMap map[string][]*sdkws.GroupMemberFullInfo, fullMap map[string]bool) *pbgroup.BatchGetIncrementalGroupMemberResp { resList := make(map[string]*pbgroup.GetIncrementalGroupMemberResp) - for key, version := range versions { - resList[key] = &pbgroup.GetIncrementalGroupMemberResp{ - VersionID: version.ID.Hex(), - Version: uint64(version.Version), - Full: fullMap[key], - Delete: deleteIdsMap[key], - Insert: insertListMap[key], - Update: updateListMap[key], + for groupID, versionLog := range versions { + resList[groupID] = &pbgroup.GetIncrementalGroupMemberResp{ + VersionID: versionLog.ID.Hex(), + Version: uint64(versionLog.Version), + Full: fullMap[groupID], + Delete: deleteIdsMap[groupID], + Insert: insertListMap[groupID], + Update: updateListMap[groupID], } } @@ -235,17 +232,20 @@ func (s *groupServer) BatchGetIncrementalGroupMember(ctx context.Context, req *p if err != nil { return nil, err } - for key, val := range resp.RespList { - if val.Full || hasGroupUpdate[key] { - count, err := s.db.FindGroupMemberNum(ctx, key) + + for groupID, val := range resp.RespList { + if val.Full || hasGroupUpdateMap[groupID] { + count, err := s.db.FindGroupMemberNum(ctx, groupID) if err != nil { return nil, err } - owner, err := s.db.TakeGroupOwner(ctx, key) + + owner, err := s.db.TakeGroupOwner(ctx, groupID) if err != nil { return nil, err } - resp.RespList[key].Group = s.groupDB2PB(groupsMap[key], owner.UserID, count) + + resp.RespList[groupID].Group = s.groupDB2PB(groupsMap[groupID], owner.UserID, count) } } diff --git a/internal/rpc/incrversion/batch_option.go b/internal/rpc/incrversion/batch_option.go index 3c7d0a391..7fbe70c90 100644 --- a/internal/rpc/incrversion/batch_option.go +++ b/internal/rpc/incrversion/batch_option.go @@ -18,8 +18,7 @@ type BatchOption[A, B any] struct { Versions func(ctx context.Context, dIds []string, versions []uint64, limits []int) (map[string]*model.VersionLog, error) CacheMaxVersions func(ctx context.Context, dIds []string) (map[string]*model.VersionLog, error) Find func(ctx context.Context, dId string, ids []string) (A, error) - // Resp func(version map[string]*model.VersionLog, deleteIds, insertList, updateList []A, full bool) []*B - Resp func(versions map[string]*model.VersionLog, deleteIdsMap map[string][]string, insertListMap, updateListMap map[string]A, fullMap map[string]bool) *B + Resp func(versionsMap map[string]*model.VersionLog, deleteIdsMap map[string][]string, insertListMap, updateListMap map[string]A, fullMap map[string]bool) *B } func (o *BatchOption[A, B]) newError(msg string) error { @@ -212,26 +211,3 @@ func (o *BatchOption[A, B]) Build() (*B, error) { return o.Resp(versions, deleteIdsMap, insertListMap, updateListMap, fullMap), nil } - -// for _, versionLog := range versionLogs { -// if versionLog != nil { -// if !full { - -// } -// insertIds, deleteIds, updateIds = append(insertIds, versionLog.InsertID...), append(deleteIds, versionLog.DeleteIDs...), append(updateIds, versionLog.UpdateIDs...) -// } -// } - -// insertList, err := o.Find(o.Ctx, insertIds) -// if err != nil { -// return nil, err -// } - -// updateList, err := o.Find(o.Ctx, updateIds) -// if err != nil { -// return nil, err -// } - -// full := len(insertIds) > 0 || len(updateIds) > 0 - -// return o.Resp(versionLogs, deleteIds, insertList, updateList, full), nil diff --git a/pkg/common/storage/cache/redis/group.go b/pkg/common/storage/cache/redis/group.go index 4764897bb..736111df3 100644 --- a/pkg/common/storage/cache/redis/group.go +++ b/pkg/common/storage/cache/redis/group.go @@ -398,8 +398,10 @@ func (g *GroupCacheRedis) BatchFindMaxGroupMemberVersion(ctx context.Context, gr }, func(versionLog *model.VersionLog) string { return versionLog.DID }, func(ctx context.Context, groupIDs []string) ([]*model.VersionLog, error) { + // create two slices with len is groupIDs, just need 0 versions := make([]uint, len(groupIDs)) limits := make([]int, len(groupIDs)) + return g.groupMemberDB.BatchFindMemberIncrVersion(ctx, groupIDs, versions, limits) }) } diff --git a/pkg/common/storage/controller/group.go b/pkg/common/storage/controller/group.go index de79e5374..896b3b9c1 100644 --- a/pkg/common/storage/controller/group.go +++ b/pkg/common/storage/controller/group.go @@ -504,19 +504,24 @@ func (g *groupDatabase) FindMemberIncrVersion(ctx context.Context, groupID strin func (g *groupDatabase) BatchFindMemberIncrVersion(ctx context.Context, groupIDs []string, versions []uint64, limits []int) (map[string]*model.VersionLog, error) { if len(groupIDs) == 0 { - return nil, nil + return nil, errs.New("groupIDs is nil.") } + + // convert []uint64 to []uint var uintVersions []uint for _, version := range versions { uintVersions = append(uintVersions, uint(version)) } + versionLogs, err := g.groupMemberDB.BatchFindMemberIncrVersion(ctx, groupIDs, uintVersions, limits) if err != nil { return nil, errs.Wrap(err) } + groupMemberIncrVersionsMap := datautil.SliceToMap(versionLogs, func(e *model.VersionLog) string { return e.DID }) + return groupMemberIncrVersionsMap, nil }