学习---自定义rpc框架

pull/254/head
xjs 4 years ago
parent ff4379a520
commit 47e5fef626

@ -0,0 +1,39 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>custom-rpc</artifactId>
<groupId>com.xjs</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<name>自定义RPC框架-API</name>
<artifactId>lg-rpc-api</artifactId>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!--netty依赖 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<!--json依赖 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.73</version>
</dependency>
<!--lombok依赖 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
</dependency>
</dependencies>
</project>

@ -0,0 +1,17 @@
package com.lagou.rpc.api;
import com.lagou.rpc.pojo.User;
/**
*
*/
public interface IUserService {
/**
* ID
*
* @param id
* @return
*/
User getById(int id);
}

@ -0,0 +1,34 @@
package com.lagou.rpc.common;
import lombok.Data;
import java.util.Arrays;
/**
*
*/
@Data
public class RpcRequest {
/**
* ID
*/
private String requestId;
/**
*
*/
private String className;
/**
*
*/
private String methodName;
/**
*
*/
private Class<?>[] parameterTypes;
/**
*
*/
private Object[] parameters;
}

@ -0,0 +1,26 @@
package com.lagou.rpc.common;
import lombok.Data;
/**
*
*/
@Data
public class RpcResponse {
/**
* ID
*/
private String requestId;
/**
*
*/
private String error;
/**
*
*/
private Object result;
}

@ -0,0 +1,9 @@
package com.lagou.rpc.pojo;
import lombok.Data;
@Data
public class User {
private int id;
private String name;
}

@ -0,0 +1,27 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>custom-rpc</artifactId>
<groupId>com.xjs</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<name>自定义RPC框架-消费者</name>
<artifactId>lg-rpc-consumer</artifactId>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.xjs</groupId>
<artifactId>lg-rpc-api</artifactId>
<version>1.0</version>
</dependency>
</dependencies>
</project>

@ -0,0 +1,23 @@
package com.lagou.rpc.consumer;
import com.lagou.rpc.api.IUserService;
import com.lagou.rpc.consumer.proxy.RpcClientProxy;
import com.lagou.rpc.pojo.User;
/**
*
* @author xiejs
* @since 2022-03-10
*/
public class ClientBootStrap {
public static void main(String[] args) {
IUserService userService = (IUserService) RpcClientProxy.createProxy(IUserService.class);
User user = userService.getById(1);
System.out.println(user);
}
}

@ -0,0 +1,127 @@
package com.lagou.rpc.consumer.client;
import com.lagou.rpc.consumer.handler.RpcClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
*
* <ol>
* <li>Netty</li>
* <li></li>
* <li></li>
* </ol>
*
* @author xiejs
* @since 2022-03-10
*/
@Data
@NoArgsConstructor
public class RpcClient {
private String ip;
private Integer port;
private NioEventLoopGroup group;
private Channel channel;
private RpcClientHandler clientHandler = new RpcClientHandler();
private ExecutorService executor = Executors.newCachedThreadPool();
public RpcClient(String ip, Integer port) {
this.ip = ip;
this.port = port;
this.initClient();
}
/**
* -Netty
*/
public void initClient() {
try {
//1、创建线程组
group = new NioEventLoopGroup();
//2、创建启动助手
Bootstrap bootstrap = new Bootstrap();
//3、设置参数
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//String类型编解码器
pipeline.addLast(new StringEncoder());
pipeline.addLast(new StringDecoder());
//添加客户端处理类
pipeline.addLast(clientHandler);
}
});
//连接netty服务端
channel = bootstrap.connect(ip, port).sync().channel();
} catch (Exception e) {
e.printStackTrace();
if (channel != null) {
channel.close();
}
if (group != null) {
group.shutdownGracefully();
}
}
}
/**
*
*/
public void close() {
if (channel != null) {
channel.close();
}
if (group != null) {
group.shutdownGracefully();
}
}
/**
*
*
* @return
*/
public Object send(String msg) throws ExecutionException, InterruptedException {
clientHandler.setRequestMsg(msg);
Future future = executor.submit(clientHandler);
return future.get();
}
}

@ -0,0 +1,73 @@
package com.lagou.rpc.consumer.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.Data;
import org.springframework.stereotype.Component;
import java.util.concurrent.Callable;
/**
*
* <ol>
* <li></li>
* <li></li>
* </ol>
* @author xiejs
* @since 2022-03-10
*/
@Data
@Component
public class RpcClientHandler extends SimpleChannelInboundHandler<String> implements Callable {
private ChannelHandlerContext ctx;
/**
*
*/
String requestMsg;
String responseMsg;
/**
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
}
/**
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
protected synchronized void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
responseMsg = msg;
//唤醒等待的线程
notify();
}
@Override
public synchronized Object call() throws Exception {
//消息发送
ctx.writeAndFlush(requestMsg);
//线程等待
wait();
return responseMsg;
}
}

@ -0,0 +1,68 @@
package com.lagou.rpc.consumer.proxy;
import com.alibaba.fastjson.JSON;
import com.lagou.rpc.common.RpcRequest;
import com.lagou.rpc.common.RpcResponse;
import com.lagou.rpc.consumer.client.RpcClient;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.UUID;
/**
* -
* <ol>
* <li>request</li>
* <li>RpcClient</li>
* <li></li>
* <li></li>
* </ol>
*
* @author xiejs
* @since 2022-03-10
*/
public class RpcClientProxy {
public static Object createProxy(Class serviceClass) {
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class[]{serviceClass},
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//1、封装request对象
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setRequestId(UUID.randomUUID().toString());
rpcRequest.setClassName(method.getDeclaringClass().getName());
rpcRequest.setParameterTypes(method.getParameterTypes());
rpcRequest.setMethodName(method.getName());
rpcRequest.setParameters(args);
//2、封装RpcClient对象
RpcClient rpcClient = new RpcClient("127.0.0.1", 8887);
try {
//3、发送消息
Object responseMsg = rpcClient.send(JSON.toJSONString(rpcRequest));
RpcResponse rpcResponse = JSON.parseObject(responseMsg.toString(), RpcResponse.class);
if (rpcResponse.getError() != null) {
throw new RuntimeException(rpcResponse.getError());
}
//4、返回结构
Object result = rpcResponse.getResult();
return JSON.parseObject(result.toString(), method.getReturnType());
} catch (Exception e) {
throw e;
} finally {
rpcClient.close();
}
}
}
);
}
}

@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>custom-rpc</artifactId>
<groupId>com.xjs</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<name>自定义RPC框架-生产者</name>
<artifactId>lg-rpc-provider</artifactId>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.xjs</groupId>
<artifactId>lg-rpc-api</artifactId>
<version>1.0</version>
</dependency>
<!--spring相关依赖 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
</dependencies>
</project>

@ -0,0 +1,32 @@
package com.lagou.rpc.provider;
import com.lagou.rpc.provider.server.RpcServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author xiejs
* @since 2022-03-09
*/
@SpringBootApplication
public class ServerBootstrapApplication implements CommandLineRunner {
@Autowired
private RpcServer rpcServer;
public static void main(String[] args) {
SpringApplication.run(ServerBootstrapApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
new Thread(new Runnable() {
@Override
public void run() {
rpcServer.startServer("127.0.0.1",8887);
}
}).start();
}
}

@ -0,0 +1,18 @@
package com.lagou.rpc.provider.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
*
* @author xiejs
* @since 2022-03-09
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcService {
}

@ -0,0 +1,120 @@
package com.lagou.rpc.provider.handler;
import com.alibaba.fastjson.JSON;
import com.lagou.rpc.common.RpcRequest;
import com.lagou.rpc.common.RpcResponse;
import com.lagou.rpc.provider.annotation.RpcService;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.springframework.beans.BeansException;
import org.springframework.cglib.reflect.FastClass;
import org.springframework.cglib.reflect.FastMethod;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* <p>
*
* </p>
*
* <ul>
* <li>@RpcServicebean</li>
* <li></li>
* <li>beanNamebean</li>
* <li> </li>
* <li>bean</li>
* <li></li>
* </ul>
*
* @author xiejs
* @since 2022-03-09
*/
@Component
@ChannelHandler.Sharable
public class RpcServerHandler extends SimpleChannelInboundHandler<String> implements ApplicationContextAware {
private static final Map SERVICE_INSTANCE_MAP = new ConcurrentHashMap();
/**
*
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
//1、接收客户端请求--将msg转化RpcRequest对象
RpcRequest request = JSON.parseObject(msg, RpcRequest.class);
RpcResponse response = new RpcResponse();
response.setRequestId(request.getRequestId());
try {
response.setResult(handler(request));
} catch (Exception e) {
response.setError(e.getMessage());
e.printStackTrace();
}
//给客户端进行响应
ctx.writeAndFlush(JSON.toJSONString(response));
}
/**
*
*
* @return obj
*/
private Object handler(RpcRequest request) throws InvocationTargetException {
//根据传递过来的beanName从缓存中查找到对应的bean
Object serviceBean = SERVICE_INSTANCE_MAP.get(request.getClassName());
if (serviceBean == null) {
throw new RuntimeException("根据beanName找不到服务beanName" + request.getClassName());
}
//解析请求中的方法名称,参数类型 参数信息
Class<?> serviceBeanClass = serviceBean.getClass();
String methodName = request.getMethodName();
Class<?>[] parameterTypes = request.getParameterTypes();
Object[] parameters = request.getParameters();
//反射调用bean的方法
FastClass fastClass = FastClass.create(serviceBeanClass);
FastMethod method = fastClass.getMethod(methodName, parameterTypes);
return method.invoke(serviceBean, parameters);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, Object> serviceMap = applicationContext.getBeansWithAnnotation(RpcService.class);
if (serviceMap != null && serviceMap.size() > 0) {
Set<Map.Entry<String, Object>> entries = serviceMap.entrySet();
for (Map.Entry<String, Object> item : entries) {
Object serviceBean = item.getValue();
if (serviceBean.getClass().getInterfaces().length == 0) {
throw new RuntimeException("服务必须实现接口");
}
//默认取第一个接口作为缓存bean的名称
String name = serviceBean.getClass().getInterfaces()[0].getName();
SERVICE_INSTANCE_MAP.put(name, serviceBean);
}
}
}
}

@ -0,0 +1,89 @@
package com.lagou.rpc.provider.server;
import com.lagou.rpc.provider.handler.RpcServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* netty
*
* @author xiejs
* @since 2022-03-09
*/
@Component
public class RpcServer implements DisposableBean {
@Autowired
private RpcServerHandler serverHandler;
private NioEventLoopGroup bossGroup;
private NioEventLoopGroup workerGroup;
public void startServer(String ip, Integer port) {
try {
//1、创建线程组
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
//2、创建服务端启动助手
ServerBootstrap bootstrap = new ServerBootstrap();
//3、设置参数
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//添加String的编解码器
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
//添加业务处理类
pipeline.addLast(serverHandler);
}
});
//4、绑定端口
ChannelFuture future = bootstrap.bind(ip, port).sync();
System.out.println("--------服务端启动成功----------");
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
*
*
* @throws Exception
*/
@Override
public void destroy() throws Exception {
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
}
}

@ -0,0 +1,30 @@
package com.lagou.rpc.provider.service;
import com.lagou.rpc.api.IUserService;
import com.lagou.rpc.pojo.User;
import com.lagou.rpc.provider.annotation.RpcService;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
@Service
@RpcService
public class UserServiceImpl implements IUserService {
Map<Object, User> userMap = new HashMap();
@Override
public User getById(int id) {
if (userMap.size() == 0) {
User user1 = new User();
user1.setId(1);
user1.setName("张三");
User user2 = new User();
user2.setId(2);
user2.setName("李四");
userMap.put(user1.getId(), user1);
userMap.put(user2.getId(), user2);
}
return userMap.get(id);
}
}

@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>netty-project</artifactId>
<groupId>com.xjs</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<name>自定义RPC框架</name>
<artifactId>custom-rpc</artifactId>
<packaging>pom</packaging>
<modules>
<module>lg-rpc-consumer</module>
<module>lg-rpc-provider</module>
<module>lg-rpc-api</module>
</modules>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<curator.version>4.3.0</curator.version>
</properties>
</project>

@ -4,13 +4,12 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>netty-project</artifactId>
<groupId>com.ruoyi</groupId>
<version>3.3.0</version>
<groupId>com.xjs</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<name>Netty聊天室网页版</name>
<groupId>com.xjs</groupId>
<artifactId>netty-springboot</artifactId>
<properties>
@ -24,12 +23,6 @@
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>

@ -4,14 +4,15 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>xjs-study</artifactId>
<groupId>com.ruoyi</groupId>
<version>3.3.0</version>
<groupId>com.xjs</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<name>Netty项目</name>
<modules>
<module>netty-springboot</module>
<module>custom-rpc</module>
</modules>
<artifactId>netty-project</artifactId>

@ -16,6 +16,8 @@
</modules>
<artifactId>xjs-study</artifactId>
<groupId>com.xjs</groupId>
<version>1.0</version>
<properties>
<maven.compiler.source>11</maven.compiler.source>

@ -4,13 +4,12 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>xjs-study</artifactId>
<groupId>com.ruoyi</groupId>
<version>3.3.0</version>
<groupId>com.xjs</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<name>学习-基础内容</name>
<groupId>com.xjs</groupId>
<artifactId>xjs-study-base</artifactId>
<properties>

Loading…
Cancel
Save