From 8a21e07d6d75caafb882eafd676ee6b0db123b5e Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Thu, 25 May 2023 20:57:19 +0800 Subject: [PATCH] fix: user logout recycle resources --- internal/msggateway/message_handler.go | 15 ++++++++---- internal/msggateway/n_ws_server.go | 9 +++---- pkg/rpcclient/push.go | 33 ++++++++++++++++++++++++++ 3 files changed, 49 insertions(+), 8 deletions(-) create mode 100644 pkg/rpcclient/push.go diff --git a/internal/msggateway/message_handler.go b/internal/msggateway/message_handler.go index 6f630e5c9..6eb936ab0 100644 --- a/internal/msggateway/message_handler.go +++ b/internal/msggateway/message_handler.go @@ -2,6 +2,8 @@ package msggateway import ( "context" + "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" + "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/push" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" @@ -50,11 +52,13 @@ var _ MessageHandler = (*GrpcHandler)(nil) type GrpcHandler struct { msgRpcClient *rpcclient.MsgClient + pushClient *rpcclient.PushClient validate *validator.Validate } -func NewGrpcHandler(validate *validator.Validate, msgRpcClient *rpcclient.MsgClient) *GrpcHandler { - return &GrpcHandler{msgRpcClient: msgRpcClient, validate: validate} +func NewGrpcHandler(validate *validator.Validate, client discoveryregistry.SvcDiscoveryRegistry) *GrpcHandler { + return &GrpcHandler{msgRpcClient: rpcclient.NewMsgClient(client), + pushClient: rpcclient.NewPushClient(client), validate: validate} } func (g GrpcHandler) GetSeq(context context.Context, data Req) ([]byte, error) { @@ -137,8 +141,11 @@ func (g GrpcHandler) PullMessageBySeqList(context context.Context, data Req) ([] } func (g GrpcHandler) UserLogout(context context.Context, data Req) ([]byte, error) { - //todo - resp, err := g.msgRpcClient.PullMessageBySeqList(context, nil) + req := push.DelUserPushTokenReq{} + if err := proto.Unmarshal(data.Data, &req); err != nil { + return nil, err + } + resp, err := g.pushClient.DelUserPushToken(context, req) if err != nil { return nil, err } diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index a4999bdb6..f11891d1e 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -2,6 +2,7 @@ package msggateway import ( "errors" + "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" "net/http" "sync" "sync/atomic" @@ -10,7 +11,6 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" - "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "github.com/go-playground/validator/v10" ) @@ -21,7 +21,8 @@ type LongConnServer interface { GetUserAllCons(userID string) ([]*Client, bool) GetUserPlatformCons(userID string, platform int) ([]*Client, bool, bool) Validate(s interface{}) error - SetMessageHandler(msgRpcClient *rpcclient.MsgClient) + //SetMessageHandler(msgRpcClient *rpcclient.MsgClient) + SetDiscoveryRegistry(client discoveryregistry.SvcDiscoveryRegistry) UnRegister(c *Client) Compressor Encoder @@ -51,8 +52,8 @@ type WsServer struct { MessageHandler } -func (ws *WsServer) SetMessageHandler(rpcClient *rpcclient.MsgClient) { - ws.MessageHandler = NewGrpcHandler(ws.validate, rpcClient) +func (ws *WsServer) SetDiscoveryRegistry(client discoveryregistry.SvcDiscoveryRegistry) { + ws.MessageHandler = NewGrpcHandler(ws.validate, client) } func (ws *WsServer) UnRegister(c *Client) { diff --git a/pkg/rpcclient/push.go b/pkg/rpcclient/push.go new file mode 100644 index 000000000..9904c8d2d --- /dev/null +++ b/pkg/rpcclient/push.go @@ -0,0 +1,33 @@ +package rpcclient + +import ( + "context" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" + "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" + "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/push" +) + +type PushClient struct { + MetaClient +} + +func NewPushClient(client discoveryregistry.SvcDiscoveryRegistry) *PushClient { + return &PushClient{ + MetaClient: MetaClient{ + client: client, + rpcRegisterName: config.Config.RpcRegisterName.OpenImPushName, + }, + } +} + +func (p *PushClient) DelUserPushToken(ctx context.Context, req push.DelUserPushTokenReq) (*push.DelUserPushTokenResp, error) { + cc, err := p.getConn(ctx) + if err != nil { + return nil, err + } + resp, err := push.NewPushMsgServiceClient(cc).DelUserPushToken(ctx, &req) + if err != nil { + return nil, err + } + return resp, nil +}