diff --git a/config/openim-rpc-group.yml b/config/openim-rpc-group.yml index f49c46207..5fff8b9fe 100644 --- a/config/openim-rpc-group.yml +++ b/config/openim-rpc-group.yml @@ -11,3 +11,6 @@ prometheus: enable: true # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup ports: [ 20103 ] + + +enableHistoryForNewMembers: true \ No newline at end of file diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 284d02d5d..fc6574e54 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -436,6 +436,14 @@ func (c *conversationServer) SetConversationMaxSeq(ctx context.Context, req *pbc return &pbconversation.SetConversationMaxSeqResp{}, nil } +func (c *conversationServer) SetConversationMinSeq(ctx context.Context, req *pbconversation.SetConversationMinSeqReq) (*pbconversation.SetConversationMinSeqResp, error) { + if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.OwnerUserID, req.ConversationID, + map[string]any{"min_seq": req.MinSeq}); err != nil { + return nil, err + } + return &pbconversation.SetConversationMinSeqResp{}, nil +} + func (c *conversationServer) GetConversationIDs(ctx context.Context, req *pbconversation.GetConversationIDsReq) (*pbconversation.GetConversationIDsResp, error) { conversationIDs, err := c.conversationDatabase.GetConversationIDs(ctx, req.UserID) if err != nil { diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index e4069585f..9589ed249 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -105,13 +105,20 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg database := controller.NewGroupDatabase(rdb, &config.LocalCacheConfig, groupDB, groupMemberDB, groupRequestDB, mgocli.GetTx(), grouphash.NewGroupHashFromGroupServer(&gs)) gs.db = database gs.user = userRpcClient - gs.notification = NewGroupNotificationSender(database, &msgRpcClient, &userRpcClient, config, func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error) { - users, err := userRpcClient.GetUsersInfo(ctx, userIDs) - if err != nil { - return nil, err - } - return datautil.Slice(users, func(e *sdkws.UserInfo) notification.CommonUser { return e }), nil - }) + gs.notification = NewGroupNotificationSender( + database, + &msgRpcClient, + &userRpcClient, + &conversationRpcClient, + config, + func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error) { + users, err := userRpcClient.GetUsersInfo(ctx, userIDs) + if err != nil { + return nil, err + } + return datautil.Slice(users, func(e *sdkws.UserInfo) notification.CommonUser { return e }), nil + }, + ) localcache.InitLocalCache(&config.LocalCacheConfig) gs.conversationRpcClient = conversationRpcClient gs.msgRpcClient = msgRpcClient @@ -450,10 +457,10 @@ func (g *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite if err := g.db.CreateGroup(ctx, nil, groupMembers); err != nil { return nil, err } - if err := g.conversationRpcClient.GroupChatFirstCreateConversation(ctx, req.GroupID, req.InvitedUserIDs); err != nil { + + if err = g.notification.MemberEnterNotification(ctx, req.GroupID, req.InvitedUserIDs...); err != nil { return nil, err } - g.notification.MemberInvitedNotification(ctx, req.GroupID, req.Reason, req.InvitedUserIDs) return &pbgroup.InviteUserToGroupResp{}, nil } @@ -822,14 +829,13 @@ func (g *groupServer) GroupApplicationResponse(ctx context.Context, req *pbgroup } switch req.HandleResult { case constant.GroupResponseAgree: - if err := g.conversationRpcClient.GroupChatFirstCreateConversation(ctx, req.GroupID, []string{req.FromUserID}); err != nil { - return nil, err - } g.notification.GroupApplicationAcceptedNotification(ctx, req) if member == nil { log.ZDebug(ctx, "GroupApplicationResponse", "member is nil") } else { - g.notification.MemberEnterNotification(ctx, req.GroupID, req.FromUserID) + if err = g.notification.MemberEnterNotification(ctx, req.GroupID, req.FromUserID); err != nil { + return nil, err + } } case constant.GroupResponseRefuse: g.notification.GroupApplicationRejectedNotification(ctx, req) @@ -889,10 +895,9 @@ func (g *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq) return nil, err } - if err := g.conversationRpcClient.GroupChatFirstCreateConversation(ctx, req.GroupID, []string{req.InviterUserID}); err != nil { + if err = g.notification.MemberEnterNotification(ctx, req.GroupID, req.InviterUserID); err != nil { return nil, err } - g.notification.MemberEnterNotification(ctx, req.GroupID, req.InviterUserID) g.webhookAfterJoinGroup(ctx, &g.config.WebhooksConfig.AfterJoinGroup, req) return &pbgroup.JoinGroupResp{}, nil @@ -1487,7 +1492,7 @@ func (g *groupServer) SetGroupMemberInfo(ctx context.Context, req *pbgroup.SetGr userIDs := make([]string, 0, len(members)+1) for _, member := range members { if _, ok := temp[member.UserID]; ok { - return nil, errs.ErrArgs.WrapMsg(fmt.Sprintf("repeat group %g user %g", member.GroupID, member.UserID)) + return nil, errs.ErrArgs.WrapMsg(fmt.Sprintf("repeat group %s user %s", member.GroupID, member.UserID)) } temp[member.UserID] = struct{}{} userIDs = append(userIDs, member.UserID) @@ -1605,7 +1610,7 @@ func (g *groupServer) GetGroupAbstractInfo(ctx context.Context, req *pbgroup.Get return nil, err } if ids := datautil.Single(req.GroupIDs, datautil.Keys(groupUserMap)); len(ids) > 0 { - return nil, servererrs.ErrGroupIDNotFound.WrapMsg(fmt.Sprintf("group %g not found member", strings.Join(ids, ","))) + return nil, servererrs.ErrGroupIDNotFound.WrapMsg(fmt.Sprintf("group %s not found member", strings.Join(ids, ","))) } return &pbgroup.GetGroupAbstractInfoResp{ GroupAbstractInfos: datautil.Slice(groups, func(group *model.Group) *pbgroup.GroupAbstractInfo { diff --git a/internal/rpc/group/notification.go b/internal/rpc/group/notification.go index 9815167e9..72ca9b813 100644 --- a/internal/rpc/group/notification.go +++ b/internal/rpc/group/notification.go @@ -21,6 +21,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx" + "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification" "github.com/openimsdk/open-im-server/v3/pkg/authverify" @@ -43,12 +44,22 @@ const ( adminReceiver ) -func NewGroupNotificationSender(db controller.GroupDatabase, msgRpcClient *rpcclient.MessageRpcClient, userRpcClient *rpcclient.UserRpcClient, config *Config, fn func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error)) *GroupNotificationSender { +func NewGroupNotificationSender( + db controller.GroupDatabase, + msgRpcClient *rpcclient.MessageRpcClient, + userRpcClient *rpcclient.UserRpcClient, + conversationRpcClient *rpcclient.ConversationRpcClient, + config *Config, + fn func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error), +) *GroupNotificationSender { return &GroupNotificationSender{ NotificationSender: rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithRpcClient(msgRpcClient), rpcclient.WithUserRpcClient(userRpcClient)), getUsersInfo: fn, db: db, config: config, + + conversationRpcClient: conversationRpcClient, + msgRpcClient: msgRpcClient, } } @@ -57,6 +68,9 @@ type GroupNotificationSender struct { getUsersInfo func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error) db controller.GroupDatabase config *Config + + conversationRpcClient *rpcclient.ConversationRpcClient + msgRpcClient *rpcclient.MessageRpcClient } func (g *GroupNotificationSender) PopulateGroupMember(ctx context.Context, members ...*model.GroupMember) error { @@ -494,50 +508,43 @@ func (g *GroupNotificationSender) MemberKickedNotification(ctx context.Context, g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.MemberKickedNotification, tips) } -func (g *GroupNotificationSender) MemberInvitedNotification(ctx context.Context, groupID, reason string, invitedUserIDList []string) { +func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, groupID string, entrantUserID ...string) error { var err error defer func() { if err != nil { log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err) } }() - var group *sdkws.GroupInfo - group, err = g.getGroupInfo(ctx, groupID) - if err != nil { - return + + if !g.config.RpcConfig.EnableHistoryForNewMembers { + conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID) + maxSeq, err := g.msgRpcClient.GetConversationMaxSeq(ctx, conversationID) + if err != nil { + return err + } + err = g.conversationRpcClient.SetConversationMinSeq(ctx, entrantUserID, conversationID, maxSeq) + if err != nil { + return err + } } - var users []*sdkws.GroupMemberFullInfo - users, err = g.getGroupMembers(ctx, groupID, invitedUserIDList) - if err != nil { - return + if err := g.conversationRpcClient.GroupChatFirstCreateConversation(ctx, groupID, entrantUserID); err != nil { + return err } - tips := &sdkws.MemberInvitedTips{Group: group, InvitedUserList: users} - err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID) - g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID) - g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.MemberInvitedNotification, tips) -} -func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, groupID string, entrantUserID string) { - var err error - defer func() { - if err != nil { - log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err) - } - }() var group *sdkws.GroupInfo group, err = g.getGroupInfo(ctx, groupID) if err != nil { - return + return err } - var user *sdkws.GroupMemberFullInfo - user, err = g.getGroupMember(ctx, groupID, entrantUserID) + users, err := g.getGroupMembers(ctx, groupID, entrantUserID) if err != nil { - return + return err } - tips := &sdkws.MemberEnterTips{Group: group, EntrantUser: user} + tips := &sdkws.MemberEnterTips{Group: group, EntrantUsers: users} g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID) g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.MemberEnterNotification, tips) + return nil } func (g *GroupNotificationSender) GroupDismissedNotification(ctx context.Context, tips *sdkws.GroupDismissedTips) { diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index ebb3bf0b7..5261e034c 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -258,7 +258,8 @@ type Group struct { ListenIP string `mapstructure:"listenIP"` Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` - Prometheus Prometheus `mapstructure:"prometheus"` + Prometheus Prometheus `mapstructure:"prometheus"` + EnableHistoryForNewMembers bool `mapstructure:"enableHistoryForNewMembers"` } type Msg struct { diff --git a/pkg/rpcclient/conversation.go b/pkg/rpcclient/conversation.go index 8f95f86a6..ccca85619 100644 --- a/pkg/rpcclient/conversation.go +++ b/pkg/rpcclient/conversation.go @@ -77,6 +77,11 @@ func (c *ConversationRpcClient) SetConversationMaxSeq(ctx context.Context, owner return err } +func (c *ConversationRpcClient) SetConversationMinSeq(ctx context.Context, ownerUserIDs []string, conversationID string, minSeq int64) error { + _, err := c.Client.SetConversationMinSeq(ctx, &pbconversation.SetConversationMinSeqReq{OwnerUserID: ownerUserIDs, ConversationID: conversationID, MinSeq: minSeq}) + return err +} + func (c *ConversationRpcClient) SetConversations(ctx context.Context, userIDs []string, conversation *pbconversation.ConversationReq) error { _, err := c.Client.SetConversations(ctx, &pbconversation.SetConversationsReq{UserIDs: userIDs, Conversation: conversation}) return err