dubbo spi 机制 过滤器、负载均衡等

pull/254/head
xjs 3 years ago
parent 0f22307dc1
commit 2015e88eec

@ -0,0 +1,27 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dubbo-project</artifactId>
<groupId>com.xjs</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<name>dubbo过滤器</name>
<artifactId>dubbo-spi-filter</artifactId>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>
</dependency>
</dependencies>
</project>

@ -0,0 +1,33 @@
package com.xjs.filter;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.*;
/**
* dubbo
*
* @author xiejs
* @since 2022-05-23
*/
@Activate(group = {CommonConstants.CONSUMER})
public class DubboInvokeFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
long startTime = 0L;
long endTime = 0L;
try {
startTime = System.currentTimeMillis();
Result invoke = invoker.invoke(invocation);
endTime = System.currentTimeMillis();
return invoke;
} finally {
String serviceName = invocation.getServiceName();
System.out.println("serviceName---invoke time : " + (endTime - startTime) + "ms");
}
}
}

@ -13,6 +13,7 @@ public class DogHelloService implements HelloService {
return "你在狗叫什么!";
}
@Override
public String sayHello(URL url) {
return "wa wa url";

@ -0,0 +1,27 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dubbo-project</artifactId>
<groupId>com.xjs</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<name>dubbo负载均衡器</name>
<artifactId>dubbo-spi-loadbalance</artifactId>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>
</dependency>
</dependencies>
</project>

@ -0,0 +1,33 @@
package com.xjs;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.LoadBalance;
import java.util.List;
/**
* dubbo
* @author xiejs
* @since 2022-05-24
*/
public class LoadBalancer implements LoadBalance {
@Override
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
//按照 IP + 端口排序
Invoker<T> tInvoker = invokers.stream().sorted((i1, i2) -> {
final int ipCompare = i1.getUrl().getIp().compareTo(i2.getUrl().getIp());
if (ipCompare == 0) {
return Integer.compare(i1.getUrl().getPort(), i2.getUrl().getPort());
}
return ipCompare;
}).findFirst().get();
return tInvoker;
}
}

@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dubbo-project</artifactId>
<groupId>com.xjs</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<name>dubbo线程池</name>
<artifactId>dubbo-spi-shreadpool</artifactId>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-common</artifactId>
</dependency>
</dependencies>
</project>

@ -0,0 +1,61 @@
package com.xjs.thread;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.threadpool.support.fixed.FixedThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.*;
/**
* @author xiejs
* @since 2022-05-24
*/
public class WatchingThreadPool extends FixedThreadPool implements Runnable {
private static final Logger log = LoggerFactory.getLogger(WatchingThreadPool.class);
//定义阈值
private static final double ALARM_PERCENT = 0.90;
//存储map
private final Map<URL, ThreadPoolExecutor> THREAD_POOLS = new ConcurrentHashMap<>();
public WatchingThreadPool() {
//每隔三秒打印线程使用情况
Executors.newSingleThreadScheduledExecutor()
.scheduleWithFixedDelay(this, 1, 3, TimeUnit.SECONDS);
}
//通过父类创建线程池
@Override
public Executor getExecutor(URL url) {
final Executor executor = super.getExecutor(url);
if (executor instanceof ThreadPoolExecutor) {
THREAD_POOLS.put(url, (ThreadPoolExecutor) executor);
}
return executor;
}
@Override
public void run() {
//遍历线程池
for (Map.Entry<URL, ThreadPoolExecutor> entry : THREAD_POOLS.entrySet()) {
final URL url = entry.getKey();
final ThreadPoolExecutor executor = entry.getValue();
//开始计算相关指标
final int activeCount = executor.getActiveCount();
final int poolSize = executor.getCorePoolSize();
double usedPercent = activeCount / (poolSize * 1.0);
log.info("线程池运行状况:[{}/{}:{}%]",activeCount,poolSize,usedPercent);
if (usedPercent > ALARM_PERCENT) {
log.warn("超过警戒值URL:{}",url);
}
}
}
}

@ -17,6 +17,9 @@
<module>dubbo-spi-api</module>
<module>dubbo-spi-impl</module>
<module>dubbo-spi-main</module>
<module>dubbo-spi-filter</module>
<module>dubbo-spi-loadbalance</module>
<module>dubbo-spi-threadpool</module>
</modules>
<artifactId>dubbo-project</artifactId>

@ -6,4 +6,6 @@ package com.xjs.service;
*/
public interface HelloService {
String sayHello(String name);
String sayHello(String name,int timeTowait);
}

@ -24,6 +24,18 @@
<version>1.0</version>
</dependency>
<dependency>
<groupId>com.xjs</groupId>
<artifactId>dubbo-spi-filter</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>com.xjs</groupId>
<artifactId>dubbo-spi-loadbalance</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>

@ -23,12 +23,27 @@ public class AnnotationConsumerApplication {
ConsumerComponent service = context.getBean(ConsumerComponent.class);
while (true) {
System.in.read();
String hello = service.sayHello("傻逼");
for (int i = 0; i < 1000; i++) {
try {
Thread.sleep(5);
new Thread(new Runnable() {
@Override
public void run() {
String msg = service.sayHello("hello");
System.out.println(msg);
}
}).start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(hello);
}
//String msg = service.sayHello("hello");
//System.out.println(msg);
}
@Configuration

@ -0,0 +1,37 @@
package com.xjs;
import com.xjs.service.HelloService;
import org.apache.dubbo.rpc.RpcContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* @author xiejs
* @since 2022-05-24
*/
public class XmlConsumerApplication {
public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
HelloService helloService = context.getBean(HelloService.class);
while (true) {
System.in.read();
String word = helloService.sayHello("word", 500
);
//利用Future 模式来获取
Future<Object> future = RpcContext.getContext().getFuture();
System.out.println("future result:"+future.get());
System.out.println(word);
}
}
}

@ -11,11 +11,11 @@ import org.springframework.stereotype.Component;
@Component
public class ConsumerComponent {
@Reference
@Reference(loadbalance = "onlyFirst")
private HelloService helloService;
public String sayHello(String name) {
return helloService.sayHello(name);
return helloService.sayHello(name,10);
}
}

@ -0,0 +1,15 @@
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<dubbo:application name="service-consumer"/>
<dubbo:registry address="nacos://127.0.0.1:8848"/>
<dubbo:reference id="helloService" interface="com.xjs.service.HelloService">
<dubbo:method name="sayHello" async="true">
<dubbo:argument type="String"/>
<dubbo:argument type="int"/>
</dubbo:method>
</dubbo:reference>
<context:component-scan base-package="com.xjs.bean"/>
</beans>

@ -24,6 +24,12 @@
<version>1.0</version>
</dependency>
<dependency>
<groupId>com.xjs</groupId>
<artifactId>dubbo-spi-shreadpool</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>

@ -0,0 +1,35 @@
package com.xjs;
import org.apache.dubbo.config.RegistryConfig;
import org.apache.dubbo.config.spring.context.annotation.EnableDubbo;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import java.io.IOException;
/**
* @author xiejs
* @since 2022-05-23
*/
public class DubboApplication2 {
public static void main(String[] args) throws IOException {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ProviderConfiguration.class);
context.start();
System.in.read();
}
@Configuration
@EnableDubbo(scanBasePackages = "com.xjs.service")
@PropertySource("classpath:/dubbo-provider2.properties")
static class ProviderConfiguration{
@Bean
public RegistryConfig registryConfig() {
RegistryConfig registryConfig = new RegistryConfig();
registryConfig.setAddress("nacos://127.0.0.1:8848");
return registryConfig;
}
}
}

@ -0,0 +1,35 @@
package com.xjs;
import org.apache.dubbo.config.RegistryConfig;
import org.apache.dubbo.config.spring.context.annotation.EnableDubbo;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import java.io.IOException;
/**
* @author xiejs
* @since 2022-05-23
*/
public class DubboApplication3 {
public static void main(String[] args) throws IOException {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ProviderConfiguration.class);
context.start();
System.in.read();
}
@Configuration
@EnableDubbo(scanBasePackages = "com.xjs.service")
@PropertySource("classpath:/dubbo-provider3.properties")
static class ProviderConfiguration{
@Bean
public RegistryConfig registryConfig() {
RegistryConfig registryConfig = new RegistryConfig();
registryConfig.setAddress("nacos://127.0.0.1:8848");
return registryConfig;
}
}
}

@ -2,6 +2,8 @@ package com.xjs.service;
import org.apache.dubbo.config.annotation.Service;
import java.util.concurrent.TimeUnit;
/**
* @author xiejs
* @since 2022-05-23
@ -12,4 +14,19 @@ public class HelloServiceImpl implements HelloService{
public String sayHello(String name) {
return "Hello:"+name;
}
@Override
public String sayHello(String name, int timeTowait) {
try {
TimeUnit.SECONDS.sleep(1);
Thread.sleep(timeTowait);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello:"+name;
}
}

@ -4,5 +4,9 @@ dubbo.protocol.name=dubbo
dubbo.protocol.port=7711
dubbo.protocol.host=localhost
dubbo.provider.threadpool=watching

@ -0,0 +1,9 @@
dubbo.application.name=service-provider
dubbo.protocol.name=dubbo
dubbo.protocol.port=7712
dubbo.protocol.host=localhost

@ -0,0 +1,9 @@
dubbo.application.name=service-provider
dubbo.protocol.name=dubbo
dubbo.protocol.port=7713
dubbo.protocol.host=localhost
Loading…
Cancel
Save