diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 425bdb369..34712c609 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -112,18 +112,29 @@ type rpcAuth struct { func NewRpcAuthServer(port int) *rpcAuth { log.NewPrivateLog(constant.LogFileName) - return &rpcAuth{ + s := &rpcAuth{ rpcPort: port, rpcRegisterName: config.Config.RpcRegisterName.OpenImAuthName, } + + zkClient, err := openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, 10, "", "") + if err != nil { + panic(err.Error()) + } + registerIP, err := network.GetRpcRegisterIP(config.Config.RpcRegisterIP) + err = zkClient.Register(s.rpcRegisterName, registerIP, s.rpcPort) + if err != nil { + panic(err.Error()) + } + s.registerCenter = zkClient + return s + } func (s *rpcAuth) Run() { operationID := utils.OperationIDGenerator() log.NewInfo(operationID, "rpc auth start...") - - listenIP := network.GetListenIP(config.Config.ListenIP) - address := listenIP + ":" + strconv.Itoa(s.rpcPort) + address := network.GetListenIP(config.Config.ListenIP) + ":" + strconv.Itoa(s.rpcPort) listener, err := net.Listen("tcp", address) if err != nil { panic("listening err:" + err.Error() + s.rpcRegisterName) @@ -148,16 +159,6 @@ func (s *rpcAuth) Run() { //service registers with etcd pbAuth.RegisterAuthServer(srv, s) - zkClient, err := openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, 10, "", "") - if err != nil { - panic(err.Error()) - } - registerIP, err := network.GetRpcRegisterIP(config.Config.RpcRegisterIP) - err = zkClient.Register(config.Config.RpcRegisterName.OpenImGroupName, registerIP, s.rpcPort) - if err != nil { - panic(err.Error()) - } - s.registerCenter = zkClient log.NewInfo(operationID, "RegisterAuthServer ok ", s.etcdSchema, strings.Join(s.etcdAddr, ","), registerIP, s.rpcPort, s.rpcRegisterName) err = srv.Serve(listener) if err != nil { diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index 7dcd88e18..3a4860237 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -1,7 +1,9 @@ package friend import ( + "Open_IM/internal/common/check" "Open_IM/internal/common/convert" + "Open_IM/internal/common/network" chat "Open_IM/internal/rpc/msg" "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" @@ -13,18 +15,15 @@ import ( promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/common/token_verify" "Open_IM/pkg/common/tracelog" + discoveryRegistry "Open_IM/pkg/discovery_registry" pbFriend "Open_IM/pkg/proto/friend" - pbUser "Open_IM/pkg/proto/user" "Open_IM/pkg/utils" "context" + "github.com/OpenIMSDK/openKeeper" grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "google.golang.org/grpc" "net" "strconv" - "strings" - - "Open_IM/internal/common/check" - "github.com/OpenIMSDK/getcdv3" - "google.golang.org/grpc" ) type friendServer struct { @@ -35,7 +34,7 @@ type friendServer struct { controller.FriendInterface controller.BlackInterface - userRpc pbUser.UserClient + registerCenter discoveryRegistry.SvcDiscoveryRegistry } func NewFriendServer(port int) *friendServer { @@ -43,22 +42,18 @@ func NewFriendServer(port int) *friendServer { f := friendServer{ rpcPort: port, rpcRegisterName: config.Config.RpcRegisterName.OpenImFriendName, - etcdSchema: config.Config.Etcd.EtcdSchema, - etcdAddr: config.Config.Etcd.EtcdAddr, } - ttl := 10 - etcdClient, err := getcdv3.NewEtcdConn(config.Config.Etcd.EtcdSchema, strings.Join(f.etcdAddr, ","), config.Config.RpcRegisterIP, config.Config.Etcd.UserName, config.Config.Etcd.Password, port, ttl) + + zkClient, err := openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, 10, "", "") if err != nil { - panic("NewEtcdConn failed" + err.Error()) + panic(err.Error()) } - err = etcdClient.RegisterEtcd("", f.rpcRegisterName) + registerIP, err := network.GetRpcRegisterIP(config.Config.RpcRegisterIP) + err = zkClient.Register(f.rpcRegisterName, registerIP, f.rpcPort) if err != nil { - panic("NewEtcdConn failed" + err.Error()) + panic(err.Error()) } - - etcdClient.SetDefaultEtcdConfig(config.Config.RpcRegisterName.OpenImUserName, config.Config.RpcPort.OpenImUserPort) - conn := etcdClient.GetConn("", config.Config.RpcRegisterName.OpenImUserName) - f.userRpc = pbUser.NewUserClient(conn) + f.registerCenter = zkClient //mysql init var mysql relation.Mysql @@ -87,22 +82,16 @@ func NewFriendServer(port int) *friendServer { } func (s *friendServer) Run() { - log.NewInfo("0", "friendServer run...") - - listenIP := "" - if config.Config.ListenIP == "" { - listenIP = "0.0.0.0" - } else { - listenIP = config.Config.ListenIP - } - address := listenIP + ":" + strconv.Itoa(s.rpcPort) + operationID := utils.OperationIDGenerator() + log.NewInfo(operationID, "friendServer run...") + address := network.GetListenIP(config.Config.ListenIP) + ":" + strconv.Itoa(s.rpcPort) //listener network listener, err := net.Listen("tcp", address) if err != nil { panic("listening err:" + err.Error() + s.rpcRegisterName) } - log.NewInfo("0", "listen ok ", address) + log.NewInfo(operationID, "listen ok ", address) defer listener.Close() //grpc server var grpcOpts []grpc.ServerOption @@ -122,7 +111,7 @@ func (s *friendServer) Run() { pbFriend.RegisterFriendServer(srv, s) err = srv.Serve(listener) if err != nil { - log.NewError("0", "Serve failed ", err.Error(), listener) + log.NewError(operationID, "Serve failed ", err.Error(), listener) return } } @@ -240,7 +229,7 @@ func (s *friendServer) SetFriendRemark(ctx context.Context, req *pbFriend.SetFri } // ok -func (s *friendServer) GetDesignatedFriendsReq(ctx context.Context, req *pbFriend.GetDesignatedFriendsReq) (resp *pbFriend.GetDesignatedFriendsResp, err error) { +func (s *friendServer) GetDesignatedFriends(ctx context.Context, req *pbFriend.GetDesignatedFriendsReq) (resp *pbFriend.GetDesignatedFriendsResp, err error) { resp = &pbFriend.GetDesignatedFriendsResp{} if err := check.Access(ctx, req.UserID); err != nil { return nil, err diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index f9d52a888..b97096ec3 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -80,20 +80,15 @@ func NewGroupServer(port int) *groupServer { } func (s *groupServer) Run() { - log.NewInfo("", "group rpc start ") - listenIP := "" - if config.Config.ListenIP == "" { - listenIP = "0.0.0.0" - } else { - listenIP = config.Config.ListenIP - } - address := listenIP + ":" + strconv.Itoa(s.rpcPort) + operationID := utils.OperationIDGenerator() + log.NewInfo(operationID, "group rpc start ") + address := network.GetListenIP(config.Config.ListenIP) + ":" + strconv.Itoa(s.rpcPort) //listener network listener, err := net.Listen("tcp", address) if err != nil { panic("listening err:" + err.Error() + s.rpcRegisterName) } - log.NewInfo("", "listen network success, ", address, listener) + log.NewInfo(operationID, "listen network success, ", address, listener) defer listener.Close() //grpc server @@ -120,10 +115,10 @@ func (s *groupServer) Run() { pbGroup.RegisterGroupServer(srv, s) err = srv.Serve(listener) if err != nil { - log.NewError("", "Serve failed ", err.Error()) + log.NewError(operationID, "Serve failed ", err.Error()) return } - log.NewInfo("", "group rpc success") + log.NewInfo(operationID, "group rpc success") } func (s *groupServer) CheckGroupAdmin(ctx context.Context, groupID string) error { diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 5849bf05f..c5e48dfd7 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -2,6 +2,7 @@ package user import ( "Open_IM/internal/common/convert" + "Open_IM/internal/common/network" chat "Open_IM/internal/rpc/msg" "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" @@ -12,10 +13,12 @@ import ( promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/common/token_verify" "Open_IM/pkg/common/tracelog" + discoveryRegistry "Open_IM/pkg/discovery_registry" server_api_params "Open_IM/pkg/proto/sdk_ws" pbUser "Open_IM/pkg/proto/user" "Open_IM/pkg/utils" "context" + "github.com/OpenIMSDK/openKeeper" "net" "strconv" "strings" @@ -31,6 +34,7 @@ type userServer struct { etcdSchema string etcdAddr []string controller.UserInterface + registerCenter discoveryRegistry.SvcDiscoveryRegistry } func NewUserServer(port int) *userServer { @@ -38,9 +42,19 @@ func NewUserServer(port int) *userServer { u := userServer{ rpcPort: port, rpcRegisterName: config.Config.RpcRegisterName.OpenImUserName, - etcdSchema: config.Config.Etcd.EtcdSchema, - etcdAddr: config.Config.Etcd.EtcdAddr, } + + zkClient, err := openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, 10, "", "") + if err != nil { + panic(err.Error()) + } + registerIP, err := network.GetRpcRegisterIP(config.Config.RpcRegisterIP) + err = zkClient.Register(u.rpcRegisterName, registerIP, u.rpcPort) + if err != nil { + panic(err.Error()) + } + u.registerCenter = zkClient + //mysql init var mysql relation.Mysql var model relation.UserGorm @@ -59,14 +73,7 @@ func NewUserServer(port int) *userServer { func (s *userServer) Run() { log.NewInfo("", "rpc user start...") - - listenIP := "" - if config.Config.ListenIP == "" { - listenIP = "0.0.0.0" - } else { - listenIP = config.Config.ListenIP - } - address := listenIP + ":" + strconv.Itoa(s.rpcPort) + address := network.GetListenIP(config.Config.ListenIP) + ":" + strconv.Itoa(s.rpcPort) //listener network listener, err := net.Listen("tcp", address)