diff --git a/cmd/api/main.go b/cmd/api/main.go index 136e4461a..05806f62c 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -60,6 +60,9 @@ func run(port int) error { if err != nil { return err } + if client.CreateRpcRootNodes(config.GetServiceNames()); err != nil { + return err + } fmt.Println("api init discov client success") buf := bytes.NewBuffer(nil) if err := yaml.NewEncoder(buf).Encode(config.Config); err != nil { diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 40bbcfaa2..8c45a9910 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -53,6 +53,9 @@ func StartTransfer(prometheusPort int) error { if err != nil { return err } + if client.CreateRpcRootNodes(config.GetServiceNames()); err != nil { + return err + } client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) msgModel := cache.NewMsgCacheModel(rdb) msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase()) diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index d47889fd5..acfb09a01 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -348,6 +348,13 @@ type Notification struct { ConversationSetPrivate NotificationConf `yaml:"conversationSetPrivate"` } +func GetServiceNames() []string { + return []string{Config.RpcRegisterName.OpenImUserName, Config.RpcRegisterName.OpenImFriendName, Config.RpcRegisterName.OpenImMsgName, + Config.RpcRegisterName.OpenImPushName, Config.RpcRegisterName.OpenImMessageGatewayName, Config.RpcRegisterName.OpenImGroupName, + Config.RpcRegisterName.OpenImAuthName, Config.RpcRegisterName.OpenImConversationName, Config.RpcRegisterName.OpenImRtcName, + Config.RpcRegisterName.OpenImThirdName} +} + func GetOptionsByNotification(cfg NotificationConf) utils.Options { opts := utils.NewOptions() if cfg.UnreadCount { diff --git a/pkg/discoveryregistry/discovery_register.go b/pkg/discoveryregistry/discovery_register.go index 4214ed49b..01ed88d5b 100644 --- a/pkg/discoveryregistry/discovery_register.go +++ b/pkg/discoveryregistry/discovery_register.go @@ -20,6 +20,7 @@ type SvcDiscoveryRegistry interface { Conn Register(serviceName, host string, port int, opts ...grpc.DialOption) error UnRegister() error + CreateRpcRootNodes(serviceNames []string) error RegisterConf2Registry(key string, conf []byte) error GetConfFromRegistry(key string) ([]byte, error) } diff --git a/pkg/discoveryregistry/zookeeper/register.go b/pkg/discoveryregistry/zookeeper/register.go index 7347c4b55..8f38f6d8d 100644 --- a/pkg/discoveryregistry/zookeeper/register.go +++ b/pkg/discoveryregistry/zookeeper/register.go @@ -10,7 +10,7 @@ import ( func (s *ZkClient) CreateRpcRootNodes(serviceNames []string) error { for _, serviceName := range serviceNames { - if err := s.ensureName(serviceName); err != nil { + if err := s.ensureName(serviceName); err != nil && err != zk.ErrNodeExists { return err } } diff --git a/pkg/discoveryregistry/zookeeper/resolver.go b/pkg/discoveryregistry/zookeeper/resolver.go index f41666e15..5d2e3b373 100644 --- a/pkg/discoveryregistry/zookeeper/resolver.go +++ b/pkg/discoveryregistry/zookeeper/resolver.go @@ -26,7 +26,7 @@ func (r *Resolver) ResolveNow(o resolver.ResolveNowOptions) { } r.addrs = newConns if err := r.cc.UpdateState(resolver.State{Addresses: newConns}); err != nil { - log.ZError(context.Background(), "UpdateState error", err, "conns", newConns) + log.ZError(context.Background(), "UpdateState error, conns is nil from svr", err, "conns", newConns) return } log.ZDebug(context.Background(), "resolve now finished", "target", r.target, "conns", r.addrs) diff --git a/pkg/startrpc/start.go b/pkg/startrpc/start.go index a821dbbab..d938aaada 100644 --- a/pkg/startrpc/start.go +++ b/pkg/startrpc/start.go @@ -26,14 +26,17 @@ func Start(rpcPort int, rpcRegisterName string, prometheusPort int, rpcFn func(c return err } defer listener.Close() - zkClient, err := openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, + client, err := openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, openKeeper.WithFreq(time.Hour), openKeeper.WithUserNameAndPassword(config.Config.Zookeeper.UserName, config.Config.Zookeeper.Password), openKeeper.WithRoundRobin(), openKeeper.WithTimeout(10), openKeeper.WithLogger(log.NewZkLogger())) if err != nil { return utils.Wrap1(err) } - defer zkClient.CloseZK() - zkClient.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if client.CreateRpcRootNodes(config.GetServiceNames()); err != nil { + return err + } + defer client.CloseZK() + client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) registerIP, err := network.GetRpcRegisterIP(config.Config.RpcRegisterIP) if err != nil { return err @@ -53,11 +56,11 @@ func Start(rpcPort int, rpcRegisterName string, prometheusPort int, rpcFn func(c } srv := grpc.NewServer(options...) defer srv.GracefulStop() - err = rpcFn(zkClient, srv) + err = rpcFn(client, srv) if err != nil { return utils.Wrap1(err) } - err = zkClient.Register(rpcRegisterName, registerIP, rpcPort, grpc.WithTransportCredentials(insecure.NewCredentials())) + err = client.Register(rpcRegisterName, registerIP, rpcPort, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return utils.Wrap1(err) }