fix : test for rpc handler and server side

pull/946/head
pizihao 3 years ago
parent 695a78b2c6
commit c08c59553c

@ -17,18 +17,19 @@
package cn.hippo4j.rpc.client; package cn.hippo4j.rpc.client;
import cn.hippo4j.rpc.handler.NettyClientPoolHandler; import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.AbstractNettyClientPoolHandler;
import cn.hippo4j.rpc.handler.NettyClientTakeHandler; import cn.hippo4j.rpc.handler.NettyClientTakeHandler;
import cn.hippo4j.rpc.handler.NettyServerTakeHandler; import cn.hippo4j.rpc.handler.NettyServerTakeHandler;
import cn.hippo4j.rpc.request.DefaultRequest; import cn.hippo4j.rpc.model.DefaultRequest;
import cn.hippo4j.rpc.request.Request; import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.response.Response; import cn.hippo4j.rpc.model.Response;
import cn.hippo4j.rpc.server.NettyServerConnection; import cn.hippo4j.rpc.server.AbstractNettyServerConnection;
import cn.hippo4j.rpc.server.RPCServer; import cn.hippo4j.rpc.server.RPCServer;
import cn.hippo4j.rpc.server.ServerConnection; import cn.hippo4j.rpc.server.ServerConnection;
import cn.hippo4j.rpc.support.ClassRegistry; import cn.hippo4j.rpc.discovery.ClassRegistry;
import cn.hippo4j.rpc.support.DefaultInstance; import cn.hippo4j.rpc.discovery.DefaultInstance;
import cn.hippo4j.rpc.support.Instance; import cn.hippo4j.rpc.discovery.Instance;
import io.netty.channel.pool.ChannelPoolHandler; import io.netty.channel.pool.ChannelPoolHandler;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -40,8 +41,8 @@ import java.util.concurrent.TimeUnit;
public class RPCClientTest { public class RPCClientTest {
String host = "localhost"; String host = "localhost";
int port = 8888; ServerPort port = new TestServerPort();
int portTest = 8889; ServerPort portTest = new TestPortServerPort();
@Test @Test
public void connection() throws IOException { public void connection() throws IOException {
@ -51,15 +52,15 @@ public class RPCClientTest {
// The mode connection was denied when the server was started on the specified port // The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance(); Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance); NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler); ServerConnection connection = new AbstractNettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(port, connection); RPCServer rpcServer = new RPCServer(connection, port);
CompletableFuture.runAsync(rpcServer::bind); CompletableFuture.runAsync(rpcServer::bind);
try { try {
TimeUnit.SECONDS.sleep(3); TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
ChannelPoolHandler channelPoolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler()); ChannelPoolHandler channelPoolHandler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler());
NettyClientConnection clientConnection = new NettyClientConnection(host, port, channelPoolHandler); NettyClientConnection clientConnection = new NettyClientConnection(host, port, channelPoolHandler);
RPCClient rpcClient = new RPCClient(clientConnection); RPCClient rpcClient = new RPCClient(clientConnection);
Request request = new DefaultRequest("127.0.0.18888", className, "call", null, null); Request request = new DefaultRequest("127.0.0.18888", className, "call", null, null);
@ -84,15 +85,15 @@ public class RPCClientTest {
// The mode connection was denied when the server was started on the specified port // The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance(); Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance); NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler); ServerConnection connection = new AbstractNettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(portTest, connection); RPCServer rpcServer = new RPCServer(connection, portTest);
CompletableFuture.runAsync(rpcServer::bind); CompletableFuture.runAsync(rpcServer::bind);
try { try {
TimeUnit.SECONDS.sleep(3); TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
ChannelPoolHandler channelPoolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler()); ChannelPoolHandler channelPoolHandler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler());
NettyClientConnection clientConnection = new NettyClientConnection(host, portTest, channelPoolHandler); NettyClientConnection clientConnection = new NettyClientConnection(host, portTest, channelPoolHandler);
RPCClient rpcClient = new RPCClient(clientConnection); RPCClient rpcClient = new RPCClient(clientConnection);
Class<?>[] classes = new Class<?>[2]; Class<?>[] classes = new Class<?>[2];
@ -109,4 +110,20 @@ public class RPCClientTest {
rpcClient.close(); rpcClient.close();
rpcServer.close(); rpcServer.close();
} }
static class TestServerPort implements ServerPort {
@Override
public int getPort() {
return 8888;
}
}
static class TestPortServerPort implements ServerPort {
@Override
public int getPort() {
return 8889;
}
}
} }

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package cn.hippo4j.rpc.support; package cn.hippo4j.rpc.discovery;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;

@ -15,12 +15,9 @@
* limitations under the License. * limitations under the License.
*/ */
package cn.hippo4j.rpc.support; package cn.hippo4j.rpc.discovery;
import lombok.AllArgsConstructor; import cn.hippo4j.common.web.exception.IllegalException;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -38,18 +35,26 @@ public class DefaultInstanceTest {
@Test @Test
public void testGetInstance() { public void testGetInstance() {
String className = "cn.hippo4j.rpc.support.DefaultInstanceTest$InstanceModel"; String className = "cn.hippo4j.rpc.discovery.InstanceModel";
Object instanceInstance = instance.getInstance(className); Object instanceInstance = instance.getInstance(className);
Assert.assertNotNull(instanceInstance); Assert.assertNotNull(instanceInstance);
Assert.assertEquals(className, instanceInstance.getClass().getName()); Assert.assertEquals(className, instanceInstance.getClass().getName());
} }
@Setter @Test(expected = IllegalException.class)
@Getter public void testGetInstanceTest() {
@AllArgsConstructor String className = "cn.hippo4j.rpc.discovery.InstanceModelTest";
@NoArgsConstructor Object instanceInstance = instance.getInstance(className);
public static class InstanceModel { Assert.assertNotNull(instanceInstance);
Assert.assertEquals(className, instanceInstance.getClass().getName());
}
String name; @Test
public void getInstanceTest() {
Class<InstanceServerLoader> cls = InstanceServerLoader.class;
Object instanceInstance = instance.getInstance(cls);
Assert.assertNotNull(instanceInstance);
Assert.assertEquals(InstanceServerLoaderImpl.class, instanceInstance.getClass());
} }
} }

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.discovery;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.springframework.boot.test.context.TestComponent;
@Setter
@Getter
@AllArgsConstructor
@NoArgsConstructor
@TestComponent
public class InstanceModel {
String name;
}

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.discovery;
public interface InstanceServerLoader {
String getName();
}

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.discovery;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class InstanceServerLoaderImpl implements InstanceServerLoader {
String name = "name";
}

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.discovery;
import cn.hippo4j.common.config.ApplicationContextHolder;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = {InstanceModel.class, ApplicationContextHolder.class})
@RunWith(SpringJUnit4ClassRunner.class)
public class SpringContextInstanceTest {
Instance instance = new SpringContextInstance();
@Test
public void getInstance() {
Object obj = instance.getInstance(InstanceModel.class);
Assert.assertNotNull(obj);
Assert.assertEquals(obj.getClass(), InstanceModel.class);
}
@Test
public void testGetInstance() {
Object obj = instance.getInstance("instanceModel");
Assert.assertNotNull(obj);
Assert.assertEquals(obj.getClass(), InstanceModel.class);
}
}

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.handler;
import cn.hippo4j.rpc.client.NettyClientConnection;
import cn.hippo4j.rpc.client.RPCClient;
import cn.hippo4j.rpc.discovery.*;
import cn.hippo4j.rpc.server.AbstractNettyServerConnection;
import cn.hippo4j.rpc.server.RPCServer;
import cn.hippo4j.rpc.support.NettyProxyCenter;
import io.netty.channel.pool.ChannelPoolHandler;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class ConnectHandlerTest {
@Test
public void handlerTest() {
// server
Class<InstanceServerLoader> cls = InstanceServerLoader.class;
ClassRegistry.put(cls.getName(), cls);
ServerPort port = () -> 8888;
Instance instance = new DefaultInstance();
NettyServerTakeHandler serverHandler = new NettyServerTakeHandler(instance);
AbstractNettyServerConnection connection = new AbstractNettyServerConnection(serverHandler);
RPCServer rpcServer = new RPCServer(connection, port);
CompletableFuture.runAsync(rpcServer::bind);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
ChannelPoolHandler channelPoolHandler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler());
NettyClientConnection clientConnection = new NettyClientConnection("localhost", port, channelPoolHandler);
RPCClient rpcClient = new RPCClient(clientConnection);
InstanceServerLoader loader = NettyProxyCenter.getProxy(rpcClient, cls, "localhost", port);
String name = loader.getName();
Assert.assertEquals("name", name);
}
}

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.handler;
import io.netty.channel.ChannelHandler;
import org.junit.Assert;
import org.junit.Test;
public class NettyClientPoolHandlerTest {
@Test
public void testGetHandlerEntity() {
TestHandler handler = new TestHandler();
long order = 0;
String name = "Test";
AbstractNettyClientPoolHandler poolHandler = new AbstractNettyClientPoolHandler();
HandlerManager.HandlerEntity<ChannelHandler> entity = poolHandler.getHandlerEntity(order, handler, name);
Assert.assertEquals(entity.getName(), name);
Assert.assertEquals(entity.getOrder(), order);
Assert.assertEquals(entity.getHandler(), handler);
}
@Test
public void testCompareTo() {
TestHandler handler = new TestHandler();
long order = 0;
String name = "Test";
TestHandler handler1 = new TestHandler();
long order1 = 1;
String name1 = "Test1";
AbstractNettyClientPoolHandler poolHandler = new AbstractNettyClientPoolHandler();
HandlerManager.HandlerEntity<ChannelHandler> entity = poolHandler.getHandlerEntity(order, handler, name);
HandlerManager.HandlerEntity<ChannelHandler> entity1 = poolHandler.getHandlerEntity(order1, handler1, name1);
int compare = entity.compareTo(entity1);
Assert.assertTrue(compare < 0);
}
@Test
public void addLast() {
AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler();
Assert.assertTrue(handler.isEmpty());
handler.addLast(new TestHandler());
Assert.assertFalse(handler.isEmpty());
}
@Test
public void addFirst() {
AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler();
Assert.assertTrue(handler.isEmpty());
handler.addFirst(new TestHandler());
Assert.assertFalse(handler.isEmpty());
}
@Test
public void testAddLast() {
AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler();
Assert.assertTrue(handler.isEmpty());
handler.addLast("Test", new TestHandler());
Assert.assertFalse(handler.isEmpty());
}
@Test
public void testAddFirst() {
AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler();
Assert.assertTrue(handler.isEmpty());
handler.addFirst("Test", new TestHandler());
Assert.assertFalse(handler.isEmpty());
}
}

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.handler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
public class TestHandler implements ChannelHandler {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
}
}

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.model;
import cn.hippo4j.rpc.discovery.InstanceServerLoaderImpl;
import org.junit.Assert;
import org.junit.Test;
import java.io.*;
import java.lang.reflect.Method;
public class DefaultRequestTest {
@Test
public void testReadObject() throws IOException, ClassNotFoundException, NoSuchMethodException {
String key = "name";
String clsName = InstanceServerLoaderImpl.class.getName();
Method method = InstanceServerLoaderImpl.class.getMethod("setName", String.class);
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
Object[] parameters = new Object[1];
parameters[0] = "hippo4j";
Request request = new DefaultRequest(key, clsName, methodName, parameterTypes, parameters);
byte[] bytes;
try (
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream outputStream = new ObjectOutputStream(byteArrayOutputStream)) {
outputStream.writeObject(request);
outputStream.flush();
bytes = byteArrayOutputStream.toByteArray();
}
Request request1;
try (
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {
request1 = (Request) objectInputStream.readObject();
}
Assert.assertEquals(request1.hashCode(), request1.hashCode());
Assert.assertEquals(key, request1.getKey());
Assert.assertEquals(clsName, request1.getClassName());
Assert.assertEquals(methodName, request1.getMethodName());
Assert.assertArrayEquals(parameterTypes, request1.getParameterTypes());
Assert.assertArrayEquals(parameters, request1.getParameters());
Assert.assertEquals(request1, request);
}
}

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.model;
import cn.hippo4j.common.web.exception.IllegalException;
import org.junit.Assert;
import org.junit.Test;
import java.io.*;
public class DefaultResponseTest {
@Test
public void testReadObject() throws IOException, ClassNotFoundException {
String key = "name";
Object o = "obj";
Class<?> cls = String.class;
Response response = new DefaultResponse(key, cls, o);
byte[] bytes;
try (
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream outputStream = new ObjectOutputStream(byteArrayOutputStream)) {
outputStream.writeObject(response);
outputStream.flush();
bytes = byteArrayOutputStream.toByteArray();
}
Response response1;
try (
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {
response1 = (Response) objectInputStream.readObject();
}
Assert.assertEquals(response1.hashCode(), response.hashCode());
Assert.assertEquals(key, response1.getKey());
Assert.assertEquals(o, response1.getObj());
Assert.assertEquals(cls, response1.getCls());
Assert.assertEquals(response1, response);
Assert.assertFalse(response1.isErr());
}
@Test
public void testWriteObject() throws IOException, ClassNotFoundException {
String key = "name";
Throwable throwable = new IllegalException("test throwable");
String errMsg = "test throwable";
Response response = new DefaultResponse(key, throwable, errMsg);
byte[] bytes;
try (
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream outputStream = new ObjectOutputStream(byteArrayOutputStream)) {
outputStream.writeObject(response);
outputStream.flush();
bytes = byteArrayOutputStream.toByteArray();
}
Response response1;
try (
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {
response1 = (Response) objectInputStream.readObject();
}
Assert.assertEquals(key, response1.getKey());
Assert.assertThrows(IllegalException.class, () -> {
throw response1.getThrowable();
});
Assert.assertEquals(response1.hashCode(), response.hashCode());
Assert.assertEquals(errMsg, response1.getErrMsg());
Assert.assertEquals(response1, response);
Assert.assertTrue(response1.isErr());
}
}

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.server;
import cn.hippo4j.rpc.handler.TestHandler;
import org.junit.Assert;
import org.junit.Test;
public class NettyServerConnectionTest {
@Test
public void addLast() {
AbstractNettyServerConnection connection = new AbstractNettyServerConnection();
Assert.assertTrue(connection.isEmpty());
connection.addLast(new TestHandler());
Assert.assertFalse(connection.isEmpty());
}
@Test
public void addFirst() {
AbstractNettyServerConnection connection = new AbstractNettyServerConnection();
Assert.assertTrue(connection.isEmpty());
connection.addFirst(new TestHandler());
Assert.assertFalse(connection.isEmpty());
}
@Test
public void testAddLast() {
AbstractNettyServerConnection connection = new AbstractNettyServerConnection();
Assert.assertTrue(connection.isEmpty());
connection.addLast("Test", new TestHandler());
Assert.assertFalse(connection.isEmpty());
}
@Test
public void testAddFirst() {
AbstractNettyServerConnection connection = new AbstractNettyServerConnection();
Assert.assertTrue(connection.isEmpty());
connection.addFirst("Test", new TestHandler());
Assert.assertFalse(connection.isEmpty());
}
}

@ -17,9 +17,10 @@
package cn.hippo4j.rpc.server; package cn.hippo4j.rpc.server;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.NettyServerTakeHandler; import cn.hippo4j.rpc.handler.NettyServerTakeHandler;
import cn.hippo4j.rpc.support.DefaultInstance; import cn.hippo4j.rpc.discovery.DefaultInstance;
import cn.hippo4j.rpc.support.Instance; import cn.hippo4j.rpc.discovery.Instance;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import org.junit.Assert; import org.junit.Assert;
@ -31,14 +32,14 @@ import java.util.concurrent.TimeUnit;
public class RPCServerTest { public class RPCServerTest {
public static int port = 8888; public static ServerPort port = new TestServerPort();
@Test @Test
public void bind() throws IOException { public void bind() throws IOException {
Instance instance = new DefaultInstance(); Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance); NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler); ServerConnection connection = new AbstractNettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(port, connection); RPCServer rpcServer = new RPCServer(connection, port);
CompletableFuture.runAsync(rpcServer::bind); CompletableFuture.runAsync(rpcServer::bind);
try { try {
TimeUnit.SECONDS.sleep(3); TimeUnit.SECONDS.sleep(3);
@ -58,8 +59,8 @@ public class RPCServerTest {
EventLoopGroup leader = new NioEventLoopGroup(); EventLoopGroup leader = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance); NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(leader, worker, handler); ServerConnection connection = new AbstractNettyServerConnection(leader, worker, handler);
RPCServer rpcServer = new RPCServer(port, connection); RPCServer rpcServer = new RPCServer(connection, port);
CompletableFuture.runAsync(rpcServer::bind); CompletableFuture.runAsync(rpcServer::bind);
try { try {
TimeUnit.SECONDS.sleep(3); TimeUnit.SECONDS.sleep(3);
@ -69,4 +70,11 @@ public class RPCServerTest {
rpcServer.close(); rpcServer.close();
} }
static class TestServerPort implements ServerPort {
@Override
public int getPort() {
return 8888;
}
}
} }

@ -17,7 +17,8 @@
package cn.hippo4j.rpc.support; package cn.hippo4j.rpc.support;
import cn.hippo4j.rpc.handler.NettyClientPoolHandler; import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.AbstractNettyClientPoolHandler;
import cn.hippo4j.rpc.handler.NettyClientTakeHandler; import cn.hippo4j.rpc.handler.NettyClientTakeHandler;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
@ -29,7 +30,7 @@ import org.junit.Test;
public class NettyConnectPoolHolderTest { public class NettyConnectPoolHolderTest {
String host = "127.0.0.1"; String host = "127.0.0.1";
int port = 8888; ServerPort port = new TestServerPort();
int maxCount = 8; int maxCount = 8;
int timeout = 5000; int timeout = 5000;
EventLoopGroup group = new NioEventLoopGroup(); EventLoopGroup group = new NioEventLoopGroup();
@ -37,7 +38,7 @@ public class NettyConnectPoolHolderTest {
@Test @Test
public void createPool() { public void createPool() {
NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler()); AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler());
NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls, handler); NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls, handler);
NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port); NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port);
Assert.assertEquals(pool, connectPool); Assert.assertEquals(pool, connectPool);
@ -48,7 +49,7 @@ public class NettyConnectPoolHolderTest {
@Test @Test
public void testGetPool() { public void testGetPool() {
NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler()); AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler());
NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port, timeout, group, handler); NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port, timeout, group, handler);
NettyConnectPool connectPool1 = NettyConnectPoolHolder.getPool(host, port); NettyConnectPool connectPool1 = NettyConnectPoolHolder.getPool(host, port);
Assert.assertEquals(connectPool1, connectPool); Assert.assertEquals(connectPool1, connectPool);
@ -59,7 +60,7 @@ public class NettyConnectPoolHolderTest {
@Test @Test
public void remove() { public void remove() {
NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler()); AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler());
NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port, timeout, group, handler); NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port, timeout, group, handler);
NettyConnectPool connectPool1 = NettyConnectPoolHolder.getPool(host, port); NettyConnectPool connectPool1 = NettyConnectPoolHolder.getPool(host, port);
Assert.assertEquals(connectPool1, connectPool); Assert.assertEquals(connectPool1, connectPool);
@ -67,4 +68,12 @@ public class NettyConnectPoolHolderTest {
NettyConnectPool connectPool2 = NettyConnectPoolHolder.getPool(host, port); NettyConnectPool connectPool2 = NettyConnectPoolHolder.getPool(host, port);
Assert.assertNull(connectPool2); Assert.assertNull(connectPool2);
} }
static class TestServerPort implements ServerPort {
@Override
public int getPort() {
return 8888;
}
}
} }

@ -17,10 +17,13 @@
package cn.hippo4j.rpc.support; package cn.hippo4j.rpc.support;
import cn.hippo4j.rpc.handler.NettyClientPoolHandler; import cn.hippo4j.rpc.discovery.DefaultInstance;
import cn.hippo4j.rpc.discovery.Instance;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.AbstractNettyClientPoolHandler;
import cn.hippo4j.rpc.handler.NettyClientTakeHandler; import cn.hippo4j.rpc.handler.NettyClientTakeHandler;
import cn.hippo4j.rpc.handler.NettyServerTakeHandler; import cn.hippo4j.rpc.handler.NettyServerTakeHandler;
import cn.hippo4j.rpc.server.NettyServerConnection; import cn.hippo4j.rpc.server.AbstractNettyServerConnection;
import cn.hippo4j.rpc.server.RPCServer; import cn.hippo4j.rpc.server.RPCServer;
import cn.hippo4j.rpc.server.ServerConnection; import cn.hippo4j.rpc.server.ServerConnection;
import io.netty.channel.Channel; import io.netty.channel.Channel;
@ -38,7 +41,7 @@ import java.util.concurrent.TimeUnit;
public class NettyConnectPoolTest { public class NettyConnectPoolTest {
String host = "127.0.0.1"; String host = "127.0.0.1";
int port = 8888; ServerPort port = new TestServerPort();
int maxCount = 64; int maxCount = 64;
int timeout = 5000; int timeout = 5000;
EventLoopGroup group = new NioEventLoopGroup(); EventLoopGroup group = new NioEventLoopGroup();
@ -49,8 +52,8 @@ public class NettyConnectPoolTest {
// The mode connection was denied when the server was started on the specified port // The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance(); Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance); NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler); ServerConnection connection = new AbstractNettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(port, connection); RPCServer rpcServer = new RPCServer(connection, port);
CompletableFuture.runAsync(rpcServer::bind); CompletableFuture.runAsync(rpcServer::bind);
// Given the delay in starting the server, wait here // Given the delay in starting the server, wait here
try { try {
@ -58,7 +61,7 @@ public class NettyConnectPoolTest {
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
NettyClientPoolHandler poolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler()); AbstractNettyClientPoolHandler poolHandler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler());
NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls, poolHandler); NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls, poolHandler);
Channel acquire = pool.acquire(timeout); Channel acquire = pool.acquire(timeout);
Assert.assertNotNull(acquire); Assert.assertNotNull(acquire);
@ -71,8 +74,8 @@ public class NettyConnectPoolTest {
// The mode connection was denied when the server was started on the specified port // The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance(); Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance); NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler); ServerConnection connection = new AbstractNettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(port, connection); RPCServer rpcServer = new RPCServer(connection, port);
CompletableFuture.runAsync(rpcServer::bind); CompletableFuture.runAsync(rpcServer::bind);
// Given the delay in starting the server, wait here // Given the delay in starting the server, wait here
try { try {
@ -80,7 +83,7 @@ public class NettyConnectPoolTest {
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
NettyClientPoolHandler poolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler()); AbstractNettyClientPoolHandler poolHandler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler());
NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls, poolHandler); NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls, poolHandler);
Future<Channel> acquire = pool.acquire(); Future<Channel> acquire = pool.acquire();
Assert.assertNotNull(acquire); Assert.assertNotNull(acquire);
@ -92,8 +95,8 @@ public class NettyConnectPoolTest {
// The mode connection was denied when the server was started on the specified port // The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance(); Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance); NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler); ServerConnection connection = new AbstractNettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(port, connection); RPCServer rpcServer = new RPCServer(connection, port);
CompletableFuture.runAsync(rpcServer::bind); CompletableFuture.runAsync(rpcServer::bind);
// Given the delay in starting the server, wait here // Given the delay in starting the server, wait here
try { try {
@ -102,7 +105,7 @@ public class NettyConnectPoolTest {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
NettyClientPoolHandler poolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler()); AbstractNettyClientPoolHandler poolHandler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler());
NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls, poolHandler); NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls, poolHandler);
Channel acquire = pool.acquire(timeout); Channel acquire = pool.acquire(timeout);
Assert.assertNotNull(acquire); Assert.assertNotNull(acquire);
@ -110,4 +113,12 @@ public class NettyConnectPoolTest {
pool.close(); pool.close();
rpcServer.close(); rpcServer.close();
} }
static class TestServerPort implements ServerPort {
@Override
public int getPort() {
return 8888;
}
}
} }

@ -18,32 +18,35 @@
package cn.hippo4j.rpc.support; package cn.hippo4j.rpc.support;
import cn.hippo4j.common.web.exception.IllegalException; import cn.hippo4j.common.web.exception.IllegalException;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.exception.ConnectionException; import cn.hippo4j.rpc.exception.ConnectionException;
import cn.hippo4j.rpc.handler.NettyClientPoolHandler; import cn.hippo4j.rpc.handler.AbstractNettyClientPoolHandler;
import cn.hippo4j.rpc.handler.NettyClientTakeHandler; import cn.hippo4j.rpc.handler.NettyClientTakeHandler;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
public class NettyProxyCenterTest { public class NettyProxyCenterTest {
ServerPort port = new TestServerPort();
@Test @Test
public void getProxy() { public void getProxy() {
NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler()); AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler());
ProxyInterface localhost = NettyProxyCenter.getProxy(ProxyInterface.class, "localhost", 8888, handler); ProxyInterface localhost = NettyProxyCenter.getProxy(ProxyInterface.class, "localhost", port, handler);
Assert.assertNotNull(localhost); Assert.assertNotNull(localhost);
} }
@Test(expected = IllegalException.class) @Test(expected = IllegalException.class)
public void getProxyTest() { public void getProxyTest() {
NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler()); AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler());
ProxyClass localhost = NettyProxyCenter.getProxy(ProxyClass.class, "localhost", 8888, handler); ProxyClass localhost = NettyProxyCenter.getProxy(ProxyClass.class, "localhost", port, handler);
Assert.assertNotNull(localhost); Assert.assertNotNull(localhost);
} }
@Test(expected = ConnectionException.class) @Test(expected = ConnectionException.class)
public void getProxyTestCall() { public void getProxyTestCall() {
NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler()); AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler());
ProxyInterface localhost = NettyProxyCenter.getProxy(ProxyInterface.class, "localhost", 8888, handler); ProxyInterface localhost = NettyProxyCenter.getProxy(ProxyInterface.class, "localhost", port, handler);
localhost.hello(); localhost.hello();
Assert.assertNotNull(localhost); Assert.assertNotNull(localhost);
} }
@ -56,4 +59,12 @@ public class NettyProxyCenterTest {
static class ProxyClass { static class ProxyClass {
} }
static class TestServerPort implements ServerPort {
@Override
public int getPort() {
return 8888;
}
}
} }

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.support;
import cn.hippo4j.rpc.discovery.InstanceServerLoader;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class NettyServerSupportTest {
@Test
public void bind() throws IOException {
NettyServerSupport support = new NettyServerSupport(() -> 8888, InstanceServerLoader.class);
CompletableFuture.runAsync(support::bind);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Assert.assertTrue(support.isActive());
support.close();
Assert.assertFalse(support.isActive());
}
}
Loading…
Cancel
Save