optimize: fix StyleCheck errors for hippo4j (#1291)

* optimize: fix StyleCheck errors for hippo4j

* ignore TypeName error
pull/1293/head
Dmego 2 years ago committed by GitHub
parent 19d4a85777
commit ea19be3cb8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -20,6 +20,9 @@ package cn.hippo4j.agent.plugin.apollo.boot;
import cn.hippo4j.agent.core.boot.BootService;
import cn.hippo4j.agent.core.boot.DefaultImplementor;
/**
* Apollo plugin boot service
*/
@DefaultImplementor
public class ApolloPluginBootService implements BootService {

@ -27,6 +27,9 @@ import net.bytebuddy.matcher.ElementMatcher;
import static net.bytebuddy.matcher.ElementMatchers.any;
/**
* Apollo instrumentation
*/
public class ApolloInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS = "com.ctrip.framework.apollo.internals.DefaultConfig";

@ -20,6 +20,9 @@ package cn.hippo4j.agent.plugin.apollo.interceptor;
import cn.hippo4j.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import cn.hippo4j.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
/**
* Default config constructor interceptor
*/
public class DefaultConfigConstructorInterceptor implements InstanceConstructorInterceptor {
@Override

@ -36,6 +36,9 @@ import java.util.Set;
import static cn.hippo4j.config.springboot1x.starter.refresher.SpringBoot1xBootstrapConfigPropertiesBinderAdapt.getNames;
/**
* Dynamic thread pool change handler spring 1x
*/
public class DynamicThreadPoolChangeHandlerSpring1x extends AbstractDynamicThreadPoolChangeHandlerSpring {
public DynamicThreadPoolChangeHandlerSpring1x(ConfigurableApplicationContext context) {

@ -27,6 +27,9 @@ import net.bytebuddy.matcher.ElementMatcher;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static cn.hippo4j.agent.core.plugin.match.NameMatch.byName;
/**
* Event publishing run listener instrumentation
*/
public class EventPublishingRunListenerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS = "org.springframework.boot.context.event.EventPublishingRunListener";

@ -32,6 +32,9 @@ import org.springframework.context.ConfigurableApplicationContext;
import java.lang.reflect.Method;
/**
* Event publishing finished interceptor
*/
public class EventPublishingFinishedInterceptor implements InstanceMethodsAroundInterceptor {
private static final ILog FILE_LOGGER = LogManager.getLogger(EventPublishingFinishedInterceptor.class);

@ -28,6 +28,9 @@ import org.springframework.context.ConfigurableApplicationContext;
import java.util.Map;
/**
* Dynamic thread pool change handler spring 2x
*/
public class DynamicThreadPoolChangeHandlerSpring2x extends AbstractDynamicThreadPoolChangeHandlerSpring {
public DynamicThreadPoolChangeHandlerSpring2x(ConfigurableApplicationContext context) {

@ -27,6 +27,9 @@ import net.bytebuddy.matcher.ElementMatcher;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static cn.hippo4j.agent.core.plugin.match.NameMatch.byName;
/**
* Event publishing run listener instrumentation
*/
public class EventPublishingRunListenerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS = "org.springframework.boot.context.event.EventPublishingRunListener";

@ -30,6 +30,9 @@ import org.springframework.context.ConfigurableApplicationContext;
import java.lang.reflect.Method;
/**
* Event publishing started interceptor
*/
public class EventPublishingStartedInterceptor implements InstanceMethodsAroundInterceptor {
private static final ILog LOGGER = LogManager.getLogger(EventPublishingStartedInterceptor.class);

@ -22,22 +22,37 @@ import cn.hippo4j.agent.core.boot.SpringBootConfigNode;
import java.util.Arrays;
import java.util.List;
/**
* Spring boot config
*/
public class SpringBootConfig {
/**
* Spring
*/
public static class Spring {
/**
* Dynamic
*/
public static class Dynamic {
/**
* ThreadPool
*/
@SpringBootConfigNode(root = SpringBootConfig.class)
public static class Thread_Pool {
/**
* Apollo
*/
@SpringBootConfigNode(root = SpringBootConfig.class)
public static class Apollo {
public static List<String> NAMESPACE = Arrays.asList("application");
public static final List<String> NAMESPACE = Arrays.asList("application");
}
public static String CONFIG_FILE_TYPE;
public static final String CONFIG_FILE_TYPE = null;
}
}
}

@ -27,6 +27,9 @@ import org.springframework.core.env.ConfigurableEnvironment;
import java.lang.reflect.Method;
/**
* Event publishing run listener environment prepared interceptor
*/
public class EventPublishingRunListenerEnvironmentPreparedInterceptor implements InstanceMethodsAroundInterceptor {
private static final Logger LOGGER = LoggerFactory.getLogger(EventPublishingRunListenerEnvironmentPreparedInterceptor.class);

@ -56,6 +56,9 @@ import static cn.hippo4j.agent.core.conf.Constants.SPRING_BOOT_CONFIG_PREFIX;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD_POOL_TEXT;
/**
* Abstract dynamic thread poo change handler spring
*/
public abstract class AbstractDynamicThreadPoolChangeHandlerSpring implements IDynamicThreadPoolChangeHandlerSpring {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractDynamicThreadPoolChangeHandlerSpring.class);

@ -17,6 +17,9 @@
package cn.hippo4j.agent.plugin.spring.common.support;
/**
* IDynamic thread pool change handler spring
*/
public interface IDynamicThreadPoolChangeHandlerSpring {
void registerApolloConfigHandler();

@ -23,6 +23,9 @@ import org.springframework.core.env.MapPropertySource;
import java.util.HashMap;
import java.util.Map;
/**
* Spring environment support
*/
public class SpringEnvironmentSupport {
public static void disableNonAgentSwitch(ConfigurableEnvironment environment) {

@ -29,6 +29,9 @@ import java.util.Iterator;
import java.util.List;
import java.util.Properties;
/**
* Spring properties loader
*/
public class SpringPropertiesLoader {
private static final ILog LOGGER = LogManager.getLogger(SpringPropertiesLoader.class);

@ -20,6 +20,7 @@ package cn.hippo4j.agent.plugin.spring.common.support;
import cn.hippo4j.agent.core.registry.AgentThreadPoolInstanceRegistry;
import cn.hippo4j.agent.core.util.ReflectUtil;
import cn.hippo4j.agent.core.util.ThreadPoolPropertyKey;
import cn.hippo4j.common.constant.Constants;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.toolkit.BooleanUtil;
@ -33,6 +34,9 @@ import java.util.Properties;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Spring thread pool register support
*/
public class SpringThreadPoolRegisterSupport {
private static final Logger LOGGER = LoggerFactory.getLogger(SpringThreadPoolRegisterSupport.class);
@ -71,7 +75,7 @@ public class SpringThreadPoolRegisterSupport {
properties.put(ThreadPoolPropertyKey.QUEUE_CAPACITY, executor.getQueue().remainingCapacity());
properties.put(ThreadPoolPropertyKey.THREAD_NAME_PREFIX, threadPoolId);
properties.put(ThreadPoolPropertyKey.REJECTED_HANDLER, RejectedPolicyTypeEnum.getRejectedPolicyTypeEnumByName(executor.getRejectedExecutionHandler().getClass().getSimpleName()).getName());
properties.put(ThreadPoolPropertyKey.EXECUTE_TIME_OUT, 10000L);
properties.put(ThreadPoolPropertyKey.EXECUTE_TIME_OUT, Constants.EXECUTE_TIME_OUT);
// register executor.
AgentThreadPoolInstanceRegistry.getInstance().putHolder(threadPoolId, executor, properties);

@ -27,6 +27,9 @@ import net.bytebuddy.matcher.ElementMatcher;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
/**
* Thread pool executor instrumentation
*/
public class ThreadPoolExecutorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS = "java.util.concurrent.ThreadPoolExecutor";

@ -31,12 +31,17 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Thread pool executor constructor method interceptor
*/
public class ThreadPoolExecutorConstructorMethodInterceptor implements InstanceConstructorInterceptor {
private static final ILog LOGGER = LogManager.getLogger(ThreadPoolExecutorConstructorMethodInterceptor.class);
private static final List<String> EXCLUDE_STACK_TRACE_ELEMENT_CLASS_PREFIX = Arrays.asList("java", "cn.hippo4j.agent");
private static final int INITIAL_CAPACITY = 3;
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) throws Throwable {
@ -64,8 +69,8 @@ public class ThreadPoolExecutorConstructorMethodInterceptor implements InstanceC
}
}
List<StackTraceElement> result = new ArrayList<>(3); // Find up to three layers
for (int j = 0; i < stackTraceElements.length && j < 3; i++, j++) {
List<StackTraceElement> result = new ArrayList<>(INITIAL_CAPACITY); // Find up to three layers
for (int j = 0; i < stackTraceElements.length && j < INITIAL_CAPACITY; i++, j++) {
String fullClassName = stackTraceElements[i].getClassName();
if (isExcludeThreadPoolClass(fullClassName)) {
break;

@ -29,4 +29,5 @@
<suppress checks="MagicNumber" files="UndertowWebThreadPoolHandler.java"/>
<suppress checks="MagicNumber" files="DashboardServiceImpl.java"/>
<suppress checks="MagicNumber" files="DefaultThreadPoolCheckAlarmHandler.java"/>
<suppress checks="TypeName" files="SpringBootConfig.java"/>
</suppressions>

@ -21,6 +21,9 @@ import cn.hippo4j.core.enable.EnableDynamicThreadPool;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Config apollo spring boot 1x example application
*/
@EnableDynamicThreadPool
@SpringBootApplication(scanBasePackages = "cn.hippo4j.example.core")
public class ConfigApolloSpringBoot1xExampleApplication {

@ -21,6 +21,9 @@ import cn.hippo4j.core.enable.EnableDynamicThreadPool;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Config apollo example application
*/
@EnableDynamicThreadPool
@SpringBootApplication(scanBasePackages = "cn.hippo4j.example.core")
public class ConfigApolloExampleApplication {

@ -21,6 +21,9 @@ import cn.hippo4j.core.enable.EnableDynamicThreadPool;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Config consul example application
*/
@EnableDynamicThreadPool
@SpringBootApplication(scanBasePackages = "cn.hippo4j.example")
public class ConfigConsulExampleApplication {

@ -17,6 +17,7 @@
package cn.hippo4j.example.config.etcd.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
@ -29,6 +30,7 @@ import java.util.concurrent.ThreadPoolExecutor;
* @date : 2022/9/2 19:18
* @description:
*/
@Slf4j
@RestController
@RequestMapping
public class TestController {
@ -38,6 +40,6 @@ public class TestController {
@GetMapping("test")
public void test() {
System.out.println(messageConsumeDynamicExecutor.getMaximumPoolSize());
log.info(String.valueOf(messageConsumeDynamicExecutor.getMaximumPoolSize()));
}
}

@ -21,6 +21,9 @@ import cn.hippo4j.core.enable.EnableDynamicThreadPool;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Config nacos spring boot 1.5 example application
*/
@EnableDynamicThreadPool
@SpringBootApplication(scanBasePackages = "cn.hippo4j.example.core")
public class ConfigNacosSpringBoot15ExampleApplication {

@ -21,6 +21,9 @@ import cn.hippo4j.core.enable.EnableDynamicThreadPool;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Config nacos example application
*/
@EnableDynamicThreadPool
@SpringBootApplication(scanBasePackages = "cn.hippo4j.example.core")
public class ConfigNacosExampleApplication {

@ -21,6 +21,9 @@ import cn.hippo4j.core.enable.EnableDynamicThreadPool;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Config zookeeper example application
*/
@EnableDynamicThreadPool
@SpringBootApplication(scanBasePackages = "cn.hippo4j.example.core")
public class ConfigZookeeperExampleApplication {

@ -50,6 +50,16 @@ public class DynamicThreadPoolConfig {
public static final ThreadPoolExecutor FIELD2 = new ThreadPoolExecutor(10, 20,
1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(20));
private static final long EXECUTE_TIMEOUT = 800L;
private static final long AWAIT_TERMINATION_MILLIS = 5000L;
private static final int MAX_QUEUE_CAPACITY = 200;
private static final int CORE_POOL_SIZE = AVAILABLE_PROCESSORS * 2;
private static final int MAX_POOL_SIZE = AVAILABLE_PROCESSORS * 4;
@Bean
@DynamicThreadPool
public Executor messageConsumeTtlDynamicThreadPool() {
@ -58,9 +68,9 @@ public class DynamicThreadPoolConfig {
.dynamicPool()
.threadFactory(threadPoolId)
.threadPoolId(threadPoolId)
.executeTimeOut(800L)
.executeTimeOut(EXECUTE_TIMEOUT)
.waitForTasksToCompleteOnShutdown(true)
.awaitTerminationMillis(5000L)
.awaitTerminationMillis(AWAIT_TERMINATION_MILLIS)
.taskDecorator(new TaskTraceBuilderHandler())
.build();
// Ali ttl adaptation use case.
@ -86,10 +96,9 @@ public class DynamicThreadPoolConfig {
public ThreadPoolTaskExecutor testSpringThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setThreadNamePrefix("test-spring-task-executor_");
int maxQueueCapacity = 200;
threadPoolTaskExecutor.setCorePoolSize(AVAILABLE_PROCESSORS * 2);
threadPoolTaskExecutor.setMaxPoolSize(AVAILABLE_PROCESSORS * 4);
threadPoolTaskExecutor.setQueueCapacity(maxQueueCapacity);
threadPoolTaskExecutor.setCorePoolSize(CORE_POOL_SIZE);
threadPoolTaskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
threadPoolTaskExecutor.setQueueCapacity(MAX_QUEUE_CAPACITY);
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
threadPoolTaskExecutor.setTaskDecorator(new TaskDecoratorTest.ContextCopyingDecorator());
return threadPoolTaskExecutor;

@ -29,9 +29,11 @@ import java.util.concurrent.ThreadPoolExecutor;
*/
public class ErrorLogRejectedExecutionHandler implements CustomRejectedExecutionHandler {
private static final int REJECTED_TYPE = 12;
@Override
public Integer getType() {
return 12;
return REJECTED_TYPE;
}
@Override
@ -39,6 +41,9 @@ public class ErrorLogRejectedExecutionHandler implements CustomRejectedExecution
return new CustomErrorLogRejectedExecutionHandler();
}
/**
* Custom Error Log Rejected Execution Handler
*/
public static class CustomErrorLogRejectedExecutionHandler implements RejectedExecutionHandler {
@Override

@ -35,6 +35,10 @@ import java.util.concurrent.TimeUnit;
@Component
public class AlarmSendMessageTest {
private static final int SLEEP_TIME = 10240124;
private static final int INITIAL_DELAY = 3;
/**
* Test alarm notification.
* If you need to run this single test, add @PostConstruct to the method.
@ -48,7 +52,7 @@ public class AlarmSendMessageTest {
try {
poolExecutor.execute(() -> {
try {
Thread.sleep(10240124);
Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
@ -56,6 +60,6 @@ public class AlarmSendMessageTest {
} catch (Exception ex) {
log.error("Throw reject policy.", ex.getMessage());
}
}, 3, 1, TimeUnit.SECONDS);
}, INITIAL_DELAY, 1, TimeUnit.SECONDS);
}
}

@ -35,33 +35,41 @@ import java.util.concurrent.ThreadPoolExecutor;
@Slf4j
public class RegisterDynamicThreadPoolTest {
private static final int CAPACITY = 1024;
private static final long KEEP_ALIVE_TIME = 1024L;
private static final long EXECUTE_TIMEOUT = 1024L;
private static final int CAPACITY_ALARM = 90;
private static final int ACTIVE_ALARM = 90;
private static final int CORE_NOTIFY_INTERVAL = 5;
private static final int SERVER_NOTIFY_INTERVAL = 10;
public static ThreadPoolExecutor registerDynamicThreadPool(String threadPoolId) {
DynamicThreadPoolRegisterParameter parameterInfo = DynamicThreadPoolRegisterParameter.builder()
.corePoolSize(1)
.maximumPoolSize(2)
.blockingQueueType(BlockingQueueTypeEnum.LINKED_BLOCKING_QUEUE)
.capacity(1024)
.capacity(CAPACITY)
// TimeUnit.SECONDS
.keepAliveTime(1024L)
.keepAliveTime(KEEP_ALIVE_TIME)
// TimeUnit.MILLISECONDS
.executeTimeOut(1024L)
.executeTimeOut(EXECUTE_TIMEOUT)
.rejectedPolicyType(RejectedPolicyTypeEnum.DISCARD_POLICY)
.isAlarm(true)
.allowCoreThreadTimeOut(false)
.capacityAlarm(90)
.activeAlarm(90)
.capacityAlarm(CAPACITY_ALARM)
.activeAlarm(ACTIVE_ALARM)
.threadPoolId(threadPoolId)
.threadNamePrefix(threadPoolId)
.build();
// Core mode and server mode, you can choose one of them.
DynamicThreadPoolRegisterCoreNotifyParameter coreNotifyParameter = DynamicThreadPoolRegisterCoreNotifyParameter.builder()
.receives("chen.ma")
.interval(5)
.interval(CORE_NOTIFY_INTERVAL)
.build();
DynamicThreadPoolRegisterServerNotifyParameter serverNotifyParameter = DynamicThreadPoolRegisterServerNotifyParameter.builder()
.platform(NotifyPlatformEnum.WECHAT.name())
.accessToken("7487d0a0-20ec-40ab-b67b-ce68db406b37")
.interval(10)
.interval(SERVER_NOTIFY_INTERVAL)
.receives("chen.ma")
.build();
DynamicThreadPoolRegisterWrapper registerWrapper = DynamicThreadPoolRegisterWrapper.builder()

@ -44,6 +44,12 @@ public class RunStateHandlerTest {
@Resource
private ThreadPoolExecutor messageProduceDynamicThreadPool;
private static final int MAX_RANDOM = 10;
private static final int SLEEP_500 = 500;
private static final int SLEEP_1000 = 1000;
private static final int SLEEP_3000 = 3000;
private static final int SLEEP_5000 = 5000;
/*
* @Resource private ThreadPoolTaskExecutor testSpringThreadPoolTaskExecutor;
*/
@ -82,7 +88,7 @@ public class RunStateHandlerTest {
*/
MDC.put(EXECUTE_TIMEOUT_TRACE, "39948722194639841.251.16612352194691531");
try {
Thread.sleep(5000);
Thread.sleep(SLEEP_5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
@ -90,14 +96,14 @@ public class RunStateHandlerTest {
try {
executor.execute(() -> {
try {
int maxRandom = 10;
int maxRandom = MAX_RANDOM;
int temp = 2;
Random random = new Random();
// Assignment thread pool completedTaskCount
if (random.nextInt(maxRandom) % temp == 0) {
Thread.sleep(1000);
Thread.sleep(SLEEP_1000);
} else {
Thread.sleep(3000);
Thread.sleep(SLEEP_3000);
}
} catch (InterruptedException ignored) {
}
@ -105,7 +111,7 @@ public class RunStateHandlerTest {
} catch (Exception ignored) {
}
try {
Thread.sleep(500);
Thread.sleep(SLEEP_500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

@ -38,6 +38,8 @@ public class TaskDecoratorTest {
public static final String PLACEHOLDER = "site";
private static final int SLEEP_TIME = 5000;
private final ThreadPoolExecutor taskDecoratorTestExecutor = new ThreadPoolExecutor(
1,
1,
@ -60,7 +62,7 @@ public class TaskDecoratorTest {
taskDecoratorTestExecutor.execute(() -> {
MDC.put(PLACEHOLDER, "View the official website: https://www.hippo4j.cn");
try {
Thread.sleep(5000);
Thread.sleep(SLEEP_TIME);
DynamicThreadPoolWrapper poolWrapper = GlobalThreadPoolManage.getExecutorService(GlobalTestConstant.MESSAGE_PRODUCE);
ThreadPoolExecutor threadPoolExecutor = poolWrapper.getExecutor();
threadPoolExecutor.execute(() -> log.info("Pass context via taskDecorator MDC: {}", MDC.get(PLACEHOLDER)));
@ -70,6 +72,9 @@ public class TaskDecoratorTest {
});
}
/**
* Context Copying Decorator
*/
public static class ContextCopyingDecorator implements TaskDecorator {
@Override

@ -39,6 +39,8 @@ public class MessageProduce {
public static final String TOPIC = "test";
private static final int SLEEP_TIME = 3;
private final KafkaTemplate template;
@GetMapping("/message/send")
@ -54,9 +56,9 @@ public class MessageProduce {
String message = UUID.randomUUID().toString();
template.send(TOPIC, "autoTestMessage " + message);
try {
TimeUnit.SECONDS.sleep(3);
TimeUnit.SECONDS.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
e.printStackTrace();
log.error(e.getMessage());
}
}
});

@ -21,6 +21,9 @@ import cn.hippo4j.core.enable.EnableDynamicThreadPool;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Server adapter kafka example application
*/
@EnableDynamicThreadPool
@SpringBootApplication(scanBasePackages = {"cn.hippo4j.example.core", "cn.hippo4j.springboot.starter.adapter.kafka.example"})
public class ServerAdapterKafkaExampleApplication {

@ -21,6 +21,9 @@ import cn.hippo4j.core.enable.EnableDynamicThreadPool;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Server Adapter RabbitMQ Example Application
*/
@EnableDynamicThreadPool
@SpringBootApplication(scanBasePackages = {"cn.hippo4j.example.core", "cn.hippo4j.springboot.starter.adapter.rabbitmq.example"})
public class ServerAdapterRabbitMQExampleApplication {

@ -31,15 +31,23 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class RabbitMQThreadPoolConfig {
private static final int MAX_POOL_SIZE = 5;
private static final int CORE_POOL_SIZE = 5;
private static final int QUEUE_CAPACITY = 1000;
private static final int CONSUMERS_PER_QUEUE = 10;
@Bean
public ThreadPoolTaskExecutor rabbitListenerTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// Specify the maximum number of threads.
executor.setMaxPoolSize(5);
executor.setMaxPoolSize(MAX_POOL_SIZE);
// Specifies the minimum number of thread pool maintenance threads.
executor.setCorePoolSize(5);
executor.setCorePoolSize(CORE_POOL_SIZE);
// Specifies the number of tasks waiting to be processed.
executor.setQueueCapacity(1000);
executor.setQueueCapacity(QUEUE_CAPACITY);
executor.setThreadNamePrefix("RabbitListenerTaskExecutor-");
return executor;
}
@ -50,7 +58,7 @@ public class RabbitMQThreadPoolConfig {
DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
factory.setConnectionFactory(abstractConnectionFactory);
factory.setMessageConverter(messageConverter);
factory.setConsumersPerQueue(10);
factory.setConsumersPerQueue(CONSUMERS_PER_QUEUE);
abstractConnectionFactory.setExecutor(rabbitListenerTaskExecutor);
return factory;
}

@ -20,7 +20,7 @@ package cn.hippo4j.springboot.starter.adapter.rabbitmq.example.constants;
/**
* Simple MQ constant.
*/
public interface SimpleMQConstant {
public class SimpleMQConstant {
String QUEUE_NAME = "framework_message-center_queue";
public static final String QUEUE_NAME = "framework_message-center_queue";
}

@ -21,6 +21,9 @@ import cn.hippo4j.core.enable.EnableDynamicThreadPool;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Server adapter rocketMQ example application
*/
@EnableDynamicThreadPool
@SpringBootApplication(scanBasePackages = {"cn.hippo4j.example.core", "cn.hippo4j.springboot.starter.adapter.rocketmq.example"})
public class ServerAdapterRocketMQExampleApplication {

@ -37,10 +37,12 @@ public class MessageProduce {
private final StreamBridge streamBridge;
private static final int MAX_SEND_SIZE = 10;
@GetMapping("/message/send")
public String sendMessage(@RequestParam(required = false) Integer maxSendSize) {
if (maxSendSize == null) {
maxSendSize = 10;
maxSendSize = MAX_SEND_SIZE;
}
for (int i = 0; i < maxSendSize; i++) {
sendMessage0();

@ -29,6 +29,9 @@ import org.springframework.messaging.MessageHeaders;
import java.util.function.Consumer;
/**
* Server Adapter Spring Cloud Stream RabbitMQ Application
*/
@Slf4j
@EnableDynamicThreadPool
@SpringBootApplication

@ -39,6 +39,7 @@ public class MessageConsume {
long startTime = System.currentTimeMillis();
try {
// ignore
log.info("Message: {}", JSONUtil.toJSONString(message));
} finally {
log.info("Keys: {}, Msg id: {}, Execute time: {} ms, Message: {}", headers.get("rocketmq_KEYS"), headers.get("rocketmq_MESSAGE_ID"), System.currentTimeMillis() - startTime,
JSONUtil.toJSONString(message));
@ -51,6 +52,7 @@ public class MessageConsume {
long startTime = System.currentTimeMillis();
try {
// ignore
log.info("Message: {}", JSONUtil.toJSONString(message));
} finally {
log.info("Keys: {}, Msg id: {}, Execute time: {} ms, Message: {}", headers.get("rocketmq_KEYS"), headers.get("rocketmq_MESSAGE_ID"), System.currentTimeMillis() - startTime,
JSONUtil.toJSONString(message));

@ -43,10 +43,13 @@ public class MessageProduce {
public static final String MESSAGE_CENTER_SAVE_MESSAGE_TAG = "framework_message-center_save-message_tag";
private static final int MAX_SEND_SIZE = 10;
private static final long SEND_TIMEOUT = 2000L;
@GetMapping("/message/send")
public String sendMessage() {
int maxSendSize = 10;
for (int i = 0; i < maxSendSize; i++) {
for (int i = 0; i < MAX_SEND_SIZE; i++) {
sendMessage(MESSAGE_CENTER_SEND_MESSAGE_TAG);
sendMessage(MESSAGE_CENTER_SAVE_MESSAGE_TAG);
}
@ -67,7 +70,7 @@ public class MessageProduce {
long startTime = System.currentTimeMillis();
boolean sendResult = false;
try {
sendResult = output.send(message, 2000L);
sendResult = output.send(message, SEND_TIMEOUT);
} finally {
log.info("Send status: {}, Keys: {}, Execute time: {} ms, Message: {}",
sendResult,

@ -24,6 +24,9 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
/**
* Server Adapter Spring Cloud Stream RocketMQ Application
*/
@Slf4j
@EnableDynamicThreadPool
@EnableBinding({Source.class, MySink.class})

@ -21,6 +21,9 @@ import cn.hippo4j.core.enable.EnableDynamicThreadPool;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Server es monitor example application
*/
@EnableDynamicThreadPool
@SpringBootApplication(scanBasePackages = {"cn.hippo4j.example.core", "cn.hippo4j.monitor"})
public class ServerEsMonitorExampleApplication {

@ -21,6 +21,9 @@ import cn.hippo4j.core.enable.EnableDynamicThreadPool;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Server Example Application
*/
@EnableDynamicThreadPool
@SpringBootApplication(scanBasePackages = {"cn.hippo4j.example.core", "cn.hippo4j.example.server"})
public class ServerExampleApplication {

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.common.constant;
public class MagicNumberConstants {
public static final int INDEX_0 = 0;
public static final int INDEX_1 = 1;
public static final int INDEX_2 = 2;
public static final int INDEX_3 = 3;
public static final int SIZE_4 = 4;
}

@ -46,6 +46,9 @@ import javax.servlet.Servlet;
@AutoConfigureBefore(WebThreadPoolHandlerConfiguration.class)
public class WebThreadPoolHandlerConfiguration1x {
/**
* Embedded tomcat
*/
@Configuration
@ConditionalOnClass({Servlet.class, Tomcat.class})
@ConditionalOnBean(WebThreadPoolRunStateHandler.class)

@ -68,6 +68,9 @@ public class ConfigHandlerConfiguration {
return new DefaultBootstrapConfigPropertiesBinderAdapt();
}
/**
* Embedded Nacos
*/
@RequiredArgsConstructor
@ConditionalOnClass(value = ConfigService.class, name = NACOS_INJECTED_BEAN_NAME)
@ConditionalOnMissingClass(NACOS_CONFIG_MANAGER_KEY)
@ -80,6 +83,9 @@ public class ConfigHandlerConfiguration {
}
}
/**
* Embedded Nacos Cloud
*/
@ConditionalOnClass(NacosConfigProperties.class)
@ConditionalOnProperty(prefix = BootstrapConfigProperties.PREFIX, name = NACOS_DATA_ID_KEY)
static class EmbeddedNacosCloud {
@ -90,6 +96,9 @@ public class ConfigHandlerConfiguration {
}
}
/**
* Embedded Apollo
*/
@ConditionalOnClass(com.ctrip.framework.apollo.ConfigService.class)
@ConditionalOnProperty(prefix = BootstrapConfigProperties.PREFIX, name = APOLLO_NAMESPACE_KEY)
static class EmbeddedApollo {
@ -100,6 +109,9 @@ public class ConfigHandlerConfiguration {
}
}
/**
* Embedded Consul
*/
@ConditionalOnClass(ConsulConfigProperties.class)
@ConditionalOnProperty(prefix = BootstrapConfigProperties.PREFIX, name = CONSUL_DATA_KEY)
static class EmbeddedConsul {
@ -110,6 +122,9 @@ public class ConfigHandlerConfiguration {
}
}
/**
* Embedded Zookeeper
*/
@ConditionalOnClass(CuratorFramework.class)
@ConditionalOnProperty(prefix = BootstrapConfigProperties.PREFIX, name = ZOOKEEPER_CONNECT_STR_KEY)
static class EmbeddedZookeeper {
@ -120,6 +135,9 @@ public class ConfigHandlerConfiguration {
}
}
/**
* Embedded Etcd
*/
@ConditionalOnClass(Client.class)
@ConditionalOnProperty(prefix = BootstrapConfigProperties.PREFIX, name = ETCD)
static class EmbeddedEtcd {
@ -130,6 +148,9 @@ public class ConfigHandlerConfiguration {
}
}
/**
* Polaris
*/
@ConditionalOnClass(ConfigFileService.class)
@ConditionalOnProperty(prefix = BootstrapConfigProperties.PREFIX, name = POLARIS)
static class Polaris {

@ -53,6 +53,8 @@ public class ConfigModeNotifyConfigBuilder implements NotifyConfigBuilder {
private final WebThreadPoolService webThreadPoolService;
private static final int DEFAULT_INTERVAL = 5;
@Override
public Map<String, List<NotifyConfigDTO>> buildNotify() {
Map<String, List<NotifyConfigDTO>> resultMap = new HashMap<>();
@ -145,7 +147,7 @@ public class ConfigModeNotifyConfigBuilder implements NotifyConfigBuilder {
.orElse(Optional.ofNullable(configProperties.getDefaultExecutor())
.map(ExecutorProperties::getNotify)
.map(ExecutorNotifyProperties::getInterval)
.orElse(5));
.orElse(DEFAULT_INTERVAL));
}
private String buildReceive(IExecutorProperties executorProperties) {

@ -27,7 +27,7 @@ import java.util.ServiceLoader;
/**
* Config parser handler.
*/
public class ConfigParserHandler {
public final class ConfigParserHandler {
private static final List<ConfigParser> PARSERS = new ArrayList<>();
@ -53,6 +53,9 @@ public class ConfigParserHandler {
return ConfigParserHandlerHolder.INSTANCE;
}
/**
* Config Parser Handler Holder
*/
private static class ConfigParserHandlerHolder {
private static final ConfigParserHandler INSTANCE = new ConfigParserHandler();

@ -39,6 +39,8 @@ import java.util.stream.Collectors;
@Slf4j
public class ConsulRefresherHandler extends AbstractConfigThreadPoolDynamicRefresh {
private static final int INITIAL_CAPACITY = 64;
@EventListener(EnvironmentChangeEvent.class)
public void refreshed(EnvironmentChangeEvent event) {
Map<String, Object> configInfo = extractLatestConfigInfo(event);
@ -54,7 +56,7 @@ public class ConsulRefresherHandler extends AbstractConfigThreadPoolDynamicRefre
.map(propertySource -> (BootstrapPropertySource<?>) propertySource).collect(Collectors.toList());
Optional<BootstrapPropertySource<?>> bootstrapPropertySource = bootstrapPropertySourceList.stream()
.filter(source -> source.getName().contains(activeProfile) && source.getPropertyNames().length != 0).findFirst();
Map<String, Object> configInfo = new HashMap<>(64);
Map<String, Object> configInfo = new HashMap<>(INITIAL_CAPACITY);
if (bootstrapPropertySource.isPresent()) {
ConsulPropertySource consulPropertySource = (ConsulPropertySource) bootstrapPropertySource.get().getDelegate();
String[] propertyNames = consulPropertySource.getPropertyNames();

@ -115,4 +115,4 @@ public class EtcdRefresherHandler extends AbstractConfigThreadPoolDynamicRefresh
.build() : clientBuilder.build();
}
}
}
}

@ -41,21 +41,25 @@ import java.util.Map;
@Slf4j
public class ZookeeperRefresherHandler extends AbstractConfigThreadPoolDynamicRefresh {
static final String ZK_CONNECT_STR = "zk-connect-str";
private static final String ZK_CONNECT_STR = "zk-connect-str";
static final String ROOT_NODE = "root-node";
private static final String ROOT_NODE = "root-node";
static final String CONFIG_VERSION = "config-version";
private static final String CONFIG_VERSION = "config-version";
static final String NODE = "node";
private static final String NODE = "node";
private CuratorFramework curatorFramework;
private static final int BASE_SLEEP_TIME_MS = 1000;
private static final int MAX_RETRIES = 3;
@Override
public void initRegisterListener() {
Map<String, String> zkConfigs = bootstrapConfigProperties.getZookeeper();
curatorFramework = CuratorFrameworkFactory.newClient(zkConfigs.get(ZK_CONNECT_STR),
new ExponentialBackoffRetry(1000, 3));
new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES));
String nodePath = ZKPaths.makePath(ZKPaths.makePath(zkConfigs.get(ROOT_NODE),
zkConfigs.get(CONFIG_VERSION)), zkConfigs.get(NODE));
final ConnectionStateListener connectionStateListener = (client, newState) -> {

@ -55,4 +55,4 @@ public abstract class AbstractRefreshListener<M> implements RefreshListener<Hipp
protected String getNodes(M properties) {
return WebIpAndPortHolder.ALL;
}
}
}

@ -53,9 +53,9 @@ public class AdapterExecutorsRefreshListener extends AbstractRefreshListener<Ada
@Override
public void onApplicationEvent(Hippo4jConfigDynamicRefreshEvent event) {
List<AdapterExecutorProperties> adapterExecutors;
List<AdapterExecutorProperties> adapterExecutors = event.getBootstrapConfigProperties().getAdapterExecutors();
Map<String, ThreadPoolAdapter> threadPoolAdapterMap = ApplicationContextHolder.getBeansOfType(ThreadPoolAdapter.class);
if (CollectionUtil.isEmpty(adapterExecutors = event.getBootstrapConfigProperties().getAdapterExecutors()) || CollectionUtil.isEmpty(threadPoolAdapterMap)) {
if (CollectionUtil.isEmpty(adapterExecutors) || CollectionUtil.isEmpty(threadPoolAdapterMap)) {
return;
}
for (AdapterExecutorProperties each : adapterExecutors) {

@ -20,13 +20,13 @@ package cn.hippo4j.config.springboot.starter.refresher.event;
/**
* Hippo-4j config dynamic refresh event order.
*/
public interface Hippo4jConfigDynamicRefreshEventOrder {
public class Hippo4jConfigDynamicRefreshEventOrder {
int WEB_EXECUTOR_LISTENER = 0;
public static final int WEB_EXECUTOR_LISTENER = 0;
int PLATFORMS_LISTENER = 1;
public static final int PLATFORMS_LISTENER = 1;
int EXECUTORS_LISTENER = 2;
public static final int EXECUTORS_LISTENER = 2;
int ADAPTER_EXECUTORS_LISTENER = 3;
public static final int ADAPTER_EXECUTORS_LISTENER = 3;
}

@ -18,6 +18,7 @@
package cn.hippo4j.config.springboot.starter.support;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.constant.Constants;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.toolkit.ReflectUtil;
@ -54,6 +55,14 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
private final BootstrapConfigProperties configProperties;
private static final int DEFAULT_ACTIVE_ALARM = 80;
private static final int DEFAULT_CAPACITY_ALARM = 80;
private static final int DEFAULT_INTERVAL = 5;
private static final String DEFAULT_RECEIVES = "";
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) {
return bean;
@ -76,8 +85,8 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
log.error("Failed to create dynamic thread pool in annotation mode.", ex);
return bean;
}
DynamicThreadPoolExecutor dynamicThreadPoolExecutor;
if ((dynamicThreadPoolExecutor = DynamicThreadPoolAdapterChoose.unwrap(bean)) == null) {
DynamicThreadPoolExecutor dynamicThreadPoolExecutor = DynamicThreadPoolAdapterChoose.unwrap(bean);
if (dynamicThreadPoolExecutor == null) {
dynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) bean;
}
DynamicThreadPoolWrapper wrap = new DynamicThreadPoolWrapper(dynamicThreadPoolExecutor.getThreadPoolId(), dynamicThreadPoolExecutor);
@ -156,7 +165,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
.setAllowCoreThreadTimeOut(executor.allowsCoreThreadTimeOut())
.setKeepAliveTime(executor.getKeepAliveTime(TimeUnit.SECONDS))
.setBlockingQueue(queueType)
.setExecuteTimeOut(10000L)
.setExecuteTimeOut(Constants.EXECUTE_TIME_OUT)
.setQueueCapacity(queueCapacity)
.setRejectedHandler(executor.getRejectedExecutionHandler().getClass().getSimpleName())
.setThreadPoolId(threadPoolId);
@ -233,15 +242,15 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
boolean isAlarm = Optional.ofNullable(executorProperties.getAlarm())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getAlarm).orElse(true));
int activeAlarm = Optional.ofNullable(executorProperties.getActiveAlarm())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getActiveAlarm).orElse(80));
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getActiveAlarm).orElse(DEFAULT_ACTIVE_ALARM));
int capacityAlarm = Optional.ofNullable(executorProperties.getCapacityAlarm())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getCapacityAlarm).orElse(80));
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getCapacityAlarm).orElse(DEFAULT_CAPACITY_ALARM));
int interval = Optional.ofNullable(notify)
.map(ExecutorNotifyProperties::getInterval)
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getNotify).map(ExecutorNotifyProperties::getInterval).orElse(5));
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getNotify).map(ExecutorNotifyProperties::getInterval).orElse(DEFAULT_INTERVAL));
String receive = Optional.ofNullable(notify)
.map(ExecutorNotifyProperties::getReceives)
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getNotify).map(ExecutorNotifyProperties::getReceives).orElse(""));
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getNotify).map(ExecutorNotifyProperties::getReceives).orElse(DEFAULT_RECEIVES));
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(isAlarm, activeAlarm, capacityAlarm);
threadPoolNotifyAlarm.setInterval(interval);
threadPoolNotifyAlarm.setReceives(receive);

@ -32,6 +32,10 @@ import static cn.hippo4j.common.constant.Constants.AVAILABLE_PROCESSORS;
@Configuration
public class CommonConfig {
private static final int DEFAULT_QUEUE_CAPACITY = 4096;
private static final int DEFAULT_AWAIT_TERMINATION_MILLIS = 5000;
@Bean
@ConditionalOnMissingBean
public ApplicationContextHolder hippo4jApplicationContextHolder() {
@ -45,9 +49,9 @@ public class CommonConfig {
monitorThreadPool.setThreadNamePrefix("server.monitor.executor.");
monitorThreadPool.setCorePoolSize(AVAILABLE_PROCESSORS);
monitorThreadPool.setMaxPoolSize(AVAILABLE_PROCESSORS << 1);
monitorThreadPool.setQueueCapacity(4096);
monitorThreadPool.setQueueCapacity(DEFAULT_QUEUE_CAPACITY);
monitorThreadPool.setAllowCoreThreadTimeOut(true);
monitorThreadPool.setAwaitTerminationMillis(5000);
monitorThreadPool.setAwaitTerminationMillis(DEFAULT_AWAIT_TERMINATION_MILLIS);
return monitorThreadPool;
}
}

@ -33,7 +33,7 @@ import org.springframework.context.annotation.Configuration;
@ConfigurationProperties(prefix = ServerBootstrapProperties.PREFIX)
public class ServerBootstrapProperties {
public final static String PREFIX = "hippo4j.core";
public static final String PREFIX = "hippo4j.core";
/**
* Whether to start the background task of cleaning up thread pool history data.

@ -17,20 +17,23 @@
package cn.hippo4j.config.event;
import lombok.Getter;
/**
* Local data change event.
*/
@Getter
public class LocalDataChangeEvent extends AbstractEvent {
/**
* Tenant + Item + Thread-pool
*/
public final String groupKey;
private final String groupKey;
/**
* Client instance unique identifier
*/
public final String identify;
private final String identify;
public LocalDataChangeEvent(String identify, String groupKey) {
this.identify = identify;

@ -64,6 +64,9 @@ public interface HisRunDataMapper extends BaseMapper<HisRunDataInfo> {
+ "limit 4")
List<ThreadPoolTaskRanking> queryThreadPoolMaxRanking(@Param("startTime") Long startTime, @Param("endTime") Long endTime);
/**
* Thread Pool Task Ranking
*/
@Data
class ThreadPoolTaskRanking {

@ -33,13 +33,13 @@ public class CacheItem {
final String groupKey;
public volatile String md5 = Constants.NULL;
private volatile String md5 = Constants.NULL;
public volatile long lastModifiedTs;
private volatile long lastModifiedTs;
public volatile ConfigAllInfo configAllInfo;
private volatile ConfigAllInfo configAllInfo;
public SimpleReadWriteLock rwLock = new SimpleReadWriteLock();
private SimpleReadWriteLock rwLock = new SimpleReadWriteLock();
public CacheItem(String groupKey) {
this.groupKey = SingletonRepository.DataIdGroupIdCache.getSingleton(groupKey);

@ -21,6 +21,9 @@ import lombok.Data;
import javax.validation.constraints.Pattern;
/**
* Config Modify Save Req DTO
*/
@Data
public class ConfigModifySaveReqDTO {

@ -20,8 +20,6 @@ package cn.hippo4j.config.model.biz.threadpool;
import com.fasterxml.jackson.annotation.JsonAlias;
import lombok.Data;
import java.util.List;
/**
* Config modify verify dto
*/

@ -48,6 +48,12 @@ public class DefaultPublisher extends Thread implements EventPublisher {
protected volatile Long lastEventSequence = -1L;
private static final int DEFAULT_QUEUE_MAX_SIZE = -1;
private static final int DEFAULT_WAIT_TIMES = 60;
private static final long SLEEP_1S = 1000L;
private static final AtomicReferenceFieldUpdater<DefaultPublisher, Long> UPDATER = AtomicReferenceFieldUpdater
.newUpdater(DefaultPublisher.class, Long.class, "lastEventSequence");
@ -64,8 +70,8 @@ public class DefaultPublisher extends Thread implements EventPublisher {
public synchronized void start() {
if (!initialized) {
super.start();
if (queueMaxSize == -1) {
queueMaxSize = NotifyCenter.ringBufferSize;
if (queueMaxSize == DEFAULT_QUEUE_MAX_SIZE) {
queueMaxSize = NotifyCenter.RING_BUFFER_SIZE;
}
initialized = true;
}
@ -78,13 +84,13 @@ public class DefaultPublisher extends Thread implements EventPublisher {
private void openEventHandler() {
try {
int waitTimes = 60;
int waitTimes = DEFAULT_WAIT_TIMES;
for (;;) {
if (shutdown || hasSubscriber() || waitTimes <= 0) {
break;
}
try {
Thread.sleep(1000L);
Thread.sleep(SLEEP_1S);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

@ -37,9 +37,9 @@ public class NotifyCenter {
private static final NotifyCenter INSTANCE = new NotifyCenter();
public static int ringBufferSize = 16384;
public static final int RING_BUFFER_SIZE = 16384;
public static int shareBufferSize = 1024;
public static final int SHARE_BUFFER_SIZE = 1024;
private DefaultSharePublisher sharePublisher;
@ -61,7 +61,7 @@ public class NotifyCenter {
}
};
INSTANCE.sharePublisher = new DefaultSharePublisher();
INSTANCE.sharePublisher.init(AbstractSlowEvent.class, shareBufferSize);
INSTANCE.sharePublisher.init(AbstractSlowEvent.class, SHARE_BUFFER_SIZE);
}
public static void registerSubscriber(final AbstractSubscriber consumer) {
@ -73,20 +73,20 @@ public class NotifyCenter {
addSubscriber(consumer, subscribeType);
}
}
return;
}
final Class<? extends AbstractEvent> subscribeType = consumer.subscribeType();
if (ClassUtil.isAssignableFrom(AbstractSlowEvent.class, subscribeType)) {
INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
return;
} else {
final Class<? extends AbstractEvent> subscribeType = consumer.subscribeType();
if (ClassUtil.isAssignableFrom(AbstractSlowEvent.class, subscribeType)) {
INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
} else {
addSubscriber(consumer, subscribeType);
}
}
addSubscriber(consumer, subscribeType);
}
private static void addSubscriber(final AbstractSubscriber consumer, Class<? extends AbstractEvent> subscribeType) {
final String topic = ClassUtil.getCanonicalName(subscribeType);
synchronized (NotifyCenter.class) {
MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, publisherFactory, subscribeType, ringBufferSize);
MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, publisherFactory, subscribeType, RING_BUFFER_SIZE);
}
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
publisher.addSubscriber(consumer);

@ -47,6 +47,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static cn.hippo4j.common.constant.Constants.GROUP_KEY_DELIMITER;
import static cn.hippo4j.common.constant.Constants.GROUP_KEY_DELIMITER_TRANSLATION;
import static cn.hippo4j.common.constant.MagicNumberConstants.INDEX_3;
/**
* Config cache service.
@ -54,7 +55,7 @@ import static cn.hippo4j.common.constant.Constants.GROUP_KEY_DELIMITER_TRANSLATI
@Slf4j
public class ConfigCacheService {
private static ConfigService CONFIG_SERVICE;
private static ConfigService configService;
static {
AbstractSubjectCenter.register(AbstractSubjectCenter.SubjectType.CLEAR_CONFIG_CACHE, new ClearConfigCache());
@ -83,9 +84,11 @@ public class ConfigCacheService {
*/
public static boolean checkTpId(String groupKey, String tpId, String clientIdentify) {
Map<String, CacheItem> cacheItemMap = Optional.ofNullable(CLIENT_CONFIG_CACHE.get(groupKey)).orElse(new HashMap<>());
CacheItem cacheItem;
if (CollectionUtil.isNotEmpty(cacheItemMap) && (cacheItem = cacheItemMap.get(clientIdentify)) != null) {
return Objects.equals(tpId, cacheItem.configAllInfo.getTpId());
if (CollectionUtil.isNotEmpty(cacheItemMap)) {
CacheItem cacheItem = cacheItemMap.get(clientIdentify);
if (cacheItem != null) {
return Objects.equals(tpId, cacheItem.getConfigAllInfo().getTpId());
}
}
return Boolean.FALSE;
}
@ -97,31 +100,34 @@ public class ConfigCacheService {
* @param clientIdentify
* @return
*/
private synchronized static String getContentMd5IsNullPut(String groupKey, String clientIdentify) {
private static synchronized String getContentMd5IsNullPut(String groupKey, String clientIdentify) {
Map<String, CacheItem> cacheItemMap = Optional.ofNullable(CLIENT_CONFIG_CACHE.get(groupKey)).orElse(new HashMap<>());
CacheItem cacheItem = null;
if (CollectionUtil.isNotEmpty(cacheItemMap) && (cacheItem = cacheItemMap.get(clientIdentify)) != null) {
return cacheItem.md5;
if (CollectionUtil.isNotEmpty(cacheItemMap)) {
cacheItem = cacheItemMap.get(clientIdentify);
if (cacheItem != null) {
return cacheItem.getMd5();
}
}
if (CONFIG_SERVICE == null) {
CONFIG_SERVICE = ApplicationContextHolder.getBean(ConfigService.class);
if (configService == null) {
configService = ApplicationContextHolder.getBean(ConfigService.class);
}
String[] params = groupKey.split(GROUP_KEY_DELIMITER_TRANSLATION);
ConfigAllInfo config = CONFIG_SERVICE.findConfigRecentInfo(params);
ConfigAllInfo config = configService.findConfigRecentInfo(params);
if (config != null && StringUtil.isNotBlank(config.getTpId())) {
cacheItem = new CacheItem(groupKey, config);
cacheItemMap.put(clientIdentify, cacheItem);
CLIENT_CONFIG_CACHE.put(groupKey, cacheItemMap);
}
return (cacheItem != null) ? cacheItem.md5 : Constants.NULL;
return (cacheItem != null) ? cacheItem.getMd5() : Constants.NULL;
}
public static String getContentMd5(String groupKey) {
if (CONFIG_SERVICE == null) {
CONFIG_SERVICE = ApplicationContextHolder.getBean(ConfigService.class);
if (configService == null) {
configService = ApplicationContextHolder.getBean(ConfigService.class);
}
String[] params = groupKey.split(GROUP_KEY_DELIMITER_TRANSLATION);
ConfigAllInfo config = CONFIG_SERVICE.findConfigRecentInfo(params);
ConfigAllInfo config = configService.findConfigRecentInfo(params);
if (config == null || StringUtils.isEmpty(config.getTpId())) {
String errorMessage = String.format("config is null. tpId: %s, itemId: %s, tenantId: %s", params[0], params[1], params[2]);
throw new RuntimeException(errorMessage);
@ -131,21 +137,23 @@ public class ConfigCacheService {
public static void updateMd5(String groupKey, String identify, String md5) {
CacheItem cache = makeSure(groupKey, identify);
if (cache.md5 == null || !cache.md5.equals(md5)) {
cache.md5 = md5;
if (cache.getMd5() == null || !cache.getMd5().equals(md5)) {
cache.setMd5(md5);
String[] params = groupKey.split(GROUP_KEY_DELIMITER_TRANSLATION);
ConfigAllInfo config = CONFIG_SERVICE.findConfigRecentInfo(params);
cache.configAllInfo = config;
cache.lastModifiedTs = System.currentTimeMillis();
ConfigAllInfo config = configService.findConfigRecentInfo(params);
cache.setConfigAllInfo(config);
cache.setLastModifiedTs(System.currentTimeMillis());
NotifyCenter.publishEvent(new LocalDataChangeEvent(identify, groupKey));
}
}
public synchronized static CacheItem makeSure(String groupKey, String ip) {
public static synchronized CacheItem makeSure(String groupKey, String ip) {
Map<String, CacheItem> ipCacheItemMap = CLIENT_CONFIG_CACHE.get(groupKey);
CacheItem item;
if (ipCacheItemMap != null && (item = ipCacheItemMap.get(ip)) != null) {
return item;
if (ipCacheItemMap != null) {
CacheItem item = ipCacheItemMap.get(ip);
if (item != null) {
return item;
}
}
CacheItem tmp = new CacheItem(groupKey);
Map<String, CacheItem> cacheItemMap = new HashMap<>();
@ -176,7 +184,7 @@ public class ConfigCacheService {
for (String each : keys) {
String[] keyArray = each.split(GROUP_KEY_DELIMITER_TRANSLATION);
if (keyArray.length > 2) {
identifyList.add(keyArray[3]);
identifyList.add(keyArray[INDEX_3]);
}
}
}
@ -192,7 +200,7 @@ public class ConfigCacheService {
coarseRemove(groupKey);
}
private synchronized static void coarseRemove(String coarse) {
private static synchronized void coarseRemove(String coarse) {
// fuzzy search
List<String> identificationList = MapUtil.parseMapForFilter(CLIENT_CONFIG_CACHE, coarse);
for (String cacheMapKey : identificationList) {

@ -70,17 +70,23 @@ public class LongPollingService {
private final Map<String, Long> retainIps = new ConcurrentHashMap<>();
private static final long SCHEDULE_PERIOD = 30L;
private static final int MAX_TIMEOUT = 10000;
private static final int DEFAULT_DELAY_TIME = 500;
public LongPollingService() {
allSubs = new ConcurrentLinkedQueue<>();
ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 30L, TimeUnit.SECONDS);
NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);
ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, SCHEDULE_PERIOD, TimeUnit.SECONDS);
NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.RING_BUFFER_SIZE);
NotifyCenter.registerSubscriber(new AbstractSubscriber() {
@Override
public void onEvent(AbstractEvent event) {
if (!isFixedPolling() && event instanceof LocalDataChangeEvent) {
LocalDataChangeEvent evt = (LocalDataChangeEvent) event;
ConfigExecutor.executeLongPolling(new DataChangeTask(evt.identify, evt.groupKey));
ConfigExecutor.executeLongPolling(new DataChangeTask(evt.getIdentify(), evt.getGroupKey()));
}
}
@ -155,25 +161,30 @@ public class LongPollingService {
int probeRequestSize) {
String str = req.getHeader(LONG_POLLING_HEADER);
String noHangUpFlag = req.getHeader(LONG_POLLING_NO_HANG_UP_HEADER);
int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, DEFAULT_DELAY_TIME);
long timeout = Math.max(MAX_TIMEOUT, Long.parseLong(str) - delayTime);
boolean shouldReturn = false;
if (isFixedPolling()) {
timeout = Math.max(10000, getFixedPollingInterval());
timeout = Math.max(MAX_TIMEOUT, getFixedPollingInterval());
} else {
List<String> changedGroups = Md5ConfigUtil.compareMd5(req, clientMd5Map);
if (!changedGroups.isEmpty()) {
generateResponse(rsp, changedGroups);
return;
shouldReturn = true;
} else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
log.info("New initializing cacheData added in.");
return;
shouldReturn = true;
}
}
String clientIdentify = RequestUtil.getClientIdentify(req);
final AsyncContext asyncContext = req.startAsync();
asyncContext.setTimeout(0L);
ConfigExecutor.executeLongPolling(new ClientLongPolling(asyncContext, clientMd5Map, clientIdentify, probeRequestSize,
timeout - delayTime, Pair.of(req.getHeader(CLIENT_APP_NAME_HEADER), req.getHeader(CLIENT_VERSION))));
if (!shouldReturn) {
String clientIdentify = RequestUtil.getClientIdentify(req);
final AsyncContext asyncContext = req.startAsync();
asyncContext.setTimeout(0L);
ConfigExecutor.executeLongPolling(new ClientLongPolling(asyncContext, clientMd5Map, clientIdentify, probeRequestSize,
timeout - delayTime, Pair.of(req.getHeader(CLIENT_APP_NAME_HEADER), req.getHeader(CLIENT_VERSION))));
}
}
/**
@ -199,8 +210,8 @@ public class LongPollingService {
Future<?> asyncTimeoutFuture;
public ClientLongPolling(AsyncContext asyncContext, Map<String, String> clientMd5Map, String clientIdentify,
int probeRequestSize, long timeout, Pair<String, String> appInfo) {
ClientLongPolling(AsyncContext asyncContext, Map<String, String> clientMd5Map, String clientIdentify,
int probeRequestSize, long timeout, Pair<String, String> appInfo) {
this.asyncContext = asyncContext;
this.clientMd5Map = clientMd5Map;
this.clientIdentify = clientIdentify;

@ -140,6 +140,9 @@ public class ThreadPoolAdapterService {
}
}
/**
* Clear Thread Pool Adapter Cache
*/
static class ClearThreadPoolAdapterCache implements Observer<String> {
@Override

@ -67,6 +67,16 @@ import java.util.concurrent.SynchronousQueue;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static cn.hippo4j.common.constant.MagicNumberConstants.INDEX_0;
import static cn.hippo4j.common.constant.MagicNumberConstants.INDEX_1;
import static cn.hippo4j.common.constant.MagicNumberConstants.INDEX_2;
import static cn.hippo4j.common.constant.MagicNumberConstants.INDEX_3;
import static cn.hippo4j.common.executor.support.BlockingQueueTypeEnum.ARRAY_BLOCKING_QUEUE;
import static cn.hippo4j.common.executor.support.BlockingQueueTypeEnum.LINKED_BLOCKING_DEQUE;
import static cn.hippo4j.common.executor.support.BlockingQueueTypeEnum.LINKED_BLOCKING_QUEUE;
import static cn.hippo4j.common.executor.support.BlockingQueueTypeEnum.LINKED_TRANSFER_QUEUE;
import static cn.hippo4j.common.executor.support.BlockingQueueTypeEnum.PRIORITY_BLOCKING_QUEUE;
import static cn.hippo4j.common.executor.support.BlockingQueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE;
import static cn.hippo4j.config.service.ConfigCacheService.getContent;
/**
@ -85,6 +95,8 @@ public class ConfigServiceImpl implements ConfigService {
private final NotifyService notifyService;
private static final int DEFAULT_QUEUE_CAPACITY = 1024;
@Override
public ConfigAllInfo findConfigAllInfo(String tpId, String itemId, String tenantId) {
LambdaQueryWrapper<ConfigAllInfo> wrapper = Wrappers.lambdaQuery(ConfigAllInfo.class)
@ -99,13 +111,13 @@ public class ConfigServiceImpl implements ConfigService {
public ConfigAllInfo findConfigRecentInfo(String... params) {
ConfigAllInfo resultConfig;
ConfigAllInfo configInstance = null;
String instanceId = params[3];
String instanceId = params[INDEX_3];
if (StringUtil.isNotBlank(instanceId)) {
LambdaQueryWrapper<ConfigInstanceInfo> instanceQueryWrapper = Wrappers.lambdaQuery(ConfigInstanceInfo.class)
.eq(ConfigInstanceInfo::getTpId, params[0])
.eq(ConfigInstanceInfo::getItemId, params[1])
.eq(ConfigInstanceInfo::getTenantId, params[2])
.eq(ConfigInstanceInfo::getInstanceId, params[3])
.eq(ConfigInstanceInfo::getTpId, params[INDEX_0])
.eq(ConfigInstanceInfo::getItemId, params[INDEX_1])
.eq(ConfigInstanceInfo::getTenantId, params[INDEX_2])
.eq(ConfigInstanceInfo::getInstanceId, params[INDEX_3])
.orderByDesc(ConfigInstanceInfo::getGmtCreate)
.last("LIMIT 1");
ConfigInstanceInfo instanceInfo = configInstanceMapper.selectOne(instanceQueryWrapper);
@ -216,6 +228,7 @@ public class ConfigServiceImpl implements ConfigService {
public Long addConfigInfo(ConfigAllInfo config) {
config.setContent(ContentUtil.getPoolContent(config));
config.setMd5(Md5Util.getTpContentMd5(config));
Long configId = null;
try {
// Currently it is a single application, and it supports switching distributed locks during cluster deployment in the future.
synchronized (ConfigService.class) {
@ -225,14 +238,14 @@ public class ConfigServiceImpl implements ConfigService {
.eq(ConfigAllInfo::getDelFlag, DelEnum.NORMAL.getIntCode()));
Assert.isNull(configAllInfo, "线程池配置已存在");
if (SqlHelper.retBool(configInfoMapper.insert(config))) {
return config.getId();
configId = config.getId();
}
}
} catch (Exception ex) {
log.error("[db-error] message: {}", ex.getMessage(), ex);
throw ex;
}
return null;
return configId;
}
public void updateConfigInfo(String identify, boolean isChangeNotice, ConfigAllInfo config) {
@ -251,7 +264,6 @@ public class ConfigServiceImpl implements ConfigService {
ConfigInstanceInfo instanceInfo = BeanUtil.convert(config, ConfigInstanceInfo.class);
instanceInfo.setInstanceId(identify);
configInstanceMapper.insert(instanceInfo);
return;
} else if (StringUtil.isEmpty(identify) && isChangeNotice) {
List<String> identifyList = ConfigCacheService.getIdentifyList(config.getTenantId(), config.getItemId(), config.getTpId());
if (CollectionUtil.isNotEmpty(identifyList)) {
@ -261,9 +273,9 @@ public class ConfigServiceImpl implements ConfigService {
configInstanceMapper.insert(instanceInfo);
}
}
return;
} else {
configInfoMapper.update(config, wrapper);
}
configInfoMapper.update(config, wrapper);
} catch (Exception ex) {
log.error("[db-error] message: {}", ex.getMessage(), ex);
throw ex;
@ -294,18 +306,20 @@ public class ConfigServiceImpl implements ConfigService {
*/
private Integer getQueueCapacityByType(ConfigAllInfo config) {
int queueCapacity;
switch (config.getQueueType()) {
case 5:
queueCapacity = Integer.MAX_VALUE;
break;
default:
queueCapacity = config.getCapacity();
break;
if (LINKED_TRANSFER_QUEUE.getType().equals(config.getQueueType())) {
queueCapacity = Integer.MAX_VALUE;
} else {
queueCapacity = config.getCapacity();
}
List<Integer> queueTypes = Stream.of(1, 2, 3, 6, 9).collect(Collectors.toList());
List<Integer> queueTypes = Stream.of(
ARRAY_BLOCKING_QUEUE.getType(),
LINKED_BLOCKING_QUEUE.getType(),
LINKED_BLOCKING_DEQUE.getType(),
PRIORITY_BLOCKING_QUEUE.getType(),
RESIZABLE_LINKED_BLOCKING_QUEUE.getType()).collect(Collectors.toList());
boolean setDefaultFlag = queueTypes.contains(config.getQueueType()) && (config.getCapacity() == null || Objects.equals(config.getCapacity(), 0));
if (setDefaultFlag) {
queueCapacity = 1024;
queueCapacity = DEFAULT_QUEUE_CAPACITY;
}
return queueCapacity;
}

@ -47,6 +47,10 @@ import java.util.Date;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import static cn.hippo4j.common.constant.MagicNumberConstants.INDEX_0;
import static cn.hippo4j.common.constant.MagicNumberConstants.INDEX_1;
import static cn.hippo4j.common.constant.MagicNumberConstants.INDEX_2;
import static cn.hippo4j.common.constant.MagicNumberConstants.INDEX_3;
import static cn.hippo4j.common.toolkit.DateUtil.NORM_TIME_PATTERN;
/**
@ -151,12 +155,12 @@ public class HisRunDataServiceImpl extends ServiceImpl<HisRunDataMapper, HisRunD
runtimeMessages.forEach(each -> {
HisRunDataInfo hisRunDataInfo = BeanUtil.convert(each, HisRunDataInfo.class);
String[] parseKey = GroupKey.parseKey(each.getGroupKey());
boolean checkFlag = ConfigCacheService.checkTpId(each.getGroupKey(), parseKey[0], parseKey[3]);
boolean checkFlag = ConfigCacheService.checkTpId(each.getGroupKey(), parseKey[INDEX_0], parseKey[INDEX_3]);
if (checkFlag) {
hisRunDataInfo.setTpId(parseKey[0]);
hisRunDataInfo.setItemId(parseKey[1]);
hisRunDataInfo.setTenantId(parseKey[2]);
hisRunDataInfo.setInstanceId(parseKey[3]);
hisRunDataInfo.setTpId(parseKey[INDEX_0]);
hisRunDataInfo.setItemId(parseKey[INDEX_1]);
hisRunDataInfo.setTenantId(parseKey[INDEX_2]);
hisRunDataInfo.setInstanceId(parseKey[INDEX_3]);
hisRunDataInfos.add(hisRunDataInfo);
}
});

@ -31,9 +31,9 @@ public class EnvUtil {
public static final String STANDALONE_MODE_PROPERTY_NAME = "hippo4j.standalone";
private static String HIPPO4J_HOME_PATH = null;
private static String hippo4jHomePath = null;
private static Boolean IS_STANDALONE = null;
private static Boolean isStandalone = null;
/**
* Get hippo4j home.
@ -41,14 +41,13 @@ public class EnvUtil {
* @return
*/
public static String getHippo4jHome() {
if (StringUtil.isBlank(HIPPO4J_HOME_PATH)) {
String hippo4jHome = System.getProperty(HIPPO4J_HOME_KEY);
if (StringUtil.isBlank(hippo4jHome)) {
hippo4jHome = Paths.get(System.getProperty("user.home"), "hippo4j").toString();
if (StringUtil.isBlank(hippo4jHomePath)) {
hippo4jHomePath = System.getProperty(HIPPO4J_HOME_KEY);
if (StringUtil.isBlank(hippo4jHomePath)) {
hippo4jHomePath = Paths.get(System.getProperty("user.home"), "hippo4j").toString();
}
return hippo4jHome;
}
return HIPPO4J_HOME_PATH;
return hippo4jHomePath;
}
/**
@ -57,9 +56,9 @@ public class EnvUtil {
* @return
*/
public static boolean getStandaloneMode() {
if (Objects.isNull(IS_STANDALONE)) {
IS_STANDALONE = Boolean.getBoolean(STANDALONE_MODE_PROPERTY_NAME);
if (Objects.isNull(isStandalone)) {
isStandalone = Boolean.getBoolean(STANDALONE_MODE_PROPERTY_NAME);
}
return IS_STANDALONE;
return isStandalone;
}
}

@ -34,6 +34,11 @@ import java.util.Map;
import static cn.hippo4j.common.constant.Constants.LINE_SEPARATOR;
import static cn.hippo4j.common.constant.Constants.WORD_SEPARATOR;
import static cn.hippo4j.common.constant.MagicNumberConstants.INDEX_0;
import static cn.hippo4j.common.constant.MagicNumberConstants.INDEX_1;
import static cn.hippo4j.common.constant.MagicNumberConstants.INDEX_2;
import static cn.hippo4j.common.constant.MagicNumberConstants.INDEX_3;
import static cn.hippo4j.common.constant.MagicNumberConstants.SIZE_4;
/**
* Md5 config util.
@ -44,6 +49,10 @@ public class Md5ConfigUtil {
static final char LINE_SEPARATOR_CHAR = (char) 1;
private static final int CLIENT_MD5_MAP_INIT_SIZE = 5;
private static final int CLIENT_MD5_MAP_MAX_SIZE = 10000;
private static final int CLIENT_MD5_TMP_LIST_INIT_SIZE = 3;
private static final int CLIENT_MD5_TMP_LIST_MAX_SIZE = 4;
/**
* Get thread pool content md5
*
@ -74,18 +83,18 @@ public class Md5ConfigUtil {
}
public static Map<String, String> getClientMd5Map(String configKeysString) {
Map<String, String> md5Map = new HashMap(5);
Map<String, String> md5Map = new HashMap(CLIENT_MD5_MAP_INIT_SIZE);
if (null == configKeysString || "".equals(configKeysString)) {
return md5Map;
}
int start = 0;
List<String> tmpList = new ArrayList(3);
List<String> tmpList = new ArrayList(CLIENT_MD5_TMP_LIST_INIT_SIZE);
for (int i = start; i < configKeysString.length(); i++) {
char c = configKeysString.charAt(i);
if (c == WORD_SEPARATOR_CHAR) {
tmpList.add(configKeysString.substring(start, i));
start = i + 1;
if (tmpList.size() > 4) {
if (tmpList.size() > CLIENT_MD5_TMP_LIST_MAX_SIZE) {
// Malformed message and return parameter error.
throw new IllegalArgumentException("invalid protocol,too much key");
}
@ -95,12 +104,12 @@ public class Md5ConfigUtil {
endValue = configKeysString.substring(start, i);
}
start = i + 1;
String groupKey = getKey(tmpList.get(0), tmpList.get(1), tmpList.get(2), tmpList.get(3));
String groupKey = getKey(tmpList.get(INDEX_0), tmpList.get(INDEX_1), tmpList.get(INDEX_2), tmpList.get(INDEX_3));
groupKey = SingletonRepository.DataIdGroupIdCache.getSingleton(groupKey);
md5Map.put(groupKey, endValue);
tmpList.clear();
// Protect malformed messages
if (md5Map.size() > 10000) {
if (md5Map.size() > CLIENT_MD5_MAP_MAX_SIZE) {
throw new IllegalArgumentException("invalid protocol, too much listener");
}
}
@ -140,7 +149,7 @@ public class Md5ConfigUtil {
sb.append(WORD_SEPARATOR);
sb.append(dataIdGroupId[1]);
// if have tenant, then set it
if (dataIdGroupId.length == 4) {
if (dataIdGroupId.length == SIZE_4) {
if (StringUtil.isNotBlank(dataIdGroupId[2])) {
sb.append(WORD_SEPARATOR);
sb.append(dataIdGroupId[2]);

@ -24,6 +24,8 @@ public class SimpleReadWriteLock {
private int status = 0;
private static final int FREE_STATUS = -1;
public synchronized boolean tryReadLock() {
if (isWriteLocked()) {
return false;
@ -41,7 +43,7 @@ public class SimpleReadWriteLock {
if (!isFree()) {
return false;
} else {
status = -1;
status = FREE_STATUS;
return true;
}
}

@ -24,8 +24,13 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class SingletonRepository<T> {
/**
* initialCapacity: 1 << 16
*/
private static final int INITIAL_CAPACITY = 65536;
public SingletonRepository() {
shared = new ConcurrentHashMap(1 << 16);
shared = new ConcurrentHashMap(INITIAL_CAPACITY);
}
public T getSingleton(T obj) {
@ -43,6 +48,9 @@ public class SingletonRepository<T> {
private final ConcurrentHashMap<T, T> shared;
/**
* Data Id Group Id Cache
*/
public static class DataIdGroupIdCache {
public static String getSingleton(String str) {

@ -26,7 +26,7 @@ public final class LocalDataChangeEventTest {
@Test
public void assertGetSingleton() {
LocalDataChangeEvent localDataChangeEvent = new LocalDataChangeEvent("groupKey", "identify");
Assert.isTrue(StringUtil.isNotEmpty(localDataChangeEvent.groupKey));
Assert.isTrue(StringUtil.isNotEmpty(localDataChangeEvent.identify));
Assert.isTrue(StringUtil.isNotEmpty(localDataChangeEvent.getGroupKey()));
Assert.isTrue(StringUtil.isNotEmpty(localDataChangeEvent.getIdentify()));
}
}

@ -214,7 +214,7 @@ public class ThreadPoolController {
List<ThreadPoolInstanceInfo> returnThreadPool = new ArrayList<>();
content.forEach((key, val) -> {
ThreadPoolInstanceInfo threadPoolInstanceInfo =
BeanUtil.convert(val.configAllInfo, ThreadPoolInstanceInfo.class);
BeanUtil.convert(val.getConfigAllInfo(), ThreadPoolInstanceInfo.class);
threadPoolInstanceInfo.setClientAddress(StringUtil.subBefore(key, Constants.IDENTIFY_SLICER_SYMBOL));
threadPoolInstanceInfo.setActive(activeMap.get(key));
threadPoolInstanceInfo.setIdentify(key);

Loading…
Cancel
Save