pull/458/head
wangchuxiao 2 years ago
parent fca3058db9
commit a4f87de272

@ -60,6 +60,9 @@ func run(port int) error {
if err != nil { if err != nil {
return err return err
} }
if client.CreateRpcRootNodes(config.GetServiceNames()); err != nil {
return err
}
fmt.Println("api init discov client success") fmt.Println("api init discov client success")
buf := bytes.NewBuffer(nil) buf := bytes.NewBuffer(nil)
if err := yaml.NewEncoder(buf).Encode(config.Config); err != nil { if err := yaml.NewEncoder(buf).Encode(config.Config); err != nil {

@ -53,6 +53,9 @@ func StartTransfer(prometheusPort int) error {
if err != nil { if err != nil {
return err return err
} }
if client.CreateRpcRootNodes(config.GetServiceNames()); err != nil {
return err
}
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()))
msgModel := cache.NewMsgCacheModel(rdb) msgModel := cache.NewMsgCacheModel(rdb)
msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase()) msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase())

@ -348,6 +348,13 @@ type Notification struct {
ConversationSetPrivate NotificationConf `yaml:"conversationSetPrivate"` ConversationSetPrivate NotificationConf `yaml:"conversationSetPrivate"`
} }
func GetServiceNames() []string {
return []string{Config.RpcRegisterName.OpenImUserName, Config.RpcRegisterName.OpenImFriendName, Config.RpcRegisterName.OpenImMsgName,
Config.RpcRegisterName.OpenImPushName, Config.RpcRegisterName.OpenImMessageGatewayName, Config.RpcRegisterName.OpenImGroupName,
Config.RpcRegisterName.OpenImAuthName, Config.RpcRegisterName.OpenImConversationName, Config.RpcRegisterName.OpenImRtcName,
Config.RpcRegisterName.OpenImThirdName}
}
func GetOptionsByNotification(cfg NotificationConf) utils.Options { func GetOptionsByNotification(cfg NotificationConf) utils.Options {
opts := utils.NewOptions() opts := utils.NewOptions()
if cfg.UnreadCount { if cfg.UnreadCount {

@ -20,6 +20,7 @@ type SvcDiscoveryRegistry interface {
Conn Conn
Register(serviceName, host string, port int, opts ...grpc.DialOption) error Register(serviceName, host string, port int, opts ...grpc.DialOption) error
UnRegister() error UnRegister() error
CreateRpcRootNodes(serviceNames []string) error
RegisterConf2Registry(key string, conf []byte) error RegisterConf2Registry(key string, conf []byte) error
GetConfFromRegistry(key string) ([]byte, error) GetConfFromRegistry(key string) ([]byte, error)
} }

@ -10,7 +10,7 @@ import (
func (s *ZkClient) CreateRpcRootNodes(serviceNames []string) error { func (s *ZkClient) CreateRpcRootNodes(serviceNames []string) error {
for _, serviceName := range serviceNames { for _, serviceName := range serviceNames {
if err := s.ensureName(serviceName); err != nil { if err := s.ensureName(serviceName); err != nil && err != zk.ErrNodeExists {
return err return err
} }
} }

@ -26,7 +26,7 @@ func (r *Resolver) ResolveNow(o resolver.ResolveNowOptions) {
} }
r.addrs = newConns r.addrs = newConns
if err := r.cc.UpdateState(resolver.State{Addresses: newConns}); err != nil { if err := r.cc.UpdateState(resolver.State{Addresses: newConns}); err != nil {
log.ZError(context.Background(), "UpdateState error", err, "conns", newConns) log.ZError(context.Background(), "UpdateState error, conns is nil from svr", err, "conns", newConns)
return return
} }
log.ZDebug(context.Background(), "resolve now finished", "target", r.target, "conns", r.addrs) log.ZDebug(context.Background(), "resolve now finished", "target", r.target, "conns", r.addrs)

@ -26,14 +26,17 @@ func Start(rpcPort int, rpcRegisterName string, prometheusPort int, rpcFn func(c
return err return err
} }
defer listener.Close() defer listener.Close()
zkClient, err := openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, client, err := openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema,
openKeeper.WithFreq(time.Hour), openKeeper.WithUserNameAndPassword(config.Config.Zookeeper.UserName, openKeeper.WithFreq(time.Hour), openKeeper.WithUserNameAndPassword(config.Config.Zookeeper.UserName,
config.Config.Zookeeper.Password), openKeeper.WithRoundRobin(), openKeeper.WithTimeout(10), openKeeper.WithLogger(log.NewZkLogger())) config.Config.Zookeeper.Password), openKeeper.WithRoundRobin(), openKeeper.WithTimeout(10), openKeeper.WithLogger(log.NewZkLogger()))
if err != nil { if err != nil {
return utils.Wrap1(err) return utils.Wrap1(err)
} }
defer zkClient.CloseZK() if client.CreateRpcRootNodes(config.GetServiceNames()); err != nil {
zkClient.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) return err
}
defer client.CloseZK()
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()))
registerIP, err := network.GetRpcRegisterIP(config.Config.RpcRegisterIP) registerIP, err := network.GetRpcRegisterIP(config.Config.RpcRegisterIP)
if err != nil { if err != nil {
return err return err
@ -53,11 +56,11 @@ func Start(rpcPort int, rpcRegisterName string, prometheusPort int, rpcFn func(c
} }
srv := grpc.NewServer(options...) srv := grpc.NewServer(options...)
defer srv.GracefulStop() defer srv.GracefulStop()
err = rpcFn(zkClient, srv) err = rpcFn(client, srv)
if err != nil { if err != nil {
return utils.Wrap1(err) return utils.Wrap1(err)
} }
err = zkClient.Register(rpcRegisterName, registerIP, rpcPort, grpc.WithTransportCredentials(insecure.NewCredentials())) err = client.Register(rpcRegisterName, registerIP, rpcPort, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil { if err != nil {
return utils.Wrap1(err) return utils.Wrap1(err)
} }

Loading…
Cancel
Save