diff --git a/go.mod b/go.mod index 4f6790507..ccd9885d0 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.72-alpha.77 + github.com/openimsdk/protocol v0.0.72-alpha.78 github.com/openimsdk/tools v0.0.50-alpha.74 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index 7db0218c2..18d2a6495 100644 --- a/go.sum +++ b/go.sum @@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM= github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.72-alpha.77 h1:ka7TeOpNKKqYTz8aWR8nZnz2rnzgsO9mw6xOQcVXwbE= -github.com/openimsdk/protocol v0.0.72-alpha.77/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= +github.com/openimsdk/protocol v0.0.72-alpha.78 h1:n9HVj5olMPiGLF3Z4apPvvYzn2yOpyrsn2/YiAaIsxw= +github.com/openimsdk/protocol v0.0.72-alpha.78/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= github.com/openimsdk/tools v0.0.50-alpha.74 h1:yh10SiMiivMEjicEQg+QAsH4pvaO+4noMPdlw+ew0Kc= github.com/openimsdk/tools v0.0.50-alpha.74/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= diff --git a/internal/rpc/group/db_map.go b/internal/rpc/group/db_map.go index e99f9e772..7504bc851 100644 --- a/internal/rpc/group/db_map.go +++ b/internal/rpc/group/db_map.go @@ -16,6 +16,7 @@ package group import ( "context" + "strings" "time" pbgroup "github.com/openimsdk/protocol/group" @@ -55,41 +56,52 @@ func UpdateGroupInfoMap(ctx context.Context, group *sdkws.GroupInfoForSet) map[s return m } -func UpdateGroupInfoExMap(ctx context.Context, group *pbgroup.SetGroupInfoExReq) (map[string]any, error) { - m := make(map[string]any) +func UpdateGroupInfoExMap(ctx context.Context, group *pbgroup.SetGroupInfoExReq) (m map[string]any, normalFlag, groupNameFlag, notificationFlag bool, err error) { + m = make(map[string]any) if group.GroupName != nil { - if group.GroupName.Value != "" { + if strings.TrimSpace(group.GroupName.Value) != "" { m["group_name"] = group.GroupName.Value + groupNameFlag = true } else { - return nil, errs.ErrArgs.WrapMsg("group name is empty") + return nil, normalFlag, notificationFlag, groupNameFlag, errs.ErrArgs.WrapMsg("group name is empty") } } + if group.Notification != nil { + notificationFlag = true + group.Notification.Value = strings.TrimSpace(group.Notification.Value) // if Notification only contains spaces, set it to empty string + m["notification"] = group.Notification.Value - m["notification_update_time"] = time.Now() m["notification_user_id"] = mcontext.GetOpUserID(ctx) + m["notification_update_time"] = time.Now() } if group.Introduction != nil { m["introduction"] = group.Introduction.Value + normalFlag = true } if group.FaceURL != nil { m["face_url"] = group.FaceURL.Value + normalFlag = true } if group.NeedVerification != nil { m["need_verification"] = group.NeedVerification.Value + normalFlag = true } if group.LookMemberInfo != nil { m["look_member_info"] = group.LookMemberInfo.Value + normalFlag = true } if group.ApplyMemberFriend != nil { m["apply_member_friend"] = group.ApplyMemberFriend.Value + normalFlag = true } if group.Ex != nil { m["ex"] = group.Ex.Value + normalFlag = true } - return m, nil + return m, normalFlag, groupNameFlag, notificationFlag, nil } func UpdateGroupStatusMap(status int) map[string]any { diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index c256d23f9..4d4b58f6b 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -298,10 +298,11 @@ func (g *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR g.notification.GroupCreatedNotification(ctx, tips, req.SendNotification) if req.GroupInfo.Notification != "" { + notificationFlag := true g.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{ Group: tips.Group, OpUser: tips.OpUser, - }) + }, ¬ificationFlag) } reqCallBackAfter := &pbgroup.CreateGroupReq{ @@ -457,7 +458,6 @@ func (g *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite if err := g.db.CreateGroup(ctx, nil, groupMembers); err != nil { return nil, err } - if err = g.notification.GroupApplicationAgreeMemberEnterNotification(ctx, req.GroupID, req.SendNotification, opUserID, req.InvitedUserIDs...); err != nil { return nil, err } @@ -1036,7 +1036,8 @@ func (g *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInf log.ZWarn(ctx, "SetConversations", err, "UserIDs", resp.UserIDs, "conversation", conversation) } }() - g.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{Group: tips.Group, OpUser: tips.OpUser}) + notficationFlag := true + g.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{Group: tips.Group, OpUser: tips.OpUser}, ¬ficationFlag) } if req.GroupInfoForSet.GroupName != "" { num-- @@ -1097,7 +1098,7 @@ func (g *groupServer) SetGroupInfoEx(ctx context.Context, req *pbgroup.SetGroupI return nil, err } - updatedData, err := UpdateGroupInfoExMap(ctx, req) + updatedData, normalFlag, groupNameFlag, notificationFlag, err := UpdateGroupInfoExMap(ctx, req) if len(updatedData) == 0 { return &pbgroup.SetGroupInfoExResp{}, nil } @@ -1125,41 +1126,38 @@ func (g *groupServer) SetGroupInfoEx(ctx context.Context, req *pbgroup.SetGroupI tips.OpUser = g.groupMemberDB2PB(opMember, 0) } - num := len(updatedData) - - if req.Notification != nil { - num -= 3 - + if notificationFlag { if req.Notification.Value != "" { - func() { - conversation := &pbconversation.ConversationReq{ - ConversationID: msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupID), - ConversationType: constant.ReadGroupChatType, - GroupID: req.GroupID, - } + conversation := &pbconv.ConversationReq{ + ConversationID: msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupID), + ConversationType: constant.ReadGroupChatType, + GroupID: req.GroupID, + } - resp, err := g.GetGroupMemberUserIDs(ctx, &pbgroup.GetGroupMemberUserIDsReq{GroupID: req.GroupID}) - if err != nil { - log.ZWarn(ctx, "GetGroupMemberIDs is failed.", err) - return - } + resp, err := g.GetGroupMemberUserIDs(ctx, &pbgroup.GetGroupMemberUserIDsReq{GroupID: req.GroupID}) + if err != nil { + log.ZWarn(ctx, "GetGroupMemberIDs is failed.", err) + return nil, err + } - conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.GroupNotification} - if err := g.conversationClient.SetConversations(ctx, resp.UserIDs, conversation); err != nil { - log.ZWarn(ctx, "SetConversations", err, "UserIDs", resp.UserIDs, "conversation", conversation) - } - }() + conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.GroupNotification} + if err := g.conversationClient.SetConversations(ctx, resp.UserIDs, conversation); err != nil { + log.ZWarn(ctx, "SetConversations", err, "UserIDs", resp.UserIDs, "conversation", conversation) + } - g.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{Group: tips.Group, OpUser: tips.OpUser}) + g.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{Group: tips.Group, OpUser: tips.OpUser}, ¬ificationFlag) + } else { + notificationFlag = false + g.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{Group: tips.Group, OpUser: tips.OpUser}, ¬ificationFlag) } } - if req.GroupName != nil { - num-- + if groupNameFlag { g.notification.GroupInfoSetNameNotification(ctx, &sdkws.GroupInfoSetNameTips{Group: tips.Group, OpUser: tips.OpUser}) } - if num > 0 { + // if updatedData > 0, send the normal notification + if normalFlag { g.notification.GroupInfoSetNotification(ctx, tips) } diff --git a/internal/rpc/group/notification.go b/internal/rpc/group/notification.go index a879d3b4a..9e310ef20 100644 --- a/internal/rpc/group/notification.go +++ b/internal/rpc/group/notification.go @@ -234,17 +234,17 @@ func (g *NotificationSender) groupMemberDB2PB(member *model.GroupMember, appMang return result, nil } */ -func (g *NotificationSender) fillOpUser(ctx context.Context, opUser **sdkws.GroupMemberFullInfo, groupID string) (err error) { - return g.fillOpUserByUserID(ctx, mcontext.GetOpUserID(ctx), opUser, groupID) +func (g *NotificationSender) fillOpUser(ctx context.Context, targetUser **sdkws.GroupMemberFullInfo, groupID string) (err error) { + return g.fillUserByUserID(ctx, mcontext.GetOpUserID(ctx), targetUser, groupID) } -func (g *NotificationSender) fillOpUserByUserID(ctx context.Context, userID string, opUser **sdkws.GroupMemberFullInfo, groupID string) error { - if opUser == nil { +func (g *NotificationSender) fillUserByUserID(ctx context.Context, userID string, targetUser **sdkws.GroupMemberFullInfo, groupID string) error { + if targetUser == nil { return errs.ErrInternalServer.WrapMsg("**sdkws.GroupMemberFullInfo is nil") } if groupID != "" { if authverify.IsManagerUserID(userID, g.config.Share.IMAdminUserID) { - *opUser = &sdkws.GroupMemberFullInfo{ + *targetUser = &sdkws.GroupMemberFullInfo{ GroupID: groupID, UserID: userID, RoleLevel: constant.GroupAdmin, @@ -253,7 +253,7 @@ func (g *NotificationSender) fillOpUserByUserID(ctx context.Context, userID stri } else { member, err := g.db.TakeGroupMember(ctx, groupID, userID) if err == nil { - *opUser = g.groupMemberDB2PB(member, 0) + *targetUser = g.groupMemberDB2PB(member, 0) } else if !(errors.Is(err, mongo.ErrNoDocuments) || errs.ErrRecordNotFound.Is(err)) { return err } @@ -263,8 +263,8 @@ func (g *NotificationSender) fillOpUserByUserID(ctx context.Context, userID stri if err != nil { return err } - if *opUser == nil { - *opUser = &sdkws.GroupMemberFullInfo{ + if *targetUser == nil { + *targetUser = &sdkws.GroupMemberFullInfo{ GroupID: groupID, UserID: userID, Nickname: user.Nickname, @@ -272,11 +272,11 @@ func (g *NotificationSender) fillOpUserByUserID(ctx context.Context, userID stri OperatorUserID: userID, } } else { - if (*opUser).Nickname == "" { - (*opUser).Nickname = user.Nickname + if (*targetUser).Nickname == "" { + (*targetUser).Nickname = user.Nickname } - if (*opUser).FaceURL == "" { - (*opUser).FaceURL = user.FaceURL + if (*targetUser).FaceURL == "" { + (*targetUser).FaceURL = user.FaceURL } } return nil @@ -350,7 +350,7 @@ func (g *NotificationSender) GroupInfoSetNameNotification(ctx context.Context, t g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetNameNotification, tips) } -func (g *NotificationSender) GroupInfoSetAnnouncementNotification(ctx context.Context, tips *sdkws.GroupInfoSetAnnouncementTips) { +func (g *NotificationSender) GroupInfoSetAnnouncementNotification(ctx context.Context, tips *sdkws.GroupInfoSetAnnouncementTips, sendMessage *bool) { var err error defer func() { if err != nil { @@ -557,15 +557,13 @@ func (g *NotificationSender) GroupApplicationAgreeMemberEnterNotification(ctx co InvitedUserList: users, } opUserID := mcontext.GetOpUserID(ctx) - if err = g.fillOpUserByUserID(ctx, opUserID, &tips.OpUser, tips.Group.GroupID); err != nil { + if err = g.fillUserByUserID(ctx, opUserID, &tips.OpUser, tips.Group.GroupID); err != nil { return nil } - switch { - case invitedOpUserID == "": - case invitedOpUserID == opUserID: + if invitedOpUserID == opUserID { tips.InviterUser = tips.OpUser - default: - if err = g.fillOpUserByUserID(ctx, invitedOpUserID, &tips.InviterUser, tips.Group.GroupID); err != nil { + } else { + if err = g.fillUserByUserID(ctx, invitedOpUserID, &tips.InviterUser, tips.Group.GroupID); err != nil { return err } } @@ -782,7 +780,7 @@ func (g *NotificationSender) GroupMemberSetToAdminNotification(ctx context.Conte if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { return } - g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID) + g.setSortVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID, &tips.GroupSortVersion) g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberSetToAdminNotification, tips) } @@ -807,6 +805,6 @@ func (g *NotificationSender) GroupMemberSetToOrdinaryUserNotification(ctx contex if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { return } - g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID) + g.setSortVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID, &tips.GroupSortVersion) g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberSetToOrdinaryUserNotification, tips) } diff --git a/internal/rpc/group/sync.go b/internal/rpc/group/sync.go index 0592aa811..335f4481b 100644 --- a/internal/rpc/group/sync.go +++ b/internal/rpc/group/sync.go @@ -11,12 +11,12 @@ import ( "github.com/openimsdk/protocol/constant" pbgroup "github.com/openimsdk/protocol/group" "github.com/openimsdk/protocol/sdkws" - "github.com/openimsdk/tools/errs" - "github.com/openimsdk/tools/log" ) -func (s *groupServer) GetFullGroupMemberUserIDs(ctx context.Context, req *pbgroup.GetFullGroupMemberUserIDsReq) (*pbgroup.GetFullGroupMemberUserIDsResp, error) { - vl, err := s.db.FindMaxGroupMemberVersionCache(ctx, req.GroupID) +const versionSyncLimit = 500 + +func (g *groupServer) GetFullGroupMemberUserIDs(ctx context.Context, req *pbgroup.GetFullGroupMemberUserIDsReq) (*pbgroup.GetFullGroupMemberUserIDsResp, error) { + vl, err := g.db.FindMaxGroupMemberVersionCache(ctx, req.GroupID) if err != nil { return nil, err } @@ -132,152 +132,8 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou return resp, nil } -func (s *groupServer) BatchGetIncrementalGroupMember(ctx context.Context, req *pbgroup.BatchGetIncrementalGroupMemberReq) (resp *pbgroup.BatchGetIncrementalGroupMemberResp, err error) { - type VersionInfo struct { - GroupID string - VersionID string - VersionNumber uint64 - } - - var groupIDs []string - - groupsVersionMap := make(map[string]*VersionInfo) - groupsMap := make(map[string]*model.Group) - hasGroupUpdateMap := make(map[string]bool) - sortVersionMap := make(map[string]uint64) - - var targetKeys, versionIDs []string - var versionNumbers []uint64 - - var requestBodyLen int - - for _, group := range req.ReqList { - groupsVersionMap[group.GroupID] = &VersionInfo{ - GroupID: group.GroupID, - VersionID: group.VersionID, - VersionNumber: group.Version, - } - - groupIDs = append(groupIDs, group.GroupID) - } - - groups, err := s.db.FindGroup(ctx, groupIDs) - if err != nil { - return nil, errs.Wrap(err) - } - - for _, group := range groups { - if group.Status == constant.GroupStatusDismissed { - err = servererrs.ErrDismissedAlready.Wrap() - log.ZError(ctx, "This group is Dismissed Already", err, "group is", group.GroupID) - - delete(groupsVersionMap, group.GroupID) - } else { - groupsMap[group.GroupID] = group - } - } - - for groupID, vInfo := range groupsVersionMap { - targetKeys = append(targetKeys, groupID) - versionIDs = append(versionIDs, vInfo.VersionID) - versionNumbers = append(versionNumbers, vInfo.VersionNumber) - } - - opt := incrversion.BatchOption[[]*sdkws.GroupMemberFullInfo, pbgroup.BatchGetIncrementalGroupMemberResp]{ - Ctx: ctx, - TargetKeys: targetKeys, - VersionIDs: versionIDs, - VersionNumbers: versionNumbers, - Versions: func(ctx context.Context, groupIDs []string, versions []uint64, limits []int) (map[string]*model.VersionLog, error) { - vLogs, err := s.db.BatchFindMemberIncrVersion(ctx, groupIDs, versions, limits) - if err != nil { - return nil, errs.Wrap(err) - } - - for groupID, vlog := range vLogs { - vlogElems := make([]model.VersionLogElem, 0, len(vlog.Logs)) - for i, log := range vlog.Logs { - switch log.EID { - case model.VersionGroupChangeID: - vlog.LogLen-- - hasGroupUpdateMap[groupID] = true - case model.VersionSortChangeID: - vlog.LogLen-- - sortVersionMap[groupID] = uint64(log.Version) - default: - vlogElems = append(vlogElems, vlog.Logs[i]) - } - } - vlog.Logs = vlogElems - if vlog.LogLen > 0 { - hasGroupUpdateMap[groupID] = true - } - } - - return vLogs, nil - }, - CacheMaxVersions: s.db.BatchFindMaxGroupMemberVersionCache, - Find: func(ctx context.Context, groupID string, ids []string) ([]*sdkws.GroupMemberFullInfo, error) { - memberInfo, err := s.getGroupMembersInfo(ctx, groupID, ids) - if err != nil { - return nil, err - } - - return memberInfo, err - }, - Resp: func(versions map[string]*model.VersionLog, deleteIdsMap map[string][]string, insertListMap, updateListMap map[string][]*sdkws.GroupMemberFullInfo, fullMap map[string]bool) *pbgroup.BatchGetIncrementalGroupMemberResp { - resList := make(map[string]*pbgroup.GetIncrementalGroupMemberResp) - - for groupID, versionLog := range versions { - resList[groupID] = &pbgroup.GetIncrementalGroupMemberResp{ - VersionID: versionLog.ID.Hex(), - Version: uint64(versionLog.Version), - Full: fullMap[groupID], - Delete: deleteIdsMap[groupID], - Insert: insertListMap[groupID], - Update: updateListMap[groupID], - SortVersion: sortVersionMap[groupID], - } - - requestBodyLen += len(insertListMap[groupID]) + len(updateListMap[groupID]) + len(deleteIdsMap[groupID]) - if requestBodyLen > 200 { - break - } - } - - return &pbgroup.BatchGetIncrementalGroupMemberResp{ - RespList: resList, - } - }, - } - - resp, err = opt.Build() - if err != nil { - return nil, errs.Wrap(err) - } - - for groupID, val := range resp.RespList { - if val.Full || hasGroupUpdateMap[groupID] { - count, err := s.db.FindGroupMemberNum(ctx, groupID) - if err != nil { - return nil, err - } - - owner, err := s.db.TakeGroupOwner(ctx, groupID) - if err != nil { - return nil, err - } - - resp.RespList[groupID].Group = s.groupDB2PB(groupsMap[groupID], owner.UserID, count) - } - } - - return resp, nil - -} - -func (s *groupServer) GetIncrementalJoinGroup(ctx context.Context, req *pbgroup.GetIncrementalJoinGroupReq) (*pbgroup.GetIncrementalJoinGroupResp, error) { - if err := authverify.CheckAccessV3(ctx, req.UserID, s.config.Share.IMAdminUserID); err != nil { +func (g *groupServer) GetIncrementalJoinGroup(ctx context.Context, req *pbgroup.GetIncrementalJoinGroupReq) (*pbgroup.GetIncrementalJoinGroupResp, error) { + if err := authverify.CheckAccessV3(ctx, req.UserID, g.config.Share.IMAdminUserID); err != nil { return nil, err } opt := incrversion.Option[*sdkws.GroupInfo, pbgroup.GetIncrementalJoinGroupResp]{ @@ -301,3 +157,23 @@ func (s *groupServer) GetIncrementalJoinGroup(ctx context.Context, req *pbgroup. } return opt.Build() } + +func (g *groupServer) BatchGetIncrementalGroupMember(ctx context.Context, req *pbgroup.BatchGetIncrementalGroupMemberReq) (*pbgroup.BatchGetIncrementalGroupMemberResp, error) { + var num int + resp := make(map[string]*pbgroup.GetIncrementalGroupMemberResp) + for _, memberReq := range req.ReqList { + if _, ok := resp[memberReq.GroupID]; ok { + continue + } + memberResp, err := g.GetIncrementalGroupMember(ctx, memberReq) + if err != nil { + return nil, err + } + resp[memberReq.GroupID] = memberResp + num += len(memberResp.Insert) + len(memberResp.Update) + len(memberResp.Delete) + if num >= versionSyncLimit { + break + } + } + return &pbgroup.BatchGetIncrementalGroupMemberResp{RespList: resp}, nil +} diff --git a/pkg/notification/msg.go b/pkg/notification/msg.go index 1d2365b5a..98c314a04 100644 --- a/pkg/notification/msg.go +++ b/pkg/notification/msg.go @@ -190,6 +190,7 @@ func WithRpcGetUserName() NotificationOptions { opt.RpcGetUsername = true } } + func WithSendNotification(send *bool) NotificationOptions { return func(opt *notificationOpt) { opt.SendNotification = send