Merge branch 'develop' into bruceyan/issue#1216

# Conflicts:
#	agents/threadpool/hippo4j-agent-core/pom.xml
pull/1295/head
yanrongzhen 2 years ago
commit e310094f1e

5
.gitignore vendored

@ -46,3 +46,8 @@ build/
docs/node_modules
docs/build
docs/.docusaurus
### Docker ###
docker/threadpool/conf
docker/threadpool/target

@ -18,7 +18,6 @@
package cn.hippo4j.agent.bootstrap;
import cn.hippo4j.agent.core.boot.AgentPackageNotFoundException;
import cn.hippo4j.agent.core.boot.DefaultNamedThreadFactory;
import cn.hippo4j.agent.core.boot.ServiceManager;
import cn.hippo4j.agent.core.conf.Config;
import cn.hippo4j.agent.core.conf.SnifferConfigInitializer;
@ -49,15 +48,14 @@ import java.security.ProtectionDomain;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static net.bytebuddy.matcher.ElementMatchers.nameContains;
import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith;
import static net.bytebuddy.matcher.ElementMatchers.not;
/**
* Hippo4j Agent
*/
public class Hippo4jAgent {
private static ILog LOGGER = LogManager.getLogger(Hippo4jAgent.class);
@ -151,6 +149,9 @@ public class Hippo4jAgent {
.addShutdownHook(new Thread(ServiceManager.INSTANCE::shutdown, "hippo4j service shutdown thread"));
}
/**
* transformer
*/
private static class Transformer implements AgentBuilder.Transformer {
private PluginFinder pluginFinder;
@ -193,6 +194,9 @@ public class Hippo4jAgent {
return nameStartsWith("cn.hippo4j").and(not(nameStartsWith("cn.hippo4j.agent.toolkit.")));
}
/**
* listener
*/
private static class Listener implements AgentBuilder.Listener {
@Override
@ -235,6 +239,9 @@ public class Hippo4jAgent {
}
}
/**
* redefinition listener
*/
private static class RedefinitionListener implements AgentBuilder.RedefinitionStrategy.Listener {
@Override

@ -75,7 +75,9 @@
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-common</artifactId>
<artifactId>hippo4j-threadpool-infra-common</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

@ -20,6 +20,9 @@ package cn.hippo4j.agent.plugin.apollo.boot;
import cn.hippo4j.agent.core.boot.BootService;
import cn.hippo4j.agent.core.boot.DefaultImplementor;
/**
* Apollo plugin boot service
*/
@DefaultImplementor
public class ApolloPluginBootService implements BootService {

@ -27,6 +27,9 @@ import net.bytebuddy.matcher.ElementMatcher;
import static net.bytebuddy.matcher.ElementMatchers.any;
/**
* Apollo instrumentation
*/
public class ApolloInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS = "com.ctrip.framework.apollo.internals.DefaultConfig";

@ -20,6 +20,9 @@ package cn.hippo4j.agent.plugin.apollo.interceptor;
import cn.hippo4j.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import cn.hippo4j.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
/**
* Default config constructor interceptor
*/
public class DefaultConfigConstructorInterceptor implements InstanceConstructorInterceptor {
@Override

@ -36,6 +36,9 @@ import java.util.Set;
import static cn.hippo4j.config.springboot1x.starter.refresher.SpringBoot1xBootstrapConfigPropertiesBinderAdapt.getNames;
/**
* Dynamic thread pool change handler spring 1x
*/
public class DynamicThreadPoolChangeHandlerSpring1x extends AbstractDynamicThreadPoolChangeHandlerSpring {
public DynamicThreadPoolChangeHandlerSpring1x(ConfigurableApplicationContext context) {

@ -31,6 +31,9 @@ import java.util.List;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static cn.hippo4j.agent.core.plugin.match.NameMatch.byName;
/**
* Event publishing run listener instrumentation
*/
public class EventPublishingRunListenerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS = "org.springframework.boot.context.event.EventPublishingRunListener";

@ -32,6 +32,9 @@ import org.springframework.context.ConfigurableApplicationContext;
import java.lang.reflect.Method;
/**
* Event publishing finished interceptor
*/
public class EventPublishingFinishedInterceptor implements InstanceMethodsAroundInterceptor {
private static final ILog FILE_LOGGER = LogManager.getLogger(EventPublishingFinishedInterceptor.class);

@ -28,6 +28,9 @@ import org.springframework.context.ConfigurableApplicationContext;
import java.util.Map;
/**
* Dynamic thread pool change handler spring 2x
*/
public class DynamicThreadPoolChangeHandlerSpring2x extends AbstractDynamicThreadPoolChangeHandlerSpring {
public DynamicThreadPoolChangeHandlerSpring2x(ConfigurableApplicationContext context) {

@ -31,6 +31,9 @@ import java.util.List;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static cn.hippo4j.agent.core.plugin.match.NameMatch.byName;
/**
* Event publishing run listener instrumentation
*/
public class EventPublishingRunListenerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS = "org.springframework.boot.context.event.EventPublishingRunListener";

@ -33,6 +33,9 @@ import org.springframework.context.ConfigurableApplicationContext;
import java.lang.reflect.Method;
import java.util.Map;
/**
* Event publishing started interceptor
*/
public class EventPublishingStartedInterceptor implements InstanceMethodsAroundInterceptor {
private static final ILog LOGGER = LogManager.getLogger(EventPublishingStartedInterceptor.class);

@ -22,15 +22,30 @@ import cn.hippo4j.agent.core.boot.SpringBootConfigNode;
import java.util.Arrays;
import java.util.List;
/**
* Spring boot config
*/
public class SpringBootConfig {
/**
* Spring
*/
public static class Spring {
/**
* Dynamic
*/
public static class Dynamic {
/**
* ThreadPool
*/
@SpringBootConfigNode(root = SpringBootConfig.class)
public static class Thread_Pool {
/**
* Apollo
*/
@SpringBootConfigNode(root = SpringBootConfig.class)
public static class Apollo {

@ -27,6 +27,9 @@ import org.springframework.core.env.ConfigurableEnvironment;
import java.lang.reflect.Method;
/**
* Event publishing run listener environment prepared interceptor
*/
public class EventPublishingRunListenerEnvironmentPreparedInterceptor implements InstanceMethodsAroundInterceptor {
private static final Logger LOGGER = LoggerFactory.getLogger(EventPublishingRunListenerEnvironmentPreparedInterceptor.class);

@ -56,6 +56,9 @@ import static cn.hippo4j.agent.core.conf.Constants.SPRING_BOOT_CONFIG_PREFIX;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD_POOL_TEXT;
/**
* Abstract dynamic thread poo change handler spring
*/
public abstract class AbstractDynamicThreadPoolChangeHandlerSpring implements IDynamicThreadPoolChangeHandlerSpring {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractDynamicThreadPoolChangeHandlerSpring.class);

@ -17,6 +17,9 @@
package cn.hippo4j.agent.plugin.spring.common.support;
/**
* IDynamic thread pool change handler spring
*/
public interface IDynamicThreadPoolChangeHandlerSpring {
void registerApolloConfigHandler();

@ -23,6 +23,9 @@ import org.springframework.core.env.MapPropertySource;
import java.util.HashMap;
import java.util.Map;
/**
* Spring environment support
*/
public class SpringEnvironmentSupport {
public static void disableNonAgentSwitch(ConfigurableEnvironment environment) {

@ -29,6 +29,9 @@ import java.util.Iterator;
import java.util.List;
import java.util.Properties;
/**
* Spring properties loader
*/
public class SpringPropertiesLoader {
private static final ILog LOGGER = LogManager.getLogger(SpringPropertiesLoader.class);

@ -20,6 +20,7 @@ package cn.hippo4j.agent.plugin.spring.common.support;
import cn.hippo4j.agent.core.registry.AgentThreadPoolInstanceRegistry;
import cn.hippo4j.agent.core.util.ReflectUtil;
import cn.hippo4j.agent.core.util.ThreadPoolPropertyKey;
import cn.hippo4j.common.constant.Constants;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.toolkit.BooleanUtil;
@ -38,6 +39,9 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Spring thread pool register support
*/
public class SpringThreadPoolRegisterSupport {
private static final Logger LOGGER = LoggerFactory.getLogger(SpringThreadPoolRegisterSupport.class);
@ -96,7 +100,7 @@ public class SpringThreadPoolRegisterSupport {
properties.put(ThreadPoolPropertyKey.QUEUE_CAPACITY, executor.getQueue().remainingCapacity());
properties.put(ThreadPoolPropertyKey.THREAD_NAME_PREFIX, threadPoolId);
properties.put(ThreadPoolPropertyKey.REJECTED_HANDLER, RejectedPolicyTypeEnum.getRejectedPolicyTypeEnumByName(executor.getRejectedExecutionHandler().getClass().getSimpleName()).getName());
properties.put(ThreadPoolPropertyKey.EXECUTE_TIME_OUT, 10000L);
properties.put(ThreadPoolPropertyKey.EXECUTE_TIME_OUT, Constants.EXECUTE_TIME_OUT);
// register executor.
AgentThreadPoolInstanceRegistry.getInstance().putHolder(threadPoolId, executor, properties);

@ -20,7 +20,7 @@
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-common</artifactId>
<artifactId>hippo4j-threadpool-infra-common</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

@ -27,6 +27,9 @@ import net.bytebuddy.matcher.ElementMatcher;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
/**
* Thread pool executor instrumentation
*/
public class ThreadPoolExecutorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS = "java.util.concurrent.ThreadPoolExecutor";

@ -31,12 +31,17 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Thread pool executor constructor method interceptor
*/
public class ThreadPoolExecutorConstructorMethodInterceptor implements InstanceConstructorInterceptor {
private static final ILog LOGGER = LogManager.getLogger(ThreadPoolExecutorConstructorMethodInterceptor.class);
private static final List<String> EXCLUDE_STACK_TRACE_ELEMENT_CLASS_PREFIX = Arrays.asList("java", "cn.hippo4j.agent");
private static final int INITIAL_CAPACITY = 3;
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) throws Throwable {
@ -64,8 +69,8 @@ public class ThreadPoolExecutorConstructorMethodInterceptor implements InstanceC
}
}
List<StackTraceElement> result = new ArrayList<>(3); // Find up to three layers
for (int j = 0; i < stackTraceElements.length && j < 3; i++, j++) {
List<StackTraceElement> result = new ArrayList<>(INITIAL_CAPACITY); // Find up to three layers
for (int j = 0; i < stackTraceElements.length && j < INITIAL_CAPACITY; i++, j++) {
String fullClassName = stackTraceElements[i].getClassName();
if (isExcludeThreadPoolClass(fullClassName)) {
break;

@ -75,7 +75,6 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>

@ -29,4 +29,7 @@
<suppress checks="MagicNumber" files="UndertowWebThreadPoolHandler.java"/>
<suppress checks="MagicNumber" files="DashboardServiceImpl.java"/>
<suppress checks="MagicNumber" files="DefaultThreadPoolCheckAlarmHandler.java"/>
<suppress checks="TypeName" files="SpringBootConfig.java"/>
<suppress checks="ReturnCount" files="Hippo4jAgent.java"/>
<suppress checks="StaticVariableName" files="Hippo4jAgent.java"/>
</suppressions>

@ -34,10 +34,11 @@ hippo4j/hippo4j-server
方式一:
```shell
# 进入到 hippo4j-server/hippo4j-bootstrap 工程路径下
# 进入到 threadpool/server/bootstrap 工程路径下
mvn clean package -Dskip.spotless.apply=true
# 进入到 docker/threadpool 工程路径下
# 默认打包是打包的 tag 是 latest
docker build -t hippo4j/hippo4j-server ../hippo4j-bootstrap
docker build -t hippo4j/hippo4j-server ../threadpool
```
方式二:
@ -45,6 +46,6 @@ docker build -t hippo4j/hippo4j-server ../hippo4j-bootstrap
通过 `maven docker plugin`
```shell
# 进入到 hippo4j-server 工程路径下
# 进入到 threadpool/server/bootstrap 工程路径下
mvn clean package -DskipTests -Dskip.spotless.apply=true docker:build
```

@ -34,10 +34,11 @@ hippo4j/hippo4j-server
方式一:
```shell
# 进入到 hippo4j-server/hippo4j-bootstrap 工程路径下
# 进入到 threadpool/server/bootstrap 工程路径下
mvn clean package -Dskip.spotless.apply=true
# 进入到 docker/threadpool 工程路径下
# 默认打包是打包的 tag 是 latest
docker build -t hippo4j/hippo4j-server ../hippo4j-bootstrap
docker build -t hippo4j/hippo4j-server ../threadpool
```
方式二:
@ -45,6 +46,6 @@ docker build -t hippo4j/hippo4j-server ../hippo4j-bootstrap
通过 `maven docker plugin`
```shell
# 进入到 hippo4j-server 工程路径下
# 进入到 threadpool/server/bootstrap 工程路径下
mvn clean package -DskipTests -Dskip.spotless.apply=true docker:build
```

@ -21,6 +21,9 @@ import cn.hippo4j.core.enable.EnableDynamicThreadPool;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Config apollo spring boot 1x example application
*/
@EnableDynamicThreadPool
@SpringBootApplication(scanBasePackages = "cn.hippo4j.example.core")
public class ConfigApolloSpringBoot1xExampleApplication {

@ -21,6 +21,9 @@ import cn.hippo4j.core.enable.EnableDynamicThreadPool;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Config apollo example application
*/
@EnableDynamicThreadPool
@SpringBootApplication(scanBasePackages = "cn.hippo4j.example.core")
public class ConfigApolloExampleApplication {

@ -21,6 +21,9 @@ import cn.hippo4j.core.enable.EnableDynamicThreadPool;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Config consul example application
*/
@EnableDynamicThreadPool
@SpringBootApplication(scanBasePackages = "cn.hippo4j.example")
public class ConfigConsulExampleApplication {

@ -17,6 +17,7 @@
package cn.hippo4j.example.config.etcd.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
@ -29,6 +30,7 @@ import java.util.concurrent.ThreadPoolExecutor;
* @date : 2022/9/2 19:18
* @description:
*/
@Slf4j
@RestController
@RequestMapping
public class TestController {
@ -38,6 +40,6 @@ public class TestController {
@GetMapping("test")
public void test() {
System.out.println(messageConsumeDynamicExecutor.getMaximumPoolSize());
log.info(String.valueOf(messageConsumeDynamicExecutor.getMaximumPoolSize()));
}
}

@ -21,6 +21,9 @@ import cn.hippo4j.core.enable.EnableDynamicThreadPool;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Config nacos spring boot 1.5 example application
*/
@EnableDynamicThreadPool
@SpringBootApplication(scanBasePackages = "cn.hippo4j.example.core")
public class ConfigNacosSpringBoot15ExampleApplication {

@ -21,6 +21,9 @@ import cn.hippo4j.core.enable.EnableDynamicThreadPool;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Config nacos example application
*/
@EnableDynamicThreadPool
@SpringBootApplication(scanBasePackages = "cn.hippo4j.example.core")
public class ConfigNacosExampleApplication {

@ -21,6 +21,9 @@ import cn.hippo4j.core.enable.EnableDynamicThreadPool;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Config zookeeper example application
*/
@EnableDynamicThreadPool
@SpringBootApplication(scanBasePackages = "cn.hippo4j.example.core")
public class ConfigZookeeperExampleApplication {

@ -50,6 +50,16 @@ public class DynamicThreadPoolConfig {
public static final ThreadPoolExecutor FIELD2 = new ThreadPoolExecutor(10, 20,
1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(20));
private static final long EXECUTE_TIMEOUT = 800L;
private static final long AWAIT_TERMINATION_MILLIS = 5000L;
private static final int MAX_QUEUE_CAPACITY = 200;
private static final int CORE_POOL_SIZE = AVAILABLE_PROCESSORS * 2;
private static final int MAX_POOL_SIZE = AVAILABLE_PROCESSORS * 4;
@Bean
@DynamicThreadPool
public Executor messageConsumeTtlDynamicThreadPool() {
@ -58,9 +68,9 @@ public class DynamicThreadPoolConfig {
.dynamicPool()
.threadFactory(threadPoolId)
.threadPoolId(threadPoolId)
.executeTimeOut(800L)
.executeTimeOut(EXECUTE_TIMEOUT)
.waitForTasksToCompleteOnShutdown(true)
.awaitTerminationMillis(5000L)
.awaitTerminationMillis(AWAIT_TERMINATION_MILLIS)
.taskDecorator(new TaskTraceBuilderHandler())
.build();
// Ali ttl adaptation use case.
@ -86,10 +96,9 @@ public class DynamicThreadPoolConfig {
public ThreadPoolTaskExecutor testSpringThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setThreadNamePrefix("test-spring-task-executor_");
int maxQueueCapacity = 200;
threadPoolTaskExecutor.setCorePoolSize(AVAILABLE_PROCESSORS * 2);
threadPoolTaskExecutor.setMaxPoolSize(AVAILABLE_PROCESSORS * 4);
threadPoolTaskExecutor.setQueueCapacity(maxQueueCapacity);
threadPoolTaskExecutor.setCorePoolSize(CORE_POOL_SIZE);
threadPoolTaskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
threadPoolTaskExecutor.setQueueCapacity(MAX_QUEUE_CAPACITY);
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
threadPoolTaskExecutor.setTaskDecorator(new TaskDecoratorTest.ContextCopyingDecorator());
return threadPoolTaskExecutor;

@ -17,7 +17,7 @@
package cn.hippo4j.example.core.handler;
import cn.hippo4j.common.api.ClientNetworkService;
import cn.hippo4j.core.api.ClientNetworkService;
import org.springframework.core.env.ConfigurableEnvironment;
/**

@ -29,9 +29,11 @@ import java.util.concurrent.ThreadPoolExecutor;
*/
public class ErrorLogRejectedExecutionHandler implements CustomRejectedExecutionHandler {
private static final int REJECTED_TYPE = 12;
@Override
public Integer getType() {
return 12;
return REJECTED_TYPE;
}
@Override
@ -39,6 +41,9 @@ public class ErrorLogRejectedExecutionHandler implements CustomRejectedExecution
return new CustomErrorLogRejectedExecutionHandler();
}
/**
* Custom Error Log Rejected Execution Handler
*/
public static class CustomErrorLogRejectedExecutionHandler implements RejectedExecutionHandler {
@Override

@ -35,6 +35,10 @@ import java.util.concurrent.TimeUnit;
@Component
public class AlarmSendMessageTest {
private static final int SLEEP_TIME = 10240124;
private static final int INITIAL_DELAY = 3;
/**
* Test alarm notification.
* If you need to run this single test, add @PostConstruct to the method.
@ -48,7 +52,7 @@ public class AlarmSendMessageTest {
try {
poolExecutor.execute(() -> {
try {
Thread.sleep(10240124);
Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
@ -56,6 +60,6 @@ public class AlarmSendMessageTest {
} catch (Exception ex) {
log.error("Throw reject policy.", ex.getMessage());
}
}, 3, 1, TimeUnit.SECONDS);
}, INITIAL_DELAY, 1, TimeUnit.SECONDS);
}
}

@ -35,33 +35,41 @@ import java.util.concurrent.ThreadPoolExecutor;
@Slf4j
public class RegisterDynamicThreadPoolTest {
private static final int CAPACITY = 1024;
private static final long KEEP_ALIVE_TIME = 1024L;
private static final long EXECUTE_TIMEOUT = 1024L;
private static final int CAPACITY_ALARM = 90;
private static final int ACTIVE_ALARM = 90;
private static final int CORE_NOTIFY_INTERVAL = 5;
private static final int SERVER_NOTIFY_INTERVAL = 10;
public static ThreadPoolExecutor registerDynamicThreadPool(String threadPoolId) {
DynamicThreadPoolRegisterParameter parameterInfo = DynamicThreadPoolRegisterParameter.builder()
.corePoolSize(1)
.maximumPoolSize(2)
.blockingQueueType(BlockingQueueTypeEnum.LINKED_BLOCKING_QUEUE)
.capacity(1024)
.capacity(CAPACITY)
// TimeUnit.SECONDS
.keepAliveTime(1024L)
.keepAliveTime(KEEP_ALIVE_TIME)
// TimeUnit.MILLISECONDS
.executeTimeOut(1024L)
.executeTimeOut(EXECUTE_TIMEOUT)
.rejectedPolicyType(RejectedPolicyTypeEnum.DISCARD_POLICY)
.isAlarm(true)
.allowCoreThreadTimeOut(false)
.capacityAlarm(90)
.activeAlarm(90)
.capacityAlarm(CAPACITY_ALARM)
.activeAlarm(ACTIVE_ALARM)
.threadPoolId(threadPoolId)
.threadNamePrefix(threadPoolId)
.build();
// Core mode and server mode, you can choose one of them.
DynamicThreadPoolRegisterCoreNotifyParameter coreNotifyParameter = DynamicThreadPoolRegisterCoreNotifyParameter.builder()
.receives("chen.ma")
.interval(5)
.interval(CORE_NOTIFY_INTERVAL)
.build();
DynamicThreadPoolRegisterServerNotifyParameter serverNotifyParameter = DynamicThreadPoolRegisterServerNotifyParameter.builder()
.platform(NotifyPlatformEnum.WECHAT.name())
.accessToken("7487d0a0-20ec-40ab-b67b-ce68db406b37")
.interval(10)
.interval(SERVER_NOTIFY_INTERVAL)
.receives("chen.ma")
.build();
DynamicThreadPoolRegisterWrapper registerWrapper = DynamicThreadPoolRegisterWrapper.builder()

@ -44,6 +44,12 @@ public class RunStateHandlerTest {
@Resource
private ThreadPoolExecutor messageProduceDynamicThreadPool;
private static final int MAX_RANDOM = 10;
private static final int SLEEP_500 = 500;
private static final int SLEEP_1000 = 1000;
private static final int SLEEP_3000 = 3000;
private static final int SLEEP_5000 = 5000;
/*
* @Resource private ThreadPoolTaskExecutor testSpringThreadPoolTaskExecutor;
*/
@ -82,7 +88,7 @@ public class RunStateHandlerTest {
*/
MDC.put(EXECUTE_TIMEOUT_TRACE, "39948722194639841.251.16612352194691531");
try {
Thread.sleep(5000);
Thread.sleep(SLEEP_5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
@ -90,14 +96,14 @@ public class RunStateHandlerTest {
try {
executor.execute(() -> {
try {
int maxRandom = 10;
int maxRandom = MAX_RANDOM;
int temp = 2;
Random random = new Random();
// Assignment thread pool completedTaskCount
if (random.nextInt(maxRandom) % temp == 0) {
Thread.sleep(1000);
Thread.sleep(SLEEP_1000);
} else {
Thread.sleep(3000);
Thread.sleep(SLEEP_3000);
}
} catch (InterruptedException ignored) {
}
@ -105,7 +111,7 @@ public class RunStateHandlerTest {
} catch (Exception ignored) {
}
try {
Thread.sleep(500);
Thread.sleep(SLEEP_500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

@ -38,6 +38,8 @@ public class TaskDecoratorTest {
public static final String PLACEHOLDER = "site";
private static final int SLEEP_TIME = 5000;
private final ThreadPoolExecutor taskDecoratorTestExecutor = new ThreadPoolExecutor(
1,
1,
@ -60,7 +62,7 @@ public class TaskDecoratorTest {
taskDecoratorTestExecutor.execute(() -> {
MDC.put(PLACEHOLDER, "View the official website: https://www.hippo4j.cn");
try {
Thread.sleep(5000);
Thread.sleep(SLEEP_TIME);
DynamicThreadPoolWrapper poolWrapper = GlobalThreadPoolManage.getExecutorService(GlobalTestConstant.MESSAGE_PRODUCE);
ThreadPoolExecutor threadPoolExecutor = poolWrapper.getExecutor();
threadPoolExecutor.execute(() -> log.info("Pass context via taskDecorator MDC: {}", MDC.get(PLACEHOLDER)));
@ -70,6 +72,9 @@ public class TaskDecoratorTest {
});
}
/**
* Context Copying Decorator
*/
public static class ContextCopyingDecorator implements TaskDecorator {
@Override

@ -39,6 +39,8 @@ public class MessageProduce {
public static final String TOPIC = "test";
private static final int SLEEP_TIME = 3;
private final KafkaTemplate template;
@GetMapping("/message/send")
@ -54,9 +56,9 @@ public class MessageProduce {
String message = UUID.randomUUID().toString();
template.send(TOPIC, "autoTestMessage " + message);
try {
TimeUnit.SECONDS.sleep(3);
TimeUnit.SECONDS.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
e.printStackTrace();
log.error(e.getMessage());
}
}
});

@ -21,6 +21,9 @@ import cn.hippo4j.core.enable.EnableDynamicThreadPool;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Server adapter kafka example application
*/
@EnableDynamicThreadPool
@SpringBootApplication(scanBasePackages = {"cn.hippo4j.example.core", "cn.hippo4j.springboot.starter.adapter.kafka.example"})
public class ServerAdapterKafkaExampleApplication {

@ -21,6 +21,9 @@ import cn.hippo4j.core.enable.EnableDynamicThreadPool;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Server Adapter RabbitMQ Example Application
*/
@EnableDynamicThreadPool
@SpringBootApplication(scanBasePackages = {"cn.hippo4j.example.core", "cn.hippo4j.springboot.starter.adapter.rabbitmq.example"})
public class ServerAdapterRabbitMQExampleApplication {

@ -31,15 +31,23 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class RabbitMQThreadPoolConfig {
private static final int MAX_POOL_SIZE = 5;
private static final int CORE_POOL_SIZE = 5;
private static final int QUEUE_CAPACITY = 1000;
private static final int CONSUMERS_PER_QUEUE = 10;
@Bean
public ThreadPoolTaskExecutor rabbitListenerTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// Specify the maximum number of threads.
executor.setMaxPoolSize(5);
executor.setMaxPoolSize(MAX_POOL_SIZE);
// Specifies the minimum number of thread pool maintenance threads.
executor.setCorePoolSize(5);
executor.setCorePoolSize(CORE_POOL_SIZE);
// Specifies the number of tasks waiting to be processed.
executor.setQueueCapacity(1000);
executor.setQueueCapacity(QUEUE_CAPACITY);
executor.setThreadNamePrefix("RabbitListenerTaskExecutor-");
return executor;
}
@ -50,7 +58,7 @@ public class RabbitMQThreadPoolConfig {
DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
factory.setConnectionFactory(abstractConnectionFactory);
factory.setMessageConverter(messageConverter);
factory.setConsumersPerQueue(10);
factory.setConsumersPerQueue(CONSUMERS_PER_QUEUE);
abstractConnectionFactory.setExecutor(rabbitListenerTaskExecutor);
return factory;
}

@ -20,7 +20,7 @@ package cn.hippo4j.springboot.starter.adapter.rabbitmq.example.constants;
/**
* Simple MQ constant.
*/
public interface SimpleMQConstant {
public class SimpleMQConstant {
String QUEUE_NAME = "framework_message-center_queue";
public static final String QUEUE_NAME = "framework_message-center_queue";
}

@ -21,6 +21,9 @@ import cn.hippo4j.core.enable.EnableDynamicThreadPool;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Server adapter rocketMQ example application
*/
@EnableDynamicThreadPool
@SpringBootApplication(scanBasePackages = {"cn.hippo4j.example.core", "cn.hippo4j.springboot.starter.adapter.rocketmq.example"})
public class ServerAdapterRocketMQExampleApplication {

@ -37,10 +37,12 @@ public class MessageProduce {
private final StreamBridge streamBridge;
private static final int MAX_SEND_SIZE = 10;
@GetMapping("/message/send")
public String sendMessage(@RequestParam(required = false) Integer maxSendSize) {
if (maxSendSize == null) {
maxSendSize = 10;
maxSendSize = MAX_SEND_SIZE;
}
for (int i = 0; i < maxSendSize; i++) {
sendMessage0();

@ -29,6 +29,9 @@ import org.springframework.messaging.MessageHeaders;
import java.util.function.Consumer;
/**
* Server Adapter Spring Cloud Stream RabbitMQ Application
*/
@Slf4j
@EnableDynamicThreadPool
@SpringBootApplication

@ -39,6 +39,7 @@ public class MessageConsume {
long startTime = System.currentTimeMillis();
try {
// ignore
log.info("Message: {}", JSONUtil.toJSONString(message));
} finally {
log.info("Keys: {}, Msg id: {}, Execute time: {} ms, Message: {}", headers.get("rocketmq_KEYS"), headers.get("rocketmq_MESSAGE_ID"), System.currentTimeMillis() - startTime,
JSONUtil.toJSONString(message));
@ -51,6 +52,7 @@ public class MessageConsume {
long startTime = System.currentTimeMillis();
try {
// ignore
log.info("Message: {}", JSONUtil.toJSONString(message));
} finally {
log.info("Keys: {}, Msg id: {}, Execute time: {} ms, Message: {}", headers.get("rocketmq_KEYS"), headers.get("rocketmq_MESSAGE_ID"), System.currentTimeMillis() - startTime,
JSONUtil.toJSONString(message));

@ -43,10 +43,13 @@ public class MessageProduce {
public static final String MESSAGE_CENTER_SAVE_MESSAGE_TAG = "framework_message-center_save-message_tag";
private static final int MAX_SEND_SIZE = 10;
private static final long SEND_TIMEOUT = 2000L;
@GetMapping("/message/send")
public String sendMessage() {
int maxSendSize = 10;
for (int i = 0; i < maxSendSize; i++) {
for (int i = 0; i < MAX_SEND_SIZE; i++) {
sendMessage(MESSAGE_CENTER_SEND_MESSAGE_TAG);
sendMessage(MESSAGE_CENTER_SAVE_MESSAGE_TAG);
}
@ -67,7 +70,7 @@ public class MessageProduce {
long startTime = System.currentTimeMillis();
boolean sendResult = false;
try {
sendResult = output.send(message, 2000L);
sendResult = output.send(message, SEND_TIMEOUT);
} finally {
log.info("Send status: {}, Keys: {}, Execute time: {} ms, Message: {}",
sendResult,

@ -24,6 +24,9 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
/**
* Server Adapter Spring Cloud Stream RocketMQ Application
*/
@Slf4j
@EnableDynamicThreadPool
@EnableBinding({Source.class, MySink.class})

@ -21,6 +21,9 @@ import cn.hippo4j.core.enable.EnableDynamicThreadPool;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Server es monitor example application
*/
@EnableDynamicThreadPool
@SpringBootApplication(scanBasePackages = {"cn.hippo4j.example.core", "cn.hippo4j.monitor"})
public class ServerEsMonitorExampleApplication {

@ -21,6 +21,9 @@ import cn.hippo4j.core.enable.EnableDynamicThreadPool;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Server Example Application
*/
@EnableDynamicThreadPool
@SpringBootApplication(scanBasePackages = {"cn.hippo4j.example.core", "cn.hippo4j.example.server"})
public class ServerExampleApplication {

@ -4,24 +4,34 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool</artifactId>
<artifactId>hippo4j-infra</artifactId>
<version>${revision}</version>
</parent>
<artifactId>hippo4j-threadpool-common</artifactId>
<artifactId>hippo4j-threadpool-infra-common</artifactId>
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
<dependency>
<groupId>com.github.dozermapper</groupId>
<artifactId>dozer-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<!-- Unit test start -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
@ -37,22 +47,11 @@
<artifactId>mockito-inline</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
<dependency>
<groupId>com.github.dozermapper</groupId>
<artifactId>dozer-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-core</artifactId>
<scope>test</scope>
</dependency>
<!-- Unit test end -->
</dependencies>
</project>

@ -28,9 +28,9 @@ public interface ClientCloseHookExecute {
/**
* Client close hook function execution.
*
* @param req
* @param requestParam
*/
void closeHook(ClientCloseHookReq req);
void closeHook(ClientCloseHookReq requestParam);
/**
* Client close hook req.

@ -131,4 +131,6 @@ public class Constants {
public static final String CLIENT_VERSION = "Client-Version";
public static final String CONFIGURATION_PROPERTIES_PREFIX = "spring.dynamic.thread-pool";
public static final long NO_REJECT_COUNT_NUM = -1L;
}

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.common.constant;
public class MagicNumberConstants {
public static final int INDEX_0 = 0;
public static final int INDEX_1 = 1;
public static final int INDEX_2 = 2;
public static final int INDEX_3 = 3;
public static final int SIZE_4 = 4;
}

@ -17,7 +17,7 @@
package cn.hippo4j.common.executor.support;
import cn.hippo4j.common.extension.support.ServiceLoaderRegistry;
import cn.hippo4j.common.extension.spi.ServiceLoaderRegistry;
import lombok.Getter;
import java.util.Collection;

@ -17,12 +17,10 @@
package cn.hippo4j.common.executor.support;
import cn.hippo4j.common.extension.support.ServiceLoaderRegistry;
import cn.hippo4j.common.extension.spi.ServiceLoaderRegistry;
import lombok.Getter;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Stream;

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.common.extension.support;
package cn.hippo4j.common.extension.spi;
/**
* Service loader instantiation exception.

@ -15,9 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.common.extension.support;
import cn.hippo4j.common.extension.annotation.SingletonSPI;
package cn.hippo4j.common.extension.spi;
import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
@ -31,7 +29,6 @@ import java.util.stream.Collectors;
/**
* Dynamic thread-pool SPI service loader.
*/
@Deprecated
public class ServiceLoaderRegistry {
/**

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.common.extension.annotation;
package cn.hippo4j.common.extension.spi;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
@ -27,6 +27,5 @@ import java.lang.annotation.Target;
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Deprecated
public @interface SingletonSPI {
}

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save