package msg import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/kafka" "Open_IM/pkg/common/log" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbChat "Open_IM/pkg/proto/chat" "Open_IM/pkg/utils" "google.golang.org/grpc" "net" "strconv" "strings" ) type rpcChat struct { rpcPort int rpcRegisterName string etcdSchema string etcdAddr []string producer *kafka.Producer } func NewRpcChatServer(port int) *rpcChat { log.NewPrivateLog(constant.LogFileName) rc := rpcChat{ rpcPort: port, rpcRegisterName: config.Config.RpcRegisterName.OpenImOfflineMessageName, etcdSchema: config.Config.Etcd.EtcdSchema, etcdAddr: config.Config.Etcd.EtcdAddr, } rc.producer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic) return &rc } func (rpc *rpcChat) Run() { log.Info("", "rpcChat init...") listenIP := "" if config.Config.ListenIP == "" { listenIP = "0.0.0.0" } else { listenIP = config.Config.ListenIP } address := listenIP + ":" + strconv.Itoa(rpc.rpcPort) listener, err := net.Listen("tcp", address) if err != nil { panic("listening err:" + err.Error() + rpc.rpcRegisterName) } log.Info("", "listen network success, address ", address) srv := grpc.NewServer() defer srv.GracefulStop() rpcRegisterIP := "" pbChat.RegisterChatServer(srv, rpc) if config.Config.RpcRegisterIP == "" { rpcRegisterIP, err = utils.GetLocalIP() if err != nil { log.Error("", "GetLocalIP failed ", err.Error()) } } err = getcdv3.RegisterEtcd(rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), rpcRegisterIP, rpc.rpcPort, rpc.rpcRegisterName, 10) if err != nil { log.Error("", "register rpcChat to etcd failed ", err.Error()) return } err = srv.Serve(listener) if err != nil { log.Error("", "rpc rpcChat failed ", err.Error()) return } log.Info("", "rpc rpcChat init success") }