Merge branch 'main' of github.com:openimsdk/open-im-server into fix/userinfo-update

pull/2552/head
Monet Lee 1 year ago
commit 9d9b963278

@ -11,3 +11,6 @@ prometheus:
enable: true
# List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup
ports: [ 20103 ]
enableHistoryForNewMembers: true

@ -436,6 +436,14 @@ func (c *conversationServer) SetConversationMaxSeq(ctx context.Context, req *pbc
return &pbconversation.SetConversationMaxSeqResp{}, nil
}
func (c *conversationServer) SetConversationMinSeq(ctx context.Context, req *pbconversation.SetConversationMinSeqReq) (*pbconversation.SetConversationMinSeqResp, error) {
if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.OwnerUserID, req.ConversationID,
map[string]any{"min_seq": req.MinSeq}); err != nil {
return nil, err
}
return &pbconversation.SetConversationMinSeqResp{}, nil
}
func (c *conversationServer) GetConversationIDs(ctx context.Context, req *pbconversation.GetConversationIDsReq) (*pbconversation.GetConversationIDsResp, error) {
conversationIDs, err := c.conversationDatabase.GetConversationIDs(ctx, req.UserID)
if err != nil {

@ -105,13 +105,20 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
database := controller.NewGroupDatabase(rdb, &config.LocalCacheConfig, groupDB, groupMemberDB, groupRequestDB, mgocli.GetTx(), grouphash.NewGroupHashFromGroupServer(&gs))
gs.db = database
gs.user = userRpcClient
gs.notification = NewGroupNotificationSender(database, &msgRpcClient, &userRpcClient, config, func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error) {
users, err := userRpcClient.GetUsersInfo(ctx, userIDs)
if err != nil {
return nil, err
}
return datautil.Slice(users, func(e *sdkws.UserInfo) notification.CommonUser { return e }), nil
})
gs.notification = NewGroupNotificationSender(
database,
&msgRpcClient,
&userRpcClient,
&conversationRpcClient,
config,
func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error) {
users, err := userRpcClient.GetUsersInfo(ctx, userIDs)
if err != nil {
return nil, err
}
return datautil.Slice(users, func(e *sdkws.UserInfo) notification.CommonUser { return e }), nil
},
)
localcache.InitLocalCache(&config.LocalCacheConfig)
gs.conversationRpcClient = conversationRpcClient
gs.msgRpcClient = msgRpcClient
@ -450,10 +457,10 @@ func (g *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite
if err := g.db.CreateGroup(ctx, nil, groupMembers); err != nil {
return nil, err
}
if err := g.conversationRpcClient.GroupChatFirstCreateConversation(ctx, req.GroupID, req.InvitedUserIDs); err != nil {
if err = g.notification.MemberEnterNotification(ctx, req.GroupID, req.InvitedUserIDs...); err != nil {
return nil, err
}
g.notification.MemberInvitedNotification(ctx, req.GroupID, req.Reason, req.InvitedUserIDs)
return &pbgroup.InviteUserToGroupResp{}, nil
}
@ -822,14 +829,13 @@ func (g *groupServer) GroupApplicationResponse(ctx context.Context, req *pbgroup
}
switch req.HandleResult {
case constant.GroupResponseAgree:
if err := g.conversationRpcClient.GroupChatFirstCreateConversation(ctx, req.GroupID, []string{req.FromUserID}); err != nil {
return nil, err
}
g.notification.GroupApplicationAcceptedNotification(ctx, req)
if member == nil {
log.ZDebug(ctx, "GroupApplicationResponse", "member is nil")
} else {
g.notification.MemberEnterNotification(ctx, req.GroupID, req.FromUserID)
if err = g.notification.MemberEnterNotification(ctx, req.GroupID, req.FromUserID); err != nil {
return nil, err
}
}
case constant.GroupResponseRefuse:
g.notification.GroupApplicationRejectedNotification(ctx, req)
@ -889,10 +895,9 @@ func (g *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq)
return nil, err
}
if err := g.conversationRpcClient.GroupChatFirstCreateConversation(ctx, req.GroupID, []string{req.InviterUserID}); err != nil {
if err = g.notification.MemberEnterNotification(ctx, req.GroupID, req.InviterUserID); err != nil {
return nil, err
}
g.notification.MemberEnterNotification(ctx, req.GroupID, req.InviterUserID)
g.webhookAfterJoinGroup(ctx, &g.config.WebhooksConfig.AfterJoinGroup, req)
return &pbgroup.JoinGroupResp{}, nil
@ -1487,7 +1492,7 @@ func (g *groupServer) SetGroupMemberInfo(ctx context.Context, req *pbgroup.SetGr
userIDs := make([]string, 0, len(members)+1)
for _, member := range members {
if _, ok := temp[member.UserID]; ok {
return nil, errs.ErrArgs.WrapMsg(fmt.Sprintf("repeat group %g user %g", member.GroupID, member.UserID))
return nil, errs.ErrArgs.WrapMsg(fmt.Sprintf("repeat group %s user %s", member.GroupID, member.UserID))
}
temp[member.UserID] = struct{}{}
userIDs = append(userIDs, member.UserID)
@ -1605,7 +1610,7 @@ func (g *groupServer) GetGroupAbstractInfo(ctx context.Context, req *pbgroup.Get
return nil, err
}
if ids := datautil.Single(req.GroupIDs, datautil.Keys(groupUserMap)); len(ids) > 0 {
return nil, servererrs.ErrGroupIDNotFound.WrapMsg(fmt.Sprintf("group %g not found member", strings.Join(ids, ",")))
return nil, servererrs.ErrGroupIDNotFound.WrapMsg(fmt.Sprintf("group %s not found member", strings.Join(ids, ",")))
}
return &pbgroup.GetGroupAbstractInfoResp{
GroupAbstractInfos: datautil.Slice(groups, func(group *model.Group) *pbgroup.GroupAbstractInfo {

@ -21,6 +21,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
@ -43,12 +44,22 @@ const (
adminReceiver
)
func NewGroupNotificationSender(db controller.GroupDatabase, msgRpcClient *rpcclient.MessageRpcClient, userRpcClient *rpcclient.UserRpcClient, config *Config, fn func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error)) *GroupNotificationSender {
func NewGroupNotificationSender(
db controller.GroupDatabase,
msgRpcClient *rpcclient.MessageRpcClient,
userRpcClient *rpcclient.UserRpcClient,
conversationRpcClient *rpcclient.ConversationRpcClient,
config *Config,
fn func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error),
) *GroupNotificationSender {
return &GroupNotificationSender{
NotificationSender: rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithRpcClient(msgRpcClient), rpcclient.WithUserRpcClient(userRpcClient)),
getUsersInfo: fn,
db: db,
config: config,
conversationRpcClient: conversationRpcClient,
msgRpcClient: msgRpcClient,
}
}
@ -57,6 +68,9 @@ type GroupNotificationSender struct {
getUsersInfo func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error)
db controller.GroupDatabase
config *Config
conversationRpcClient *rpcclient.ConversationRpcClient
msgRpcClient *rpcclient.MessageRpcClient
}
func (g *GroupNotificationSender) PopulateGroupMember(ctx context.Context, members ...*model.GroupMember) error {
@ -494,50 +508,43 @@ func (g *GroupNotificationSender) MemberKickedNotification(ctx context.Context,
g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.MemberKickedNotification, tips)
}
func (g *GroupNotificationSender) MemberInvitedNotification(ctx context.Context, groupID, reason string, invitedUserIDList []string) {
func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, groupID string, entrantUserID ...string) error {
var err error
defer func() {
if err != nil {
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
}
}()
var group *sdkws.GroupInfo
group, err = g.getGroupInfo(ctx, groupID)
if err != nil {
return
if !g.config.RpcConfig.EnableHistoryForNewMembers {
conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID)
maxSeq, err := g.msgRpcClient.GetConversationMaxSeq(ctx, conversationID)
if err != nil {
return err
}
err = g.conversationRpcClient.SetConversationMinSeq(ctx, entrantUserID, conversationID, maxSeq)
if err != nil {
return err
}
}
var users []*sdkws.GroupMemberFullInfo
users, err = g.getGroupMembers(ctx, groupID, invitedUserIDList)
if err != nil {
return
if err := g.conversationRpcClient.GroupChatFirstCreateConversation(ctx, groupID, entrantUserID); err != nil {
return err
}
tips := &sdkws.MemberInvitedTips{Group: group, InvitedUserList: users}
err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID)
g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID)
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.MemberInvitedNotification, tips)
}
func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, groupID string, entrantUserID string) {
var err error
defer func() {
if err != nil {
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
}
}()
var group *sdkws.GroupInfo
group, err = g.getGroupInfo(ctx, groupID)
if err != nil {
return
return err
}
var user *sdkws.GroupMemberFullInfo
user, err = g.getGroupMember(ctx, groupID, entrantUserID)
users, err := g.getGroupMembers(ctx, groupID, entrantUserID)
if err != nil {
return
return err
}
tips := &sdkws.MemberEnterTips{Group: group, EntrantUser: user}
tips := &sdkws.MemberEnterTips{Group: group, EntrantUsers: users}
g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID)
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.MemberEnterNotification, tips)
return nil
}
func (g *GroupNotificationSender) GroupDismissedNotification(ctx context.Context, tips *sdkws.GroupDismissedTips) {

@ -258,7 +258,8 @@ type Group struct {
ListenIP string `mapstructure:"listenIP"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"`
Prometheus Prometheus `mapstructure:"prometheus"`
Prometheus Prometheus `mapstructure:"prometheus"`
EnableHistoryForNewMembers bool `mapstructure:"enableHistoryForNewMembers"`
}
type Msg struct {

@ -77,6 +77,11 @@ func (c *ConversationRpcClient) SetConversationMaxSeq(ctx context.Context, owner
return err
}
func (c *ConversationRpcClient) SetConversationMinSeq(ctx context.Context, ownerUserIDs []string, conversationID string, minSeq int64) error {
_, err := c.Client.SetConversationMinSeq(ctx, &pbconversation.SetConversationMinSeqReq{OwnerUserID: ownerUserIDs, ConversationID: conversationID, MinSeq: minSeq})
return err
}
func (c *ConversationRpcClient) SetConversations(ctx context.Context, userIDs []string, conversation *pbconversation.ConversationReq) error {
_, err := c.Client.SetConversations(ctx, &pbconversation.SetConversationsReq{UserIDs: userIDs, Conversation: conversation})
return err

Loading…
Cancel
Save