From 95d75ad7edf761a19823395ae6ae3c61e5132b8b Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Fri, 30 Jun 2023 14:38:19 +0800 Subject: [PATCH 1/4] middleware checker --- pkg/a2r/api2rpc.go | 3 ++- pkg/checker/check.go | 5 +++++ pkg/common/mw/rpc_server_interceptor.go | 6 ++++++ 3 files changed, 13 insertions(+), 1 deletion(-) create mode 100644 pkg/checker/check.go diff --git a/pkg/a2r/api2rpc.go b/pkg/a2r/api2rpc.go index bb1cb1fac..4cfa3ee4c 100644 --- a/pkg/a2r/api2rpc.go +++ b/pkg/a2r/api2rpc.go @@ -2,6 +2,7 @@ package a2r import ( "context" + "github.com/OpenIMSDK/Open-IM-Server/pkg/checker" "github.com/OpenIMSDK/Open-IM-Server/pkg/apiresp" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" @@ -21,7 +22,7 @@ func Call[A, B, C any]( apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) // 参数错误 return } - if check, ok := any(&req).(interface{ Check() error }); ok { + if check, ok := any(&req).(checker.Checker); ok { if err := check.Check(); err != nil { log.ZWarn(c, "custom check error", err, "req", req) apiresp.GinError(c, errs.ErrArgs.Wrap(err.Error())) // 参数校验失败 diff --git a/pkg/checker/check.go b/pkg/checker/check.go new file mode 100644 index 000000000..f59986ec7 --- /dev/null +++ b/pkg/checker/check.go @@ -0,0 +1,5 @@ +package checker + +type Checker interface { + Check() error +} diff --git a/pkg/common/mw/rpc_server_interceptor.go b/pkg/common/mw/rpc_server_interceptor.go index 6d2df5459..933ab88f1 100644 --- a/pkg/common/mw/rpc_server_interceptor.go +++ b/pkg/common/mw/rpc_server_interceptor.go @@ -3,6 +3,7 @@ package mw import ( "context" "fmt" + "github.com/OpenIMSDK/Open-IM-Server/pkg/checker" "math" "runtime" "strings" @@ -91,6 +92,11 @@ func RpcServerInterceptor(ctx context.Context, req interface{}, info *grpc.Unary return nil, status.New(codes.InvalidArgument, err.Error()).Err() } } + if err := req.(checker.Checker); err != nil { + if err := err.Check(); err != nil { + return nil, status.New(codes.InvalidArgument, err.Error()).Err() + } + } log.ZInfo(ctx, "rpc server req", "funcName", funcName, "req", rpcString(req)) resp, err = handler(ctx, req) if err == nil { From 1428be637fc22570ade73873d70c9ce61ec78195 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Fri, 30 Jun 2023 14:40:02 +0800 Subject: [PATCH 2/4] init rpc client once --- internal/rpc/msg/server.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 40e494fb1..c70d28e35 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -77,8 +77,8 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e MsgDatabase: msgDatabase, ExtendMsgDatabase: extendMsgDatabase, RegisterCenter: client, - GroupLocalCache: localcache.NewGroupLocalCache(client), - ConversationLocalCache: localcache.NewConversationLocalCache(client), + GroupLocalCache: localcache.NewGroupLocalCache(&groupRpcClient), + ConversationLocalCache: localcache.NewConversationLocalCache(&conversationClient), friend: &friendRpcClient, MessageLocker: NewLockerMessage(cacheModel), } From cb7bebbbcbf4a793f43b5ba9f8ee2c104936fa2f Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Fri, 30 Jun 2023 14:40:59 +0800 Subject: [PATCH 3/4] init once rpc client --- internal/push/push_rpc_server.go | 6 +++++- internal/push/push_to_client.go | 16 +++++++--------- pkg/common/db/localcache/conversation.go | 7 +++---- pkg/common/db/localcache/group.go | 6 ++---- 4 files changed, 17 insertions(+), 18 deletions(-) diff --git a/internal/push/push_rpc_server.go b/internal/push/push_rpc_server.go index 6890cc086..ca45a58fa 100644 --- a/internal/push/push_rpc_server.go +++ b/internal/push/push_rpc_server.go @@ -11,6 +11,7 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" pbPush "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/push" + "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient" "google.golang.org/grpc" ) @@ -26,7 +27,10 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e cacheModel := cache.NewMsgCacheModel(rdb) offlinePusher := NewOfflinePusher(cacheModel) database := controller.NewPushDatabase(cacheModel) - pusher := NewPusher(client, offlinePusher, database, localcache.NewGroupLocalCache(client), localcache.NewConversationLocalCache(client)) + groupRpcClient := rpcclient.NewGroupRpcClient(client) + conversationRpcClient := rpcclient.NewConversationRpcClient(client) + msgRpcClient := rpcclient.NewMessageRpcClient(client) + pusher := NewPusher(client, offlinePusher, database, localcache.NewGroupLocalCache(&groupRpcClient), localcache.NewConversationLocalCache(&conversationRpcClient), &conversationRpcClient, &groupRpcClient, &msgRpcClient) var wg sync.WaitGroup wg.Add(2) go func() { diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index df38f06cf..7f08ee0aa 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -30,7 +30,7 @@ type Pusher struct { offlinePusher offlinepush.OfflinePusher groupLocalCache *localcache.GroupLocalCache conversationLocalCache *localcache.ConversationLocalCache - msgClient *rpcclient.MessageRpcClient + msgRpcClient *rpcclient.MessageRpcClient conversationRpcClient *rpcclient.ConversationRpcClient groupRpcClient *rpcclient.GroupRpcClient successCount int @@ -39,19 +39,17 @@ type Pusher struct { var errNoOfflinePusher = errors.New("no offlinePusher is configured") func NewPusher(discov discoveryregistry.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase, - groupLocalCache *localcache.GroupLocalCache, conversationLocalCache *localcache.ConversationLocalCache) *Pusher { - msgClient := rpcclient.NewMessageRpcClient(discov) - conversationRpcClient := rpcclient.NewConversationRpcClient(discov) - groupRpcClient := rpcclient.NewGroupRpcClient(discov) + groupLocalCache *localcache.GroupLocalCache, conversationLocalCache *localcache.ConversationLocalCache, + conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient, msgRpcClient *rpcclient.MessageRpcClient) *Pusher { return &Pusher{ discov: discov, database: database, offlinePusher: offlinePusher, groupLocalCache: groupLocalCache, conversationLocalCache: conversationLocalCache, - msgClient: &msgClient, - conversationRpcClient: &conversationRpcClient, - groupRpcClient: &groupRpcClient, + msgRpcClient: msgRpcClient, + conversationRpcClient: conversationRpcClient, + groupRpcClient: groupRpcClient, } } @@ -70,7 +68,7 @@ func NewOfflinePusher(cache cache.MsgModel) offlinepush.OfflinePusher { func (p *Pusher) DeleteMemberAndSetConversationSeq(ctx context.Context, groupID string, userIDs []string) error { conevrsationID := utils.GetConversationIDBySessionType(constant.SuperGroupChatType, groupID) - maxSeq, err := p.msgClient.GetConversationMaxSeq(ctx, conevrsationID) + maxSeq, err := p.msgRpcClient.GetConversationMaxSeq(ctx, conevrsationID) if err != nil { return err } diff --git a/pkg/common/db/localcache/conversation.go b/pkg/common/db/localcache/conversation.go index 3cb647217..3c188c90e 100644 --- a/pkg/common/db/localcache/conversation.go +++ b/pkg/common/db/localcache/conversation.go @@ -4,7 +4,6 @@ import ( "context" "sync" - "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/conversation" "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient" ) @@ -13,7 +12,7 @@ type ConversationLocalCache struct { lock sync.Mutex superGroupRecvMsgNotNotifyUserIDs map[string]Hash conversationIDs map[string]Hash - client *rpcclient.Conversation + client *rpcclient.ConversationRpcClient } type Hash struct { @@ -21,11 +20,11 @@ type Hash struct { ids []string } -func NewConversationLocalCache(discov discoveryregistry.SvcDiscoveryRegistry) *ConversationLocalCache { +func NewConversationLocalCache(client *rpcclient.ConversationRpcClient) *ConversationLocalCache { return &ConversationLocalCache{ superGroupRecvMsgNotNotifyUserIDs: make(map[string]Hash), conversationIDs: make(map[string]Hash), - client: rpcclient.NewConversation(discov), + client: client, } } diff --git a/pkg/common/db/localcache/group.go b/pkg/common/db/localcache/group.go index 49443412c..ea66d122c 100644 --- a/pkg/common/db/localcache/group.go +++ b/pkg/common/db/localcache/group.go @@ -4,7 +4,6 @@ import ( "context" "sync" - "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/group" "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient" @@ -13,7 +12,7 @@ import ( type GroupLocalCache struct { lock sync.Mutex cache map[string]GroupMemberIDsHash - client *rpcclient.Group + client *rpcclient.GroupRpcClient } type GroupMemberIDsHash struct { @@ -21,8 +20,7 @@ type GroupMemberIDsHash struct { userIDs []string } -func NewGroupLocalCache(discov discoveryregistry.SvcDiscoveryRegistry) *GroupLocalCache { - client := rpcclient.NewGroup(discov) +func NewGroupLocalCache(client *rpcclient.GroupRpcClient) *GroupLocalCache { return &GroupLocalCache{ cache: make(map[string]GroupMemberIDsHash, 0), client: client, From 4edf0c1ffb1c7b52e41a6f5419d81b78038e40ae Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Fri, 30 Jun 2023 14:50:37 +0800 Subject: [PATCH 4/4] middleware checker --- pkg/a2r/api2rpc.go | 9 +++------ pkg/checker/check.go | 9 +++++++++ pkg/common/mw/rpc_server_interceptor.go | 12 ++++++------ 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/pkg/a2r/api2rpc.go b/pkg/a2r/api2rpc.go index 4cfa3ee4c..50d5b2ac3 100644 --- a/pkg/a2r/api2rpc.go +++ b/pkg/a2r/api2rpc.go @@ -22,12 +22,9 @@ func Call[A, B, C any]( apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) // 参数错误 return } - if check, ok := any(&req).(checker.Checker); ok { - if err := check.Check(); err != nil { - log.ZWarn(c, "custom check error", err, "req", req) - apiresp.GinError(c, errs.ErrArgs.Wrap(err.Error())) // 参数校验失败 - return - } + if err := checker.Validate(&req); err != nil { + apiresp.GinError(c, errs.ErrArgs.Wrap(err.Error())) // 参数校验失败 + return } data, err := rpc(client, c, &req) if err != nil { diff --git a/pkg/checker/check.go b/pkg/checker/check.go index f59986ec7..f5913682d 100644 --- a/pkg/checker/check.go +++ b/pkg/checker/check.go @@ -3,3 +3,12 @@ package checker type Checker interface { Check() error } + +func Validate(args any) error { + if checker, ok := args.(Checker); ok { + if err := checker.Check(); err != nil { + return err + } + } + return nil +} diff --git a/pkg/common/mw/rpc_server_interceptor.go b/pkg/common/mw/rpc_server_interceptor.go index 933ab88f1..518b5f0a2 100644 --- a/pkg/common/mw/rpc_server_interceptor.go +++ b/pkg/common/mw/rpc_server_interceptor.go @@ -92,13 +92,13 @@ func RpcServerInterceptor(ctx context.Context, req interface{}, info *grpc.Unary return nil, status.New(codes.InvalidArgument, err.Error()).Err() } } - if err := req.(checker.Checker); err != nil { - if err := err.Check(); err != nil { - return nil, status.New(codes.InvalidArgument, err.Error()).Err() - } - } log.ZInfo(ctx, "rpc server req", "funcName", funcName, "req", rpcString(req)) - resp, err = handler(ctx, req) + resp, err = func() (interface{}, error) { + if err := checker.Validate(req); err != nil { + return nil, err + } + return handler(ctx, req) + }() if err == nil { log.ZInfo(ctx, "rpc server resp", "funcName", funcName, "resp", rpcString(resp)) return resp, nil