From 1f52b1ff45a9c943e6bbc33e8ff3dad473aea046 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Mon, 15 Apr 2024 17:24:32 +0800 Subject: [PATCH] refactor notification --- internal/msggateway/n_ws_server.go | 8 +- internal/push/push_to_client.go | 8 +- internal/rpc/conversation/conversaion.go | 10 +- internal/rpc/conversation/notification.go | 12 +- internal/rpc/friend/black.go | 6 +- internal/rpc/friend/friend.go | 22 +- internal/rpc/friend/notification.go | 44 ++- internal/rpc/group/group.go | 20 +- internal/rpc/group/notification.go | 334 ++++++++++++---------- internal/rpc/msg/as_read.go | 34 +-- internal/rpc/msg/notification.go | 8 +- internal/rpc/msg/revoke.go | 4 +- internal/rpc/msg/server.go | 8 +- internal/rpc/user/notification.go | 16 +- internal/rpc/user/user.go | 28 +- internal/tools/conversation.go | 4 +- pkg/common/notification/notification.go | 51 ---- pkg/common/webhook/http_client.go | 15 +- pkg/rpcclient/msg.go | 38 ++- 19 files changed, 298 insertions(+), 372 deletions(-) delete mode 100644 pkg/common/notification/notification.go diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index a691ee7b7..21213fca8 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -18,7 +18,6 @@ import ( "context" "fmt" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" - "github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue" pbAuth "github.com/openimsdk/protocol/auth" "github.com/openimsdk/tools/mcontext" "net/http" @@ -54,11 +53,6 @@ type LongConnServer interface { MessageHandler } -const ( - webhookWorkerCount = 2 - webhookBufferSize = 100 -) - type WsServer struct { msgGatewayConfig *Config port int @@ -149,7 +143,7 @@ func NewWsServer(msgGatewayConfig *Config, opts ...Option) (*WsServer, error) { clients: newUserMap(), Compressor: NewGzipCompressor(), Encoder: NewGobEncoder(), - webhookClient: webhook.NewWebhookClient(msgGatewayConfig.WebhooksConfig.URL, memAsyncQueue.NewMemoryQueue(webhookWorkerCount, webhookBufferSize)), + webhookClient: webhook.NewWebhookClient(msgGatewayConfig.WebhooksConfig.URL), }, nil } diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index e4ed54b31..85b3e9056 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -18,7 +18,6 @@ import ( "context" "encoding/json" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" - "github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue" "github.com/openimsdk/tools/errs" "sync" @@ -49,11 +48,6 @@ import ( "google.golang.org/grpc" ) -const ( - webhookWorkerCount = 2 - webhookBufferSize = 100 -) - type Pusher struct { config *Config database controller.PushDatabase @@ -83,7 +77,7 @@ func NewPusher(config *Config, discov discovery.SvcDiscoveryRegistry, offlinePus msgRpcClient: msgRpcClient, conversationRpcClient: conversationRpcClient, groupRpcClient: groupRpcClient, - webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL, memAsyncQueue.NewMemoryQueue(webhookWorkerCount, webhookBufferSize)), + webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL), } } diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 20eb46691..cc253e141 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -200,7 +200,7 @@ func (c *conversationServer) SetConversation(ctx context.Context, req *pbconvers if err != nil { return nil, err } - _ = c.conversationNotificationSender.ConversationChangeNotification(ctx, req.Conversation.OwnerUserID, []string{req.Conversation.ConversationID}) + c.conversationNotificationSender.ConversationChangeNotification(ctx, req.Conversation.OwnerUserID, []string{req.Conversation.ConversationID}) resp := &pbconversation.SetConversationResp{} return resp, nil } @@ -292,14 +292,8 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver return nil, err } for _, userID := range req.UserIDs { - err := c.conversationNotificationSender.ConversationSetPrivateNotification(ctx, userID, req.Conversation.UserID, + c.conversationNotificationSender.ConversationSetPrivateNotification(ctx, userID, req.Conversation.UserID, req.Conversation.IsPrivateChat.Value, req.Conversation.ConversationID) - if err != nil { - log.ZWarn(ctx, "send conversation set private notification failed", err, - "userID", userID, "conversationID", req.Conversation.ConversationID) - - continue - } } } diff --git a/internal/rpc/conversation/notification.go b/internal/rpc/conversation/notification.go index 2eabc4e46..994e1d57a 100644 --- a/internal/rpc/conversation/notification.go +++ b/internal/rpc/conversation/notification.go @@ -34,7 +34,7 @@ func NewConversationNotificationSender(conf *config.Notification, msgRpcClient * // SetPrivate invote. func (c *ConversationNotificationSender) ConversationSetPrivateNotification(ctx context.Context, sendID, recvID string, isPrivateChat bool, conversationID string, -) error { +) { tips := &sdkws.ConversationSetPrivateTips{ RecvID: recvID, SendID: sendID, @@ -42,23 +42,23 @@ func (c *ConversationNotificationSender) ConversationSetPrivateNotification(ctx ConversationID: conversationID, } - return c.Notification(ctx, sendID, recvID, constant.ConversationPrivateChatNotification, tips) + c.Notification(ctx, sendID, recvID, constant.ConversationPrivateChatNotification, tips) } -func (c *ConversationNotificationSender) ConversationChangeNotification(ctx context.Context, userID string, conversationIDs []string) error { +func (c *ConversationNotificationSender) ConversationChangeNotification(ctx context.Context, userID string, conversationIDs []string) { tips := &sdkws.ConversationUpdateTips{ UserID: userID, ConversationIDList: conversationIDs, } - return c.Notification(ctx, userID, userID, constant.ConversationChangeNotification, tips) + c.Notification(ctx, userID, userID, constant.ConversationChangeNotification, tips) } func (c *ConversationNotificationSender) ConversationUnreadChangeNotification( ctx context.Context, userID, conversationID string, unreadCountTime, hasReadSeq int64, -) error { +) { tips := &sdkws.ConversationHasReadTips{ UserID: userID, ConversationID: conversationID, @@ -66,5 +66,5 @@ func (c *ConversationNotificationSender) ConversationUnreadChangeNotification( UnreadCountTime: unreadCountTime, } - return c.Notification(ctx, userID, userID, constant.ConversationUnreadNotification, tips) + c.Notification(ctx, userID, userID, constant.ConversationUnreadNotification, tips) } diff --git a/internal/rpc/friend/black.go b/internal/rpc/friend/black.go index a0b202ffa..1f52286f3 100644 --- a/internal/rpc/friend/black.go +++ b/internal/rpc/friend/black.go @@ -86,10 +86,6 @@ func (s *friendServer) AddBlack(ctx context.Context, req *pbfriend.AddBlackReq) if err := s.blackDatabase.Create(ctx, []*relation.BlackModel{&black}); err != nil { return nil, err } - - if err := s.notificationSender.BlackAddedNotification(ctx, req); err != nil { - return nil, err - } - + s.notificationSender.BlackAddedNotification(ctx, req) return &pbfriend.AddBlackResp{}, nil } diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index 43dfc8900..bffda3c04 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -18,7 +18,6 @@ import ( "context" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" - "github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue" "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/open-im-server/v3/pkg/authverify" @@ -39,11 +38,6 @@ import ( "google.golang.org/grpc" ) -const ( - webhookWorkerCount = 2 - webhookBufferSize = 100 -) - type friendServer struct { friendDatabase controller.FriendDatabase blackDatabase controller.BlackDatabase @@ -120,7 +114,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg RegisterCenter: client, conversationRpcClient: rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation), config: config, - webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL, memAsyncQueue.NewMemoryQueue(webhookWorkerCount, webhookBufferSize)), + webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL), }) return nil @@ -152,10 +146,7 @@ func (s *friendServer) ApplyToAddFriend(ctx context.Context, req *pbfriend.Apply if err = s.friendDatabase.AddFriendRequest(ctx, req.FromUserID, req.ToUserID, req.ReqMsg, req.Ex); err != nil { return nil, err } - if err = s.notificationSender.FriendApplicationAddNotification(ctx, req); err != nil { - return nil, err - } - + s.notificationSender.FriendApplicationAddNotification(ctx, req) s.webhookAfterAddFriend(ctx, &s.config.WebhooksConfig.AfterAddFriend, req) return resp, nil } @@ -215,9 +206,7 @@ func (s *friendServer) RespondFriendApply(ctx context.Context, req *pbfriend.Res if err != nil { return nil, err } - if err := s.notificationSender.FriendApplicationAgreedNotification(ctx, req); err != nil { - return nil, err - } + s.notificationSender.FriendApplicationAgreedNotification(ctx, req) return resp, nil } if req.HandleResult == constant.FriendResponseRefuse { @@ -472,9 +461,6 @@ func (s *friendServer) UpdateFriends( resp := &pbfriend.UpdateFriendsResp{} - err = s.notificationSender.FriendsInfoUpdateNotification(ctx, req.OwnerUserID, req.FriendUserIDs) - if err != nil { - return nil, errs.WrapMsg(err, "FriendsInfoUpdateNotification Error") - } + s.notificationSender.FriendsInfoUpdateNotification(ctx, req.OwnerUserID, req.FriendUserIDs) return resp, nil } diff --git a/internal/rpc/friend/notification.go b/internal/rpc/friend/notification.go index ec628fe29..f88c9664e 100644 --- a/internal/rpc/friend/notification.go +++ b/internal/rpc/friend/notification.go @@ -122,39 +122,39 @@ func (f *FriendNotificationSender) getFromToUserNickname( return users[fromUserID].Nickname, users[toUserID].Nickname, nil } -func (f *FriendNotificationSender) UserInfoUpdatedNotification(ctx context.Context, changedUserID string) error { +func (f *FriendNotificationSender) UserInfoUpdatedNotification(ctx context.Context, changedUserID string) { tips := sdkws.UserInfoUpdatedTips{UserID: changedUserID} - return f.Notification(ctx, mcontext.GetOpUserID(ctx), changedUserID, constant.UserInfoUpdatedNotification, &tips) + f.Notification(ctx, mcontext.GetOpUserID(ctx), changedUserID, constant.UserInfoUpdatedNotification, &tips) } -func (f *FriendNotificationSender) FriendApplicationAddNotification(ctx context.Context, req *pbfriend.ApplyToAddFriendReq) error { +func (f *FriendNotificationSender) FriendApplicationAddNotification(ctx context.Context, req *pbfriend.ApplyToAddFriendReq) { tips := sdkws.FriendApplicationTips{FromToUserID: &sdkws.FromToUserID{ FromUserID: req.FromUserID, ToUserID: req.ToUserID, }} - return f.Notification(ctx, req.FromUserID, req.ToUserID, constant.FriendApplicationNotification, &tips) + f.Notification(ctx, req.FromUserID, req.ToUserID, constant.FriendApplicationNotification, &tips) } func (f *FriendNotificationSender) FriendApplicationAgreedNotification( ctx context.Context, req *pbfriend.RespondFriendApplyReq, -) error { +) { tips := sdkws.FriendApplicationApprovedTips{FromToUserID: &sdkws.FromToUserID{ FromUserID: req.FromUserID, ToUserID: req.ToUserID, }, HandleMsg: req.HandleMsg} - return f.Notification(ctx, req.ToUserID, req.FromUserID, constant.FriendApplicationApprovedNotification, &tips) + f.Notification(ctx, req.ToUserID, req.FromUserID, constant.FriendApplicationApprovedNotification, &tips) } func (f *FriendNotificationSender) FriendApplicationRefusedNotification( ctx context.Context, req *pbfriend.RespondFriendApplyReq, -) error { +) { tips := sdkws.FriendApplicationApprovedTips{FromToUserID: &sdkws.FromToUserID{ FromUserID: req.FromUserID, ToUserID: req.ToUserID, }, HandleMsg: req.HandleMsg} - return f.Notification(ctx, req.ToUserID, req.FromUserID, constant.FriendApplicationRejectedNotification, &tips) + f.Notification(ctx, req.ToUserID, req.FromUserID, constant.FriendApplicationRejectedNotification, &tips) } func (f *FriendNotificationSender) FriendAddedNotification( @@ -178,36 +178,37 @@ func (f *FriendNotificationSender) FriendAddedNotification( if err != nil { return err } - return f.Notification(ctx, fromUserID, toUserID, constant.FriendAddedNotification, &tips) + f.Notification(ctx, fromUserID, toUserID, constant.FriendAddedNotification, &tips) + return nil } -func (f *FriendNotificationSender) FriendDeletedNotification(ctx context.Context, req *pbfriend.DeleteFriendReq) error { +func (f *FriendNotificationSender) FriendDeletedNotification(ctx context.Context, req *pbfriend.DeleteFriendReq) { tips := sdkws.FriendDeletedTips{FromToUserID: &sdkws.FromToUserID{ FromUserID: req.OwnerUserID, ToUserID: req.FriendUserID, }} - return f.Notification(ctx, req.OwnerUserID, req.FriendUserID, constant.FriendDeletedNotification, &tips) + f.Notification(ctx, req.OwnerUserID, req.FriendUserID, constant.FriendDeletedNotification, &tips) } -func (f *FriendNotificationSender) FriendRemarkSetNotification(ctx context.Context, fromUserID, toUserID string) error { +func (f *FriendNotificationSender) FriendRemarkSetNotification(ctx context.Context, fromUserID, toUserID string) { tips := sdkws.FriendInfoChangedTips{FromToUserID: &sdkws.FromToUserID{}} tips.FromToUserID.FromUserID = fromUserID tips.FromToUserID.ToUserID = toUserID - return f.Notification(ctx, fromUserID, toUserID, constant.FriendRemarkSetNotification, &tips) + f.Notification(ctx, fromUserID, toUserID, constant.FriendRemarkSetNotification, &tips) } -func (f *FriendNotificationSender) FriendsInfoUpdateNotification(ctx context.Context, toUserID string, friendIDs []string) error { +func (f *FriendNotificationSender) FriendsInfoUpdateNotification(ctx context.Context, toUserID string, friendIDs []string) { tips := sdkws.FriendsInfoUpdateTips{FromToUserID: &sdkws.FromToUserID{}} tips.FromToUserID.ToUserID = toUserID tips.FriendIDs = friendIDs - return f.Notification(ctx, toUserID, toUserID, constant.FriendsInfoUpdateNotification, &tips) + f.Notification(ctx, toUserID, toUserID, constant.FriendsInfoUpdateNotification, &tips) } -func (f *FriendNotificationSender) BlackAddedNotification(ctx context.Context, req *pbfriend.AddBlackReq) error { +func (f *FriendNotificationSender) BlackAddedNotification(ctx context.Context, req *pbfriend.AddBlackReq) { tips := sdkws.BlackAddedTips{FromToUserID: &sdkws.FromToUserID{}} tips.FromToUserID.FromUserID = req.OwnerUserID tips.FromToUserID.ToUserID = req.BlackUserID - return f.Notification(ctx, req.OwnerUserID, req.BlackUserID, constant.BlackAddedNotification, &tips) + f.Notification(ctx, req.OwnerUserID, req.BlackUserID, constant.BlackAddedNotification, &tips) } func (f *FriendNotificationSender) BlackDeletedNotification(ctx context.Context, req *pbfriend.RemoveBlackReq) { @@ -215,15 +216,10 @@ func (f *FriendNotificationSender) BlackDeletedNotification(ctx context.Context, FromUserID: req.OwnerUserID, ToUserID: req.BlackUserID, }} - if err := f.Notification(ctx, req.OwnerUserID, req.BlackUserID, constant.BlackDeletedNotification, &blackDeletedTips); err != nil { - // err - } + f.Notification(ctx, req.OwnerUserID, req.BlackUserID, constant.BlackDeletedNotification, &blackDeletedTips) } func (f *FriendNotificationSender) FriendInfoUpdatedNotification(ctx context.Context, changedUserID string, needNotifiedUserID string) { tips := sdkws.UserInfoUpdatedTips{UserID: changedUserID} - if err := f.Notification(ctx, mcontext.GetOpUserID(ctx), needNotifiedUserID, - constant.FriendInfoUpdatedNotification, &tips); err != nil { - // err - } + f.Notification(ctx, mcontext.GetOpUserID(ctx), needNotifiedUserID, constant.FriendInfoUpdatedNotification, &tips) } diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index fd9eb6873..450813967 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -20,7 +20,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" - "github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue" "math/big" "math/rand" "strconv" @@ -55,11 +54,6 @@ import ( "google.golang.org/protobuf/types/known/wrapperspb" ) -const ( - webhookWorkerCount = 2 - webhookBufferSize = 100 -) - type groupServer struct { db controller.GroupDatabase user rpcclient.UserRpcClient @@ -120,7 +114,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg gs.conversationRpcClient = conversationRpcClient gs.msgRpcClient = msgRpcClient gs.config = config - gs.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL, memAsyncQueue.NewMemoryQueue(webhookWorkerCount, webhookBufferSize)) + gs.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL) pbgroup.RegisterGroupServer(server, &gs) return nil } @@ -138,9 +132,7 @@ func (s *groupServer) NotificationUserInfoUpdate(ctx context.Context, req *pbgro groupIDs = append(groupIDs, member.GroupID) } for _, groupID := range groupIDs { - if err = s.notification.GroupMemberInfoSetNotification(ctx, groupID, req.UserID); err != nil { - return nil, err - } + s.notification.GroupMemberInfoSetNotification(ctx, groupID, req.UserID) } if err = s.db.DeleteGroupMemberHash(ctx, groupIDs); err != nil { return nil, err @@ -929,7 +921,7 @@ func (s *groupServer) QuitGroup(ctx context.Context, req *pbgroup.QuitGroupReq) if err != nil { return nil, err } - _ = s.notification.MemberQuitNotification(ctx, s.groupMemberDB2PB(member, 0)) + s.notification.MemberQuitNotification(ctx, s.groupMemberDB2PB(member, 0)) if err := s.deleteMemberAndSetConversationSeq(ctx, req.GroupID, []string{req.UserID}); err != nil { return nil, err } @@ -1024,14 +1016,14 @@ func (s *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInf log.ZWarn(ctx, "SetConversations", err, resp.UserIDs, conversation) } }() - _ = s.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{Group: tips.Group, OpUser: tips.OpUser}) + s.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{Group: tips.Group, OpUser: tips.OpUser}) } if req.GroupInfoForSet.GroupName != "" { num-- - _ = s.notification.GroupInfoSetNameNotification(ctx, &sdkws.GroupInfoSetNameTips{Group: tips.Group, OpUser: tips.OpUser}) + s.notification.GroupInfoSetNameNotification(ctx, &sdkws.GroupInfoSetNameTips{Group: tips.Group, OpUser: tips.OpUser}) } if num > 0 { - _ = s.notification.GroupInfoSetNotification(ctx, tips) + s.notification.GroupInfoSetNotification(ctx, tips) } s.webhookAfterSetGroupInfo(ctx, &s.config.WebhooksConfig.AfterSetGroupInfo, req) diff --git a/internal/rpc/group/notification.go b/internal/rpc/group/notification.go index 973dc1f3c..6d7cebcbc 100644 --- a/internal/rpc/group/notification.go +++ b/internal/rpc/group/notification.go @@ -287,114 +287,122 @@ func (g *GroupNotificationSender) fillOpUser(ctx context.Context, opUser **sdkws return nil } -func (g *GroupNotificationSender) GroupCreatedNotification(ctx context.Context, tips *sdkws.GroupCreatedTips) (err error) { +func (g *GroupNotificationSender) GroupCreatedNotification(ctx context.Context, tips *sdkws.GroupCreatedTips) { + var err error defer func() { if err != nil { log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err) } }() - if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { - return err + if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { + return } - return g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupCreatedNotification, tips) + g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupCreatedNotification, tips) } -func (g *GroupNotificationSender) GroupInfoSetNotification(ctx context.Context, tips *sdkws.GroupInfoSetTips) (err error) { +func (g *GroupNotificationSender) GroupInfoSetNotification(ctx context.Context, tips *sdkws.GroupInfoSetTips) { + var err error defer func() { if err != nil { log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err) } }() - if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { - return err + if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { + return } - return g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetNotification, tips, rpcclient.WithRpcGetUserName()) + g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetNotification, tips, rpcclient.WithRpcGetUserName()) } -func (g *GroupNotificationSender) GroupInfoSetNameNotification(ctx context.Context, tips *sdkws.GroupInfoSetNameTips) (err error) { +func (g *GroupNotificationSender) GroupInfoSetNameNotification(ctx context.Context, tips *sdkws.GroupInfoSetNameTips) { + var err error defer func() { if err != nil { log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err) } }() - if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { - return err + if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { + return } - return g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetNameNotification, tips) + g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetNameNotification, tips) } -func (g *GroupNotificationSender) GroupInfoSetAnnouncementNotification(ctx context.Context, tips *sdkws.GroupInfoSetAnnouncementTips) (err error) { +func (g *GroupNotificationSender) GroupInfoSetAnnouncementNotification(ctx context.Context, tips *sdkws.GroupInfoSetAnnouncementTips) { + var err error defer func() { if err != nil { log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err) } }() - if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { - return err + if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { + return } - return g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetAnnouncementNotification, tips, rpcclient.WithRpcGetUserName()) + g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetAnnouncementNotification, tips, rpcclient.WithRpcGetUserName()) } -func (g *GroupNotificationSender) JoinGroupApplicationNotification(ctx context.Context, req *pbgroup.JoinGroupReq) (err error) { +func (g *GroupNotificationSender) JoinGroupApplicationNotification(ctx context.Context, req *pbgroup.JoinGroupReq) { + var err error defer func() { if err != nil { log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err) } }() - group, err := g.getGroupInfo(ctx, req.GroupID) + var group *sdkws.GroupInfo + group, err = g.getGroupInfo(ctx, req.GroupID) if err != nil { - return err + return } - user, err := g.getUser(ctx, req.InviterUserID) + var user *sdkws.PublicUserInfo + user, err = g.getUser(ctx, req.InviterUserID) if err != nil { - return err + return } userIDs, err := g.getGroupOwnerAndAdminUserID(ctx, req.GroupID) if err != nil { - return err + return } userIDs = append(userIDs, req.InviterUserID, mcontext.GetOpUserID(ctx)) tips := &sdkws.JoinGroupApplicationTips{Group: group, Applicant: user, ReqMsg: req.ReqMessage} for _, userID := range datautil.Distinct(userIDs) { - err = g.Notification(ctx, mcontext.GetOpUserID(ctx), userID, constant.JoinGroupApplicationNotification, tips) - if err != nil { - log.ZError(ctx, "JoinGroupApplicationNotification failed", err, "group", req.GroupID, "userID", userID) - } + g.Notification(ctx, mcontext.GetOpUserID(ctx), userID, constant.JoinGroupApplicationNotification, tips) } - return nil } -func (g *GroupNotificationSender) MemberQuitNotification(ctx context.Context, member *sdkws.GroupMemberFullInfo) (err error) { +func (g *GroupNotificationSender) MemberQuitNotification(ctx context.Context, member *sdkws.GroupMemberFullInfo) { + var err error defer func() { if err != nil { log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err) } }() - group, err := g.getGroupInfo(ctx, member.GroupID) + var group *sdkws.GroupInfo + group, err = g.getGroupInfo(ctx, member.GroupID) if err != nil { - return err + return } tips := &sdkws.MemberQuitTips{Group: group, QuitUser: member} - return g.Notification(ctx, mcontext.GetOpUserID(ctx), member.GroupID, constant.MemberQuitNotification, tips) + g.Notification(ctx, mcontext.GetOpUserID(ctx), member.GroupID, constant.MemberQuitNotification, tips) } -func (g *GroupNotificationSender) GroupApplicationAcceptedNotification(ctx context.Context, req *pbgroup.GroupApplicationResponseReq) (err error) { +func (g *GroupNotificationSender) GroupApplicationAcceptedNotification(ctx context.Context, req *pbgroup.GroupApplicationResponseReq) { + var err error defer func() { if err != nil { log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err) } }() - group, err := g.getGroupInfo(ctx, req.GroupID) + var group *sdkws.GroupInfo + group, err = g.getGroupInfo(ctx, req.GroupID) if err != nil { - return err + return } - userIDs, err := g.getGroupOwnerAndAdminUserID(ctx, req.GroupID) + var userIDs []string + userIDs, err = g.getGroupOwnerAndAdminUserID(ctx, req.GroupID) if err != nil { - return err + return } tips := &sdkws.GroupApplicationAcceptedTips{Group: group, HandleMsg: req.HandledMsg} if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { - return err + return } for _, userID := range append(userIDs, req.FromUserID) { if userID == req.FromUserID { @@ -402,31 +410,30 @@ func (g *GroupNotificationSender) GroupApplicationAcceptedNotification(ctx conte } else { tips.ReceiverAs = 1 } - err = g.Notification(ctx, mcontext.GetOpUserID(ctx), userID, constant.GroupApplicationAcceptedNotification, tips) - if err != nil { - log.ZError(ctx, "failed", err) - } + g.Notification(ctx, mcontext.GetOpUserID(ctx), userID, constant.GroupApplicationAcceptedNotification, tips) } - return nil } -func (g *GroupNotificationSender) GroupApplicationRejectedNotification(ctx context.Context, req *pbgroup.GroupApplicationResponseReq) (err error) { +func (g *GroupNotificationSender) GroupApplicationRejectedNotification(ctx context.Context, req *pbgroup.GroupApplicationResponseReq) { + var err error defer func() { if err != nil { log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err) } }() - group, err := g.getGroupInfo(ctx, req.GroupID) + var group *sdkws.GroupInfo + group, err = g.getGroupInfo(ctx, req.GroupID) if err != nil { - return err + return } - userIDs, err := g.getGroupOwnerAndAdminUserID(ctx, req.GroupID) + var userIDs []string + userIDs, err = g.getGroupOwnerAndAdminUserID(ctx, req.GroupID) if err != nil { - return err + return } tips := &sdkws.GroupApplicationRejectedTips{Group: group, HandleMsg: req.HandledMsg} if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { - return err + return } for _, userID := range append(userIDs, req.FromUserID) { if userID == req.FromUserID { @@ -434,254 +441,281 @@ func (g *GroupNotificationSender) GroupApplicationRejectedNotification(ctx conte } else { tips.ReceiverAs = 1 } - err = g.Notification(ctx, mcontext.GetOpUserID(ctx), userID, constant.GroupApplicationRejectedNotification, tips) - if err != nil { - log.ZError(ctx, "failed", err) - } + g.Notification(ctx, mcontext.GetOpUserID(ctx), userID, constant.GroupApplicationRejectedNotification, tips) } - return nil } -func (g *GroupNotificationSender) GroupOwnerTransferredNotification(ctx context.Context, req *pbgroup.TransferGroupOwnerReq) (err error) { +func (g *GroupNotificationSender) GroupOwnerTransferredNotification(ctx context.Context, req *pbgroup.TransferGroupOwnerReq) { + var err error defer func() { if err != nil { log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err) } }() - group, err := g.getGroupInfo(ctx, req.GroupID) + var group *sdkws.GroupInfo + group, err = g.getGroupInfo(ctx, req.GroupID) if err != nil { - return err + return } opUserID := mcontext.GetOpUserID(ctx) - member, err := g.getGroupMemberMap(ctx, req.GroupID, []string{opUserID, req.NewOwnerUserID}) + var member map[string]*sdkws.GroupMemberFullInfo + member, err = g.getGroupMemberMap(ctx, req.GroupID, []string{opUserID, req.NewOwnerUserID}) if err != nil { - return err + return } tips := &sdkws.GroupOwnerTransferredTips{Group: group, OpUser: member[opUserID], NewGroupOwner: member[req.NewOwnerUserID]} - if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { - return err + if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { + return } - return g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupOwnerTransferredNotification, tips) + g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupOwnerTransferredNotification, tips) } -func (g *GroupNotificationSender) MemberKickedNotification(ctx context.Context, tips *sdkws.MemberKickedTips) (err error) { +func (g *GroupNotificationSender) MemberKickedNotification(ctx context.Context, tips *sdkws.MemberKickedTips) { + var err error defer func() { if err != nil { log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err) } }() - if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { - return err + if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { + return } - return g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.MemberKickedNotification, tips) + g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.MemberKickedNotification, tips) } -func (g *GroupNotificationSender) MemberInvitedNotification(ctx context.Context, groupID, reason string, invitedUserIDList []string) (err error) { +func (g *GroupNotificationSender) MemberInvitedNotification(ctx context.Context, groupID, reason string, invitedUserIDList []string) { + var err error defer func() { if err != nil { log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err) } }() - group, err := g.getGroupInfo(ctx, groupID) + var group *sdkws.GroupInfo + group, err = g.getGroupInfo(ctx, groupID) if err != nil { - return err + return } + + var users []*sdkws.GroupMemberFullInfo + users, err = g.getGroupMembers(ctx, groupID, invitedUserIDList) if err != nil { - return err - } - users, err := g.getGroupMembers(ctx, groupID, invitedUserIDList) - if err != nil { - return err + return } tips := &sdkws.MemberInvitedTips{Group: group, InvitedUserList: users} - if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { - return err - } - return g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.MemberInvitedNotification, tips) + err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID) + g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.MemberInvitedNotification, tips) } -func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, groupID string, entrantUserID string) (err error) { +func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, groupID string, entrantUserID string) { + var err error defer func() { if err != nil { log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err) } }() - group, err := g.getGroupInfo(ctx, groupID) + var group *sdkws.GroupInfo + group, err = g.getGroupInfo(ctx, groupID) if err != nil { - return err + return } - user, err := g.getGroupMember(ctx, groupID, entrantUserID) + var user *sdkws.GroupMemberFullInfo + user, err = g.getGroupMember(ctx, groupID, entrantUserID) if err != nil { - return err + return } tips := &sdkws.MemberEnterTips{Group: group, EntrantUser: user} - return g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.MemberEnterNotification, tips) + g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.MemberEnterNotification, tips) } -func (g *GroupNotificationSender) GroupDismissedNotification(ctx context.Context, tips *sdkws.GroupDismissedTips) (err error) { +func (g *GroupNotificationSender) GroupDismissedNotification(ctx context.Context, tips *sdkws.GroupDismissedTips) { + var err error defer func() { if err != nil { log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err) } }() - if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { - return err + if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { + return } - return g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupDismissedNotification, tips) + g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupDismissedNotification, tips) } -func (g *GroupNotificationSender) GroupMemberMutedNotification(ctx context.Context, groupID, groupMemberUserID string, mutedSeconds uint32) (err error) { +func (g *GroupNotificationSender) GroupMemberMutedNotification(ctx context.Context, groupID, groupMemberUserID string, mutedSeconds uint32) { + var err error defer func() { if err != nil { log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err) } }() - group, err := g.getGroupInfo(ctx, groupID) + var group *sdkws.GroupInfo + group, err = g.getGroupInfo(ctx, groupID) if err != nil { - return err + return } - user, err := g.getGroupMemberMap(ctx, groupID, []string{mcontext.GetOpUserID(ctx), groupMemberUserID}) + var user map[string]*sdkws.GroupMemberFullInfo + user, err = g.getGroupMemberMap(ctx, groupID, []string{mcontext.GetOpUserID(ctx), groupMemberUserID}) if err != nil { - return err + return } tips := &sdkws.GroupMemberMutedTips{ Group: group, MutedSeconds: mutedSeconds, OpUser: user[mcontext.GetOpUserID(ctx)], MutedUser: user[groupMemberUserID], } - if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { - return err + if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { + return } - return g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberMutedNotification, tips) + g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberMutedNotification, tips) } -func (g *GroupNotificationSender) GroupMemberCancelMutedNotification(ctx context.Context, groupID, groupMemberUserID string) (err error) { +func (g *GroupNotificationSender) GroupMemberCancelMutedNotification(ctx context.Context, groupID, groupMemberUserID string) { + var err error defer func() { if err != nil { log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err) } }() - group, err := g.getGroupInfo(ctx, groupID) + var group *sdkws.GroupInfo + group, err = g.getGroupInfo(ctx, groupID) if err != nil { - return err + return } - user, err := g.getGroupMemberMap(ctx, groupID, []string{mcontext.GetOpUserID(ctx), groupMemberUserID}) + var user map[string]*sdkws.GroupMemberFullInfo + user, err = g.getGroupMemberMap(ctx, groupID, []string{mcontext.GetOpUserID(ctx), groupMemberUserID}) if err != nil { - return err + return } tips := &sdkws.GroupMemberCancelMutedTips{Group: group, OpUser: user[mcontext.GetOpUserID(ctx)], MutedUser: user[groupMemberUserID]} - if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { - return err + if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { + return } - return g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberCancelMutedNotification, tips) + g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberCancelMutedNotification, tips) } -func (g *GroupNotificationSender) GroupMutedNotification(ctx context.Context, groupID string) (err error) { +func (g *GroupNotificationSender) GroupMutedNotification(ctx context.Context, groupID string) { + var err error defer func() { if err != nil { log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err) } }() - group, err := g.getGroupInfo(ctx, groupID) + var group *sdkws.GroupInfo + group, err = g.getGroupInfo(ctx, groupID) if err != nil { - return err + return } - users, err := g.getGroupMembers(ctx, groupID, []string{mcontext.GetOpUserID(ctx)}) + var users []*sdkws.GroupMemberFullInfo + users, err = g.getGroupMembers(ctx, groupID, []string{mcontext.GetOpUserID(ctx)}) if err != nil { - return err + return } tips := &sdkws.GroupMutedTips{Group: group} if len(users) > 0 { tips.OpUser = users[0] } - if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { - return err + if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { + return } - return g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMutedNotification, tips) + g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMutedNotification, tips) } -func (g *GroupNotificationSender) GroupCancelMutedNotification(ctx context.Context, groupID string) (err error) { +func (g *GroupNotificationSender) GroupCancelMutedNotification(ctx context.Context, groupID string) { + var err error defer func() { if err != nil { log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err) } }() - group, err := g.getGroupInfo(ctx, groupID) + var group *sdkws.GroupInfo + group, err = g.getGroupInfo(ctx, groupID) if err != nil { - return err + return } - users, err := g.getGroupMembers(ctx, groupID, []string{mcontext.GetOpUserID(ctx)}) + var users []*sdkws.GroupMemberFullInfo + users, err = g.getGroupMembers(ctx, groupID, []string{mcontext.GetOpUserID(ctx)}) if err != nil { - return err + return } tips := &sdkws.GroupCancelMutedTips{Group: group} if len(users) > 0 { tips.OpUser = users[0] } - if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { - return err + if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { + return } - return g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupCancelMutedNotification, tips) + g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupCancelMutedNotification, tips) } -func (g *GroupNotificationSender) GroupMemberInfoSetNotification(ctx context.Context, groupID, groupMemberUserID string) (err error) { - group, err := g.getGroupInfo(ctx, groupID) +func (g *GroupNotificationSender) GroupMemberInfoSetNotification(ctx context.Context, groupID, groupMemberUserID string) { + var err error + defer func() { + if err != nil { + log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err) + } + }() + var group *sdkws.GroupInfo + group, err = g.getGroupInfo(ctx, groupID) if err != nil { - return err + return } - user, err := g.getGroupMemberMap(ctx, groupID, []string{groupMemberUserID}) + var user map[string]*sdkws.GroupMemberFullInfo + user, err = g.getGroupMemberMap(ctx, groupID, []string{groupMemberUserID}) if err != nil { - return err + return } tips := &sdkws.GroupMemberInfoSetTips{Group: group, OpUser: user[mcontext.GetOpUserID(ctx)], ChangedUser: user[groupMemberUserID]} - if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { - return err + if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { + return } - return g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberInfoSetNotification, tips) + g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberInfoSetNotification, tips) } -func (g *GroupNotificationSender) GroupMemberSetToAdminNotification(ctx context.Context, groupID, groupMemberUserID string) (err error) { - group, err := g.getGroupInfo(ctx, groupID) +func (g *GroupNotificationSender) GroupMemberSetToAdminNotification(ctx context.Context, groupID, groupMemberUserID string) { + var err error + defer func() { + if err != nil { + log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err) + } + }() + var group *sdkws.GroupInfo + group, err = g.getGroupInfo(ctx, groupID) if err != nil { - return err + return } user, err := g.getGroupMemberMap(ctx, groupID, []string{mcontext.GetOpUserID(ctx), groupMemberUserID}) if err != nil { - return err + return } tips := &sdkws.GroupMemberInfoSetTips{Group: group, OpUser: user[mcontext.GetOpUserID(ctx)], ChangedUser: user[groupMemberUserID]} - if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { - return err + if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { + return } - return g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberSetToAdminNotification, tips) + g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberSetToAdminNotification, tips) } -func (g *GroupNotificationSender) GroupMemberSetToOrdinaryUserNotification(ctx context.Context, groupID, groupMemberUserID string) (err error) { +func (g *GroupNotificationSender) GroupMemberSetToOrdinaryUserNotification(ctx context.Context, groupID, groupMemberUserID string) { + var err error defer func() { if err != nil { log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err) } }() - group, err := g.getGroupInfo(ctx, groupID) + var group *sdkws.GroupInfo + group, err = g.getGroupInfo(ctx, groupID) if err != nil { - return err + return } - user, err := g.getGroupMemberMap(ctx, groupID, []string{mcontext.GetOpUserID(ctx), groupMemberUserID}) + var user map[string]*sdkws.GroupMemberFullInfo + user, err = g.getGroupMemberMap(ctx, groupID, []string{mcontext.GetOpUserID(ctx), groupMemberUserID}) if err != nil { - return err + return } tips := &sdkws.GroupMemberInfoSetTips{Group: group, OpUser: user[mcontext.GetOpUserID(ctx)], ChangedUser: user[groupMemberUserID]} - if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { - return err + if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { + return } - return g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberSetToOrdinaryUserNotification, tips) + g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberSetToOrdinaryUserNotification, tips) } -func (g *GroupNotificationSender) SuperGroupNotification(ctx context.Context, sendID, recvID string) (err error) { - defer func() { - if err != nil { - log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err) - } - }() - err = g.Notification(ctx, sendID, recvID, constant.SuperGroupUpdateNotification, nil) - return err +func (g *GroupNotificationSender) SuperGroupNotification(ctx context.Context, sendID, recvID string) { + g.Notification(ctx, sendID, recvID, constant.SuperGroupUpdateNotification, nil) } diff --git a/internal/rpc/msg/as_read.go b/internal/rpc/msg/as_read.go index cf3d5f546..992374d57 100644 --- a/internal/rpc/msg/as_read.go +++ b/internal/rpc/msg/as_read.go @@ -83,9 +83,7 @@ func (m *msgServer) SetConversationHasReadSeq(ctx context.Context, req *msg.SetC if err := m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq); err != nil { return nil, err } - if err = m.sendMarkAsReadNotification(ctx, req.ConversationID, constant.SingleChatType, req.UserID, req.UserID, nil, req.HasReadSeq); err != nil { - return nil, err - } + m.sendMarkAsReadNotification(ctx, req.ConversationID, constant.SingleChatType, req.UserID, req.UserID, nil, req.HasReadSeq) return &msg.SetConversationHasReadSeqResp{}, nil } @@ -126,11 +124,8 @@ func (m *msgServer) MarkMsgsAsRead(ctx context.Context, req *msg.MarkMsgsAsReadR ContentType: conversation.ConversationType, } m.webhookAfterSingleMsgRead(ctx, &m.config.WebhooksConfig.AfterSingleMsgRead, reqCallback) - - if err = m.sendMarkAsReadNotification(ctx, req.ConversationID, conversation.ConversationType, req.UserID, - m.conversationAndGetRecvID(conversation, req.UserID), req.Seqs, hasReadSeq); err != nil { - return nil, err - } + m.sendMarkAsReadNotification(ctx, req.ConversationID, conversation.ConversationType, req.UserID, + m.conversationAndGetRecvID(conversation, req.UserID), req.Seqs, hasReadSeq) return &msg.MarkMsgsAsReadResp{}, nil } @@ -169,11 +164,8 @@ func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkCon } hasReadSeq = req.HasReadSeq } - if err = m.sendMarkAsReadNotification(ctx, req.ConversationID, conversation.ConversationType, req.UserID, - m.conversationAndGetRecvID(conversation, req.UserID), seqs, hasReadSeq); err != nil { - return nil, err - } - + m.sendMarkAsReadNotification(ctx, req.ConversationID, conversation.ConversationType, req.UserID, + m.conversationAndGetRecvID(conversation, req.UserID), seqs, hasReadSeq) } else if conversation.ConversationType == constant.SuperGroupChatType || conversation.ConversationType == constant.NotificationChatType { if req.HasReadSeq > hasReadSeq { @@ -183,11 +175,8 @@ func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkCon } hasReadSeq = req.HasReadSeq } - if err = m.sendMarkAsReadNotification(ctx, req.ConversationID, constant.SingleChatType, req.UserID, - req.UserID, seqs, hasReadSeq); err != nil { - return nil, err - } - + m.sendMarkAsReadNotification(ctx, req.ConversationID, constant.SingleChatType, req.UserID, + req.UserID, seqs, hasReadSeq) } reqCall := &cbapi.CallbackGroupMsgReadReq{ @@ -201,16 +190,13 @@ func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkCon return &msg.MarkConversationAsReadResp{}, nil } -func (m *msgServer) sendMarkAsReadNotification(ctx context.Context, conversationID string, sessionType int32, sendID, recvID string, seqs []int64, hasReadSeq int64) error { +func (m *msgServer) sendMarkAsReadNotification(ctx context.Context, conversationID string, sessionType int32, sendID, recvID string, seqs []int64, hasReadSeq int64) { tips := &sdkws.MarkAsReadTips{ MarkAsReadUserID: sendID, ConversationID: conversationID, Seqs: seqs, HasReadSeq: hasReadSeq, } - err := m.notificationSender.NotificationWithSessionType(ctx, sendID, recvID, constant.HasReadReceipt, sessionType, tips) - if err != nil { - log.ZWarn(ctx, "send has read Receipt err", err) - } - return nil + m.notificationSender.NotificationWithSessionType(ctx, sendID, recvID, constant.HasReadReceipt, sessionType, tips) + } diff --git a/internal/rpc/msg/notification.go b/internal/rpc/msg/notification.go index 80f034f45..707d6634f 100644 --- a/internal/rpc/msg/notification.go +++ b/internal/rpc/msg/notification.go @@ -30,21 +30,21 @@ func NewMsgNotificationSender(config *Config, opts ...rpcclient.NotificationSend return &MsgNotificationSender{rpcclient.NewNotificationSender(&config.NotificationConfig, opts...)} } -func (m *MsgNotificationSender) UserDeleteMsgsNotification(ctx context.Context, userID, conversationID string, seqs []int64) error { +func (m *MsgNotificationSender) UserDeleteMsgsNotification(ctx context.Context, userID, conversationID string, seqs []int64) { tips := sdkws.DeleteMsgsTips{ UserID: userID, ConversationID: conversationID, Seqs: seqs, } - return m.Notification(ctx, userID, userID, constant.DeleteMsgsNotification, &tips) + m.Notification(ctx, userID, userID, constant.DeleteMsgsNotification, &tips) } -func (m *MsgNotificationSender) MarkAsReadNotification(ctx context.Context, conversationID string, sesstionType int32, sendID, recvID string, seqs []int64, hasReadSeq int64) error { +func (m *MsgNotificationSender) MarkAsReadNotification(ctx context.Context, conversationID string, sesstionType int32, sendID, recvID string, seqs []int64, hasReadSeq int64) { tips := &sdkws.MarkAsReadTips{ MarkAsReadUserID: sendID, ConversationID: conversationID, Seqs: seqs, HasReadSeq: hasReadSeq, } - return m.NotificationWithSessionType(ctx, sendID, recvID, constant.HasReadReceipt, sesstionType, tips) + m.NotificationWithSessionType(ctx, sendID, recvID, constant.HasReadReceipt, sesstionType, tips) } diff --git a/internal/rpc/msg/revoke.go b/internal/rpc/msg/revoke.go index 4c9b5dcf0..be5af0e49 100644 --- a/internal/rpc/msg/revoke.go +++ b/internal/rpc/msg/revoke.go @@ -123,9 +123,7 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg. } else { recvID = msgs[0].RecvID } - if err := m.notificationSender.NotificationWithSessionType(ctx, req.UserID, recvID, constant.MsgRevokeNotification, msgs[0].SessionType, &tips); err != nil { - return nil, err - } + m.notificationSender.NotificationWithSessionType(ctx, req.UserID, recvID, constant.MsgRevokeNotification, msgs[0].SessionType, &tips) m.webhookAfterRevokeMsg(ctx, &m.config.WebhooksConfig.AfterRevokeMsg, req) return &msg.RevokeMsgResp{}, nil } diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index c88170f1c..794fbec72 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -18,7 +18,6 @@ import ( "context" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" - "github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/redisutil" @@ -34,11 +33,6 @@ import ( "google.golang.org/grpc" ) -const ( - webhookWorkerCount = 2 - webhookBufferSize = 100 -) - type ( // MessageInterceptorChain defines a chain of message interceptor functions. MessageInterceptorChain []MessageInterceptorFunc @@ -109,7 +103,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg ConversationLocalCache: rpccache.NewConversationLocalCache(conversationClient, &config.LocalCacheConfig, rdb), FriendLocalCache: rpccache.NewFriendLocalCache(friendRpcClient, &config.LocalCacheConfig, rdb), config: config, - webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL, memAsyncQueue.NewMemoryQueue(webhookWorkerCount, webhookBufferSize)), + webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL), } s.notificationSender = rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithLocalSendMsg(s.SendMsg)) diff --git a/internal/rpc/user/notification.go b/internal/rpc/user/notification.go index aec71d7c6..348cd5628 100644 --- a/internal/rpc/user/notification.go +++ b/internal/rpc/user/notification.go @@ -97,24 +97,24 @@ func NewUserNotificationSender(config *Config, msgRpcClient *rpcclient.MessageRp func (u *UserNotificationSender) UserStatusChangeNotification( ctx context.Context, tips *sdkws.UserStatusChangeTips, -) error { - return u.Notification(ctx, tips.FromUserID, tips.ToUserID, constant.UserStatusChangeNotification, tips) +) { + u.Notification(ctx, tips.FromUserID, tips.ToUserID, constant.UserStatusChangeNotification, tips) } func (u *UserNotificationSender) UserCommandUpdateNotification( ctx context.Context, tips *sdkws.UserCommandUpdateTips, -) error { - return u.Notification(ctx, tips.FromUserID, tips.ToUserID, constant.UserCommandUpdateNotification, tips) +) { + u.Notification(ctx, tips.FromUserID, tips.ToUserID, constant.UserCommandUpdateNotification, tips) } func (u *UserNotificationSender) UserCommandAddNotification( ctx context.Context, tips *sdkws.UserCommandAddTips, -) error { - return u.Notification(ctx, tips.FromUserID, tips.ToUserID, constant.UserCommandAddNotification, tips) +) { + u.Notification(ctx, tips.FromUserID, tips.ToUserID, constant.UserCommandAddNotification, tips) } func (u *UserNotificationSender) UserCommandDeleteNotification( ctx context.Context, tips *sdkws.UserCommandDeleteTips, -) error { - return u.Notification(ctx, tips.FromUserID, tips.ToUserID, constant.UserCommandDeleteNotification, tips) +) { + u.Notification(ctx, tips.FromUserID, tips.ToUserID, constant.UserCommandDeleteNotification, tips) } diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 1fe7c5076..c453ac9f8 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -19,7 +19,6 @@ import ( "github.com/openimsdk/open-im-server/v3/internal/rpc/friend" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" - "github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue" "github.com/openimsdk/tools/db/redisutil" "math/rand" "strings" @@ -46,11 +45,6 @@ import ( "google.golang.org/grpc" ) -const ( - webhookWorkerCount = 2 - webhookBufferSize = 100 -) - type userServer struct { db controller.UserDatabase friendNotificationSender *friend.FriendNotificationSender @@ -107,7 +101,7 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi friendNotificationSender: friend.NewFriendNotificationSender(&config.NotificationConfig, &msgRpcClient, friend.WithDBFunc(database.FindWithError)), userNotificationSender: NewUserNotificationSender(config, &msgRpcClient, WithUserFunc(database.FindWithError)), config: config, - webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL, memAsyncQueue.NewMemoryQueue(webhookWorkerCount, webhookBufferSize)), + webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL), } pbuser.RegisterUserServer(server, u) return u.db.InitOnce(context.Background(), users) @@ -142,7 +136,7 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserI if err := s.db.UpdateByMap(ctx, req.UserInfo.UserID, data); err != nil { return nil, err } - _ = s.friendNotificationSender.UserInfoUpdatedNotification(ctx, req.UserInfo.UserID) + s.friendNotificationSender.UserInfoUpdatedNotification(ctx, req.UserInfo.UserID) friends, err := s.friendRpcClient.GetFriendIDs(ctx, req.UserInfo.UserID) if err != nil { return nil, err @@ -174,7 +168,7 @@ func (s *userServer) UpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUse if err = s.db.UpdateByMap(ctx, req.UserInfo.UserID, data); err != nil { return nil, err } - _ = s.friendNotificationSender.UserInfoUpdatedNotification(ctx, req.UserInfo.UserID) + s.friendNotificationSender.UserInfoUpdatedNotification(ctx, req.UserInfo.UserID) friends, err := s.friendRpcClient.GetFriendIDs(ctx, req.UserInfo.UserID) if err != nil { return nil, err @@ -418,10 +412,7 @@ func (s *userServer) ProcessUserCommandAdd(ctx context.Context, req *pbuser.Proc FromUserID: req.UserID, ToUserID: req.UserID, } - err = s.userNotificationSender.UserCommandAddNotification(ctx, tips) - if err != nil { - return nil, err - } + s.userNotificationSender.UserCommandAddNotification(ctx, tips) return &pbuser.ProcessUserCommandAddResp{}, nil } @@ -440,11 +431,7 @@ func (s *userServer) ProcessUserCommandDelete(ctx context.Context, req *pbuser.P FromUserID: req.UserID, ToUserID: req.UserID, } - err = s.userNotificationSender.UserCommandDeleteNotification(ctx, tips) - if err != nil { - return nil, err - } - + s.userNotificationSender.UserCommandDeleteNotification(ctx, tips) return &pbuser.ProcessUserCommandDeleteResp{}, nil } @@ -473,10 +460,7 @@ func (s *userServer) ProcessUserCommandUpdate(ctx context.Context, req *pbuser.P FromUserID: req.UserID, ToUserID: req.UserID, } - err = s.userNotificationSender.UserCommandUpdateNotification(ctx, tips) - if err != nil { - return nil, err - } + s.userNotificationSender.UserCommandUpdateNotification(ctx, tips) return &pbuser.ProcessUserCommandUpdateResp{}, nil } diff --git a/internal/tools/conversation.go b/internal/tools/conversation.go index 099c984a6..3e2a88ffd 100644 --- a/internal/tools/conversation.go +++ b/internal/tools/conversation.go @@ -142,9 +142,7 @@ func (c *MsgTool) ConversationsDestructMsgs() { log.ZError(ctx, "updateUsersConversationField failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) continue } - if err := c.msgNotificationSender.UserDeleteMsgsNotification(ctx, conversation.OwnerUserID, conversation.ConversationID, seqs); err != nil { - log.ZError(ctx, "userDeleteMsgsNotification failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) - } + c.msgNotificationSender.UserDeleteMsgsNotification(ctx, conversation.OwnerUserID, conversation.ConversationID, seqs) } } } diff --git a/pkg/common/notification/notification.go b/pkg/common/notification/notification.go deleted file mode 100644 index d29b97ef7..000000000 --- a/pkg/common/notification/notification.go +++ /dev/null @@ -1,51 +0,0 @@ -package notification - -import ( - "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue" - "github.com/openimsdk/tools/utils/httputil" - "net/http" -) - -package webhook - -import ( -"context" -"encoding/json" -"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" -"github.com/openimsdk/open-im-server/v3/pkg/common/config" -"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" -"github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue" -"github.com/openimsdk/protocol/constant" -"github.com/openimsdk/tools/log" -"github.com/openimsdk/tools/utils/httputil" -"net/http" -) - -type Client struct { - url string - queue *memAsyncQueue.MemoryQueue -} - -func NewWebhookClient(url string, queue *memAsyncQueue.MemoryQueue) *Client { - http.DefaultTransport.(*http.Transport).MaxConnsPerHost = 100 // Enhance the default number of max connections per host - return &Client{ - - url: url, - queue: queue, - } -} - -func (c *Client) SyncPost(ctx context.Context, command string, req callbackstruct.CallbackReq, resp callbackstruct.CallbackResp, before *config.BeforeConfig) error { - if before.Enable { - return c.post(ctx, command, req, resp, before.Timeout) - } - return nil -} - -func (c *Client) AsyncPost(ctx context.Context, command string, req callbackstruct.CallbackReq, resp callbackstruct.CallbackResp, after *config.AfterConfig) { - if after.Enable { - c.queue.Push(func() { c.post(ctx, command, req, resp, after.Timeout) }) - } -} \ No newline at end of file diff --git a/pkg/common/webhook/http_client.go b/pkg/common/webhook/http_client.go index 325308586..9055172b2 100644 --- a/pkg/common/webhook/http_client.go +++ b/pkg/common/webhook/http_client.go @@ -33,8 +33,21 @@ type Client struct { queue *memAsyncQueue.MemoryQueue } -func NewWebhookClient(url string, queue *memAsyncQueue.MemoryQueue) *Client { +const ( + webhookWorkerCount = 2 + webhookBufferSize = 100 +) + +func NewWebhookClient(url string, options ...*memAsyncQueue.MemoryQueue) *Client { + var queue *memAsyncQueue.MemoryQueue + if len(options) > 0 && options[0] != nil { + queue = options[0] + } else { + queue = memAsyncQueue.NewMemoryQueue(webhookWorkerCount, webhookBufferSize) + } + http.DefaultTransport.(*http.Transport).MaxConnsPerHost = 100 // Enhance the default number of max connections per host + return &Client{ client: httputil.NewHTTPClient(httputil.NewClientConfig()), url: url, diff --git a/pkg/rpcclient/msg.go b/pkg/rpcclient/msg.go index 15c538449..7ec8f38dd 100644 --- a/pkg/rpcclient/msg.go +++ b/pkg/rpcclient/msg.go @@ -17,14 +17,12 @@ package rpcclient import ( "context" "encoding/json" - "fmt" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/discovery" - "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/system/program" "github.com/openimsdk/tools/utils/idutil" @@ -217,6 +215,13 @@ type NotificationSender struct { sessionTypeConf map[int32]int32 sendMsg func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) getUserInfo func(ctx context.Context, userID string) (*sdkws.UserInfo, error) + queue *memAsyncQueue.MemoryQueue +} + +func WithQueue(queue *memAsyncQueue.MemoryQueue) NotificationSenderOptions { + return func(s *NotificationSender) { + s.queue = queue + } } type NotificationSenderOptions func(*NotificationSender) @@ -239,11 +244,19 @@ func WithUserRpcClient(userRpcClient *UserRpcClient) NotificationSenderOptions { } } +const ( + notificationWorkerCount = 2 + notificationBufferSize = 200 +) + func NewNotificationSender(conf *config.Notification, opts ...NotificationSenderOptions) *NotificationSender { notificationSender := &NotificationSender{contentTypeConf: newContentTypeConf(conf), sessionTypeConf: newSessionTypeConf()} for _, opt := range opts { opt(notificationSender) } + if notificationSender.queue == nil { + notificationSender.queue = memAsyncQueue.NewMemoryQueue(notificationWorkerCount, notificationBufferSize) + } return notificationSender } @@ -259,11 +272,12 @@ func WithRpcGetUserName() NotificationOptions { } } -func (s *NotificationSender) NotificationWithSessionType(ctx context.Context, sendID, recvID string, contentType, sesstionType int32, m proto.Message, opts ...NotificationOptions) (err error) { +func (s *NotificationSender) send(ctx context.Context, sendID, recvID string, contentType, sesstionType int32, m proto.Message, opts ...NotificationOptions) { n := sdkws.NotificationElem{Detail: jsonutil.StructToJsonString(m)} content, err := json.Marshal(&n) if err != nil { - return errs.WrapMsg(err, "json.Marshal failed", "sendID", sendID, "recvID", recvID, "contentType", contentType, "msg", jsonutil.StructToJsonString(m)) + log.ZError(ctx, "json.Marshal failed", err, "sendID", sendID, "recvID", recvID, "contentType", contentType, "msg", jsonutil.StructToJsonString(m)) + return } notificationOpt := ¬ificationOpt{} for _, opt := range opts { @@ -275,7 +289,8 @@ func (s *NotificationSender) NotificationWithSessionType(ctx context.Context, se if notificationOpt.WithRpcGetUsername && s.getUserInfo != nil { userInfo, err = s.getUserInfo(ctx, sendID) if err != nil { - return errs.WrapMsg(err, "getUserInfo failed", "sendID", sendID) + log.ZError(ctx, "getUserInfo failed", err, "sendID", sendID) + return } msg.SenderNickname = userInfo.Nickname msg.SenderFaceURL = userInfo.FaceURL @@ -303,13 +318,16 @@ func (s *NotificationSender) NotificationWithSessionType(ctx context.Context, se req.MsgData = &msg _, err = s.sendMsg(ctx, &req) if err != nil { - return errs.WrapMsg(err, "SendMsg failed", "req", fmt.Sprintf("%+v", req)) + log.ZError(ctx, "SendMsg failed", err, "req", req.String()) } - return err } -func (s *NotificationSender) Notification(ctx context.Context, sendID, recvID string, contentType int32, m proto.Message, opts ...NotificationOptions) error { - return s.NotificationWithSessionType(ctx, sendID, recvID, contentType, s.sessionTypeConf[contentType], m, opts...) +func (s *NotificationSender) NotificationWithSessionType(ctx context.Context, sendID, recvID string, contentType, sesstionType int32, m proto.Message, opts ...NotificationOptions) { + s.queue.Push(func() { s.send(ctx, sendID, recvID, contentType, sesstionType, m, opts...) }) +} + +func (s *NotificationSender) Notification(ctx context.Context, sendID, recvID string, contentType int32, m proto.Message, opts ...NotificationOptions) { + s.NotificationWithSessionType(ctx, sendID, recvID, contentType, s.sessionTypeConf[contentType], m, opts...) } func (s *NotificationSender) SetOptionsByContentType(_ context.Context, options map[string]bool, contentType int32) {