From 048d4b83e125adf15f637b4696e7d85d8cd16e19 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Mon, 13 Feb 2023 17:44:22 +0800 Subject: [PATCH] rpc start --- cmd/rpc/group/main.go | 34 ++--- internal/rpc/group/group.go | 206 ++++++++++++++++-------------- internal/startrpc/start.go | 66 ++++++++++ pkg/common/db/cache/redis.go | 6 + pkg/common/db/unrelation/mongo.go | 6 + 5 files changed, 206 insertions(+), 112 deletions(-) create mode 100644 internal/startrpc/start.go diff --git a/cmd/rpc/group/main.go b/cmd/rpc/group/main.go index 8a5f26cfd..ad9ba151c 100644 --- a/cmd/rpc/group/main.go +++ b/cmd/rpc/group/main.go @@ -2,25 +2,25 @@ package main import ( "Open_IM/internal/rpc/group" + "Open_IM/internal/startrpc" "Open_IM/pkg/common/config" - "Open_IM/pkg/common/constant" - promePkg "Open_IM/pkg/common/prometheus" - "flag" - "fmt" ) func main() { - defaultPorts := config.Config.RpcPort.OpenImGroupPort - rpcPort := flag.Int("port", defaultPorts[0], "get RpcGroupPort from cmd,default 16000 as port") - prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.GroupPrometheusPort[0], "groupPrometheusPort default listen port") - flag.Parse() - fmt.Println("start group rpc server, port: ", *rpcPort, ", OpenIM version: ", constant.CurrentVersion, "\n") - rpcServer := group.NewGroupServer(*rpcPort) - go func() { - err := promePkg.StartPromeSrv(*prometheusPort) - if err != nil { - panic(err) - } - }() - rpcServer.Run() + //defaultPorts := config.Config.RpcPort.OpenImGroupPort + //rpcPort := flag.Int("port", defaultPorts[0], "get RpcGroupPort from cmd,default 16000 as port") + //prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.GroupPrometheusPort[0], "groupPrometheusPort default listen port") + //flag.Parse() + //fmt.Println("start group rpc server, port: ", *rpcPort, ", OpenIM version: ", constant.CurrentVersion, "\n") + //rpcServer := group.NewGroupServer(*rpcPort) + //go func() { + // err := promePkg.StartPromeSrv(*prometheusPort) + // if err != nil { + // panic(err) + // } + //}() + //rpcServer.Run() + + startrpc.StartRpc(config.Config.RpcPort.OpenImGroupPort[0], config.Config.RpcRegisterName.OpenImGroupName, config.Config.Prometheus.GroupPrometheusPort[0], group.Start) + } diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 397c5fc63..85119fd82 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -2,18 +2,13 @@ package group import ( "Open_IM/internal/common/check" - "Open_IM/internal/common/network" chat "Open_IM/internal/rpc/msg" - "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db/cache" "Open_IM/pkg/common/db/controller" "Open_IM/pkg/common/db/relation" relationTb "Open_IM/pkg/common/db/table/relation" "Open_IM/pkg/common/db/unrelation" - "Open_IM/pkg/common/log" - "Open_IM/pkg/common/middleware" - promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/common/tokenverify" "Open_IM/pkg/common/tracelog" discoveryRegistry "Open_IM/pkg/discoveryregistry" @@ -22,107 +17,128 @@ import ( "Open_IM/pkg/utils" "context" "fmt" - grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "google.golang.org/grpc" "gorm.io/gorm" "math/big" "math/rand" - "net" "strconv" "strings" "time" - - "github.com/OpenIMSDK/openKeeper" - "google.golang.org/grpc" ) -type groupServer struct { - rpcPort int - rpcRegisterName string - schema string - zkAddr []string - GroupInterface controller.GroupInterface - registerCenter discoveryRegistry.SvcDiscoveryRegistry - user *check.UserCheck +func Start(server *grpc.Server) { + //err := mysql.InitConn().AutoMigrateModel(&groupModel) + //if err != nil { + // panic("db init err:" + err.Error()) + //} + //mongo.InitMongo() + //redis.InitRedis() + pbGroup.RegisterGroupServer(server, &groupServer{ + GroupInterface: controller.NewGroupInterface(nil, cache.NewRedis().GetClient(), unrelation.NewMongo().GetClient()), + registerCenter: nil, + user: check.NewUserCheck(), + }) } -func NewGroupServer(port int) *groupServer { - log.NewPrivateLog(constant.LogFileName) - g := groupServer{ - rpcPort: port, - rpcRegisterName: config.Config.RpcRegisterName.OpenImGroupName, - schema: config.Config.Zookeeper.Schema, - zkAddr: config.Config.Zookeeper.ZkAddr, - } - //mysql init - var mysql relation.Mysql - var mongo unrelation.Mongo - var groupModel relationTb.GroupModel - var redis cache.RedisClient - err := mysql.InitConn().AutoMigrateModel(&groupModel) - if err != nil { - panic("db init err:" + err.Error()) - } - mongo.InitMongo() - redis.InitRedis() - mongo.CreateSuperGroupIndex() - zkClient, err := openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, 10, "", "") - if err != nil { - panic(err.Error()) - } - registerIP, err := network.GetRpcRegisterIP(config.Config.RpcRegisterIP) - g.registerCenter = zkClient - err = g.registerCenter.Register(config.Config.RpcRegisterName.OpenImGroupName, registerIP, port) - if err != nil { - panic(err.Error()) - } - - //conns, err := g.registerCenter.GetConns(config.Config.RpcRegisterName.OpenImConversationName) - g.GroupInterface = controller.NewGroupInterface(mysql.GormConn(), redis.GetClient(), mongo.GetClient()) - g.user = check.NewUserCheck() - return &g +type groupServer struct { + //rpcPort int + //rpcRegisterName string + //schema string + //zkAddr []string + GroupInterface controller.GroupInterface + registerCenter discoveryRegistry.SvcDiscoveryRegistry + user *check.UserCheck } -func (s *groupServer) Run() { - operationID := utils.OperationIDGenerator() - log.NewInfo(operationID, "group rpc start ") - address := network.GetListenIP(config.Config.ListenIP) + ":" + strconv.Itoa(s.rpcPort) - //listener network - listener, err := net.Listen("tcp", address) - if err != nil { - panic("listening err:" + err.Error() + s.rpcRegisterName) - } - log.NewInfo(operationID, "listen network success, ", address, listener) - - defer listener.Close() - //grpc server - recvSize := 1024 * 1024 * constant.GroupRPCRecvSize - sendSize := 1024 * 1024 * constant.GroupRPCSendSize - var grpcOpts = []grpc.ServerOption{ - grpc.MaxRecvMsgSize(recvSize), - grpc.MaxSendMsgSize(sendSize), - grpc.UnaryInterceptor(middleware.RpcServerInterceptor), - } - if config.Config.Prometheus.Enable { - promePkg.NewGrpcRequestCounter() - promePkg.NewGrpcRequestFailedCounter() - promePkg.NewGrpcRequestSuccessCounter() - grpcOpts = append(grpcOpts, []grpc.ServerOption{ - // grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme), - grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor), - grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor), - }...) - } - srv := grpc.NewServer(grpcOpts...) - defer srv.GracefulStop() - //Service registers with etcd - pbGroup.RegisterGroupServer(srv, s) - err = srv.Serve(listener) - if err != nil { - log.NewError(operationID, "Serve failed ", err.Error()) - return - } - log.NewInfo(operationID, "group rpc success") -} +// +//type groupServer struct { +// rpcPort int +// rpcRegisterName string +// schema string +// zkAddr []string +// GroupInterface controller.GroupInterface +// registerCenter discoveryRegistry.SvcDiscoveryRegistry +// user *check.UserCheck +//} +// +//func NewGroupServer(port int) *groupServer { +// log.NewPrivateLog(constant.LogFileName) +// g := groupServer{ +// rpcPort: port, +// rpcRegisterName: config.Config.RpcRegisterName.OpenImGroupName, +// schema: config.Config.Zookeeper.Schema, +// zkAddr: config.Config.Zookeeper.ZkAddr, +// } +// //mysql init +// var mysql relation.Mysql +// var mongo unrelation.Mongo +// var groupModel relationTb.GroupModel +// var redis cache.RedisClient +// err := mysql.InitConn().AutoMigrateModel(&groupModel) +// if err != nil { +// panic("db init err:" + err.Error()) +// } +// mongo.InitMongo() +// redis.InitRedis() +// mongo.CreateSuperGroupIndex() +// zkClient, err := openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, 10, "", "") +// if err != nil { +// panic(err.Error()) +// } +// registerIP, err := network.GetRpcRegisterIP(config.Config.RpcRegisterIP) +// g.registerCenter = zkClient +// err = g.registerCenter.Register(config.Config.RpcRegisterName.OpenImGroupName, registerIP, port) +// if err != nil { +// panic(err.Error()) +// } +// +// //conns, err := g.registerCenter.GetConns(config.Config.RpcRegisterName.OpenImConversationName) +// g.GroupInterface = controller.NewGroupInterface(mysql.GormConn(), redis.GetClient(), mongo.GetClient()) +// g.user = check.NewUserCheck() +// return &g +//} +// +//func (s *groupServer) Run() { +// operationID := utils.OperationIDGenerator() +// log.NewInfo(operationID, "group rpc start ") +// address := network.GetListenIP(config.Config.ListenIP) + ":" + strconv.Itoa(s.rpcPort) +// //listener network +// listener, err := net.Listen("tcp", address) +// if err != nil { +// panic("listening err:" + err.Error() + s.rpcRegisterName) +// } +// log.NewInfo(operationID, "listen network success, ", address, listener) +// +// defer listener.Close() +// //grpc server +// recvSize := 1024 * 1024 * constant.GroupRPCRecvSize +// sendSize := 1024 * 1024 * constant.GroupRPCSendSize +// var grpcOpts = []grpc.ServerOption{ +// grpc.MaxRecvMsgSize(recvSize), +// grpc.MaxSendMsgSize(sendSize), +// grpc.UnaryInterceptor(middleware.RpcServerInterceptor), +// } +// if config.Config.Prometheus.Enable { +// promePkg.NewGrpcRequestCounter() +// promePkg.NewGrpcRequestFailedCounter() +// promePkg.NewGrpcRequestSuccessCounter() +// grpcOpts = append(grpcOpts, []grpc.ServerOption{ +// // grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme), +// grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor), +// grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor), +// }...) +// } +// srv := grpc.NewServer(grpcOpts...) +// defer srv.GracefulStop() +// //Service registers with etcd +// pbGroup.RegisterGroupServer(srv, s) +// err = srv.Serve(listener) +// if err != nil { +// log.NewError(operationID, "Serve failed ", err.Error()) +// return +// } +// log.NewInfo(operationID, "group rpc success") +//} func (s *groupServer) CheckGroupAdmin(ctx context.Context, groupID string) error { if !tokenverify.IsAppManagerUid(ctx) { @@ -145,7 +161,7 @@ func (s *groupServer) GetUsernameMap(ctx context.Context, userIDs []string, comp if err != nil { return nil, err } - return utils.SliceToMapAny(users, func(e *open_im_sdk.PublicUserInfo) (string, string) { + return utils.SliceToMapAny(users, func(e *sdkws.PublicUserInfo) (string, string) { return e.UserID, e.Nickname }), nil } diff --git a/internal/startrpc/start.go b/internal/startrpc/start.go new file mode 100644 index 000000000..841ee3cbc --- /dev/null +++ b/internal/startrpc/start.go @@ -0,0 +1,66 @@ +package startrpc + +import ( + "Open_IM/internal/common/network" + "Open_IM/pkg/common/config" + "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/log" + "Open_IM/pkg/common/middleware" + promePkg "Open_IM/pkg/common/prometheus" + "flag" + "fmt" + "github.com/OpenIMSDK/openKeeper" + grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "google.golang.org/grpc" + "net" +) + +func StartRpc(rpcPort int, rpcRegisterName string, prometheusPort int, fn func(server *grpc.Server), options ...grpc.ServerOption) { + flagRpcPort := flag.Int("port", rpcPort, "get RpcGroupPort from cmd,default 16000 as port") + flagPrometheusPort := flag.Int("prometheus_port", prometheusPort, "groupPrometheusPort default listen port") + flag.Parse() + rpcPort = *flagRpcPort + prometheusPort = *flagPrometheusPort + fmt.Println("start group rpc server, port: ", rpcPort, ", OpenIM version: ", constant.CurrentVersion) + log.NewPrivateLog(constant.LogFileName) + listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", config.Config.ListenIP, rpcPort)) + if err != nil { + panic(err) + } + defer listener.Close() + zkClient, err := openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, 10, "", "") + if err != nil { + panic(err.Error()) + } + registerIP, err := network.GetRpcRegisterIP(config.Config.RpcRegisterIP) + if err != nil { + panic(err) + } + options = append(options, grpc.UnaryInterceptor(middleware.RpcServerInterceptor)) // ctx 中间件 + if config.Config.Prometheus.Enable { + promePkg.NewGrpcRequestCounter() + promePkg.NewGrpcRequestFailedCounter() + promePkg.NewGrpcRequestSuccessCounter() + options = append(options, []grpc.ServerOption{ + // grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme), + grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor), + grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor), + }...) + } + srv := grpc.NewServer(options...) + defer srv.GracefulStop() + fn(srv) + err = zkClient.Register(rpcRegisterName, registerIP, rpcPort) + if err != nil { + panic(err.Error()) + } + if config.Config.Prometheus.Enable { + err := promePkg.StartPromeSrv(prometheusPort) + if err != nil { + panic(err) + } + } + if err := srv.Serve(listener); err != nil { + panic(err) + } +} diff --git a/pkg/common/db/cache/redis.go b/pkg/common/db/cache/redis.go index 82523eb0f..fbafdd83e 100644 --- a/pkg/common/db/cache/redis.go +++ b/pkg/common/db/cache/redis.go @@ -50,6 +50,12 @@ type Cache interface { // native redis operate +func NewRedis() *RedisClient { + o := &RedisClient{} + o.InitRedis() + return o +} + type RedisClient struct { rdb redis.UniversalClient } diff --git a/pkg/common/db/unrelation/mongo.go b/pkg/common/db/unrelation/mongo.go index 75ae8c1b5..7500214ae 100644 --- a/pkg/common/db/unrelation/mongo.go +++ b/pkg/common/db/unrelation/mongo.go @@ -13,6 +13,12 @@ import ( "time" ) +func NewMongo() *Mongo { + mgo := &Mongo{} + mgo.InitMongo() + return mgo +} + type Mongo struct { db *mongo.Client }