From 05e66e9f8d309fd878363a991df2cd30592bf804 Mon Sep 17 00:00:00 2001 From: AndrewZuo01 <59896149+AndrewZuo01@users.noreply.github.com> Date: Fri, 26 Jan 2024 14:39:53 +0800 Subject: [PATCH] Feature: add direct connect mode with zookeeper, etcd, and k8s (#1775) * fix: fix the bug * fix: fix the imAdmin permission and searchNoficitaion resp * 2023 Annual Summary Reflections and Aspirations Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com> * fix: dissmissGroup and lack of keyword bug (#1678) * Update docker-start-all.sh * Update env-template.yaml * Update docker-start-all.sh * fix openim config mongo passwd env Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com> * fix: fix some bug * fix: group messages sync failed. * fix: fix the valiable name * fix: fix the getSortedConversation api * fix: fix the mongo search error * fix: GroupApplicationAcceptedNotification (cherry picked from commit 4c3c4555a35ec8e31ffbf3e96a5dba3bceec09ee) * fix: GroupApplicationAcceptedNotification * fix update friends * fix pageFindUser * Delete .devcontainer directory * fix find user * Your commit message here * feat: direct conn * fix: direct conn message gateway array exceed length * fix: direct conn message gateway array exceed length * fix: direct conn cannot find name * fix: direct conn cannot find name * fix: direct conn cannot find name * fix: direct conn cannot find name * fix: direct conn cannot find name * fix: direct conn cannot find name * fix: direct conn cannot find name * fix: direct conn cannot find name * fix: direct conn cannot find name * fix: operation id invalid * feat: multiple address * feat: multiple address * feat: multiple address * feat: multiple addresses * feat: multiple addresses * feat: multiple addresses * feat: multiple addresses * feat: multiple addresses * feat: multiple addresses * feat: multiple addresses * feat: multiple addresses * feat: multiple addresses * feat: multiple addresses * feat: multiple addresses * feat: multiple addresses * feat: multiple addresses * feat: direct conn with multiple ports * Update user.go * feat: direct conn with multiple ports * feat: remove checkServiceHealth * feat: update fmt error * feat: update .devcontainer * feat: update .devcontainer * feat: update fmt.Errorf( --------- Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com> Co-authored-by: luhaoling <2198702716@qq.com> Co-authored-by: Xinwei Xiong <3293172751@qq.com> Co-authored-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com> Co-authored-by: Brabem <69128477+luhaoling@users.noreply.github.com> Co-authored-by: OpenIM Bot <124379614+kubbot@users.noreply.github.com> Co-authored-by: OpenIM Robot <139873238+openimbot@users.noreply.github.com> Co-authored-by: Gordon <46924906+FGadvancer@users.noreply.github.com> Co-authored-by: withchao <993506633@qq.com> --- internal/api/route.go | 3 +- internal/msgtransfer/init.go | 2 +- internal/rpc/user/user.go | 4 + internal/tools/msg.go | 2 +- .../direct/directResolver.go | 81 +++++++++ .../discoveryregister/direct/directconn.go | 154 ++++++++++++++++++ .../discoveryregister/discoveryregister.go | 3 + .../discoveryregister_test.go | 1 + .../kubernetes/kubernetes.go | 20 +++ pkg/common/startrpc/start.go | 2 +- 10 files changed, 268 insertions(+), 4 deletions(-) create mode 100644 pkg/common/discoveryregister/direct/directResolver.go create mode 100644 pkg/common/discoveryregister/direct/directconn.go diff --git a/internal/api/route.go b/internal/api/route.go index 3f16d3e50..10907d086 100644 --- a/internal/api/route.go +++ b/internal/api/route.go @@ -16,6 +16,7 @@ package api import ( "context" + "fmt" "net/http" "github.com/OpenIMSDK/protocol/constant" @@ -43,7 +44,7 @@ import ( ) func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.UniversalClient) *gin.Engine { - discov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) // 默认RPC中间件 + discov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) // 默认RPC中间件 gin.SetMode(gin.ReleaseMode) r := gin.New() if v, ok := binding.Validator.Engine().(*validator.Validate); ok { diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index a8d10799f..7d692662d 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -66,7 +66,7 @@ func StartTransfer(prometheusPort int) error { if err := client.CreateRpcRootNodes(config.Config.GetServiceNames()); err != nil { return err } - client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) + client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) msgModel := cache.NewMsgCacheModel(rdb) msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase()) msgDatabase := controller.NewCommonMsgDatabase(msgDocModel, msgModel) diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 50c97f41b..981e0ccc4 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -61,6 +61,7 @@ type userServer struct { RegisterCenter registry.SvcDiscoveryRegistry } + func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error { rdb, err := cache.NewRedis() if err != nil { @@ -243,6 +244,7 @@ func (s *userServer) GetPaginationUsers(ctx context.Context, req *pbuser.GetPagi return nil, err } return &pbuser.GetPaginationUsersResp{Total: int32(total), Users: convert.UsersDB2Pb(users)}, err + } } @@ -439,6 +441,7 @@ func (s *userServer) ProcessUserCommandDelete(ctx context.Context, req *pbuser.P if err != nil { return nil, err } + return &pbuser.ProcessUserCommandDeleteResp{}, nil } @@ -475,6 +478,7 @@ func (s *userServer) ProcessUserCommandUpdate(ctx context.Context, req *pbuser.P } func (s *userServer) ProcessUserCommandGet(ctx context.Context, req *pbuser.ProcessUserCommandGetReq) (*pbuser.ProcessUserCommandGetResp, error) { + err := authverify.CheckAccessV3(ctx, req.UserID) if err != nil { return nil, err diff --git a/internal/tools/msg.go b/internal/tools/msg.go index 97bb2988e..30006670e 100644 --- a/internal/tools/msg.go +++ b/internal/tools/msg.go @@ -79,7 +79,7 @@ func InitMsgTool() (*MsgTool, error) { if err != nil { return nil, err } - discov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) + discov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) userDB, err := mgo.NewUserMongo(mongo.GetDatabase()) if err != nil { return nil, err diff --git a/pkg/common/discoveryregister/direct/directResolver.go b/pkg/common/discoveryregister/direct/directResolver.go new file mode 100644 index 000000000..285f55112 --- /dev/null +++ b/pkg/common/discoveryregister/direct/directResolver.go @@ -0,0 +1,81 @@ +package direct + +import ( + "context" + "github.com/OpenIMSDK/tools/log" + "google.golang.org/grpc/resolver" + "math/rand" + "strings" +) + +const ( + slashSeparator = "/" + // EndpointSepChar is the separator char in endpoints. + EndpointSepChar = ',' + + subsetSize = 32 + scheme = "direct" +) + +type ResolverDirect struct { +} + +func NewResolverDirect() *ResolverDirect { + return &ResolverDirect{} +} + +func (rd *ResolverDirect) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) ( + resolver.Resolver, error) { + log.ZDebug(context.Background(), "Build", "target", target) + endpoints := strings.FieldsFunc(GetEndpoints(target), func(r rune) bool { + return r == EndpointSepChar + }) + endpoints = subset(endpoints, subsetSize) + addrs := make([]resolver.Address, 0, len(endpoints)) + + for _, val := range endpoints { + addrs = append(addrs, resolver.Address{ + Addr: val, + }) + } + if err := cc.UpdateState(resolver.State{ + Addresses: addrs, + }); err != nil { + return nil, err + } + + return &nopResolver{cc: cc}, nil +} +func init() { + resolver.Register(&ResolverDirect{}) +} +func (rd *ResolverDirect) Scheme() string { + return scheme // return your custom scheme name +} + +// GetEndpoints returns the endpoints from the given target. +func GetEndpoints(target resolver.Target) string { + return strings.Trim(target.URL.Path, slashSeparator) +} +func subset(set []string, sub int) []string { + rand.Shuffle(len(set), func(i, j int) { + set[i], set[j] = set[j], set[i] + }) + if len(set) <= sub { + return set + } + + return set[:sub] +} + +type nopResolver struct { + cc resolver.ClientConn +} + +func (n nopResolver) ResolveNow(options resolver.ResolveNowOptions) { + +} + +func (n nopResolver) Close() { + +} diff --git a/pkg/common/discoveryregister/direct/directconn.go b/pkg/common/discoveryregister/direct/directconn.go new file mode 100644 index 000000000..3eaa6fa19 --- /dev/null +++ b/pkg/common/discoveryregister/direct/directconn.go @@ -0,0 +1,154 @@ +package direct + +import ( + "context" + "errors" + "fmt" + "github.com/OpenIMSDK/tools/errs" + config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +type ServiceAddresses map[string][]int + +func getServiceAddresses() ServiceAddresses { + return ServiceAddresses{ + config2.Config.RpcRegisterName.OpenImUserName: config2.Config.RpcPort.OpenImUserPort, + config2.Config.RpcRegisterName.OpenImFriendName: config2.Config.RpcPort.OpenImFriendPort, + config2.Config.RpcRegisterName.OpenImMsgName: config2.Config.RpcPort.OpenImMessagePort, + config2.Config.RpcRegisterName.OpenImMessageGatewayName: config2.Config.LongConnSvr.OpenImMessageGatewayPort, + config2.Config.RpcRegisterName.OpenImGroupName: config2.Config.RpcPort.OpenImGroupPort, + config2.Config.RpcRegisterName.OpenImAuthName: config2.Config.RpcPort.OpenImAuthPort, + config2.Config.RpcRegisterName.OpenImPushName: config2.Config.RpcPort.OpenImPushPort, + config2.Config.RpcRegisterName.OpenImConversationName: config2.Config.RpcPort.OpenImConversationPort, + config2.Config.RpcRegisterName.OpenImThirdName: config2.Config.RpcPort.OpenImThirdPort, + } +} + +type ConnDirect struct { + additionalOpts []grpc.DialOption + currentServiceAddress string + conns map[string][]*grpc.ClientConn + resolverDirect *ResolverDirect +} + +func (cd *ConnDirect) GetClientLocalConns() map[string][]*grpc.ClientConn { + return nil +} + +func (cd *ConnDirect) GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) { + return "", nil +} + +func (cd *ConnDirect) Register(serviceName, host string, port int, opts ...grpc.DialOption) error { + return nil +} + +func (cd *ConnDirect) UnRegister() error { + return nil +} + +func (cd *ConnDirect) CreateRpcRootNodes(serviceNames []string) error { + return nil +} + +func (cd *ConnDirect) RegisterConf2Registry(key string, conf []byte) error { + return nil +} + +func (cd *ConnDirect) GetConfFromRegistry(key string) ([]byte, error) { + return nil, nil +} + +func (cd *ConnDirect) Close() { + +} + +func NewConnDirect() (*ConnDirect, error) { + return &ConnDirect{ + conns: make(map[string][]*grpc.ClientConn), + resolverDirect: NewResolverDirect(), + }, nil +} + +func (cd *ConnDirect) GetConns(ctx context.Context, + serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) { + + if conns, exists := cd.conns[serviceName]; exists { + return conns, nil + } + ports := getServiceAddresses()[serviceName] + var connections []*grpc.ClientConn + for _, port := range ports { + conn, err := cd.dialServiceWithoutResolver(ctx, fmt.Sprintf(config2.Config.Rpc.ListenIP+":%d", port), append(cd.additionalOpts, opts...)...) + if err != nil { + fmt.Printf("connect to port %d failed,serviceName %s, IP %s\n", port, serviceName, config2.Config.Rpc.ListenIP) + } + connections = append(connections, conn) + } + + if len(connections) == 0 { + return nil, fmt.Errorf("no connections found for service: %s", serviceName) + } + return connections, nil +} + +func (cd *ConnDirect) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + // Get service addresses + addresses := getServiceAddresses() + address, ok := addresses[serviceName] + if !ok { + return nil, errs.Wrap(errors.New("unknown service name"), "serviceName", serviceName) + } + var result string + for _, addr := range address { + if result != "" { + result = result + "," + fmt.Sprintf(config2.Config.Rpc.ListenIP+":%d", addr) + } else { + result = fmt.Sprintf(config2.Config.Rpc.ListenIP+":%d", addr) + } + } + // Try to dial a new connection + conn, err := cd.dialService(ctx, result, append(cd.additionalOpts, opts...)...) + if err != nil { + return nil, errs.Wrap(err, "address", result) + } + + // Store the new connection + cd.conns[serviceName] = append(cd.conns[serviceName], conn) + return conn, nil +} + +func (cd *ConnDirect) GetSelfConnTarget() string { + return cd.currentServiceAddress +} + +func (cd *ConnDirect) AddOption(opts ...grpc.DialOption) { + cd.additionalOpts = append(cd.additionalOpts, opts...) +} + +func (cd *ConnDirect) CloseConn(conn *grpc.ClientConn) { + if conn != nil { + conn.Close() + } +} + +func (cd *ConnDirect) dialService(ctx context.Context, address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + options := append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.DialContext(ctx, cd.resolverDirect.Scheme()+":///"+address, options...) + + if err != nil { + return nil, err + } + return conn, nil +} +func (cd *ConnDirect) dialServiceWithoutResolver(ctx context.Context, address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + options := append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.DialContext(ctx, address, options...) + + if err != nil { + return nil, err + } + return conn, nil +} diff --git a/pkg/common/discoveryregister/discoveryregister.go b/pkg/common/discoveryregister/discoveryregister.go index c14323027..76c8fb267 100644 --- a/pkg/common/discoveryregister/discoveryregister.go +++ b/pkg/common/discoveryregister/discoveryregister.go @@ -16,6 +16,7 @@ package discoveryregister import ( "errors" + "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/direct" "os" "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes" @@ -36,6 +37,8 @@ func NewDiscoveryRegister(envType string) (discoveryregistry.SvcDiscoveryRegistr return zookeeper.NewZookeeperDiscoveryRegister() case "k8s": return kubernetes.NewK8sDiscoveryRegister() + case "direct": + return direct.NewConnDirect() default: return nil, errors.New("envType not correct") } diff --git a/pkg/common/discoveryregister/discoveryregister_test.go b/pkg/common/discoveryregister/discoveryregister_test.go index d83da1285..5317db5c6 100644 --- a/pkg/common/discoveryregister/discoveryregister_test.go +++ b/pkg/common/discoveryregister/discoveryregister_test.go @@ -40,6 +40,7 @@ func TestNewDiscoveryRegister(t *testing.T) { }{ {"zookeeper", false, true}, {"k8s", false, true}, // 假设 k8s 配置也已正确设置 + {"direct", false, true}, {"invalid", true, false}, } diff --git a/pkg/common/discoveryregister/kubernetes/kubernetes.go b/pkg/common/discoveryregister/kubernetes/kubernetes.go index f27ebc805..7c40399a3 100644 --- a/pkg/common/discoveryregister/kubernetes/kubernetes.go +++ b/pkg/common/discoveryregister/kubernetes/kubernetes.go @@ -132,20 +132,40 @@ func getMsgGatewayHost(ctx context.Context) []string { // GetConns returns the gRPC client connections to the specified service. func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) { + // This conditional checks if the serviceName is not the OpenImMessageGatewayName. + // It seems to handle a special case for the OpenImMessageGateway. if serviceName != config.Config.RpcRegisterName.OpenImMessageGatewayName { + // DialContext creates a client connection to the given target (serviceName) using the specified context. + // 'cli.options' are likely default or common options for all connections in this struct. + // 'opts...' allows for additional gRPC dial options to be passed and used. conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...) + + // The function returns a slice of client connections with the new connection, or an error if occurred. return []*grpc.ClientConn{conn}, err } else { + // This block is executed if the serviceName is OpenImMessageGatewayName. + // 'ret' will accumulate the connections to return. var ret []*grpc.ClientConn + + // getMsgGatewayHost presumably retrieves hosts for the message gateway service. + // The context is passed, likely for cancellation and timeout control. gatewayHosts := getMsgGatewayHost(ctx) + + // Iterating over the retrieved gateway hosts. for _, host := range gatewayHosts { + // Establishes a connection to each host. + // Again, appending cli.options with any additional opts provided. conn, err := grpc.DialContext(ctx, host, append(cli.options, opts...)...) + + // If there's an error while dialing any host, the function returns immediately with the error. if err != nil { return nil, err } else { + // If the connection is successful, it is added to the 'ret' slice. ret = append(ret, conn) } } + // After all hosts are processed, the slice of connections is returned. return ret, nil } } diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 01076bbbb..8295404d3 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -71,7 +71,7 @@ func Start( } defer client.Close() - client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) + client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) registerIP, err := network.GetRpcRegisterIP(config.Config.Rpc.RegisterIP) if err != nil { return err