From 6aaf79c5173b2f326550bf113b211a27763c1486 Mon Sep 17 00:00:00 2001 From: AndrewZuo01 Date: Tue, 16 Jan 2024 12:11:26 +0800 Subject: [PATCH] feat: multiple address --- .../discoveryregister/direct/directconn.go | 57 ++++++++++++------- 1 file changed, 37 insertions(+), 20 deletions(-) diff --git a/pkg/common/discoveryregister/direct/directconn.go b/pkg/common/discoveryregister/direct/directconn.go index c045764fb..4f521ef5e 100644 --- a/pkg/common/discoveryregister/direct/directconn.go +++ b/pkg/common/discoveryregister/direct/directconn.go @@ -7,30 +7,32 @@ import ( "github.com/OpenIMSDK/tools/errs" config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" "google.golang.org/grpc" + "math/rand" "net" "time" ) -type ServiceAddresses map[string]string +type ServiceAddresses map[string][]int func getServiceAddresses() ServiceAddresses { return ServiceAddresses{ - config2.Config.RpcRegisterName.OpenImUserName: fmt.Sprintf(config2.Config.Rpc.ListenIP+":%d", config2.Config.RpcPort.OpenImUserPort[0]), - config2.Config.RpcRegisterName.OpenImFriendName: fmt.Sprintf(config2.Config.Rpc.ListenIP+":%d", config2.Config.RpcPort.OpenImFriendPort[0]), - config2.Config.RpcRegisterName.OpenImMsgName: fmt.Sprintf(config2.Config.Rpc.ListenIP+":%d", config2.Config.RpcPort.OpenImMessagePort[0]), - config2.Config.RpcRegisterName.OpenImMessageGatewayName: fmt.Sprintf(config2.Config.Rpc.ListenIP+":%d", config2.Config.LongConnSvr.OpenImMessageGatewayPort[0]), - config2.Config.RpcRegisterName.OpenImGroupName: fmt.Sprintf(config2.Config.Rpc.ListenIP+":%d", config2.Config.RpcPort.OpenImGroupPort[0]), - config2.Config.RpcRegisterName.OpenImAuthName: fmt.Sprintf(config2.Config.Rpc.ListenIP+":%d", config2.Config.RpcPort.OpenImAuthPort[0]), - config2.Config.RpcRegisterName.OpenImPushName: fmt.Sprintf(config2.Config.Rpc.ListenIP+":%d", config2.Config.RpcPort.OpenImPushPort[0]), - config2.Config.RpcRegisterName.OpenImConversationName: fmt.Sprintf(config2.Config.Rpc.ListenIP+":%d", config2.Config.RpcPort.OpenImConversationPort[0]), - config2.Config.RpcRegisterName.OpenImThirdName: fmt.Sprintf(config2.Config.Rpc.ListenIP+":%d", config2.Config.RpcPort.OpenImThirdPort[0]), + config2.Config.RpcRegisterName.OpenImUserName: config2.Config.RpcPort.OpenImUserPort, + config2.Config.RpcRegisterName.OpenImFriendName: config2.Config.RpcPort.OpenImFriendPort, + config2.Config.RpcRegisterName.OpenImMsgName: config2.Config.RpcPort.OpenImMessagePort, + config2.Config.RpcRegisterName.OpenImMessageGatewayName: config2.Config.LongConnSvr.OpenImMessageGatewayPort, + config2.Config.RpcRegisterName.OpenImGroupName: config2.Config.RpcPort.OpenImGroupPort, + config2.Config.RpcRegisterName.OpenImAuthName: config2.Config.RpcPort.OpenImAuthPort, + config2.Config.RpcRegisterName.OpenImPushName: config2.Config.RpcPort.OpenImPushPort, + config2.Config.RpcRegisterName.OpenImConversationName: config2.Config.RpcPort.OpenImConversationPort, + config2.Config.RpcRegisterName.OpenImThirdName: config2.Config.RpcPort.OpenImThirdPort, } } +// fmt.Sprintf(config2.Config.Rpc.ListenIP+":%d", config2.Config.RpcPort.OpenImUserPort[0]) type ConnManager struct { additionalOpts []grpc.DialOption currentServiceAddress string - conns map[string]*grpc.ClientConn + conns map[string][]*grpc.ClientConn } func (cm *ConnManager) GetClientLocalConns() map[string][]*grpc.ClientConn { @@ -67,17 +69,24 @@ func (cm *ConnManager) Close() { func NewConnManager() (*ConnManager, error) { return &ConnManager{ - conns: make(map[string]*grpc.ClientConn), + conns: make(map[string][]*grpc.ClientConn), }, nil } func (cm *ConnManager) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) { + if conns, exists := cm.conns[serviceName]; exists { + return conns, nil + } + ports := getServiceAddresses()[serviceName] var connections []*grpc.ClientConn - for name, conn := range cm.conns { - if name == serviceName { - connections = append(connections, conn) + for _, port := range ports { + conn, err := dialService(ctx, fmt.Sprintf(config2.Config.Rpc.ListenIP+":%d", port), append(cm.additionalOpts, opts...)...) + if err != nil { + fmt.Errorf("connect to port %s failed,serviceName %s, IP %s", port, serviceName, config2.Config.Rpc.ListenIP) + continue } + connections = append(connections, conn) } if len(connections) == 0 { return nil, fmt.Errorf("no connections found for service: %s", serviceName) @@ -86,20 +95,28 @@ func (cm *ConnManager) GetConns(ctx context.Context, } func (cm *ConnManager) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { - if conn, exists := cm.conns[serviceName]; exists { - return conn, nil + // Check if there are existing connections for the service + if conns, exists := cm.conns[serviceName]; exists && len(conns) > 0 { + // Return a random connection from the existing connections + randomIndex := rand.Intn(len(conns)) + return conns[randomIndex], nil } + + // Get service addresses addresses := getServiceAddresses() address, ok := addresses[serviceName] if !ok { return nil, errs.Wrap(errors.New("unknown service name"), "serviceName", serviceName) } - - conn, err := dialService(ctx, address, append(cm.additionalOpts, opts...)...) + randomIndex := rand.Intn(len(address)) + // Try to dial a new connection + conn, err := dialService(ctx, fmt.Sprintf(config2.Config.Rpc.ListenIP+":%d", address[randomIndex]), append(cm.additionalOpts, opts...)...) if err != nil { return nil, err } - cm.conns[serviceName] = conn + + // Store the new connection + cm.conns[serviceName] = append(cm.conns[serviceName], conn) return conn, nil }