pull/2993/head
withchao 9 months ago
parent e9895062cb
commit 9625cc57be

@ -221,3 +221,8 @@ require (
golang.org/x/crypto v0.27.0 // indirect golang.org/x/crypto v0.27.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect
) )
replace (
github.com/openimsdk/protocol => /Users/chao/Desktop/code/protocol
)

@ -16,6 +16,7 @@ package conversation
import ( import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"sort" "sort"
"time" "time"
@ -26,9 +27,6 @@ import (
dbModel "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" dbModel "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/localcache" "github.com/openimsdk/open-im-server/v3/pkg/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
pbgroup "github.com/openimsdk/protocol/group"
pbmsg "github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/rpccall"
"github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/open-im-server/v3/pkg/common/convert"
@ -52,6 +50,9 @@ type conversationServer struct {
conversationNotificationSender *ConversationNotificationSender conversationNotificationSender *ConversationNotificationSender
config *Config config *Config
// todo
msgClient *rpcli.MsgClient
groupClient *rpcli.GroupClient
} }
type Config struct { type Config struct {
@ -118,19 +119,12 @@ func (c *conversationServer) GetSortedConversationList(ctx context.Context, req
if len(conversations) == 0 { if len(conversations) == 0 {
return nil, errs.ErrRecordNotFound.Wrap() return nil, errs.ErrRecordNotFound.Wrap()
} }
maxSeqs, err := c.msgClient.GetMaxSeqs(ctx, conversationIDs)
maxSeqs, err := rpccall.ExtractField(ctx, pbmsg.GetMaxSeqsCaller.Invoke,
&pbmsg.GetMaxSeqsReq{ConversationIDs: conversationIDs},
(*pbmsg.SeqsInfoResp).GetMaxSeqs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
chatLogs, err := rpccall.ExtractField(ctx, pbmsg.GetMsgByConversationIDsCaller.Invoke, chatLogs, err := c.msgClient.GetMsgByConversationIDs(ctx, conversationIDs, maxSeqs)
&pbmsg.GetMsgByConversationIDsReq{
ConversationIDs: conversationIDs,
MaxSeqs: maxSeqs,
}, (*pbmsg.GetMsgByConversationIDsResp).GetMsgDatas)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -140,9 +134,7 @@ func (c *conversationServer) GetSortedConversationList(ctx context.Context, req
return nil, err return nil, err
} }
hasReadSeqs, err := rpccall.ExtractField(ctx, pbmsg.GetHasReadSeqsCaller.Invoke, hasReadSeqs, err := c.msgClient.GetHasReadSeqs(ctx, conversationIDs, req.UserID)
&pbmsg.GetHasReadSeqsReq{ConversationIDs: conversationIDs},
(*pbmsg.SeqsInfoResp).GetMaxSeqs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -230,14 +222,7 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver
return nil, errs.ErrArgs.WrapMsg("conversation must not be nil") return nil, errs.ErrArgs.WrapMsg("conversation must not be nil")
} }
if req.Conversation.ConversationType == constant.WriteGroupChatType { if req.Conversation.ConversationType == constant.WriteGroupChatType {
groupInfo, err := rpccall.ExtractField(ctx, pbgroup.GetGroupsInfoCaller.Invoke, groupInfo, err := c.groupClient.GetGroupInfo(ctx, req.Conversation.GroupID)
&pbgroup.GetGroupsInfoReq{GroupIDs: []string{req.Conversation.GroupID}},
func(r *pbgroup.GetGroupsInfoResp) *sdkws.GroupInfo {
if len(r.GroupInfos) > 0 {
return r.GroupInfos[0]
}
return nil
})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -444,14 +429,14 @@ func (c *conversationServer) CreateGroupChatConversations(ctx context.Context, r
return nil, err return nil, err
} }
conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupID) conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupID)
if _, err := pbmsg.SetUserConversationMaxSeqCaller.Invoke(ctx, &pbmsg.SetUserConversationMaxSeqReq{ConversationID: conversationID, OwnerUserID: req.UserIDs, MaxSeq: 0}); err != nil { if err := c.msgClient.SetUserConversationMaxSeq(ctx, conversationID, req.UserIDs, 0); err != nil {
return nil, err return nil, err
} }
return &pbconversation.CreateGroupChatConversationsResp{}, nil return &pbconversation.CreateGroupChatConversationsResp{}, nil
} }
func (c *conversationServer) SetConversationMaxSeq(ctx context.Context, req *pbconversation.SetConversationMaxSeqReq) (*pbconversation.SetConversationMaxSeqResp, error) { func (c *conversationServer) SetConversationMaxSeq(ctx context.Context, req *pbconversation.SetConversationMaxSeqReq) (*pbconversation.SetConversationMaxSeqResp, error) {
if _, err := pbmsg.SetUserConversationMaxSeqCaller.Invoke(ctx, &pbmsg.SetUserConversationMaxSeqReq{ConversationID: req.ConversationID, OwnerUserID: req.OwnerUserID, MaxSeq: req.MaxSeq}); err != nil { if err := c.msgClient.SetUserConversationMaxSeq(ctx, req.ConversationID, req.OwnerUserID, req.MaxSeq); err != nil {
return nil, err return nil, err
} }
if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.OwnerUserID, req.ConversationID, if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.OwnerUserID, req.ConversationID,
@ -465,7 +450,7 @@ func (c *conversationServer) SetConversationMaxSeq(ctx context.Context, req *pbc
} }
func (c *conversationServer) SetConversationMinSeq(ctx context.Context, req *pbconversation.SetConversationMinSeqReq) (*pbconversation.SetConversationMinSeqResp, error) { func (c *conversationServer) SetConversationMinSeq(ctx context.Context, req *pbconversation.SetConversationMinSeqReq) (*pbconversation.SetConversationMinSeqResp, error) {
if _, err := pbmsg.SetUserConversationMinSeqCaller.Invoke(ctx, &pbmsg.SetUserConversationMinSeqReq{ConversationID: req.ConversationID, OwnerUserID: req.OwnerUserID, MinSeq: req.MinSeq}); err != nil { if err := c.msgClient.SetUserConversationMin(ctx, req.ConversationID, req.OwnerUserID, req.MinSeq); err != nil {
return nil, err return nil, err
} }
if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.OwnerUserID, req.ConversationID, if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.OwnerUserID, req.ConversationID,
@ -584,9 +569,7 @@ func (c *conversationServer) getConversationInfo(
} }
} }
if len(groupIDs) != 0 { if len(groupIDs) != 0 {
groupInfos, err := rpccall.ExtractField(ctx, pbgroup.GetGroupsInfoCaller.Invoke, groupInfos, err := c.groupClient.GetGroupsInfo(ctx, groupIDs)
&pbgroup.GetGroupsInfoReq{GroupIDs: groupIDs},
(*pbgroup.GetGroupsInfoResp).GetGroupInfos)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -774,23 +757,22 @@ func (c *conversationServer) ClearUserConversationMsg(ctx context.Context, req *
if conversation.IsMsgDestruct == false || conversation.MsgDestructTime == 0 { if conversation.IsMsgDestruct == false || conversation.MsgDestructTime == 0 {
continue continue
} }
rcpReq := &pbmsg.GetLastMessageSeqByTimeReq{ConversationID: conversation.ConversationID, Time: req.Timestamp - conversation.MsgDestructTime} seq, err := c.msgClient.GetLastMessageSeqByTime(ctx, conversation.ConversationID, req.Timestamp-conversation.MsgDestructTime)
resp, err := pbmsg.GetLastMessageSeqByTime.Invoke(ctx, rcpReq)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if resp.Seq <= 0 { if seq <= 0 {
log.ZDebug(ctx, "ClearUserConversationMsg GetLastMessageSeqByTime seq <= 0", "index", i, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID, "msgDestructTime", conversation.MsgDestructTime, "seq", resp.Seq) log.ZDebug(ctx, "ClearUserConversationMsg GetLastMessageSeqByTime seq <= 0", "index", i, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID, "msgDestructTime", conversation.MsgDestructTime, "seq", seq)
if err := c.setConversationMinSeqAndLatestMsgDestructTime(ctx, conversation.ConversationID, conversation.OwnerUserID, -1, latestMsgDestructTime); err != nil { if err := c.setConversationMinSeqAndLatestMsgDestructTime(ctx, conversation.ConversationID, conversation.OwnerUserID, -1, latestMsgDestructTime); err != nil {
return nil, err return nil, err
} }
continue continue
} }
resp.Seq++ seq++
if err := c.setConversationMinSeqAndLatestMsgDestructTime(ctx, conversation.ConversationID, conversation.OwnerUserID, resp.Seq, latestMsgDestructTime); err != nil { if err := c.setConversationMinSeqAndLatestMsgDestructTime(ctx, conversation.ConversationID, conversation.OwnerUserID, seq, latestMsgDestructTime); err != nil {
return nil, err return nil, err
} }
log.ZDebug(ctx, "ClearUserConversationMsg set min seq", "index", i, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID, "seq", resp.Seq, "msgDestructTime", conversation.MsgDestructTime) log.ZDebug(ctx, "ClearUserConversationMsg set min seq", "index", i, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID, "seq", seq, "msgDestructTime", conversation.MsgDestructTime)
} }
return &pbconversation.ClearUserConversationMsgResp{Count: int32(len(conversations))}, nil return &pbconversation.ClearUserConversationMsgResp{Count: int32(len(conversations))}, nil
} }
@ -800,8 +782,7 @@ func (c *conversationServer) setConversationMinSeqAndLatestMsgDestructTime(ctx c
"latest_msg_destruct_time": latestMsgDestructTime, "latest_msg_destruct_time": latestMsgDestructTime,
} }
if minSeq >= 0 { if minSeq >= 0 {
req := &pbmsg.SetUserConversationMinSeqReq{ConversationID: conversationID, OwnerUserID: []string{ownerUserID}, MinSeq: minSeq} if err := c.msgClient.SetUserConversationMin(ctx, conversationID, []string{ownerUserID}, minSeq); err != nil {
if _, err := pbmsg.SetUserConversationMinSeqCaller.Invoke(ctx, req); err != nil {
return err return err
} }
update["min_seq"] = minSeq update["min_seq"] = minSeq

@ -17,6 +17,7 @@ package group
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"math/big" "math/big"
"math/rand" "math/rand"
"strconv" "strconv"
@ -41,8 +42,6 @@ import (
"github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/constant"
pbconv "github.com/openimsdk/protocol/conversation" pbconv "github.com/openimsdk/protocol/conversation"
pbgroup "github.com/openimsdk/protocol/group" pbgroup "github.com/openimsdk/protocol/group"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/rpccall"
"github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/protocol/wrapperspb" "github.com/openimsdk/protocol/wrapperspb"
"github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/mongoutil"
@ -63,6 +62,9 @@ type groupServer struct {
notification *GroupNotificationSender notification *GroupNotificationSender
config *Config config *Config
webhookClient *webhook.Client webhookClient *webhook.Client
// todo
msgClient *rpcli.MsgClient
conversationClient *rpcli.ConversationClient
} }
type Config struct { type Config struct {
@ -950,18 +952,11 @@ func (g *groupServer) QuitGroup(ctx context.Context, req *pbgroup.QuitGroupReq)
func (g *groupServer) deleteMemberAndSetConversationSeq(ctx context.Context, groupID string, userIDs []string) error { func (g *groupServer) deleteMemberAndSetConversationSeq(ctx context.Context, groupID string, userIDs []string) error {
conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID) conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID)
maxSeq, err := rpccall.ExtractField(ctx, msg.GetConversationMaxSeqCaller.Invoke, maxSeq, err := g.msgClient.GetConversationMaxSeq(ctx, conversationID)
&msg.GetConversationMaxSeqReq{ConversationID: conversationID},
(*msg.GetConversationMaxSeqResp).GetMaxSeq)
if err != nil { if err != nil {
return err return err
} }
return g.conversationClient.SetConversationMaxSeq(ctx, conversationID, userIDs, maxSeq)
return pbconv.SetConversationMaxSeqCaller.Execute(ctx, &pbconv.SetConversationMaxSeqReq{
ConversationID: conversationID,
OwnerUserID: userIDs,
MaxSeq: maxSeq,
})
} }
func (g *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInfoReq) (*pbgroup.SetGroupInfoResp, error) { func (g *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInfoReq) (*pbgroup.SetGroupInfoResp, error) {
@ -1037,11 +1032,7 @@ func (g *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInf
return return
} }
conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.GroupNotification} conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.GroupNotification}
if err := g.conversationClient.SetConversations(ctx, resp.UserIDs, conversation); err != nil {
if err := pbconv.SetConversationsCaller.Execute(ctx, &pbconv.SetConversationsReq{
UserIDs: resp.UserIDs,
Conversation: conversation,
}); err != nil {
log.ZWarn(ctx, "SetConversations", err, "UserIDs", resp.UserIDs, "conversation", conversation) log.ZWarn(ctx, "SetConversations", err, "UserIDs", resp.UserIDs, "conversation", conversation)
} }
}() }()
@ -1154,11 +1145,7 @@ func (g *groupServer) SetGroupInfoEx(ctx context.Context, req *pbgroup.SetGroupI
} }
conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.GroupNotification} conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.GroupNotification}
if err := g.conversationClient.SetConversations(ctx, resp.UserIDs, conversation); err != nil {
if err := pbconv.SetConversationsCaller.Execute(ctx, &pbconv.SetConversationsReq{
UserIDs: resp.UserIDs,
Conversation: conversation,
}); err != nil {
log.ZWarn(ctx, "SetConversations", err, "UserIDs", resp.UserIDs, "conversation", conversation) log.ZWarn(ctx, "SetConversations", err, "UserIDs", resp.UserIDs, "conversation", conversation)
} }
}() }()

@ -518,9 +518,13 @@ func (g *GroupNotificationSender) GroupApplicationAgreeMemberEnterNotification(c
if !g.config.RpcConfig.EnableHistoryForNewMembers { if !g.config.RpcConfig.EnableHistoryForNewMembers {
conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID) conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID)
maxSeq, err := rpccall.ExtractField(ctx, msg.GetConversationMaxSeqCaller.Invoke, maxSeq, err := rpccall.ExtractField(ctx, msg.GetConversationMaxSeqCaller.Invoke,
&msg.GetConversationMaxSeqReq{ConversationID: conversationID}, &msg.GetConversationMaxSeqReq{ConversationID: conversationID},
(*msg.GetConversationMaxSeqResp).GetMaxSeq) (*msg.GetConversationMaxSeqResp).GetMaxSeq)
maxSeq,err := g.
if err != nil { if err != nil {
return err return err
} }

@ -2,23 +2,13 @@ package msg
import ( import (
"context" "context"
"strings"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
pbconv "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/wrapperspb"
"github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext" "strings"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/idutil"
"github.com/openimsdk/tools/utils/stringutil"
"golang.org/x/sync/errgroup"
) )
// hard delete in Database. // DestructMsgs hard delete in Database.
func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (*msg.DestructMsgsResp, error) { func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (*msg.DestructMsgsResp, error) {
if err := authverify.CheckAdmin(ctx, m.config.Share.IMAdminUserID); err != nil { if err := authverify.CheckAdmin(ctx, m.config.Share.IMAdminUserID); err != nil {
return nil, err return nil, err
@ -61,70 +51,6 @@ func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq)
return &msg.DestructMsgsResp{Count: int32(len(docs))}, nil return &msg.DestructMsgsResp{Count: int32(len(docs))}, nil
} }
// soft delete for user self
func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (*msg.ClearMsgResp, error) {
temp := convert.ConversationsPb2DB(req.Conversations)
batchNum := 100
errg, _ := errgroup.WithContext(ctx)
errg.SetLimit(100)
for i := 0; i < len(temp); i += batchNum {
batch := temp[i:min(i+batchNum, len(temp))]
errg.Go(func() error {
for _, conversation := range batch {
handleCtx := mcontext.NewCtx(stringutil.GetSelfFuncName() + "-" + idutil.OperationIDGenerator() + "-" + conversation.ConversationID + "-" + conversation.OwnerUserID)
log.ZDebug(handleCtx, "User MsgsDestruct",
"conversationID", conversation.ConversationID,
"ownerUserID", conversation.OwnerUserID,
"msgDestructTime", conversation.MsgDestructTime,
"lastMsgDestructTime", conversation.LatestMsgDestructTime)
seqs, err := m.MsgDatabase.ClearUserMsgs(handleCtx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime)
if err != nil {
log.ZError(handleCtx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
continue
}
if len(seqs) > 0 {
minseq := datautil.Max(seqs...)
// update
if err := pbconv.UpdateConversationCaller.Execute(ctx, &pbconv.UpdateConversationReq{
ConversationID: conversation.ConversationID,
UserIDs: []string{conversation.OwnerUserID},
MinSeq: wrapperspb.Int64(minseq),
LatestMsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli()),
}); err != nil {
log.ZError(handleCtx, "updateUsersConversationField failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
continue
}
if err := pbconv.SetConversationMinSeqCaller.Execute(ctx, &pbconv.SetConversationMinSeqReq{
ConversationID: conversation.ConversationID,
OwnerUserID: []string{conversation.OwnerUserID},
MinSeq: minseq,
}); err != nil {
return err
}
// if you need Notify SDK client userseq is update.
// m.msgNotificationSender.UserDeleteMsgsNotification(handleCtx, conversation.OwnerUserID, conversation.ConversationID, seqs)
}
}
return nil
})
}
if err := errg.Wait(); err != nil {
return nil, err
}
return nil, nil
}
func (m *msgServer) GetLastMessageSeqByTime(ctx context.Context, req *msg.GetLastMessageSeqByTimeReq) (*msg.GetLastMessageSeqByTimeResp, error) { func (m *msgServer) GetLastMessageSeqByTime(ctx context.Context, req *msg.GetLastMessageSeqByTimeReq) (*msg.GetLastMessageSeqByTimeResp, error) {
seq, err := m.MsgDatabase.GetLastMessageSeqByTime(ctx, req.ConversationID, req.Time) seq, err := m.MsgDatabase.GetLastMessageSeqByTime(ctx, req.ConversationID, req.Time)
if err != nil { if err != nil {

@ -21,7 +21,6 @@ import (
"github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/rpccall"
"github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/timeutil" "github.com/openimsdk/tools/utils/timeutil"
@ -75,22 +74,13 @@ func (m *msgServer) DeleteMsgs(ctx context.Context, req *msg.DeleteMsgsReq) (*ms
if err := m.MsgDatabase.DeleteMsgsPhysicalBySeqs(ctx, req.ConversationID, req.Seqs); err != nil { if err := m.MsgDatabase.DeleteMsgsPhysicalBySeqs(ctx, req.ConversationID, req.Seqs); err != nil {
return nil, err return nil, err
} }
conv, err := m.conversationClient.GetConversationsByConversationID(ctx, req.ConversationID)
conversations, err := rpccall.ExtractField(ctx, conversation.GetConversationsByConversationIDCaller.Invoke, &conversation.GetConversationsByConversationIDReq{
ConversationIDs: []string{req.ConversationID},
}, (*conversation.GetConversationsByConversationIDResp).GetConversations)
if err != nil { if err != nil {
return nil, err return nil, err
} }
tips := &sdkws.DeleteMsgsTips{UserID: req.UserID, ConversationID: req.ConversationID, Seqs: req.Seqs} tips := &sdkws.DeleteMsgsTips{UserID: req.UserID, ConversationID: req.ConversationID, Seqs: req.Seqs}
m.notificationSender.NotificationWithSessionType( m.notificationSender.NotificationWithSessionType(ctx, req.UserID, m.conversationAndGetRecvID(conv, req.UserID),
ctx, constant.DeleteMsgsNotification, conv.ConversationType, tips)
req.UserID,
m.conversationAndGetRecvID(conversations[0], req.UserID),
constant.DeleteMsgsNotification,
conversations[0].ConversationType,
tips,
)
} else { } else {
if err := m.MsgDatabase.DeleteUserMsgsBySeqs(ctx, req.UserID, req.ConversationID, req.Seqs); err != nil { if err := m.MsgDatabase.DeleteUserMsgsBySeqs(ctx, req.UserID, req.ConversationID, req.Seqs); err != nil {
return nil, err return nil, err
@ -125,9 +115,7 @@ func (m *msgServer) DeleteMsgPhysical(ctx context.Context, req *msg.DeleteMsgPhy
} }
func (m *msgServer) clearConversation(ctx context.Context, conversationIDs []string, userID string, deleteSyncOpt *msg.DeleteSyncOpt) error { func (m *msgServer) clearConversation(ctx context.Context, conversationIDs []string, userID string, deleteSyncOpt *msg.DeleteSyncOpt) error {
conversations, err := rpccall.ExtractField(ctx, conversation.GetConversationsByConversationIDCaller.Invoke, &conversation.GetConversationsByConversationIDReq{ conversations, err := m.conversationClient.GetConversationsByConversationIDs(ctx, conversationIDs)
ConversationIDs: conversationIDs,
}, (*conversation.GetConversationsByConversationIDResp).GetConversations)
if err != nil { if err != nil {
return err return err
} }
@ -150,11 +138,7 @@ func (m *msgServer) clearConversation(ctx context.Context, conversationIDs []str
} }
ownerUserIDs := []string{userID} ownerUserIDs := []string{userID}
for conversationID, seq := range setSeqs { for conversationID, seq := range setSeqs {
if err := conversation.SetConversationMinSeqCaller.Execute(ctx, &conversation.SetConversationMinSeqReq{ if err := m.conversationClient.SetConversationMinSeq(ctx, conversationID, ownerUserIDs, seq); err != nil {
ConversationID: conversationID,
OwnerUserID: ownerUserIDs,
MinSeq: seq,
}); err != nil {
return err return err
} }
} }

@ -118,25 +118,14 @@ func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgDa
conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtAll} conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtAll}
} else { // @Everyone and @other people } else { // @Everyone and @other people
conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtAllAtMe} conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtAllAtMe}
if err := m.conversationClient.SetConversations(ctx, atUserID, conversation); err != nil {
err = pbconv.SetConversationsCaller.Execute(ctx, &pbconv.SetConversationsReq{
UserIDs: atUserID,
Conversation: conversation,
})
if err != nil {
log.ZWarn(ctx, "SetConversations", err, "userID", atUserID, "conversation", conversation) log.ZWarn(ctx, "SetConversations", err, "userID", atUserID, "conversation", conversation)
} }
memberUserIDList = datautil.Single(atUserID, memberUserIDList) memberUserIDList = datautil.Single(atUserID, memberUserIDList)
} }
conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtAll} conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtAll}
if err := m.conversationClient.SetConversations(ctx, memberUserIDList, conversation); err != nil {
err = pbconv.SetConversationsCaller.Execute(ctx, &pbconv.SetConversationsReq{
UserIDs: memberUserIDList,
Conversation: conversation,
})
if err != nil {
log.ZWarn(ctx, "SetConversations", err, "userID", memberUserIDList, "conversation", conversation) log.ZWarn(ctx, "SetConversations", err, "userID", memberUserIDList, "conversation", conversation)
} }
@ -144,11 +133,7 @@ func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgDa
} }
conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtMe} conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtMe}
err := pbconv.SetConversationsCaller.Execute(ctx, &pbconv.SetConversationsReq{ if err := m.conversationClient.SetConversations(ctx, msg.AtUserIDList, conversation); err != nil {
UserIDs: msg.AtUserIDList,
Conversation: conversation,
})
if err != nil {
log.ZWarn(ctx, "SetConversations", err, msg.AtUserIDList, conversation) log.ZWarn(ctx, "SetConversations", err, msg.AtUserIDList, conversation)
} }
} }

@ -16,6 +16,7 @@ package msg
import ( import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
@ -36,39 +37,40 @@ import (
) )
type MessageInterceptorFunc func(ctx context.Context, globalConfig *Config, req *msg.SendMsgReq) (*sdkws.MsgData, error) type MessageInterceptorFunc func(ctx context.Context, globalConfig *Config, req *msg.SendMsgReq) (*sdkws.MsgData, error)
type (
// MessageInterceptorChain defines a chain of message interceptor functions.
MessageInterceptorChain []MessageInterceptorFunc
// MsgServer encapsulates dependencies required for message handling. // MessageInterceptorChain defines a chain of message interceptor functions.
msgServer struct { type MessageInterceptorChain []MessageInterceptorFunc
RegisterCenter discovery.SvcDiscoveryRegistry // Service discovery registry for service registration.
MsgDatabase controller.CommonMsgDatabase // Interface for message database operations.
StreamMsgDatabase controller.StreamMsgDatabase
UserLocalCache *rpccache.UserLocalCache // Local cache for user data.
FriendLocalCache *rpccache.FriendLocalCache // Local cache for friend data.
GroupLocalCache *rpccache.GroupLocalCache // Local cache for group data.
ConversationLocalCache *rpccache.ConversationLocalCache // Local cache for conversation data.
Handlers MessageInterceptorChain // Chain of handlers for processing messages.
notificationSender *rpcclient.NotificationSender // RPC client for sending notifications.
msgNotificationSender *MsgNotificationSender // RPC client for sending msg notifications.
config *Config // Global configuration settings.
webhookClient *webhook.Client
msg.UnimplementedMsgServer
}
Config struct { type Config struct {
RpcConfig config.Msg RpcConfig config.Msg
RedisConfig config.Redis RedisConfig config.Redis
MongodbConfig config.Mongo MongodbConfig config.Mongo
KafkaConfig config.Kafka KafkaConfig config.Kafka
NotificationConfig config.Notification NotificationConfig config.Notification
Share config.Share Share config.Share
WebhooksConfig config.Webhooks WebhooksConfig config.Webhooks
LocalCacheConfig config.LocalCache LocalCacheConfig config.LocalCache
Discovery config.Discovery Discovery config.Discovery
} }
)
// MsgServer encapsulates dependencies required for message handling.
type msgServer struct {
RegisterCenter discovery.SvcDiscoveryRegistry // Service discovery registry for service registration.
MsgDatabase controller.CommonMsgDatabase // Interface for message database operations.
StreamMsgDatabase controller.StreamMsgDatabase
UserLocalCache *rpccache.UserLocalCache // Local cache for user data.
FriendLocalCache *rpccache.FriendLocalCache // Local cache for friend data.
GroupLocalCache *rpccache.GroupLocalCache // Local cache for group data.
ConversationLocalCache *rpccache.ConversationLocalCache // Local cache for conversation data.
Handlers MessageInterceptorChain // Chain of handlers for processing messages.
notificationSender *rpcclient.NotificationSender // RPC client for sending notifications.
msgNotificationSender *MsgNotificationSender // RPC client for sending msg notifications.
config *Config // Global configuration settings.
webhookClient *webhook.Client
msg.UnimplementedMsgServer
// todo
conversationClient rpcli.ConversationClient
}
func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorFunc) { func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorFunc) {
m.Handlers = append(m.Handlers, interceptorFunc...) m.Handlers = append(m.Handlers, interceptorFunc...)

@ -8,9 +8,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/constant"
pbconv "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/rpccall"
"github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
) )
@ -74,10 +72,7 @@ func (m *msgServer) AppendStreamMsg(ctx context.Context, req *msg.AppendStreamMs
if err := m.StreamMsgDatabase.AppendStreamMsg(ctx, req.ClientMsgID, int(req.StartIndex), req.Packets, req.End, deadlineTime); err != nil { if err := m.StreamMsgDatabase.AppendStreamMsg(ctx, req.ClientMsgID, int(req.StartIndex), req.Packets, req.End, deadlineTime); err != nil {
return nil, err return nil, err
} }
conversation, err := rpccall.ExtractField(ctx, pbconv.GetConversationCaller.Invoke, &pbconv.GetConversationReq{ conversation, err := m.conversationClient.GetConversation(ctx, res.ConversationID, res.UserID)
ConversationID: res.ConversationID,
OwnerUserID: res.UserID,
}, (*pbconv.GetConversationResp).GetConversation)
if err != nil { if err != nil {
return nil, err return nil, err
} }

@ -17,6 +17,7 @@ package user
import ( import (
"context" "context"
"errors" "errors"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"math/rand" "math/rand"
"strings" "strings"
"sync" "sync"
@ -59,6 +60,9 @@ type userServer struct {
RegisterCenter registry.SvcDiscoveryRegistry RegisterCenter registry.SvcDiscoveryRegistry
config *Config config *Config
webhookClient *webhook.Client webhookClient *webhook.Client
// todo
groupClient *rpcli.GroupClient
relationClient *rpcli.RelationClient
} }
type Config struct { type Config struct {
@ -633,7 +637,7 @@ func (s *userServer) NotificationUserInfoUpdate(ctx context.Context, userID stri
wg.Add(len(es)) wg.Add(len(es))
go func() { go func() {
defer wg.Done() defer wg.Done()
_, es[0] = group.NotificationUserInfoUpdateCaller.Invoke(ctx, &group.NotificationUserInfoUpdateReq{ _, es[0] = s.groupClient.NotificationUserInfoUpdate(ctx, &group.NotificationUserInfoUpdateReq{
UserID: userID, UserID: userID,
OldUserInfo: oldUserInfo, OldUserInfo: oldUserInfo,
NewUserInfo: newUserInfo, NewUserInfo: newUserInfo,
@ -642,7 +646,7 @@ func (s *userServer) NotificationUserInfoUpdate(ctx context.Context, userID stri
go func() { go func() {
defer wg.Done() defer wg.Done()
_, es[1] = friendpb.NotificationUserInfoUpdateCaller.Invoke(ctx, &friendpb.NotificationUserInfoUpdateReq{ _, es[1] = s.relationClient.NotificationUserInfoUpdate(ctx, &friendpb.NotificationUserInfoUpdateReq{
UserID: userID, UserID: userID,
OldUserInfo: oldUserInfo, OldUserInfo: oldUserInfo,
NewUserInfo: newUserInfo, NewUserInfo: newUserInfo,

@ -17,6 +17,7 @@ package rpcclient
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"time" "time"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
@ -132,6 +133,8 @@ type NotificationSender struct {
sendMsg func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) sendMsg func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error)
getUserInfo func(ctx context.Context, userID string) (*sdkws.UserInfo, error) getUserInfo func(ctx context.Context, userID string) (*sdkws.UserInfo, error)
queue *memamq.MemoryQueue queue *memamq.MemoryQueue
// todo
msgClient *rpcli.MsgClient
} }
func WithQueue(queue *memamq.MemoryQueue) NotificationSenderOptions { func WithQueue(queue *memamq.MemoryQueue) NotificationSenderOptions {
@ -151,7 +154,7 @@ func WithLocalSendMsg(sendMsg func(ctx context.Context, req *msg.SendMsgReq) (*m
func WithRpcClient() NotificationSenderOptions { func WithRpcClient() NotificationSenderOptions {
return func(s *NotificationSender) { return func(s *NotificationSender) {
s.sendMsg = func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) { s.sendMsg = func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) {
return msg.SendMsgCaller.Invoke(ctx, req) return s.msgClient.SendMsg(ctx, req)
} }
} }
} }

@ -0,0 +1,11 @@
package rpcli
import "github.com/openimsdk/protocol/auth"
func NewAuthClient(cli auth.AuthClient) *AuthClient {
return &AuthClient{cli}
}
type AuthClient struct {
auth.AuthClient
}

@ -0,0 +1,48 @@
package rpcli
import (
"context"
"github.com/openimsdk/protocol/conversation"
)
func NewConversationClient(cli conversation.ConversationClient) *ConversationClient {
return &ConversationClient{cli}
}
type ConversationClient struct {
conversation.ConversationClient
}
func (x *ConversationClient) SetConversationMaxSeq(ctx context.Context, conversationID string, ownerUserIDs []string, maxSeq int64) error {
req := &conversation.SetConversationMaxSeqReq{ConversationID: conversationID, OwnerUserID: ownerUserIDs, MaxSeq: maxSeq}
return ignoreResp(x.ConversationClient.SetConversationMaxSeq(ctx, req))
}
func (x *ConversationClient) SetConversations(ctx context.Context, userIDs []string, info *conversation.ConversationReq) error {
req := &conversation.SetConversationsReq{UserIDs: userIDs, Conversation: info}
return ignoreResp(x.ConversationClient.SetConversations(ctx, req))
}
func (x *ConversationClient) GetConversationsByConversationIDs(ctx context.Context, conversationIDs []string) ([]*conversation.Conversation, error) {
req := &conversation.GetConversationsByConversationIDReq{ConversationIDs: conversationIDs}
return extractField(ctx, x.ConversationClient.GetConversationsByConversationID, req, (*conversation.GetConversationsByConversationIDResp).GetConversations)
}
func (x *ConversationClient) GetConversationsByConversationID(ctx context.Context, conversationID string) (*conversation.Conversation, error) {
return firstValue(x.GetConversationsByConversationIDs(ctx, []string{conversationID}))
}
func (x *ConversationClient) SetConversationMinSeq(ctx context.Context, conversationID string, ownerUserIDs []string, minSeq int64) error {
req := &conversation.SetConversationMinSeqReq{ConversationID: conversationID, OwnerUserID: ownerUserIDs, MinSeq: minSeq}
return ignoreResp(x.ConversationClient.SetConversationMinSeq(ctx, req))
}
func (x *ConversationClient) GetConversation(ctx context.Context, conversationID string, ownerUserID string) (*conversation.Conversation, error) {
req := &conversation.GetConversationReq{ConversationID: conversationID, OwnerUserID: ownerUserID}
return extractField(ctx, x.ConversationClient.GetConversation, req, (*conversation.GetConversationResp).GetConversation)
}
func (x *ConversationClient) GetConversations(ctx context.Context, conversationIDs []string, ownerUserID string) ([]*conversation.Conversation, error) {
req := &conversation.GetConversationsReq{ConversationIDs: conversationIDs, OwnerUserID: ownerUserID}
return extractField(ctx, x.ConversationClient.GetConversations, req, (*conversation.GetConversationsResp).GetConversations)
}

@ -0,0 +1,28 @@
package rpcli
import (
"context"
"github.com/openimsdk/protocol/group"
"github.com/openimsdk/protocol/sdkws"
)
func NewGroupClient(cli group.GroupClient) *GroupClient {
return &GroupClient{cli}
}
type GroupClient struct {
group.GroupClient
}
func (x *GroupClient) cli() group.GroupClient {
return x.GroupClient
}
func (x *GroupClient) GetGroupsInfo(ctx context.Context, groupIDs []string) ([]*sdkws.GroupInfo, error) {
req := &group.GetGroupsInfoReq{GroupIDs: groupIDs}
return extractField(ctx, x.cli().GetGroupsInfo, req, (*group.GetGroupsInfoResp).GetGroupInfos)
}
func (x *GroupClient) GetGroupInfo(ctx context.Context, groupID string) (*sdkws.GroupInfo, error) {
return firstValue(x.GetGroupsInfo(ctx, []string{groupID}))
}

@ -0,0 +1,54 @@
package rpcli
import (
"context"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws"
)
func NewMsgClient(cli msg.MsgClient) *MsgClient {
return &MsgClient{cli}
}
type MsgClient struct {
msg.MsgClient
}
func (x *MsgClient) cli() msg.MsgClient {
return x.MsgClient
}
func (x *MsgClient) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) {
req := &msg.GetMaxSeqsReq{ConversationIDs: conversationIDs}
return extractField(ctx, x.cli().GetMaxSeqs, req, (*msg.SeqsInfoResp).GetMaxSeqs)
}
func (x *MsgClient) GetMsgByConversationIDs(ctx context.Context, conversationIDs []string, maxSeqs map[string]int64) (map[string]*sdkws.MsgData, error) {
req := &msg.GetMsgByConversationIDsReq{ConversationIDs: conversationIDs, MaxSeqs: maxSeqs}
return extractField(ctx, x.cli().GetMsgByConversationIDs, req, (*msg.GetMsgByConversationIDsResp).GetMsgDatas)
}
func (x *MsgClient) GetHasReadSeqs(ctx context.Context, conversationIDs []string, userID string) (map[string]int64, error) {
req := &msg.GetHasReadSeqsReq{ConversationIDs: conversationIDs, UserID: userID}
return extractField(ctx, x.cli().GetHasReadSeqs, req, (*msg.SeqsInfoResp).GetMaxSeqs)
}
func (x *MsgClient) SetUserConversationMaxSeq(ctx context.Context, conversationID string, ownerUserIDs []string, maxSeq int64) error {
req := &msg.SetUserConversationMaxSeqReq{ConversationID: conversationID, OwnerUserID: ownerUserIDs, MaxSeq: maxSeq}
return ignoreResp(x.cli().SetUserConversationMaxSeq(ctx, req))
}
func (x *MsgClient) SetUserConversationMin(ctx context.Context, conversationID string, ownerUserIDs []string, minSeq int64) error {
req := &msg.SetUserConversationsMinSeqReq{ConversationID: conversationID, UserIDs: ownerUserIDs, Seq: minSeq}
return ignoreResp(x.cli().SetUserConversationsMinSeq(ctx, req))
}
func (x *MsgClient) GetLastMessageSeqByTime(ctx context.Context, conversationID string, lastTime int64) (int64, error) {
req := &msg.GetLastMessageSeqByTimeReq{ConversationID: conversationID, Time: lastTime}
return extractField(ctx, x.cli().GetLastMessageSeqByTime, req, (*msg.GetLastMessageSeqByTimeResp).GetSeq)
}
func (x *MsgClient) GetConversationMaxSeq(ctx context.Context, conversationID string) (int64, error) {
req := &msg.GetConversationMaxSeqReq{ConversationID: conversationID}
return extractField(ctx, x.cli().GetConversationMaxSeq, req, (*msg.GetConversationMaxSeqResp).GetMaxSeq)
}

@ -0,0 +1,13 @@
package rpcli
import (
"github.com/openimsdk/protocol/msggateway"
)
func NewMsgGatewayClient(cli msggateway.MsgGatewayClient) *MsgGatewayClient {
return &MsgGatewayClient{cli}
}
type MsgGatewayClient struct {
msggateway.MsgGatewayClient
}

@ -0,0 +1,13 @@
package rpcli
import (
"github.com/openimsdk/protocol/push"
)
func NewPushMsgServiceClient(cli push.PushMsgServiceClient) *PushMsgServiceClient {
return &PushMsgServiceClient{cli}
}
type PushMsgServiceClient struct {
push.PushMsgServiceClient
}

@ -0,0 +1,11 @@
package rpcli
import "github.com/openimsdk/protocol/relation"
func NewRelationClient(cli relation.FriendClient) *RelationClient {
return &RelationClient{cli}
}
type RelationClient struct {
relation.FriendClient
}

@ -0,0 +1,13 @@
package rpcli
import (
"github.com/openimsdk/protocol/rtc"
)
func NewRtcServiceClient(cli rtc.RtcServiceClient) *RtcServiceClient {
return &RtcServiceClient{cli}
}
type RtcServiceClient struct {
rtc.RtcServiceClient
}

@ -0,0 +1,11 @@
package rpcli
import "github.com/openimsdk/protocol/third"
func NewThirdClient(cli third.ThirdClient) *ThirdClient {
return &ThirdClient{cli}
}
type ThirdClient struct {
third.ThirdClient
}

@ -0,0 +1,32 @@
package rpcli
import (
"context"
"github.com/openimsdk/tools/errs"
"google.golang.org/grpc"
)
func extractField[A, B, C any](ctx context.Context, fn func(ctx context.Context, req *A, opts ...grpc.CallOption) (*B, error), req *A, get func(*B) C) (C, error) {
resp, err := fn(ctx, req)
if err != nil {
var c C
return c, err
}
return get(resp), nil
}
func firstValue[A any](val []A, err error) (A, error) {
if err != nil {
var a A
return a, err
}
if len(val) == 0 {
var a A
return a, errs.ErrRecordNotFound.WrapMsg("record not found")
}
return val[0], nil
}
func ignoreResp(_ any, err error) error {
return err
}

@ -0,0 +1,11 @@
package rpcli
import "github.com/openimsdk/protocol/user"
func NewUserClient(cli user.UserClient) *UserClient {
return &UserClient{cli}
}
type UserClient struct {
user.UserClient
}
Loading…
Cancel
Save