Refactored example modules

pull/512/head
chen.ma 3 years ago
parent a9f6bb9835
commit d27eeba812

@ -28,5 +28,4 @@ public class Hippo4jCoreApolloExampleApplication {
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication.run(Hippo4jCoreApolloExampleApplication.class, args); SpringApplication.run(Hippo4jCoreApolloExampleApplication.class, args);
} }
} }

@ -40,9 +40,8 @@ spring.dynamic.thread-pool.executors[0].rejected-handler=AbortPolicy
spring.dynamic.thread-pool.executors[0].keep-alive-time=1000 spring.dynamic.thread-pool.executors[0].keep-alive-time=1000
spring.dynamic.thread-pool.executors[0].allow-core-thread-time-out=true spring.dynamic.thread-pool.executors[0].allow-core-thread-time-out=true
spring.dynamic.thread-pool.executors[0].thread-name-prefix=message-consume spring.dynamic.thread-pool.executors[0].thread-name-prefix=message-consume
spring.dynamic.thread-pool.executors[0].notify.is-alarm=true spring.dynamic.thread-pool.executors[0].notify.alarm=true
spring.dynamic.thread-pool.executors[0].notify.active-alarm=80 spring.dynamic.thread-pool.executors[0].notify.active-alarm=80
spring.dynamic.thread-pool.executors[0].notify.capacity-alarm=80 spring.dynamic.thread-pool.executors[0].notify.capacity-alarm=80
spring.dynamic.thread-pool.executors[0].notify.interval=8 spring.dynamic.thread-pool.executors[0].notify.interval=8
spring.dynamic.thread-pool.executors[0].notify.receives.WECHAT=xxx spring.dynamic.thread-pool.executors[0].notify.receive=chen.ma
spring.dynamic.thread-pool.executors[0].notify.receives.DING=xxx

@ -42,10 +42,8 @@ spring.dynamic.thread-pool.executors[0].rejected-handler=AbortPolicy
spring.dynamic.thread-pool.executors[0].keep-alive-time=6691 spring.dynamic.thread-pool.executors[0].keep-alive-time=6691
spring.dynamic.thread-pool.executors[0].allow-core-thread-time-out=true spring.dynamic.thread-pool.executors[0].allow-core-thread-time-out=true
spring.dynamic.thread-pool.executors[0].thread-name-prefix=message-consume spring.dynamic.thread-pool.executors[0].thread-name-prefix=message-consume
spring.dynamic.thread-pool.executors[0].notify.is-alarm=true spring.dynamic.thread-pool.executors[0].notify.alarm=true
spring.dynamic.thread-pool.executors[0].notify.active-alarm=80 spring.dynamic.thread-pool.executors[0].notify.active-alarm=80
spring.dynamic.thread-pool.executors[0].notify.capacity-alarm=80 spring.dynamic.thread-pool.executors[0].notify.capacity-alarm=80
spring.dynamic.thread-pool.executors[0].notify.interval=8 spring.dynamic.thread-pool.executors[0].notify.interval=8
spring.dynamic.thread-pool.executors[0].notify.receives.WECHAT=xxx spring.dynamic.thread-pool.executors[0].notify.receive=chen.ma
spring.dynamic.thread-pool.executors[0].notify.receives.DING=xxx
spring.dynamic.thread-pool.executors[0].notify.receives.LARK=xxx

@ -21,10 +21,6 @@ import cn.hippo4j.core.enable.EnableDynamicThreadPool;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author Redick01
* @date 2022/3/14 20:40
*/
@EnableDynamicThreadPool @EnableDynamicThreadPool
@SpringBootApplication(scanBasePackages = "cn.hippo4j.example.core") @SpringBootApplication(scanBasePackages = "cn.hippo4j.example.core")
public class Hippo4jCoreZookeeperExampleApplication { public class Hippo4jCoreZookeeperExampleApplication {

@ -33,18 +33,15 @@ import static cn.hippo4j.example.core.constant.GlobalTestConstant.MESSAGE_CONSUM
import static cn.hippo4j.example.core.constant.GlobalTestConstant.MESSAGE_PRODUCE; import static cn.hippo4j.example.core.constant.GlobalTestConstant.MESSAGE_PRODUCE;
/** /**
* Thread pool config. * Dynamic thread-pool config.
*
* @author chen.ma
* @date 2021/6/20 17:16
*/ */
@Slf4j @Slf4j
@Configuration @Configuration
public class ThreadPoolConfig { public class DynamicThreadPoolConfig {
@Bean @Bean
@DynamicThreadPool @DynamicThreadPool
public ThreadPoolExecutor messageConsumeDynamicThreadPool() { public Executor messageConsumeTtlDynamicThreadPool() {
String threadPoolId = MESSAGE_CONSUME; String threadPoolId = MESSAGE_CONSUME;
ThreadPoolExecutor customExecutor = ThreadPoolBuilder.builder() ThreadPoolExecutor customExecutor = ThreadPoolBuilder.builder()
.dynamicPool() .dynamicPool()
@ -55,7 +52,9 @@ public class ThreadPoolConfig {
.awaitTerminationMillis(5000L) .awaitTerminationMillis(5000L)
.taskDecorator(new TaskTraceBuilderHandler()) .taskDecorator(new TaskTraceBuilderHandler())
.build(); .build();
return customExecutor; // Ali ttl adaptation use case.
Executor ttlExecutor = TtlExecutors.getTtlExecutor(customExecutor);
return ttlExecutor;
} }
@Bean @Bean
@ -70,44 +69,10 @@ public class ThreadPoolConfig {
.waitForTasksToCompleteOnShutdown(true) .waitForTasksToCompleteOnShutdown(true)
.awaitTerminationMillis(5000L) .awaitTerminationMillis(5000L)
/** /**
* {@link TaskDecoratorTest} * Context passing, test cases: {@link TaskDecoratorTest}
*/ */
.taskDecorator(new TaskDecoratorTest.ContextCopyingDecorator()) .taskDecorator(new TaskDecoratorTest.ContextCopyingDecorator())
.build(); .build();
return produceExecutor; return produceExecutor;
} }
@Bean
@DynamicThreadPool
public Executor messageConsumeTtlDynamicThreadPool() {
String threadPoolId = MESSAGE_CONSUME;
ThreadPoolExecutor customExecutor = ThreadPoolBuilder.builder()
.dynamicPool()
.threadFactory(threadPoolId)
.threadPoolId(threadPoolId)
.executeTimeOut(800L)
.waitForTasksToCompleteOnShutdown(true)
.awaitTerminationMillis(5000L)
.taskDecorator(new TaskTraceBuilderHandler())
.build();
Executor ttlExecutor = TtlExecutors.getTtlExecutor(customExecutor);
return ttlExecutor;
}
@Bean
@DynamicThreadPool
public Executor messageConsumeTtlServiceDynamicThreadPool() {
String threadPoolId = MESSAGE_CONSUME;
ThreadPoolExecutor customExecutor = ThreadPoolBuilder.builder()
.dynamicPool()
.threadFactory(threadPoolId)
.threadPoolId(threadPoolId)
.executeTimeOut(800L)
.waitForTasksToCompleteOnShutdown(true)
.awaitTerminationMillis(5000L)
.taskDecorator(new TaskTraceBuilderHandler())
.build();
Executor ttlExecutor = TtlExecutors.getTtlExecutorService(customExecutor);
return ttlExecutor;
}
} }

@ -19,9 +19,6 @@ package cn.hippo4j.example.core.constant;
/** /**
* Global test variables. * Global test variables.
*
* @author chen.ma
* @date 2021/8/15 21:06
*/ */
public class GlobalTestConstant { public class GlobalTestConstant {

@ -25,10 +25,7 @@ import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
/** /**
* . * Custom Deny Policy.
*
* @author chen.ma
* @date 2022/1/4 22:19
*/ */
public class ErrorLogRejectedExecutionHandler implements CustomRejectedExecutionHandler { public class ErrorLogRejectedExecutionHandler implements CustomRejectedExecutionHandler {

@ -25,9 +25,6 @@ import static cn.hippo4j.common.constant.Constants.EXECUTE_TIMEOUT_TRACE;
/** /**
* Task trace builder handler. * Task trace builder handler.
*
* @author chen.ma
* @date 2022/3/2 20:46
*/ */
public final class TaskTraceBuilderHandler implements TaskDecorator { public final class TaskTraceBuilderHandler implements TaskDecorator {
@ -39,7 +36,7 @@ public final class TaskTraceBuilderHandler implements TaskDecorator {
MDC.put(EXECUTE_TIMEOUT_TRACE, executeTimeoutTrace); MDC.put(EXECUTE_TIMEOUT_TRACE, executeTimeoutTrace);
} }
runnable.run(); runnable.run();
// 此处不用进行清理操作, 统一在线程任务执行后清理 // There is no need to clean up here, and it will be cleaned up after the thread task is executed.
}; };
return taskRun; return taskRun;
} }

@ -31,17 +31,14 @@ import java.util.concurrent.TimeUnit;
/** /**
* Test alarm send message. * Test alarm send message.
*
* @author chen.ma
* @date 2021/8/15 21:03
*/ */
@Slf4j @Slf4j
@Component @Component
public class AlarmSendMessageTest { public class AlarmSendMessageTest {
/** /**
* . * Test alarm notification.
* , @PostConstruct * If you need to run this single test, add @PostConstruct to the method.
*/ */
@SuppressWarnings("all") @SuppressWarnings("all")
public void alarmSendMessageTest() { public void alarmSendMessageTest() {

@ -19,8 +19,11 @@ package cn.hippo4j.example.core.inittest;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter; import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper; import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper;
import cn.hippo4j.common.model.register.notify.DynamicThreadPoolRegisterCoreNotifyParameter;
import cn.hippo4j.common.model.register.notify.DynamicThreadPoolRegisterServerNotifyParameter;
import cn.hippo4j.common.toolkit.JSONUtil; import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.message.enums.NotifyPlatformEnum;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -39,18 +42,37 @@ public class RegisterDynamicThreadPoolTest {
String threadPoolId = "register-dynamic-thread-pool"; String threadPoolId = "register-dynamic-thread-pool";
DynamicThreadPoolRegisterParameter parameterInfo = new DynamicThreadPoolRegisterParameter(); DynamicThreadPoolRegisterParameter parameterInfo = new DynamicThreadPoolRegisterParameter();
parameterInfo.setThreadPoolId(threadPoolId); parameterInfo.setThreadPoolId(threadPoolId);
parameterInfo.setThreadNamePrefix(threadPoolId);
parameterInfo.setCorePoolSize(3); parameterInfo.setCorePoolSize(3);
parameterInfo.setMaximumPoolSize(14); parameterInfo.setMaximumPoolSize(14);
parameterInfo.setQueueType(9); parameterInfo.setQueueType(9);
parameterInfo.setCapacity(110); parameterInfo.setCapacity(110);
parameterInfo.setKeepAliveTime(110); parameterInfo.setKeepAliveTime(110L);
parameterInfo.setRejectedType(2); parameterInfo.setRejectedType(2);
parameterInfo.setIsAlarm(0); parameterInfo.setIsAlarm(0);
parameterInfo.setCapacityAlarm(90); parameterInfo.setCapacityAlarm(90);
parameterInfo.setLivenessAlarm(90); parameterInfo.setActiveAlarm(90);
parameterInfo.setAllowCoreThreadTimeOut(0); parameterInfo.setAllowCoreThreadTimeOut(Boolean.TRUE);
// core 模式和 server 模式,各选其一即可
DynamicThreadPoolRegisterCoreNotifyParameter coreNotifyParameter = DynamicThreadPoolRegisterCoreNotifyParameter.builder()
.activeAlarm(80)
.capacityAlarm(80)
.receives("chen.ma")
.alarm(true)
.interval(5)
.build();
DynamicThreadPoolRegisterServerNotifyParameter serverNotifyParameter = DynamicThreadPoolRegisterServerNotifyParameter.builder()
.platform(NotifyPlatformEnum.WECHAT.name())
.secretKey("xxx")
.threadPoolId(threadPoolId)
.interval(5)
.receives("chen.ma")
.build();
DynamicThreadPoolRegisterWrapper registerWrapper = DynamicThreadPoolRegisterWrapper.builder() DynamicThreadPoolRegisterWrapper registerWrapper = DynamicThreadPoolRegisterWrapper.builder()
.dynamicThreadPoolRegisterParameter(parameterInfo) .dynamicThreadPoolRegisterParameter(parameterInfo)
.dynamicThreadPoolRegisterCoreNotifyParameter(coreNotifyParameter)
.dynamicThreadPoolRegisterServerNotifyParameter(serverNotifyParameter)
.build(); .build();
ThreadPoolExecutor dynamicThreadPool = GlobalThreadPoolManage.dynamicRegister(registerWrapper); ThreadPoolExecutor dynamicThreadPool = GlobalThreadPoolManage.dynamicRegister(registerWrapper);
log.info("Dynamic registration thread pool parameter details: {}", JSONUtil.toJSONString(dynamicThreadPool)); log.info("Dynamic registration thread pool parameter details: {}", JSONUtil.toJSONString(dynamicThreadPool));

@ -25,25 +25,19 @@ import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.Random; import java.util.Random;
import java.util.concurrent.ExecutorService; import java.util.concurrent.*;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static cn.hippo4j.common.constant.Constants.EXECUTE_TIMEOUT_TRACE; import static cn.hippo4j.common.constant.Constants.EXECUTE_TIMEOUT_TRACE;
/** /**
* Test run time metrics. * Run state handler test.
*
* @author chen.ma
* @date 2021/8/15 21:00
*/ */
@Slf4j @Slf4j
@Component @Component
public class RunStateHandlerTest { public class RunStateHandlerTest {
@Resource @Resource
private ThreadPoolExecutor messageConsumeDynamicThreadPool; private Executor messageConsumeTtlDynamicThreadPool;
@Resource @Resource
private ThreadPoolExecutor messageProduceDynamicThreadPool; private ThreadPoolExecutor messageProduceDynamicThreadPool;
@ -66,24 +60,22 @@ public class RunStateHandlerTest {
@SuppressWarnings("all") @SuppressWarnings("all")
public void runStateHandlerTest() { public void runStateHandlerTest() {
log.info("Test thread pool runtime state interface..."); log.info("Test thread pool runtime state interface...");
// Start the dynamic thread pool to simulate running tasks
// 启动动态线程池模拟运行任务 runTask(messageConsumeTtlDynamicThreadPool);
runTask(messageConsumeDynamicThreadPool);
// 启动动态线程池模拟运行任务
runTask(messageProduceDynamicThreadPool); runTask(messageProduceDynamicThreadPool);
} }
private void runTask(ExecutorService executorService) { private void runTask(Executor executor) {
// 模拟任务运行 // Simulate task run
runStateHandlerTestExecutor.execute(() -> { runStateHandlerTestExecutor.execute(() -> {
/** /**
* 线, MDC Trace , . * When the execution of the thread pool task times out, the Trace flag is put into the MDC, and it is printed out when an alarm occurs.
*/ */
MDC.put(EXECUTE_TIMEOUT_TRACE, "https://github.com/opengoofy/hippo4j 感觉不错来个 Star."); MDC.put(EXECUTE_TIMEOUT_TRACE, "https://github.com/opengoofy/hippo4j 感觉不错来个 Star.");
ThreadUtil.sleep(5000); ThreadUtil.sleep(5000);
for (int i = 0; i < Integer.MAX_VALUE; i++) { for (int i = 0; i < Integer.MAX_VALUE; i++) {
try { try {
executorService.execute(() -> { executor.execute(() -> {
try { try {
int maxRandom = 10; int maxRandom = 10;
int temp = 2; int temp = 2;
@ -94,12 +86,10 @@ public class RunStateHandlerTest {
} else { } else {
Thread.sleep(3000); Thread.sleep(3000);
} }
} catch (InterruptedException e) { } catch (InterruptedException ignored) {
// ignore
} }
}); });
} catch (Exception ex) { } catch (Exception ignored) {
// ignore
} }
ThreadUtil.sleep(500); ThreadUtil.sleep(500);
} }

@ -32,9 +32,6 @@ import java.util.concurrent.TimeUnit;
/** /**
* TaskDecorator test. * TaskDecorator test.
*
* @author chen.ma
* @date 2021/11/28 13:01
*/ */
@Slf4j @Slf4j
@Component @Component
@ -57,21 +54,17 @@ public class TaskDecoratorTest {
new ThreadPoolExecutor.AbortPolicy()); new ThreadPoolExecutor.AbortPolicy());
/** /**
* 线 {@link TaskDecorator} * Test dynamic thread pool passing {@link TaskDecorator}
* , @PostConstruct * If you need to run this single test, add @PostConstruct to the method.
*/ */
public void taskDecoratorTest() { public void taskDecoratorTest() {
taskDecoratorTestExecutor.execute(() -> { taskDecoratorTestExecutor.execute(() -> {
MDC.put(PLACEHOLDER, "查看官网: https://www.hippox.cn"); MDC.put(PLACEHOLDER, "View the official website: https://www.hippo4j.cn");
ThreadUtil.sleep(5000); ThreadUtil.sleep(5000);
DynamicThreadPoolWrapper poolWrapper = GlobalThreadPoolManage.getExecutorService(GlobalTestConstant.MESSAGE_PRODUCE); DynamicThreadPoolWrapper poolWrapper = GlobalThreadPoolManage.getExecutorService(GlobalTestConstant.MESSAGE_PRODUCE);
ThreadPoolExecutor threadPoolExecutor = poolWrapper.getExecutor(); ThreadPoolExecutor threadPoolExecutor = poolWrapper.getExecutor();
threadPoolExecutor.execute(() -> { threadPoolExecutor.execute(() -> {
/** log.info("Pass context via taskDecorator MDC: {}", MDC.get(PLACEHOLDER));
* , taskDecorator .
* taskDecorator {@link ThreadPoolConfig#messageConsumeDynamicThreadPool()}
*/
log.info("通过 taskDecorator MDC 传递上下文: {}", MDC.get(PLACEHOLDER));
}); });
}); });
} }

@ -26,9 +26,7 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/** /**
* @author : wh * RabbitMQ thread-pool config.
* @date : 2022/5/24 10:02
* @description:
*/ */
@Configuration @Configuration
public class RabbitMQThreadPoolConfig { public class RabbitMQThreadPoolConfig {
@ -36,11 +34,11 @@ public class RabbitMQThreadPoolConfig {
@Bean @Bean
public ThreadPoolTaskExecutor rabbitListenerTaskExecutor() { public ThreadPoolTaskExecutor rabbitListenerTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 指定线程的最大数量 // Specify the maximum number of threads.
executor.setMaxPoolSize(5); executor.setMaxPoolSize(5);
// 指定线程池维护线程的最少数量 // Specifies the minimum number of thread pool maintenance threads.
executor.setCorePoolSize(5); executor.setCorePoolSize(5);
// 指定等待处理的任务数 // Specifies the number of tasks waiting to be processed.
executor.setQueueCapacity(1000); executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("RabbitListenerTaskExecutor-"); executor.setThreadNamePrefix("RabbitListenerTaskExecutor-");
return executor; return executor;

@ -35,7 +35,7 @@ spring.dynamic.thread-pool.executors[0].rejected-handler=CallerRunsPolicy
spring.dynamic.thread-pool.executors[0].keep-alive-time=1024 spring.dynamic.thread-pool.executors[0].keep-alive-time=1024
spring.dynamic.thread-pool.executors[0].allow-core-thread-time-out=true spring.dynamic.thread-pool.executors[0].allow-core-thread-time-out=true
spring.dynamic.thread-pool.executors[0].thread-name-prefix=untimely-thread-pool spring.dynamic.thread-pool.executors[0].thread-name-prefix=untimely-thread-pool
spring.dynamic.thread-pool.executors[0].notify.is-alarm=true spring.dynamic.thread-pool.executors[0].notify.alarm=true
spring.dynamic.thread-pool.executors[0].notify.capacity-alarm=20 spring.dynamic.thread-pool.executors[0].notify.capacity-alarm=20
spring.dynamic.thread-pool.executors[0].notify.interval=2 spring.dynamic.thread-pool.executors[0].notify.interval=2
spring.dynamic.thread-pool.executors[0].notify.receive=xxx spring.dynamic.thread-pool.executors[0].notify.receive=xxx

Loading…
Cancel
Save