diff --git a/pkg/common/discoveryregister/direct/directResolver.go b/pkg/common/discoveryregister/direct/directResolver.go new file mode 100644 index 000000000..8771b8ead --- /dev/null +++ b/pkg/common/discoveryregister/direct/directResolver.go @@ -0,0 +1,50 @@ +package direct + +import ( + "google.golang.org/grpc/resolver" + "strings" +) + +const ( + slashSeparator = "/" + // EndpointSepChar is the separator char in endpoints. + EndpointSepChar = ',' + + subsetSize = 32 + scheme = "Direct" +) + +type ResolverDirect struct { +} + +func NewResolverDirect() *ResolverDirect { + return &ResolverDirect{} +} + +func (rd *ResolverDirect) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) ( + resolver.Resolver, error) { + endpoints := strings.FieldsFunc(GetEndpoints(target), func(r rune) bool { + return r == EndpointSepChar + }) + endpoints = subset(endpoints, subsetSize) + addrs := make([]resolver.Address, 0, len(endpoints)) + + for _, val := range endpoints { + addrs = append(addrs, resolver.Address{ + Addr: val, + }) + } + if err := cc.UpdateState(resolver.State{ + Addresses: addrs, + }); err != nil { + return nil, err + } + + return &nopResolver{cc: cc}, nil +} +func init() { + resolver.Register(&ResolverDirect{}) +} +func (rd *ResolverDirect) Scheme() string { + return scheme // return your custom scheme name +} diff --git a/pkg/common/discoveryregister/direct/directconn.go b/pkg/common/discoveryregister/direct/directconn.go index 4c7cd0a90..98dc37acd 100644 --- a/pkg/common/discoveryregister/direct/directconn.go +++ b/pkg/common/discoveryregister/direct/directconn.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "github.com/OpenIMSDK/tools/errs" - "github.com/OpenIMSDK/tools/log" config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" "google.golang.org/grpc" "google.golang.org/grpc/resolver" @@ -36,6 +35,7 @@ type ConnManager struct { additionalOpts []grpc.DialOption currentServiceAddress string conns map[string][]*grpc.ClientConn + resolverDirect *ResolverDirect } func (cm *ConnManager) GetClientLocalConns() map[string][]*grpc.ClientConn { @@ -72,7 +72,8 @@ func (cm *ConnManager) Close() { func NewConnManager() (*ConnManager, error) { return &ConnManager{ - conns: make(map[string][]*grpc.ClientConn), + conns: make(map[string][]*grpc.ClientConn), + resolverDirect: NewResolverDirect(), }, nil } @@ -85,7 +86,7 @@ func (cm *ConnManager) GetConns(ctx context.Context, ports := getServiceAddresses()[serviceName] var connections []*grpc.ClientConn for _, port := range ports { - conn, err := dialService(ctx, fmt.Sprintf(config2.Config.Rpc.ListenIP+":%d", port), append(cm.additionalOpts, opts...)...) + conn, err := cm.dialServiceWithoutResolver(ctx, fmt.Sprintf(config2.Config.Rpc.ListenIP+":%d", port), append(cm.additionalOpts, opts...)...) if err != nil { fmt.Errorf("connect to port %s failed,serviceName %s, IP %s", port, serviceName, config2.Config.Rpc.ListenIP) } @@ -114,7 +115,7 @@ func (cm *ConnManager) GetConn(ctx context.Context, serviceName string, opts ... } } // Try to dial a new connection - conn, err := dialService(ctx, result, append(cm.additionalOpts, opts...)...) + conn, err := cm.dialService(ctx, result, append(cm.additionalOpts, opts...)...) if err != nil { return nil, errs.Wrap(err, "address", result) } @@ -138,16 +139,24 @@ func (cm *ConnManager) CloseConn(conn *grpc.ClientConn) { } } -func dialService(ctx context.Context, address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { +func (cm *ConnManager) dialService(ctx context.Context, address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { options := append(opts, grpc.WithInsecure()) // Replace WithInsecure with proper security options - conn, err := grpc.DialContext(ctx, "mycustomscheme:///"+address, options...) + conn, err := grpc.DialContext(ctx, cm.resolverDirect.Scheme()+":///"+address, options...) if err != nil { return nil, err } return conn, nil } +func (cm *ConnManager) dialServiceWithoutResolver(ctx context.Context, address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + options := append(opts, grpc.WithInsecure()) // Replace WithInsecure with proper security options + conn, err := grpc.DialContext(ctx, address, options...) + if err != nil { + return nil, err + } + return conn, nil +} func checkServiceHealth(address string) bool { conn, err := net.DialTimeout("tcp", address, time.Second*3) if err != nil { @@ -157,14 +166,6 @@ func checkServiceHealth(address string) bool { return true } -const ( - slashSeparator = "/" - // EndpointSepChar is the separator char in endpoints. - EndpointSepChar = ',' - - subsetSize = 32 -) - // GetEndpoints returns the endpoints from the given target. func GetEndpoints(target resolver.Target) string { return strings.Trim(target.URL.Path, slashSeparator) @@ -191,32 +192,3 @@ func (n nopResolver) ResolveNow(options resolver.ResolveNowOptions) { func (n nopResolver) Close() { } - -func (cm *ConnManager) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) ( - resolver.Resolver, error) { - log.ZDebug(context.Background(), "Builder", "target", target) - endpoints := strings.FieldsFunc(GetEndpoints(target), func(r rune) bool { - return r == EndpointSepChar - }) - endpoints = subset(endpoints, subsetSize) - addrs := make([]resolver.Address, 0, len(endpoints)) - - for _, val := range endpoints { - addrs = append(addrs, resolver.Address{ - Addr: val, - }) - } - if err := cc.UpdateState(resolver.State{ - Addresses: addrs, - }); err != nil { - return nil, err - } - - return &nopResolver{cc: cc}, nil -} -func init() { - resolver.Register(&ConnManager{}) -} -func (cm *ConnManager) Scheme() string { - return "mycustomscheme" // return your custom scheme name -}