You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
350 lines
15 KiB
350 lines
15 KiB
// Copyright © 2023 OpenIM. All rights reserved.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package rpcclient
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
|
"github.com/openimsdk/protocol/constant"
|
|
"github.com/openimsdk/protocol/msg"
|
|
"github.com/openimsdk/protocol/sdkws"
|
|
"github.com/openimsdk/tools/discovery"
|
|
"github.com/openimsdk/tools/log"
|
|
"github.com/openimsdk/tools/mcontext"
|
|
"github.com/openimsdk/tools/mq/memamq"
|
|
"github.com/openimsdk/tools/system/program"
|
|
"github.com/openimsdk/tools/utils/idutil"
|
|
"github.com/openimsdk/tools/utils/jsonutil"
|
|
"github.com/openimsdk/tools/utils/timeutil"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/protobuf/proto"
|
|
"time"
|
|
)
|
|
|
|
func newContentTypeConf(conf *config.Notification) map[int32]config.NotificationConfig {
|
|
return map[int32]config.NotificationConfig{
|
|
// group
|
|
constant.GroupCreatedNotification: conf.GroupCreated,
|
|
constant.GroupInfoSetNotification: conf.GroupInfoSet,
|
|
constant.JoinGroupApplicationNotification: conf.JoinGroupApplication,
|
|
constant.MemberQuitNotification: conf.MemberQuit,
|
|
constant.GroupApplicationAcceptedNotification: conf.GroupApplicationAccepted,
|
|
constant.GroupApplicationRejectedNotification: conf.GroupApplicationRejected,
|
|
constant.GroupOwnerTransferredNotification: conf.GroupOwnerTransferred,
|
|
constant.MemberKickedNotification: conf.MemberKicked,
|
|
constant.MemberInvitedNotification: conf.MemberInvited,
|
|
constant.MemberEnterNotification: conf.MemberEnter,
|
|
constant.GroupDismissedNotification: conf.GroupDismissed,
|
|
constant.GroupMutedNotification: conf.GroupMuted,
|
|
constant.GroupCancelMutedNotification: conf.GroupCancelMuted,
|
|
constant.GroupMemberMutedNotification: conf.GroupMemberMuted,
|
|
constant.GroupMemberCancelMutedNotification: conf.GroupMemberCancelMuted,
|
|
constant.GroupMemberInfoSetNotification: conf.GroupMemberInfoSet,
|
|
constant.GroupMemberSetToAdminNotification: conf.GroupMemberSetToAdmin,
|
|
constant.GroupMemberSetToOrdinaryUserNotification: conf.GroupMemberSetToOrdinary,
|
|
constant.GroupInfoSetAnnouncementNotification: conf.GroupInfoSetAnnouncement,
|
|
constant.GroupInfoSetNameNotification: conf.GroupInfoSetName,
|
|
// user
|
|
constant.UserInfoUpdatedNotification: conf.UserInfoUpdated,
|
|
constant.UserStatusChangeNotification: conf.UserStatusChanged,
|
|
// friend
|
|
constant.FriendApplicationNotification: conf.FriendApplicationAdded,
|
|
constant.FriendApplicationApprovedNotification: conf.FriendApplicationApproved,
|
|
constant.FriendApplicationRejectedNotification: conf.FriendApplicationRejected,
|
|
constant.FriendAddedNotification: conf.FriendAdded,
|
|
constant.FriendDeletedNotification: conf.FriendDeleted,
|
|
constant.FriendRemarkSetNotification: conf.FriendRemarkSet,
|
|
constant.BlackAddedNotification: conf.BlackAdded,
|
|
constant.BlackDeletedNotification: conf.BlackDeleted,
|
|
constant.FriendInfoUpdatedNotification: conf.FriendInfoUpdated,
|
|
constant.FriendsInfoUpdateNotification: conf.FriendInfoUpdated, // use the same FriendInfoUpdated
|
|
// conversation
|
|
constant.ConversationChangeNotification: conf.ConversationChanged,
|
|
constant.ConversationUnreadNotification: conf.ConversationChanged,
|
|
constant.ConversationPrivateChatNotification: conf.ConversationSetPrivate,
|
|
// msg
|
|
constant.MsgRevokeNotification: {IsSendMsg: false, ReliabilityLevel: constant.ReliableNotificationNoMsg},
|
|
constant.HasReadReceipt: {IsSendMsg: false, ReliabilityLevel: constant.ReliableNotificationNoMsg},
|
|
constant.DeleteMsgsNotification: {IsSendMsg: false, ReliabilityLevel: constant.ReliableNotificationNoMsg},
|
|
}
|
|
}
|
|
|
|
func newSessionTypeConf() map[int32]int32 {
|
|
return map[int32]int32{
|
|
// group
|
|
constant.GroupCreatedNotification: constant.ReadGroupChatType,
|
|
constant.GroupInfoSetNotification: constant.ReadGroupChatType,
|
|
constant.JoinGroupApplicationNotification: constant.SingleChatType,
|
|
constant.MemberQuitNotification: constant.ReadGroupChatType,
|
|
constant.GroupApplicationAcceptedNotification: constant.SingleChatType,
|
|
constant.GroupApplicationRejectedNotification: constant.SingleChatType,
|
|
constant.GroupOwnerTransferredNotification: constant.ReadGroupChatType,
|
|
constant.MemberKickedNotification: constant.ReadGroupChatType,
|
|
constant.MemberInvitedNotification: constant.ReadGroupChatType,
|
|
constant.MemberEnterNotification: constant.ReadGroupChatType,
|
|
constant.GroupDismissedNotification: constant.ReadGroupChatType,
|
|
constant.GroupMutedNotification: constant.ReadGroupChatType,
|
|
constant.GroupCancelMutedNotification: constant.ReadGroupChatType,
|
|
constant.GroupMemberMutedNotification: constant.ReadGroupChatType,
|
|
constant.GroupMemberCancelMutedNotification: constant.ReadGroupChatType,
|
|
constant.GroupMemberInfoSetNotification: constant.ReadGroupChatType,
|
|
constant.GroupMemberSetToAdminNotification: constant.ReadGroupChatType,
|
|
constant.GroupMemberSetToOrdinaryUserNotification: constant.ReadGroupChatType,
|
|
constant.GroupInfoSetAnnouncementNotification: constant.ReadGroupChatType,
|
|
constant.GroupInfoSetNameNotification: constant.ReadGroupChatType,
|
|
// user
|
|
constant.UserInfoUpdatedNotification: constant.SingleChatType,
|
|
constant.UserStatusChangeNotification: constant.SingleChatType,
|
|
// friend
|
|
constant.FriendApplicationNotification: constant.SingleChatType,
|
|
constant.FriendApplicationApprovedNotification: constant.SingleChatType,
|
|
constant.FriendApplicationRejectedNotification: constant.SingleChatType,
|
|
constant.FriendAddedNotification: constant.SingleChatType,
|
|
constant.FriendDeletedNotification: constant.SingleChatType,
|
|
constant.FriendRemarkSetNotification: constant.SingleChatType,
|
|
constant.BlackAddedNotification: constant.SingleChatType,
|
|
constant.BlackDeletedNotification: constant.SingleChatType,
|
|
constant.FriendInfoUpdatedNotification: constant.SingleChatType,
|
|
constant.FriendsInfoUpdateNotification: constant.SingleChatType,
|
|
// conversation
|
|
constant.ConversationChangeNotification: constant.SingleChatType,
|
|
constant.ConversationUnreadNotification: constant.SingleChatType,
|
|
constant.ConversationPrivateChatNotification: constant.SingleChatType,
|
|
// delete
|
|
constant.DeleteMsgsNotification: constant.SingleChatType,
|
|
}
|
|
}
|
|
|
|
type Message struct {
|
|
conn grpc.ClientConnInterface
|
|
Client msg.MsgClient
|
|
discov discovery.SvcDiscoveryRegistry
|
|
}
|
|
|
|
func NewMessage(discov discovery.SvcDiscoveryRegistry, rpcRegisterName string) *Message {
|
|
conn, err := discov.GetConn(context.Background(), rpcRegisterName)
|
|
if err != nil {
|
|
program.ExitWithError(err)
|
|
}
|
|
client := msg.NewMsgClient(conn)
|
|
return &Message{discov: discov, conn: conn, Client: client}
|
|
}
|
|
|
|
type MessageRpcClient Message
|
|
|
|
func NewMessageRpcClient(discov discovery.SvcDiscoveryRegistry, rpcRegisterName string) MessageRpcClient {
|
|
return MessageRpcClient(*NewMessage(discov, rpcRegisterName))
|
|
}
|
|
|
|
// SendMsg sends a message through the gRPC client and returns the response.
|
|
// It wraps any encountered error for better error handling and context understanding.
|
|
func (m *MessageRpcClient) SendMsg(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) {
|
|
resp, err := m.Client.SendMsg(ctx, req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
// GetMaxSeq retrieves the maximum sequence number from the gRPC client.
|
|
// Errors during the gRPC call are wrapped to provide additional context.
|
|
func (m *MessageRpcClient) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqReq) (*sdkws.GetMaxSeqResp, error) {
|
|
resp, err := m.Client.GetMaxSeq(ctx, req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
func (m *MessageRpcClient) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) {
|
|
log.ZDebug(ctx, "GetMaxSeqs", "conversationIDs", conversationIDs)
|
|
resp, err := m.Client.GetMaxSeqs(ctx, &msg.GetMaxSeqsReq{
|
|
ConversationIDs: conversationIDs,
|
|
})
|
|
return resp.MaxSeqs, err
|
|
}
|
|
|
|
func (m *MessageRpcClient) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
|
|
resp, err := m.Client.GetHasReadSeqs(ctx, &msg.GetHasReadSeqsReq{
|
|
UserID: userID,
|
|
ConversationIDs: conversationIDs,
|
|
})
|
|
return resp.MaxSeqs, err
|
|
}
|
|
|
|
func (m *MessageRpcClient) GetMsgByConversationIDs(ctx context.Context, docIDs []string, seqs map[string]int64) (map[string]*sdkws.MsgData, error) {
|
|
resp, err := m.Client.GetMsgByConversationIDs(ctx, &msg.GetMsgByConversationIDsReq{
|
|
ConversationIDs: docIDs,
|
|
MaxSeqs: seqs,
|
|
})
|
|
return resp.MsgDatas, err
|
|
}
|
|
|
|
// PullMessageBySeqList retrieves messages by their sequence numbers using the gRPC client.
|
|
// It directly forwards the request to the gRPC client and returns the response along with any error encountered.
|
|
func (m *MessageRpcClient) PullMessageBySeqList(ctx context.Context, req *sdkws.PullMessageBySeqsReq) (*sdkws.PullMessageBySeqsResp, error) {
|
|
resp, err := m.Client.PullMessageBySeqs(ctx, req)
|
|
if err != nil {
|
|
// Wrap the error to provide more context if the gRPC call fails.
|
|
return nil, err
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
func (m *MessageRpcClient) GetConversationMaxSeq(ctx context.Context, conversationID string) (int64, error) {
|
|
resp, err := m.Client.GetConversationMaxSeq(ctx, &msg.GetConversationMaxSeqReq{ConversationID: conversationID})
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return resp.MaxSeq, nil
|
|
}
|
|
|
|
func (m *MessageRpcClient) ClearMsg(ctx context.Context, ts int64) error {
|
|
_, err := m.Client.ClearMsg(ctx, &msg.ClearMsgReq{Timestamp: ts})
|
|
return err
|
|
}
|
|
|
|
type NotificationSender struct {
|
|
contentTypeConf map[int32]config.NotificationConfig
|
|
sessionTypeConf map[int32]int32
|
|
sendMsg func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error)
|
|
getUserInfo func(ctx context.Context, userID string) (*sdkws.UserInfo, error)
|
|
queue *memamq.MemoryQueue
|
|
}
|
|
|
|
func WithQueue(queue *memamq.MemoryQueue) NotificationSenderOptions {
|
|
return func(s *NotificationSender) {
|
|
s.queue = queue
|
|
}
|
|
}
|
|
|
|
type NotificationSenderOptions func(*NotificationSender)
|
|
|
|
func WithLocalSendMsg(sendMsg func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error)) NotificationSenderOptions {
|
|
return func(s *NotificationSender) {
|
|
s.sendMsg = sendMsg
|
|
}
|
|
}
|
|
|
|
func WithRpcClient(msgRpcClient *MessageRpcClient) NotificationSenderOptions {
|
|
return func(s *NotificationSender) {
|
|
s.sendMsg = msgRpcClient.SendMsg
|
|
}
|
|
}
|
|
|
|
func WithUserRpcClient(userRpcClient *UserRpcClient) NotificationSenderOptions {
|
|
return func(s *NotificationSender) {
|
|
s.getUserInfo = userRpcClient.GetUserInfo
|
|
}
|
|
}
|
|
|
|
const (
|
|
notificationWorkerCount = 2
|
|
notificationBufferSize = 200
|
|
)
|
|
|
|
func NewNotificationSender(conf *config.Notification, opts ...NotificationSenderOptions) *NotificationSender {
|
|
notificationSender := &NotificationSender{contentTypeConf: newContentTypeConf(conf), sessionTypeConf: newSessionTypeConf()}
|
|
for _, opt := range opts {
|
|
opt(notificationSender)
|
|
}
|
|
if notificationSender.queue == nil {
|
|
notificationSender.queue = memamq.NewMemoryQueue(notificationWorkerCount, notificationBufferSize)
|
|
}
|
|
return notificationSender
|
|
}
|
|
|
|
type notificationOpt struct {
|
|
WithRpcGetUsername bool
|
|
}
|
|
|
|
type NotificationOptions func(*notificationOpt)
|
|
|
|
func WithRpcGetUserName() NotificationOptions {
|
|
return func(opt *notificationOpt) {
|
|
opt.WithRpcGetUsername = true
|
|
}
|
|
}
|
|
|
|
func (s *NotificationSender) send(ctx context.Context, sendID, recvID string, contentType, sessionType int32, m proto.Message, opts ...NotificationOptions) {
|
|
ctx = mcontext.WithMustInfoCtx([]string{mcontext.GetOperationID(ctx), mcontext.GetOpUserID(ctx), mcontext.GetOpUserPlatform(ctx), mcontext.GetConnID(ctx)})
|
|
ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(5))
|
|
defer cancel()
|
|
n := sdkws.NotificationElem{Detail: jsonutil.StructToJsonString(m)}
|
|
content, err := json.Marshal(&n)
|
|
if err != nil {
|
|
log.ZWarn(ctx, "json.Marshal failed", err, "sendID", sendID, "recvID", recvID, "contentType", contentType, "msg", jsonutil.StructToJsonString(m))
|
|
return
|
|
}
|
|
notificationOpt := ¬ificationOpt{}
|
|
for _, opt := range opts {
|
|
opt(notificationOpt)
|
|
}
|
|
var req msg.SendMsgReq
|
|
var msg sdkws.MsgData
|
|
var userInfo *sdkws.UserInfo
|
|
if notificationOpt.WithRpcGetUsername && s.getUserInfo != nil {
|
|
userInfo, err = s.getUserInfo(ctx, sendID)
|
|
if err != nil {
|
|
log.ZWarn(ctx, "getUserInfo failed", err, "sendID", sendID)
|
|
return
|
|
}
|
|
msg.SenderNickname = userInfo.Nickname
|
|
msg.SenderFaceURL = userInfo.FaceURL
|
|
}
|
|
var offlineInfo sdkws.OfflinePushInfo
|
|
msg.SendID = sendID
|
|
msg.RecvID = recvID
|
|
msg.Content = content
|
|
msg.MsgFrom = constant.SysMsgType
|
|
msg.ContentType = contentType
|
|
msg.SessionType = sessionType
|
|
if msg.SessionType == constant.ReadGroupChatType {
|
|
msg.GroupID = recvID
|
|
}
|
|
msg.CreateTime = timeutil.GetCurrentTimestampByMill()
|
|
msg.ClientMsgID = idutil.GetMsgIDByMD5(sendID)
|
|
optionsConfig := s.contentTypeConf[contentType]
|
|
if sendID == recvID && contentType == constant.HasReadReceipt {
|
|
optionsConfig.ReliabilityLevel = constant.UnreliableNotification
|
|
}
|
|
options := config.GetOptionsByNotification(optionsConfig)
|
|
s.SetOptionsByContentType(ctx, options, contentType)
|
|
msg.Options = options
|
|
msg.OfflinePushInfo = &offlineInfo
|
|
req.MsgData = &msg
|
|
_, err = s.sendMsg(ctx, &req)
|
|
if err != nil {
|
|
log.ZWarn(ctx, "SendMsg failed", err, "req", req.String())
|
|
}
|
|
}
|
|
|
|
func (s *NotificationSender) NotificationWithSessionType(ctx context.Context, sendID, recvID string, contentType, sessionType int32, m proto.Message, opts ...NotificationOptions) {
|
|
s.queue.Push(func() { s.send(ctx, sendID, recvID, contentType, sessionType, m, opts...) })
|
|
}
|
|
|
|
func (s *NotificationSender) Notification(ctx context.Context, sendID, recvID string, contentType int32, m proto.Message, opts ...NotificationOptions) {
|
|
s.NotificationWithSessionType(ctx, sendID, recvID, contentType, s.sessionTypeConf[contentType], m, opts...)
|
|
}
|
|
|
|
func (s *NotificationSender) SetOptionsByContentType(_ context.Context, options map[string]bool, contentType int32) {
|
|
switch contentType {
|
|
case constant.UserStatusChangeNotification:
|
|
options[constant.IsSenderSync] = false
|
|
default:
|
|
}
|
|
}
|