refactor notification

pull/2148/head
skiffer-git 1 year ago
parent 777fa8c2cc
commit 1f52b1ff45

@ -18,7 +18,6 @@ import (
"context"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue"
pbAuth "github.com/openimsdk/protocol/auth"
"github.com/openimsdk/tools/mcontext"
"net/http"
@ -54,11 +53,6 @@ type LongConnServer interface {
MessageHandler
}
const (
webhookWorkerCount = 2
webhookBufferSize = 100
)
type WsServer struct {
msgGatewayConfig *Config
port int
@ -149,7 +143,7 @@ func NewWsServer(msgGatewayConfig *Config, opts ...Option) (*WsServer, error) {
clients: newUserMap(),
Compressor: NewGzipCompressor(),
Encoder: NewGobEncoder(),
webhookClient: webhook.NewWebhookClient(msgGatewayConfig.WebhooksConfig.URL, memAsyncQueue.NewMemoryQueue(webhookWorkerCount, webhookBufferSize)),
webhookClient: webhook.NewWebhookClient(msgGatewayConfig.WebhooksConfig.URL),
}, nil
}

@ -18,7 +18,6 @@ import (
"context"
"encoding/json"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue"
"github.com/openimsdk/tools/errs"
"sync"
@ -49,11 +48,6 @@ import (
"google.golang.org/grpc"
)
const (
webhookWorkerCount = 2
webhookBufferSize = 100
)
type Pusher struct {
config *Config
database controller.PushDatabase
@ -83,7 +77,7 @@ func NewPusher(config *Config, discov discovery.SvcDiscoveryRegistry, offlinePus
msgRpcClient: msgRpcClient,
conversationRpcClient: conversationRpcClient,
groupRpcClient: groupRpcClient,
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL, memAsyncQueue.NewMemoryQueue(webhookWorkerCount, webhookBufferSize)),
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL),
}
}

@ -200,7 +200,7 @@ func (c *conversationServer) SetConversation(ctx context.Context, req *pbconvers
if err != nil {
return nil, err
}
_ = c.conversationNotificationSender.ConversationChangeNotification(ctx, req.Conversation.OwnerUserID, []string{req.Conversation.ConversationID})
c.conversationNotificationSender.ConversationChangeNotification(ctx, req.Conversation.OwnerUserID, []string{req.Conversation.ConversationID})
resp := &pbconversation.SetConversationResp{}
return resp, nil
}
@ -292,14 +292,8 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver
return nil, err
}
for _, userID := range req.UserIDs {
err := c.conversationNotificationSender.ConversationSetPrivateNotification(ctx, userID, req.Conversation.UserID,
c.conversationNotificationSender.ConversationSetPrivateNotification(ctx, userID, req.Conversation.UserID,
req.Conversation.IsPrivateChat.Value, req.Conversation.ConversationID)
if err != nil {
log.ZWarn(ctx, "send conversation set private notification failed", err,
"userID", userID, "conversationID", req.Conversation.ConversationID)
continue
}
}
}

@ -34,7 +34,7 @@ func NewConversationNotificationSender(conf *config.Notification, msgRpcClient *
// SetPrivate invote.
func (c *ConversationNotificationSender) ConversationSetPrivateNotification(ctx context.Context, sendID, recvID string,
isPrivateChat bool, conversationID string,
) error {
) {
tips := &sdkws.ConversationSetPrivateTips{
RecvID: recvID,
SendID: sendID,
@ -42,23 +42,23 @@ func (c *ConversationNotificationSender) ConversationSetPrivateNotification(ctx
ConversationID: conversationID,
}
return c.Notification(ctx, sendID, recvID, constant.ConversationPrivateChatNotification, tips)
c.Notification(ctx, sendID, recvID, constant.ConversationPrivateChatNotification, tips)
}
func (c *ConversationNotificationSender) ConversationChangeNotification(ctx context.Context, userID string, conversationIDs []string) error {
func (c *ConversationNotificationSender) ConversationChangeNotification(ctx context.Context, userID string, conversationIDs []string) {
tips := &sdkws.ConversationUpdateTips{
UserID: userID,
ConversationIDList: conversationIDs,
}
return c.Notification(ctx, userID, userID, constant.ConversationChangeNotification, tips)
c.Notification(ctx, userID, userID, constant.ConversationChangeNotification, tips)
}
func (c *ConversationNotificationSender) ConversationUnreadChangeNotification(
ctx context.Context,
userID, conversationID string,
unreadCountTime, hasReadSeq int64,
) error {
) {
tips := &sdkws.ConversationHasReadTips{
UserID: userID,
ConversationID: conversationID,
@ -66,5 +66,5 @@ func (c *ConversationNotificationSender) ConversationUnreadChangeNotification(
UnreadCountTime: unreadCountTime,
}
return c.Notification(ctx, userID, userID, constant.ConversationUnreadNotification, tips)
c.Notification(ctx, userID, userID, constant.ConversationUnreadNotification, tips)
}

@ -86,10 +86,6 @@ func (s *friendServer) AddBlack(ctx context.Context, req *pbfriend.AddBlackReq)
if err := s.blackDatabase.Create(ctx, []*relation.BlackModel{&black}); err != nil {
return nil, err
}
if err := s.notificationSender.BlackAddedNotification(ctx, req); err != nil {
return nil, err
}
s.notificationSender.BlackAddedNotification(ctx, req)
return &pbfriend.AddBlackResp{}, nil
}

@ -18,7 +18,6 @@ import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
@ -39,11 +38,6 @@ import (
"google.golang.org/grpc"
)
const (
webhookWorkerCount = 2
webhookBufferSize = 100
)
type friendServer struct {
friendDatabase controller.FriendDatabase
blackDatabase controller.BlackDatabase
@ -120,7 +114,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
RegisterCenter: client,
conversationRpcClient: rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation),
config: config,
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL, memAsyncQueue.NewMemoryQueue(webhookWorkerCount, webhookBufferSize)),
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL),
})
return nil
@ -152,10 +146,7 @@ func (s *friendServer) ApplyToAddFriend(ctx context.Context, req *pbfriend.Apply
if err = s.friendDatabase.AddFriendRequest(ctx, req.FromUserID, req.ToUserID, req.ReqMsg, req.Ex); err != nil {
return nil, err
}
if err = s.notificationSender.FriendApplicationAddNotification(ctx, req); err != nil {
return nil, err
}
s.notificationSender.FriendApplicationAddNotification(ctx, req)
s.webhookAfterAddFriend(ctx, &s.config.WebhooksConfig.AfterAddFriend, req)
return resp, nil
}
@ -215,9 +206,7 @@ func (s *friendServer) RespondFriendApply(ctx context.Context, req *pbfriend.Res
if err != nil {
return nil, err
}
if err := s.notificationSender.FriendApplicationAgreedNotification(ctx, req); err != nil {
return nil, err
}
s.notificationSender.FriendApplicationAgreedNotification(ctx, req)
return resp, nil
}
if req.HandleResult == constant.FriendResponseRefuse {
@ -472,9 +461,6 @@ func (s *friendServer) UpdateFriends(
resp := &pbfriend.UpdateFriendsResp{}
err = s.notificationSender.FriendsInfoUpdateNotification(ctx, req.OwnerUserID, req.FriendUserIDs)
if err != nil {
return nil, errs.WrapMsg(err, "FriendsInfoUpdateNotification Error")
}
s.notificationSender.FriendsInfoUpdateNotification(ctx, req.OwnerUserID, req.FriendUserIDs)
return resp, nil
}

@ -122,39 +122,39 @@ func (f *FriendNotificationSender) getFromToUserNickname(
return users[fromUserID].Nickname, users[toUserID].Nickname, nil
}
func (f *FriendNotificationSender) UserInfoUpdatedNotification(ctx context.Context, changedUserID string) error {
func (f *FriendNotificationSender) UserInfoUpdatedNotification(ctx context.Context, changedUserID string) {
tips := sdkws.UserInfoUpdatedTips{UserID: changedUserID}
return f.Notification(ctx, mcontext.GetOpUserID(ctx), changedUserID, constant.UserInfoUpdatedNotification, &tips)
f.Notification(ctx, mcontext.GetOpUserID(ctx), changedUserID, constant.UserInfoUpdatedNotification, &tips)
}
func (f *FriendNotificationSender) FriendApplicationAddNotification(ctx context.Context, req *pbfriend.ApplyToAddFriendReq) error {
func (f *FriendNotificationSender) FriendApplicationAddNotification(ctx context.Context, req *pbfriend.ApplyToAddFriendReq) {
tips := sdkws.FriendApplicationTips{FromToUserID: &sdkws.FromToUserID{
FromUserID: req.FromUserID,
ToUserID: req.ToUserID,
}}
return f.Notification(ctx, req.FromUserID, req.ToUserID, constant.FriendApplicationNotification, &tips)
f.Notification(ctx, req.FromUserID, req.ToUserID, constant.FriendApplicationNotification, &tips)
}
func (f *FriendNotificationSender) FriendApplicationAgreedNotification(
ctx context.Context,
req *pbfriend.RespondFriendApplyReq,
) error {
) {
tips := sdkws.FriendApplicationApprovedTips{FromToUserID: &sdkws.FromToUserID{
FromUserID: req.FromUserID,
ToUserID: req.ToUserID,
}, HandleMsg: req.HandleMsg}
return f.Notification(ctx, req.ToUserID, req.FromUserID, constant.FriendApplicationApprovedNotification, &tips)
f.Notification(ctx, req.ToUserID, req.FromUserID, constant.FriendApplicationApprovedNotification, &tips)
}
func (f *FriendNotificationSender) FriendApplicationRefusedNotification(
ctx context.Context,
req *pbfriend.RespondFriendApplyReq,
) error {
) {
tips := sdkws.FriendApplicationApprovedTips{FromToUserID: &sdkws.FromToUserID{
FromUserID: req.FromUserID,
ToUserID: req.ToUserID,
}, HandleMsg: req.HandleMsg}
return f.Notification(ctx, req.ToUserID, req.FromUserID, constant.FriendApplicationRejectedNotification, &tips)
f.Notification(ctx, req.ToUserID, req.FromUserID, constant.FriendApplicationRejectedNotification, &tips)
}
func (f *FriendNotificationSender) FriendAddedNotification(
@ -178,36 +178,37 @@ func (f *FriendNotificationSender) FriendAddedNotification(
if err != nil {
return err
}
return f.Notification(ctx, fromUserID, toUserID, constant.FriendAddedNotification, &tips)
f.Notification(ctx, fromUserID, toUserID, constant.FriendAddedNotification, &tips)
return nil
}
func (f *FriendNotificationSender) FriendDeletedNotification(ctx context.Context, req *pbfriend.DeleteFriendReq) error {
func (f *FriendNotificationSender) FriendDeletedNotification(ctx context.Context, req *pbfriend.DeleteFriendReq) {
tips := sdkws.FriendDeletedTips{FromToUserID: &sdkws.FromToUserID{
FromUserID: req.OwnerUserID,
ToUserID: req.FriendUserID,
}}
return f.Notification(ctx, req.OwnerUserID, req.FriendUserID, constant.FriendDeletedNotification, &tips)
f.Notification(ctx, req.OwnerUserID, req.FriendUserID, constant.FriendDeletedNotification, &tips)
}
func (f *FriendNotificationSender) FriendRemarkSetNotification(ctx context.Context, fromUserID, toUserID string) error {
func (f *FriendNotificationSender) FriendRemarkSetNotification(ctx context.Context, fromUserID, toUserID string) {
tips := sdkws.FriendInfoChangedTips{FromToUserID: &sdkws.FromToUserID{}}
tips.FromToUserID.FromUserID = fromUserID
tips.FromToUserID.ToUserID = toUserID
return f.Notification(ctx, fromUserID, toUserID, constant.FriendRemarkSetNotification, &tips)
f.Notification(ctx, fromUserID, toUserID, constant.FriendRemarkSetNotification, &tips)
}
func (f *FriendNotificationSender) FriendsInfoUpdateNotification(ctx context.Context, toUserID string, friendIDs []string) error {
func (f *FriendNotificationSender) FriendsInfoUpdateNotification(ctx context.Context, toUserID string, friendIDs []string) {
tips := sdkws.FriendsInfoUpdateTips{FromToUserID: &sdkws.FromToUserID{}}
tips.FromToUserID.ToUserID = toUserID
tips.FriendIDs = friendIDs
return f.Notification(ctx, toUserID, toUserID, constant.FriendsInfoUpdateNotification, &tips)
f.Notification(ctx, toUserID, toUserID, constant.FriendsInfoUpdateNotification, &tips)
}
func (f *FriendNotificationSender) BlackAddedNotification(ctx context.Context, req *pbfriend.AddBlackReq) error {
func (f *FriendNotificationSender) BlackAddedNotification(ctx context.Context, req *pbfriend.AddBlackReq) {
tips := sdkws.BlackAddedTips{FromToUserID: &sdkws.FromToUserID{}}
tips.FromToUserID.FromUserID = req.OwnerUserID
tips.FromToUserID.ToUserID = req.BlackUserID
return f.Notification(ctx, req.OwnerUserID, req.BlackUserID, constant.BlackAddedNotification, &tips)
f.Notification(ctx, req.OwnerUserID, req.BlackUserID, constant.BlackAddedNotification, &tips)
}
func (f *FriendNotificationSender) BlackDeletedNotification(ctx context.Context, req *pbfriend.RemoveBlackReq) {
@ -215,15 +216,10 @@ func (f *FriendNotificationSender) BlackDeletedNotification(ctx context.Context,
FromUserID: req.OwnerUserID,
ToUserID: req.BlackUserID,
}}
if err := f.Notification(ctx, req.OwnerUserID, req.BlackUserID, constant.BlackDeletedNotification, &blackDeletedTips); err != nil {
// err
}
f.Notification(ctx, req.OwnerUserID, req.BlackUserID, constant.BlackDeletedNotification, &blackDeletedTips)
}
func (f *FriendNotificationSender) FriendInfoUpdatedNotification(ctx context.Context, changedUserID string, needNotifiedUserID string) {
tips := sdkws.UserInfoUpdatedTips{UserID: changedUserID}
if err := f.Notification(ctx, mcontext.GetOpUserID(ctx), needNotifiedUserID,
constant.FriendInfoUpdatedNotification, &tips); err != nil {
// err
}
f.Notification(ctx, mcontext.GetOpUserID(ctx), needNotifiedUserID, constant.FriendInfoUpdatedNotification, &tips)
}

@ -20,7 +20,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue"
"math/big"
"math/rand"
"strconv"
@ -55,11 +54,6 @@ import (
"google.golang.org/protobuf/types/known/wrapperspb"
)
const (
webhookWorkerCount = 2
webhookBufferSize = 100
)
type groupServer struct {
db controller.GroupDatabase
user rpcclient.UserRpcClient
@ -120,7 +114,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
gs.conversationRpcClient = conversationRpcClient
gs.msgRpcClient = msgRpcClient
gs.config = config
gs.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL, memAsyncQueue.NewMemoryQueue(webhookWorkerCount, webhookBufferSize))
gs.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL)
pbgroup.RegisterGroupServer(server, &gs)
return nil
}
@ -138,9 +132,7 @@ func (s *groupServer) NotificationUserInfoUpdate(ctx context.Context, req *pbgro
groupIDs = append(groupIDs, member.GroupID)
}
for _, groupID := range groupIDs {
if err = s.notification.GroupMemberInfoSetNotification(ctx, groupID, req.UserID); err != nil {
return nil, err
}
s.notification.GroupMemberInfoSetNotification(ctx, groupID, req.UserID)
}
if err = s.db.DeleteGroupMemberHash(ctx, groupIDs); err != nil {
return nil, err
@ -929,7 +921,7 @@ func (s *groupServer) QuitGroup(ctx context.Context, req *pbgroup.QuitGroupReq)
if err != nil {
return nil, err
}
_ = s.notification.MemberQuitNotification(ctx, s.groupMemberDB2PB(member, 0))
s.notification.MemberQuitNotification(ctx, s.groupMemberDB2PB(member, 0))
if err := s.deleteMemberAndSetConversationSeq(ctx, req.GroupID, []string{req.UserID}); err != nil {
return nil, err
}
@ -1024,14 +1016,14 @@ func (s *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInf
log.ZWarn(ctx, "SetConversations", err, resp.UserIDs, conversation)
}
}()
_ = s.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{Group: tips.Group, OpUser: tips.OpUser})
s.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{Group: tips.Group, OpUser: tips.OpUser})
}
if req.GroupInfoForSet.GroupName != "" {
num--
_ = s.notification.GroupInfoSetNameNotification(ctx, &sdkws.GroupInfoSetNameTips{Group: tips.Group, OpUser: tips.OpUser})
s.notification.GroupInfoSetNameNotification(ctx, &sdkws.GroupInfoSetNameTips{Group: tips.Group, OpUser: tips.OpUser})
}
if num > 0 {
_ = s.notification.GroupInfoSetNotification(ctx, tips)
s.notification.GroupInfoSetNotification(ctx, tips)
}
s.webhookAfterSetGroupInfo(ctx, &s.config.WebhooksConfig.AfterSetGroupInfo, req)

@ -287,114 +287,122 @@ func (g *GroupNotificationSender) fillOpUser(ctx context.Context, opUser **sdkws
return nil
}
func (g *GroupNotificationSender) GroupCreatedNotification(ctx context.Context, tips *sdkws.GroupCreatedTips) (err error) {
func (g *GroupNotificationSender) GroupCreatedNotification(ctx context.Context, tips *sdkws.GroupCreatedTips) {
var err error
defer func() {
if err != nil {
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
}
}()
if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return err
if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return
}
return g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupCreatedNotification, tips)
g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupCreatedNotification, tips)
}
func (g *GroupNotificationSender) GroupInfoSetNotification(ctx context.Context, tips *sdkws.GroupInfoSetTips) (err error) {
func (g *GroupNotificationSender) GroupInfoSetNotification(ctx context.Context, tips *sdkws.GroupInfoSetTips) {
var err error
defer func() {
if err != nil {
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
}
}()
if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return err
if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return
}
return g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetNotification, tips, rpcclient.WithRpcGetUserName())
g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetNotification, tips, rpcclient.WithRpcGetUserName())
}
func (g *GroupNotificationSender) GroupInfoSetNameNotification(ctx context.Context, tips *sdkws.GroupInfoSetNameTips) (err error) {
func (g *GroupNotificationSender) GroupInfoSetNameNotification(ctx context.Context, tips *sdkws.GroupInfoSetNameTips) {
var err error
defer func() {
if err != nil {
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
}
}()
if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return err
if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return
}
return g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetNameNotification, tips)
g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetNameNotification, tips)
}
func (g *GroupNotificationSender) GroupInfoSetAnnouncementNotification(ctx context.Context, tips *sdkws.GroupInfoSetAnnouncementTips) (err error) {
func (g *GroupNotificationSender) GroupInfoSetAnnouncementNotification(ctx context.Context, tips *sdkws.GroupInfoSetAnnouncementTips) {
var err error
defer func() {
if err != nil {
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
}
}()
if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return err
if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return
}
return g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetAnnouncementNotification, tips, rpcclient.WithRpcGetUserName())
g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetAnnouncementNotification, tips, rpcclient.WithRpcGetUserName())
}
func (g *GroupNotificationSender) JoinGroupApplicationNotification(ctx context.Context, req *pbgroup.JoinGroupReq) (err error) {
func (g *GroupNotificationSender) JoinGroupApplicationNotification(ctx context.Context, req *pbgroup.JoinGroupReq) {
var err error
defer func() {
if err != nil {
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
}
}()
group, err := g.getGroupInfo(ctx, req.GroupID)
var group *sdkws.GroupInfo
group, err = g.getGroupInfo(ctx, req.GroupID)
if err != nil {
return err
return
}
user, err := g.getUser(ctx, req.InviterUserID)
var user *sdkws.PublicUserInfo
user, err = g.getUser(ctx, req.InviterUserID)
if err != nil {
return err
return
}
userIDs, err := g.getGroupOwnerAndAdminUserID(ctx, req.GroupID)
if err != nil {
return err
return
}
userIDs = append(userIDs, req.InviterUserID, mcontext.GetOpUserID(ctx))
tips := &sdkws.JoinGroupApplicationTips{Group: group, Applicant: user, ReqMsg: req.ReqMessage}
for _, userID := range datautil.Distinct(userIDs) {
err = g.Notification(ctx, mcontext.GetOpUserID(ctx), userID, constant.JoinGroupApplicationNotification, tips)
if err != nil {
log.ZError(ctx, "JoinGroupApplicationNotification failed", err, "group", req.GroupID, "userID", userID)
}
g.Notification(ctx, mcontext.GetOpUserID(ctx), userID, constant.JoinGroupApplicationNotification, tips)
}
return nil
}
func (g *GroupNotificationSender) MemberQuitNotification(ctx context.Context, member *sdkws.GroupMemberFullInfo) (err error) {
func (g *GroupNotificationSender) MemberQuitNotification(ctx context.Context, member *sdkws.GroupMemberFullInfo) {
var err error
defer func() {
if err != nil {
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
}
}()
group, err := g.getGroupInfo(ctx, member.GroupID)
var group *sdkws.GroupInfo
group, err = g.getGroupInfo(ctx, member.GroupID)
if err != nil {
return err
return
}
tips := &sdkws.MemberQuitTips{Group: group, QuitUser: member}
return g.Notification(ctx, mcontext.GetOpUserID(ctx), member.GroupID, constant.MemberQuitNotification, tips)
g.Notification(ctx, mcontext.GetOpUserID(ctx), member.GroupID, constant.MemberQuitNotification, tips)
}
func (g *GroupNotificationSender) GroupApplicationAcceptedNotification(ctx context.Context, req *pbgroup.GroupApplicationResponseReq) (err error) {
func (g *GroupNotificationSender) GroupApplicationAcceptedNotification(ctx context.Context, req *pbgroup.GroupApplicationResponseReq) {
var err error
defer func() {
if err != nil {
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
}
}()
group, err := g.getGroupInfo(ctx, req.GroupID)
var group *sdkws.GroupInfo
group, err = g.getGroupInfo(ctx, req.GroupID)
if err != nil {
return err
return
}
userIDs, err := g.getGroupOwnerAndAdminUserID(ctx, req.GroupID)
var userIDs []string
userIDs, err = g.getGroupOwnerAndAdminUserID(ctx, req.GroupID)
if err != nil {
return err
return
}
tips := &sdkws.GroupApplicationAcceptedTips{Group: group, HandleMsg: req.HandledMsg}
if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return err
return
}
for _, userID := range append(userIDs, req.FromUserID) {
if userID == req.FromUserID {
@ -402,31 +410,30 @@ func (g *GroupNotificationSender) GroupApplicationAcceptedNotification(ctx conte
} else {
tips.ReceiverAs = 1
}
err = g.Notification(ctx, mcontext.GetOpUserID(ctx), userID, constant.GroupApplicationAcceptedNotification, tips)
if err != nil {
log.ZError(ctx, "failed", err)
}
g.Notification(ctx, mcontext.GetOpUserID(ctx), userID, constant.GroupApplicationAcceptedNotification, tips)
}
return nil
}
func (g *GroupNotificationSender) GroupApplicationRejectedNotification(ctx context.Context, req *pbgroup.GroupApplicationResponseReq) (err error) {
func (g *GroupNotificationSender) GroupApplicationRejectedNotification(ctx context.Context, req *pbgroup.GroupApplicationResponseReq) {
var err error
defer func() {
if err != nil {
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
}
}()
group, err := g.getGroupInfo(ctx, req.GroupID)
var group *sdkws.GroupInfo
group, err = g.getGroupInfo(ctx, req.GroupID)
if err != nil {
return err
return
}
userIDs, err := g.getGroupOwnerAndAdminUserID(ctx, req.GroupID)
var userIDs []string
userIDs, err = g.getGroupOwnerAndAdminUserID(ctx, req.GroupID)
if err != nil {
return err
return
}
tips := &sdkws.GroupApplicationRejectedTips{Group: group, HandleMsg: req.HandledMsg}
if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return err
return
}
for _, userID := range append(userIDs, req.FromUserID) {
if userID == req.FromUserID {
@ -434,254 +441,281 @@ func (g *GroupNotificationSender) GroupApplicationRejectedNotification(ctx conte
} else {
tips.ReceiverAs = 1
}
err = g.Notification(ctx, mcontext.GetOpUserID(ctx), userID, constant.GroupApplicationRejectedNotification, tips)
if err != nil {
log.ZError(ctx, "failed", err)
}
g.Notification(ctx, mcontext.GetOpUserID(ctx), userID, constant.GroupApplicationRejectedNotification, tips)
}
return nil
}
func (g *GroupNotificationSender) GroupOwnerTransferredNotification(ctx context.Context, req *pbgroup.TransferGroupOwnerReq) (err error) {
func (g *GroupNotificationSender) GroupOwnerTransferredNotification(ctx context.Context, req *pbgroup.TransferGroupOwnerReq) {
var err error
defer func() {
if err != nil {
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
}
}()
group, err := g.getGroupInfo(ctx, req.GroupID)
var group *sdkws.GroupInfo
group, err = g.getGroupInfo(ctx, req.GroupID)
if err != nil {
return err
return
}
opUserID := mcontext.GetOpUserID(ctx)
member, err := g.getGroupMemberMap(ctx, req.GroupID, []string{opUserID, req.NewOwnerUserID})
var member map[string]*sdkws.GroupMemberFullInfo
member, err = g.getGroupMemberMap(ctx, req.GroupID, []string{opUserID, req.NewOwnerUserID})
if err != nil {
return err
return
}
tips := &sdkws.GroupOwnerTransferredTips{Group: group, OpUser: member[opUserID], NewGroupOwner: member[req.NewOwnerUserID]}
if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return err
if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return
}
return g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupOwnerTransferredNotification, tips)
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupOwnerTransferredNotification, tips)
}
func (g *GroupNotificationSender) MemberKickedNotification(ctx context.Context, tips *sdkws.MemberKickedTips) (err error) {
func (g *GroupNotificationSender) MemberKickedNotification(ctx context.Context, tips *sdkws.MemberKickedTips) {
var err error
defer func() {
if err != nil {
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
}
}()
if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return err
if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return
}
return g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.MemberKickedNotification, tips)
g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.MemberKickedNotification, tips)
}
func (g *GroupNotificationSender) MemberInvitedNotification(ctx context.Context, groupID, reason string, invitedUserIDList []string) (err error) {
func (g *GroupNotificationSender) MemberInvitedNotification(ctx context.Context, groupID, reason string, invitedUserIDList []string) {
var err error
defer func() {
if err != nil {
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
}
}()
group, err := g.getGroupInfo(ctx, groupID)
var group *sdkws.GroupInfo
group, err = g.getGroupInfo(ctx, groupID)
if err != nil {
return err
return
}
var users []*sdkws.GroupMemberFullInfo
users, err = g.getGroupMembers(ctx, groupID, invitedUserIDList)
if err != nil {
return err
}
users, err := g.getGroupMembers(ctx, groupID, invitedUserIDList)
if err != nil {
return err
return
}
tips := &sdkws.MemberInvitedTips{Group: group, InvitedUserList: users}
if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return err
}
return g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.MemberInvitedNotification, tips)
err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID)
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.MemberInvitedNotification, tips)
}
func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, groupID string, entrantUserID string) (err error) {
func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, groupID string, entrantUserID string) {
var err error
defer func() {
if err != nil {
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
}
}()
group, err := g.getGroupInfo(ctx, groupID)
var group *sdkws.GroupInfo
group, err = g.getGroupInfo(ctx, groupID)
if err != nil {
return err
return
}
user, err := g.getGroupMember(ctx, groupID, entrantUserID)
var user *sdkws.GroupMemberFullInfo
user, err = g.getGroupMember(ctx, groupID, entrantUserID)
if err != nil {
return err
return
}
tips := &sdkws.MemberEnterTips{Group: group, EntrantUser: user}
return g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.MemberEnterNotification, tips)
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.MemberEnterNotification, tips)
}
func (g *GroupNotificationSender) GroupDismissedNotification(ctx context.Context, tips *sdkws.GroupDismissedTips) (err error) {
func (g *GroupNotificationSender) GroupDismissedNotification(ctx context.Context, tips *sdkws.GroupDismissedTips) {
var err error
defer func() {
if err != nil {
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
}
}()
if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return err
if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return
}
return g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupDismissedNotification, tips)
g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupDismissedNotification, tips)
}
func (g *GroupNotificationSender) GroupMemberMutedNotification(ctx context.Context, groupID, groupMemberUserID string, mutedSeconds uint32) (err error) {
func (g *GroupNotificationSender) GroupMemberMutedNotification(ctx context.Context, groupID, groupMemberUserID string, mutedSeconds uint32) {
var err error
defer func() {
if err != nil {
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
}
}()
group, err := g.getGroupInfo(ctx, groupID)
var group *sdkws.GroupInfo
group, err = g.getGroupInfo(ctx, groupID)
if err != nil {
return err
return
}
user, err := g.getGroupMemberMap(ctx, groupID, []string{mcontext.GetOpUserID(ctx), groupMemberUserID})
var user map[string]*sdkws.GroupMemberFullInfo
user, err = g.getGroupMemberMap(ctx, groupID, []string{mcontext.GetOpUserID(ctx), groupMemberUserID})
if err != nil {
return err
return
}
tips := &sdkws.GroupMemberMutedTips{
Group: group, MutedSeconds: mutedSeconds,
OpUser: user[mcontext.GetOpUserID(ctx)], MutedUser: user[groupMemberUserID],
}
if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return err
if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return
}
return g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberMutedNotification, tips)
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberMutedNotification, tips)
}
func (g *GroupNotificationSender) GroupMemberCancelMutedNotification(ctx context.Context, groupID, groupMemberUserID string) (err error) {
func (g *GroupNotificationSender) GroupMemberCancelMutedNotification(ctx context.Context, groupID, groupMemberUserID string) {
var err error
defer func() {
if err != nil {
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
}
}()
group, err := g.getGroupInfo(ctx, groupID)
var group *sdkws.GroupInfo
group, err = g.getGroupInfo(ctx, groupID)
if err != nil {
return err
return
}
user, err := g.getGroupMemberMap(ctx, groupID, []string{mcontext.GetOpUserID(ctx), groupMemberUserID})
var user map[string]*sdkws.GroupMemberFullInfo
user, err = g.getGroupMemberMap(ctx, groupID, []string{mcontext.GetOpUserID(ctx), groupMemberUserID})
if err != nil {
return err
return
}
tips := &sdkws.GroupMemberCancelMutedTips{Group: group, OpUser: user[mcontext.GetOpUserID(ctx)], MutedUser: user[groupMemberUserID]}
if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return err
if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return
}
return g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberCancelMutedNotification, tips)
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberCancelMutedNotification, tips)
}
func (g *GroupNotificationSender) GroupMutedNotification(ctx context.Context, groupID string) (err error) {
func (g *GroupNotificationSender) GroupMutedNotification(ctx context.Context, groupID string) {
var err error
defer func() {
if err != nil {
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
}
}()
group, err := g.getGroupInfo(ctx, groupID)
var group *sdkws.GroupInfo
group, err = g.getGroupInfo(ctx, groupID)
if err != nil {
return err
return
}
users, err := g.getGroupMembers(ctx, groupID, []string{mcontext.GetOpUserID(ctx)})
var users []*sdkws.GroupMemberFullInfo
users, err = g.getGroupMembers(ctx, groupID, []string{mcontext.GetOpUserID(ctx)})
if err != nil {
return err
return
}
tips := &sdkws.GroupMutedTips{Group: group}
if len(users) > 0 {
tips.OpUser = users[0]
}
if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return err
if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return
}
return g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMutedNotification, tips)
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMutedNotification, tips)
}
func (g *GroupNotificationSender) GroupCancelMutedNotification(ctx context.Context, groupID string) (err error) {
func (g *GroupNotificationSender) GroupCancelMutedNotification(ctx context.Context, groupID string) {
var err error
defer func() {
if err != nil {
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
}
}()
group, err := g.getGroupInfo(ctx, groupID)
var group *sdkws.GroupInfo
group, err = g.getGroupInfo(ctx, groupID)
if err != nil {
return err
return
}
users, err := g.getGroupMembers(ctx, groupID, []string{mcontext.GetOpUserID(ctx)})
var users []*sdkws.GroupMemberFullInfo
users, err = g.getGroupMembers(ctx, groupID, []string{mcontext.GetOpUserID(ctx)})
if err != nil {
return err
return
}
tips := &sdkws.GroupCancelMutedTips{Group: group}
if len(users) > 0 {
tips.OpUser = users[0]
}
if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return err
if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return
}
return g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupCancelMutedNotification, tips)
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupCancelMutedNotification, tips)
}
func (g *GroupNotificationSender) GroupMemberInfoSetNotification(ctx context.Context, groupID, groupMemberUserID string) (err error) {
group, err := g.getGroupInfo(ctx, groupID)
func (g *GroupNotificationSender) GroupMemberInfoSetNotification(ctx context.Context, groupID, groupMemberUserID string) {
var err error
defer func() {
if err != nil {
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
}
}()
var group *sdkws.GroupInfo
group, err = g.getGroupInfo(ctx, groupID)
if err != nil {
return err
return
}
user, err := g.getGroupMemberMap(ctx, groupID, []string{groupMemberUserID})
var user map[string]*sdkws.GroupMemberFullInfo
user, err = g.getGroupMemberMap(ctx, groupID, []string{groupMemberUserID})
if err != nil {
return err
return
}
tips := &sdkws.GroupMemberInfoSetTips{Group: group, OpUser: user[mcontext.GetOpUserID(ctx)], ChangedUser: user[groupMemberUserID]}
if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return err
if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return
}
return g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberInfoSetNotification, tips)
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberInfoSetNotification, tips)
}
func (g *GroupNotificationSender) GroupMemberSetToAdminNotification(ctx context.Context, groupID, groupMemberUserID string) (err error) {
group, err := g.getGroupInfo(ctx, groupID)
func (g *GroupNotificationSender) GroupMemberSetToAdminNotification(ctx context.Context, groupID, groupMemberUserID string) {
var err error
defer func() {
if err != nil {
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
}
}()
var group *sdkws.GroupInfo
group, err = g.getGroupInfo(ctx, groupID)
if err != nil {
return err
return
}
user, err := g.getGroupMemberMap(ctx, groupID, []string{mcontext.GetOpUserID(ctx), groupMemberUserID})
if err != nil {
return err
return
}
tips := &sdkws.GroupMemberInfoSetTips{Group: group, OpUser: user[mcontext.GetOpUserID(ctx)], ChangedUser: user[groupMemberUserID]}
if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return err
if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return
}
return g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberSetToAdminNotification, tips)
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberSetToAdminNotification, tips)
}
func (g *GroupNotificationSender) GroupMemberSetToOrdinaryUserNotification(ctx context.Context, groupID, groupMemberUserID string) (err error) {
func (g *GroupNotificationSender) GroupMemberSetToOrdinaryUserNotification(ctx context.Context, groupID, groupMemberUserID string) {
var err error
defer func() {
if err != nil {
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
}
}()
group, err := g.getGroupInfo(ctx, groupID)
var group *sdkws.GroupInfo
group, err = g.getGroupInfo(ctx, groupID)
if err != nil {
return err
return
}
user, err := g.getGroupMemberMap(ctx, groupID, []string{mcontext.GetOpUserID(ctx), groupMemberUserID})
var user map[string]*sdkws.GroupMemberFullInfo
user, err = g.getGroupMemberMap(ctx, groupID, []string{mcontext.GetOpUserID(ctx), groupMemberUserID})
if err != nil {
return err
return
}
tips := &sdkws.GroupMemberInfoSetTips{Group: group, OpUser: user[mcontext.GetOpUserID(ctx)], ChangedUser: user[groupMemberUserID]}
if err := g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return err
if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return
}
return g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberSetToOrdinaryUserNotification, tips)
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberSetToOrdinaryUserNotification, tips)
}
func (g *GroupNotificationSender) SuperGroupNotification(ctx context.Context, sendID, recvID string) (err error) {
defer func() {
if err != nil {
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
}
}()
err = g.Notification(ctx, sendID, recvID, constant.SuperGroupUpdateNotification, nil)
return err
func (g *GroupNotificationSender) SuperGroupNotification(ctx context.Context, sendID, recvID string) {
g.Notification(ctx, sendID, recvID, constant.SuperGroupUpdateNotification, nil)
}

@ -83,9 +83,7 @@ func (m *msgServer) SetConversationHasReadSeq(ctx context.Context, req *msg.SetC
if err := m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq); err != nil {
return nil, err
}
if err = m.sendMarkAsReadNotification(ctx, req.ConversationID, constant.SingleChatType, req.UserID, req.UserID, nil, req.HasReadSeq); err != nil {
return nil, err
}
m.sendMarkAsReadNotification(ctx, req.ConversationID, constant.SingleChatType, req.UserID, req.UserID, nil, req.HasReadSeq)
return &msg.SetConversationHasReadSeqResp{}, nil
}
@ -126,11 +124,8 @@ func (m *msgServer) MarkMsgsAsRead(ctx context.Context, req *msg.MarkMsgsAsReadR
ContentType: conversation.ConversationType,
}
m.webhookAfterSingleMsgRead(ctx, &m.config.WebhooksConfig.AfterSingleMsgRead, reqCallback)
if err = m.sendMarkAsReadNotification(ctx, req.ConversationID, conversation.ConversationType, req.UserID,
m.conversationAndGetRecvID(conversation, req.UserID), req.Seqs, hasReadSeq); err != nil {
return nil, err
}
m.sendMarkAsReadNotification(ctx, req.ConversationID, conversation.ConversationType, req.UserID,
m.conversationAndGetRecvID(conversation, req.UserID), req.Seqs, hasReadSeq)
return &msg.MarkMsgsAsReadResp{}, nil
}
@ -169,11 +164,8 @@ func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkCon
}
hasReadSeq = req.HasReadSeq
}
if err = m.sendMarkAsReadNotification(ctx, req.ConversationID, conversation.ConversationType, req.UserID,
m.conversationAndGetRecvID(conversation, req.UserID), seqs, hasReadSeq); err != nil {
return nil, err
}
m.sendMarkAsReadNotification(ctx, req.ConversationID, conversation.ConversationType, req.UserID,
m.conversationAndGetRecvID(conversation, req.UserID), seqs, hasReadSeq)
} else if conversation.ConversationType == constant.SuperGroupChatType ||
conversation.ConversationType == constant.NotificationChatType {
if req.HasReadSeq > hasReadSeq {
@ -183,11 +175,8 @@ func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkCon
}
hasReadSeq = req.HasReadSeq
}
if err = m.sendMarkAsReadNotification(ctx, req.ConversationID, constant.SingleChatType, req.UserID,
req.UserID, seqs, hasReadSeq); err != nil {
return nil, err
}
m.sendMarkAsReadNotification(ctx, req.ConversationID, constant.SingleChatType, req.UserID,
req.UserID, seqs, hasReadSeq)
}
reqCall := &cbapi.CallbackGroupMsgReadReq{
@ -201,16 +190,13 @@ func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkCon
return &msg.MarkConversationAsReadResp{}, nil
}
func (m *msgServer) sendMarkAsReadNotification(ctx context.Context, conversationID string, sessionType int32, sendID, recvID string, seqs []int64, hasReadSeq int64) error {
func (m *msgServer) sendMarkAsReadNotification(ctx context.Context, conversationID string, sessionType int32, sendID, recvID string, seqs []int64, hasReadSeq int64) {
tips := &sdkws.MarkAsReadTips{
MarkAsReadUserID: sendID,
ConversationID: conversationID,
Seqs: seqs,
HasReadSeq: hasReadSeq,
}
err := m.notificationSender.NotificationWithSessionType(ctx, sendID, recvID, constant.HasReadReceipt, sessionType, tips)
if err != nil {
log.ZWarn(ctx, "send has read Receipt err", err)
}
return nil
m.notificationSender.NotificationWithSessionType(ctx, sendID, recvID, constant.HasReadReceipt, sessionType, tips)
}

@ -30,21 +30,21 @@ func NewMsgNotificationSender(config *Config, opts ...rpcclient.NotificationSend
return &MsgNotificationSender{rpcclient.NewNotificationSender(&config.NotificationConfig, opts...)}
}
func (m *MsgNotificationSender) UserDeleteMsgsNotification(ctx context.Context, userID, conversationID string, seqs []int64) error {
func (m *MsgNotificationSender) UserDeleteMsgsNotification(ctx context.Context, userID, conversationID string, seqs []int64) {
tips := sdkws.DeleteMsgsTips{
UserID: userID,
ConversationID: conversationID,
Seqs: seqs,
}
return m.Notification(ctx, userID, userID, constant.DeleteMsgsNotification, &tips)
m.Notification(ctx, userID, userID, constant.DeleteMsgsNotification, &tips)
}
func (m *MsgNotificationSender) MarkAsReadNotification(ctx context.Context, conversationID string, sesstionType int32, sendID, recvID string, seqs []int64, hasReadSeq int64) error {
func (m *MsgNotificationSender) MarkAsReadNotification(ctx context.Context, conversationID string, sesstionType int32, sendID, recvID string, seqs []int64, hasReadSeq int64) {
tips := &sdkws.MarkAsReadTips{
MarkAsReadUserID: sendID,
ConversationID: conversationID,
Seqs: seqs,
HasReadSeq: hasReadSeq,
}
return m.NotificationWithSessionType(ctx, sendID, recvID, constant.HasReadReceipt, sesstionType, tips)
m.NotificationWithSessionType(ctx, sendID, recvID, constant.HasReadReceipt, sesstionType, tips)
}

@ -123,9 +123,7 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.
} else {
recvID = msgs[0].RecvID
}
if err := m.notificationSender.NotificationWithSessionType(ctx, req.UserID, recvID, constant.MsgRevokeNotification, msgs[0].SessionType, &tips); err != nil {
return nil, err
}
m.notificationSender.NotificationWithSessionType(ctx, req.UserID, recvID, constant.MsgRevokeNotification, msgs[0].SessionType, &tips)
m.webhookAfterRevokeMsg(ctx, &m.config.WebhooksConfig.AfterRevokeMsg, req)
return &msg.RevokeMsgResp{}, nil
}

@ -18,7 +18,6 @@ import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/redisutil"
@ -34,11 +33,6 @@ import (
"google.golang.org/grpc"
)
const (
webhookWorkerCount = 2
webhookBufferSize = 100
)
type (
// MessageInterceptorChain defines a chain of message interceptor functions.
MessageInterceptorChain []MessageInterceptorFunc
@ -109,7 +103,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
ConversationLocalCache: rpccache.NewConversationLocalCache(conversationClient, &config.LocalCacheConfig, rdb),
FriendLocalCache: rpccache.NewFriendLocalCache(friendRpcClient, &config.LocalCacheConfig, rdb),
config: config,
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL, memAsyncQueue.NewMemoryQueue(webhookWorkerCount, webhookBufferSize)),
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL),
}
s.notificationSender = rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithLocalSendMsg(s.SendMsg))

@ -97,24 +97,24 @@ func NewUserNotificationSender(config *Config, msgRpcClient *rpcclient.MessageRp
func (u *UserNotificationSender) UserStatusChangeNotification(
ctx context.Context,
tips *sdkws.UserStatusChangeTips,
) error {
return u.Notification(ctx, tips.FromUserID, tips.ToUserID, constant.UserStatusChangeNotification, tips)
) {
u.Notification(ctx, tips.FromUserID, tips.ToUserID, constant.UserStatusChangeNotification, tips)
}
func (u *UserNotificationSender) UserCommandUpdateNotification(
ctx context.Context,
tips *sdkws.UserCommandUpdateTips,
) error {
return u.Notification(ctx, tips.FromUserID, tips.ToUserID, constant.UserCommandUpdateNotification, tips)
) {
u.Notification(ctx, tips.FromUserID, tips.ToUserID, constant.UserCommandUpdateNotification, tips)
}
func (u *UserNotificationSender) UserCommandAddNotification(
ctx context.Context,
tips *sdkws.UserCommandAddTips,
) error {
return u.Notification(ctx, tips.FromUserID, tips.ToUserID, constant.UserCommandAddNotification, tips)
) {
u.Notification(ctx, tips.FromUserID, tips.ToUserID, constant.UserCommandAddNotification, tips)
}
func (u *UserNotificationSender) UserCommandDeleteNotification(
ctx context.Context,
tips *sdkws.UserCommandDeleteTips,
) error {
return u.Notification(ctx, tips.FromUserID, tips.ToUserID, constant.UserCommandDeleteNotification, tips)
) {
u.Notification(ctx, tips.FromUserID, tips.ToUserID, constant.UserCommandDeleteNotification, tips)
}

@ -19,7 +19,6 @@ import (
"github.com/openimsdk/open-im-server/v3/internal/rpc/friend"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue"
"github.com/openimsdk/tools/db/redisutil"
"math/rand"
"strings"
@ -46,11 +45,6 @@ import (
"google.golang.org/grpc"
)
const (
webhookWorkerCount = 2
webhookBufferSize = 100
)
type userServer struct {
db controller.UserDatabase
friendNotificationSender *friend.FriendNotificationSender
@ -107,7 +101,7 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi
friendNotificationSender: friend.NewFriendNotificationSender(&config.NotificationConfig, &msgRpcClient, friend.WithDBFunc(database.FindWithError)),
userNotificationSender: NewUserNotificationSender(config, &msgRpcClient, WithUserFunc(database.FindWithError)),
config: config,
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL, memAsyncQueue.NewMemoryQueue(webhookWorkerCount, webhookBufferSize)),
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL),
}
pbuser.RegisterUserServer(server, u)
return u.db.InitOnce(context.Background(), users)
@ -142,7 +136,7 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserI
if err := s.db.UpdateByMap(ctx, req.UserInfo.UserID, data); err != nil {
return nil, err
}
_ = s.friendNotificationSender.UserInfoUpdatedNotification(ctx, req.UserInfo.UserID)
s.friendNotificationSender.UserInfoUpdatedNotification(ctx, req.UserInfo.UserID)
friends, err := s.friendRpcClient.GetFriendIDs(ctx, req.UserInfo.UserID)
if err != nil {
return nil, err
@ -174,7 +168,7 @@ func (s *userServer) UpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUse
if err = s.db.UpdateByMap(ctx, req.UserInfo.UserID, data); err != nil {
return nil, err
}
_ = s.friendNotificationSender.UserInfoUpdatedNotification(ctx, req.UserInfo.UserID)
s.friendNotificationSender.UserInfoUpdatedNotification(ctx, req.UserInfo.UserID)
friends, err := s.friendRpcClient.GetFriendIDs(ctx, req.UserInfo.UserID)
if err != nil {
return nil, err
@ -418,10 +412,7 @@ func (s *userServer) ProcessUserCommandAdd(ctx context.Context, req *pbuser.Proc
FromUserID: req.UserID,
ToUserID: req.UserID,
}
err = s.userNotificationSender.UserCommandAddNotification(ctx, tips)
if err != nil {
return nil, err
}
s.userNotificationSender.UserCommandAddNotification(ctx, tips)
return &pbuser.ProcessUserCommandAddResp{}, nil
}
@ -440,11 +431,7 @@ func (s *userServer) ProcessUserCommandDelete(ctx context.Context, req *pbuser.P
FromUserID: req.UserID,
ToUserID: req.UserID,
}
err = s.userNotificationSender.UserCommandDeleteNotification(ctx, tips)
if err != nil {
return nil, err
}
s.userNotificationSender.UserCommandDeleteNotification(ctx, tips)
return &pbuser.ProcessUserCommandDeleteResp{}, nil
}
@ -473,10 +460,7 @@ func (s *userServer) ProcessUserCommandUpdate(ctx context.Context, req *pbuser.P
FromUserID: req.UserID,
ToUserID: req.UserID,
}
err = s.userNotificationSender.UserCommandUpdateNotification(ctx, tips)
if err != nil {
return nil, err
}
s.userNotificationSender.UserCommandUpdateNotification(ctx, tips)
return &pbuser.ProcessUserCommandUpdateResp{}, nil
}

@ -142,9 +142,7 @@ func (c *MsgTool) ConversationsDestructMsgs() {
log.ZError(ctx, "updateUsersConversationField failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
continue
}
if err := c.msgNotificationSender.UserDeleteMsgsNotification(ctx, conversation.OwnerUserID, conversation.ConversationID, seqs); err != nil {
log.ZError(ctx, "userDeleteMsgsNotification failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
}
c.msgNotificationSender.UserDeleteMsgsNotification(ctx, conversation.OwnerUserID, conversation.ConversationID, seqs)
}
}
}

@ -1,51 +0,0 @@
package notification
import (
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue"
"github.com/openimsdk/tools/utils/httputil"
"net/http"
)
package webhook
import (
"context"
"encoding/json"
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/httputil"
"net/http"
)
type Client struct {
url string
queue *memAsyncQueue.MemoryQueue
}
func NewWebhookClient(url string, queue *memAsyncQueue.MemoryQueue) *Client {
http.DefaultTransport.(*http.Transport).MaxConnsPerHost = 100 // Enhance the default number of max connections per host
return &Client{
url: url,
queue: queue,
}
}
func (c *Client) SyncPost(ctx context.Context, command string, req callbackstruct.CallbackReq, resp callbackstruct.CallbackResp, before *config.BeforeConfig) error {
if before.Enable {
return c.post(ctx, command, req, resp, before.Timeout)
}
return nil
}
func (c *Client) AsyncPost(ctx context.Context, command string, req callbackstruct.CallbackReq, resp callbackstruct.CallbackResp, after *config.AfterConfig) {
if after.Enable {
c.queue.Push(func() { c.post(ctx, command, req, resp, after.Timeout) })
}
}

@ -33,8 +33,21 @@ type Client struct {
queue *memAsyncQueue.MemoryQueue
}
func NewWebhookClient(url string, queue *memAsyncQueue.MemoryQueue) *Client {
const (
webhookWorkerCount = 2
webhookBufferSize = 100
)
func NewWebhookClient(url string, options ...*memAsyncQueue.MemoryQueue) *Client {
var queue *memAsyncQueue.MemoryQueue
if len(options) > 0 && options[0] != nil {
queue = options[0]
} else {
queue = memAsyncQueue.NewMemoryQueue(webhookWorkerCount, webhookBufferSize)
}
http.DefaultTransport.(*http.Transport).MaxConnsPerHost = 100 // Enhance the default number of max connections per host
return &Client{
client: httputil.NewHTTPClient(httputil.NewClientConfig()),
url: url,

@ -17,14 +17,12 @@ package rpcclient
import (
"context"
"encoding/json"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/system/program"
"github.com/openimsdk/tools/utils/idutil"
@ -217,6 +215,13 @@ type NotificationSender struct {
sessionTypeConf map[int32]int32
sendMsg func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error)
getUserInfo func(ctx context.Context, userID string) (*sdkws.UserInfo, error)
queue *memAsyncQueue.MemoryQueue
}
func WithQueue(queue *memAsyncQueue.MemoryQueue) NotificationSenderOptions {
return func(s *NotificationSender) {
s.queue = queue
}
}
type NotificationSenderOptions func(*NotificationSender)
@ -239,11 +244,19 @@ func WithUserRpcClient(userRpcClient *UserRpcClient) NotificationSenderOptions {
}
}
const (
notificationWorkerCount = 2
notificationBufferSize = 200
)
func NewNotificationSender(conf *config.Notification, opts ...NotificationSenderOptions) *NotificationSender {
notificationSender := &NotificationSender{contentTypeConf: newContentTypeConf(conf), sessionTypeConf: newSessionTypeConf()}
for _, opt := range opts {
opt(notificationSender)
}
if notificationSender.queue == nil {
notificationSender.queue = memAsyncQueue.NewMemoryQueue(notificationWorkerCount, notificationBufferSize)
}
return notificationSender
}
@ -259,11 +272,12 @@ func WithRpcGetUserName() NotificationOptions {
}
}
func (s *NotificationSender) NotificationWithSessionType(ctx context.Context, sendID, recvID string, contentType, sesstionType int32, m proto.Message, opts ...NotificationOptions) (err error) {
func (s *NotificationSender) send(ctx context.Context, sendID, recvID string, contentType, sesstionType int32, m proto.Message, opts ...NotificationOptions) {
n := sdkws.NotificationElem{Detail: jsonutil.StructToJsonString(m)}
content, err := json.Marshal(&n)
if err != nil {
return errs.WrapMsg(err, "json.Marshal failed", "sendID", sendID, "recvID", recvID, "contentType", contentType, "msg", jsonutil.StructToJsonString(m))
log.ZError(ctx, "json.Marshal failed", err, "sendID", sendID, "recvID", recvID, "contentType", contentType, "msg", jsonutil.StructToJsonString(m))
return
}
notificationOpt := &notificationOpt{}
for _, opt := range opts {
@ -275,7 +289,8 @@ func (s *NotificationSender) NotificationWithSessionType(ctx context.Context, se
if notificationOpt.WithRpcGetUsername && s.getUserInfo != nil {
userInfo, err = s.getUserInfo(ctx, sendID)
if err != nil {
return errs.WrapMsg(err, "getUserInfo failed", "sendID", sendID)
log.ZError(ctx, "getUserInfo failed", err, "sendID", sendID)
return
}
msg.SenderNickname = userInfo.Nickname
msg.SenderFaceURL = userInfo.FaceURL
@ -303,13 +318,16 @@ func (s *NotificationSender) NotificationWithSessionType(ctx context.Context, se
req.MsgData = &msg
_, err = s.sendMsg(ctx, &req)
if err != nil {
return errs.WrapMsg(err, "SendMsg failed", "req", fmt.Sprintf("%+v", req))
log.ZError(ctx, "SendMsg failed", err, "req", req.String())
}
return err
}
func (s *NotificationSender) Notification(ctx context.Context, sendID, recvID string, contentType int32, m proto.Message, opts ...NotificationOptions) error {
return s.NotificationWithSessionType(ctx, sendID, recvID, contentType, s.sessionTypeConf[contentType], m, opts...)
func (s *NotificationSender) NotificationWithSessionType(ctx context.Context, sendID, recvID string, contentType, sesstionType int32, m proto.Message, opts ...NotificationOptions) {
s.queue.Push(func() { s.send(ctx, sendID, recvID, contentType, sesstionType, m, opts...) })
}
func (s *NotificationSender) Notification(ctx context.Context, sendID, recvID string, contentType int32, m proto.Message, opts ...NotificationOptions) {
s.NotificationWithSessionType(ctx, sendID, recvID, contentType, s.sessionTypeConf[contentType], m, opts...)
}
func (s *NotificationSender) SetOptionsByContentType(_ context.Context, options map[string]bool, contentType int32) {

Loading…
Cancel
Save