diff --git a/cmd/Open-IM-SDK-Core b/cmd/Open-IM-SDK-Core index 9fd1af471..94da8cc10 160000 --- a/cmd/Open-IM-SDK-Core +++ b/cmd/Open-IM-SDK-Core @@ -1 +1 @@ -Subproject commit 9fd1af471356fa122fb87587fb9501e9292fb416 +Subproject commit 94da8cc1074e9b6d14a94a41bf37885b27253a2d diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index 6e84ac25b..fa0729eb1 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -4,7 +4,9 @@ import ( "context" "errors" "fmt" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "runtime/debug" @@ -119,11 +121,7 @@ func (c *Client) handleMessage(message []byte) error { if binaryReq.SendID != c.userID { return errors.New("exception conn userID not same to req userID") } - ctx := context.Background() - ctx = context.WithValue(ctx, ConnID, c.ctx.GetConnID()) - ctx = context.WithValue(ctx, OperationID, binaryReq.OperationID) - ctx = context.WithValue(ctx, CommonUserID, binaryReq.SendID) - ctx = context.WithValue(ctx, PlatformID, c.platformID) + ctx := mcontext.WithMustInfoCtx([]string{binaryReq.OperationID, binaryReq.SendID, constant.PlatformIDToName(c.platformID), c.ctx.GetConnID()}) var messageErr error var resp []byte switch binaryReq.ReqIdentifier { diff --git a/pkg/common/mw/rpc_client_interceptor.go b/pkg/common/mw/rpc_client_interceptor.go index eda323836..9c8757e6c 100644 --- a/pkg/common/mw/rpc_client_interceptor.go +++ b/pkg/common/mw/rpc_client_interceptor.go @@ -23,15 +23,44 @@ func rpcClientInterceptor(ctx context.Context, method string, req, resp interfac return errs.ErrInternalServer.Wrap("call rpc request context is nil") } log.ZInfo(ctx, "rpc client req", "funcName", method, "req", rpcString(req)) + ctx, err = getRpcContext(ctx, method) + if err != nil { + return err + } + err = invoker(ctx, method, req, resp, cc, opts...) + if err == nil { + log.ZInfo(ctx, "rpc client resp", "funcName", method, "resp", rpcString(resp)) + return nil + } + log.ZError(ctx, "rpc resp error", err) + rpcErr, ok := err.(interface{ GRPCStatus() *status.Status }) + if !ok { + return errs.ErrInternalServer.Wrap(err.Error()) + } + sta := rpcErr.GRPCStatus() + if sta.Code() == 0 { + return errs.NewCodeError(errs.ServerInternalError, err.Error()).Wrap() + } + if details := sta.Details(); len(details) > 0 { + errInfo, ok := details[0].(*errinfo.ErrorInfo) + if ok { + s := strings.Join(errInfo.Warp, "->") + errInfo.Cause + return errs.NewCodeError(int(sta.Code()), sta.Message()).WithDetail(s).Wrap() + } + } + return errs.NewCodeError(int(sta.Code()), sta.Message()).Wrap() +} + +func getRpcContext(ctx context.Context, method string) (context.Context, error) { md := metadata.Pairs() if keys, _ := ctx.Value(constant.RpcCustomHeader).([]string); len(keys) > 0 { for _, key := range keys { val, ok := ctx.Value(key).([]string) if !ok { - return errs.ErrInternalServer.Wrap(fmt.Sprintf("ctx missing key %s", key)) + return nil, errs.ErrInternalServer.Wrap(fmt.Sprintf("ctx missing key %s", key)) } if len(val) == 0 { - return errs.ErrInternalServer.Wrap(fmt.Sprintf("ctx key %s value is empty", key)) + return nil, errs.ErrInternalServer.Wrap(fmt.Sprintf("ctx key %s value is empty", key)) } md.Set(key, val...) } @@ -40,41 +69,24 @@ func rpcClientInterceptor(ctx context.Context, method string, req, resp interfac operationID, ok := ctx.Value(constant.OperationID).(string) if !ok { log.ZWarn(ctx, "ctx missing operationID", errors.New("ctx missing operationID"), "funcName", method) - return errs.ErrArgs.Wrap("ctx missing operationID") + return nil, errs.ErrArgs.Wrap("ctx missing operationID") } md.Set(constant.OperationID, operationID) - args := make([]string, 0, 4) - args = append(args, constant.OperationID, operationID) + var checkArgs []string + checkArgs = append(checkArgs, constant.OperationID, operationID) opUserID, ok := ctx.Value(constant.OpUserID).(string) if ok { md.Set(constant.OpUserID, opUserID) - args = append(args, constant.OpUserID, opUserID) + checkArgs = append(checkArgs, constant.OpUserID, opUserID) } opUserIDPlatformID, ok := ctx.Value(constant.OpUserPlatform).(string) if ok { md.Set(constant.OpUserPlatform, opUserIDPlatformID) } - md.Set(constant.CheckKey, genReqKey(args)) - err = invoker(metadata.NewOutgoingContext(ctx, md), method, req, resp, cc, opts...) - if err == nil { - log.ZInfo(ctx, "rpc client resp", "funcName", method, "resp", rpcString(resp)) - return nil - } - log.ZError(ctx, "rpc resp error", err) - rpcErr, ok := err.(interface{ GRPCStatus() *status.Status }) - if !ok { - return errs.ErrInternalServer.Wrap(err.Error()) - } - sta := rpcErr.GRPCStatus() - if sta.Code() == 0 { - return errs.NewCodeError(errs.ServerInternalError, err.Error()).Wrap() - } - if details := sta.Details(); len(details) > 0 { - errInfo, ok := details[0].(*errinfo.ErrorInfo) - if ok { - s := strings.Join(errInfo.Warp, "->") + errInfo.Cause - return errs.NewCodeError(int(sta.Code()), sta.Message()).WithDetail(s).Wrap() - } + connID, ok := ctx.Value(constant.ConnID).(string) + if ok { + md.Set(constant.ConnID, connID) } - return errs.NewCodeError(int(sta.Code()), sta.Message()).Wrap() + md.Set(constant.CheckKey, genReqKey(checkArgs)) + return metadata.NewOutgoingContext(ctx, md), nil } diff --git a/pkg/common/mw/rpc_server_interceptor.go b/pkg/common/mw/rpc_server_interceptor.go index f47a4321b..e923f1319 100644 --- a/pkg/common/mw/rpc_server_interceptor.go +++ b/pkg/common/mw/rpc_server_interceptor.go @@ -79,6 +79,9 @@ func rpcServerInterceptor(ctx context.Context, req interface{}, info *grpc.Unary if opts := md.Get(constant.OpUserPlatform); len(opts) == 1 { ctx = context.WithValue(ctx, constant.OpUserPlatform, opts[0]) } + if opts := md.Get(constant.ConnID); len(opts) == 1 { + ctx = context.WithValue(ctx, constant.ConnID, opts[0]) + } if opts := md.Get(constant.CheckKey); len(opts) != 1 || opts[0] == "" { return nil, status.New(codes.InvalidArgument, "check key empty").Err() } else {