From 26dea183c6fd1927c522e64098b18a9ca7b4f134 Mon Sep 17 00:00:00 2001 From: shiming-stars-lk <1031900093@qq.com> Date: Fri, 15 Jul 2022 16:44:41 +0800 Subject: [PATCH] Add monitoring of Hystrix thread pools --- .../adapter/base/ThreadPoolAdapterExtra.java | 57 ++++++++++++ ...readPoolAdapterExtraAutoConfiguration.java | 34 ++++++++ .../base/ThreadPoolAdapterExtraHandle.java | 29 +++++++ .../main/resources/META-INF/spring.factories | 1 + .../hystrix/HystrixThreadPoolAdapter.java | 86 ++++++++++++++++--- .../config/config/NettyServerConfig.java | 27 ++++-- .../config/netty/MonitorNettyServer.java | 34 ++++++-- .../hippo4j/config/netty/ServerHandler.java | 17 ++++ ...Hippo4jAdapterKafkaExampleApplication.java | 17 ++++ .../consumer/KafkaMessageConsumer.java | 17 ++++ .../example/produce/KafkaMessageProduce.java | 17 ++++ .../HystrixAdapterAutoConfiguration.java | 25 +++++- .../DynamicThreadPoolAutoConfiguration.java | 9 +- .../config/NettyClientConfiguration.java | 19 +++- .../starter/core/DiscoveryClient.java | 5 +- .../core/ThreadPoolAdapterRegister.java | 11 ++- .../send/netty/NettyConnectSender.java | 28 ++++-- .../monitor/send/netty/SenderHandler.java | 17 ++++ .../starter/remote/ServerListManager.java | 2 +- .../starter/remote/ServerNettyAgent.java | 25 +++++- 20 files changed, 432 insertions(+), 45 deletions(-) create mode 100644 hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtra.java create mode 100644 hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtraAutoConfiguration.java create mode 100644 hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtraHandle.java create mode 100644 hippo4j-adapter/hippo4j-adapter-base/src/main/resources/META-INF/spring.factories diff --git a/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtra.java b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtra.java new file mode 100644 index 00000000..695f2402 --- /dev/null +++ b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtra.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.adapter.base; + +import lombok.extern.slf4j.Slf4j; + +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * thread pool adapter extra. + */ +@Slf4j +public class ThreadPoolAdapterExtra { + + private static final int BLOCKING_QUEUE_CAPACITY = 100; + + private BlockingQueue> blockingQueue; + + public ThreadPoolAdapterExtra() { + blockingQueue = new ArrayBlockingQueue(BLOCKING_QUEUE_CAPACITY); + } + + public void offerQueue(Map map) throws InterruptedException { + blockingQueue.offer(map, 5, TimeUnit.SECONDS); + } + + public void extraStart(ThreadPoolAdapterExtraHandle threadPoolAdapterExtraHandle) { + new Thread(() -> { + try { + for (;;) { + Map map = blockingQueue.take(); + threadPoolAdapterExtraHandle.execute(map); + } + } catch (InterruptedException e) { + log.error("extraStart error", e); + } + }, "threadPoolAdapterExtra").start(); + } +} diff --git a/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtraAutoConfiguration.java b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtraAutoConfiguration.java new file mode 100644 index 00000000..a856a52b --- /dev/null +++ b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtraAutoConfiguration.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.adapter.base; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * thread pool adapter extra auto configuration. + */ +@Configuration(proxyBeanMethods = false) +public class ThreadPoolAdapterExtraAutoConfiguration { + + @Bean + public ThreadPoolAdapterExtra threadPoolAdapterExtra() { + return new ThreadPoolAdapterExtra(); + } + +} diff --git a/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtraHandle.java b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtraHandle.java new file mode 100644 index 00000000..a6cde2e7 --- /dev/null +++ b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtraHandle.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.adapter.base; + +import java.util.Map; + +/** + * Thread Pool Adapter Extra Handle + **/ +@FunctionalInterface +public interface ThreadPoolAdapterExtraHandle { + + void execute(Map map); +} diff --git a/hippo4j-adapter/hippo4j-adapter-base/src/main/resources/META-INF/spring.factories b/hippo4j-adapter/hippo4j-adapter-base/src/main/resources/META-INF/spring.factories new file mode 100644 index 00000000..4236a045 --- /dev/null +++ b/hippo4j-adapter/hippo4j-adapter-base/src/main/resources/META-INF/spring.factories @@ -0,0 +1 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=cn.hippo4j.adapter.base.ThreadPoolAdapterExtraAutoConfiguration \ No newline at end of file diff --git a/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java b/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java index 1fc1e336..fd2bd316 100644 --- a/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java +++ b/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java @@ -18,20 +18,29 @@ package cn.hippo4j.adapter.hystrix; import cn.hippo4j.adapter.base.ThreadPoolAdapter; +import cn.hippo4j.adapter.base.ThreadPoolAdapterExtra; import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter; import cn.hippo4j.adapter.base.ThreadPoolAdapterState; +import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.toolkit.CollectionUtil; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.netflix.hystrix.HystrixThreadPool; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.context.event.ApplicationStartedEvent; import org.springframework.context.ApplicationListener; import java.lang.reflect.Field; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER; @@ -45,8 +54,25 @@ public class HystrixThreadPoolAdapter implements ThreadPoolAdapter, ApplicationL private static final String THREAD_POOLS_FIELD = "threadPools"; + private static final int TASK_INTERVAL_SECONDS = 2; + private final Map HYSTRIX_CONSUME_EXECUTOR = Maps.newHashMap(); + private final ScheduledExecutorService scheduler; + + private ThreadPoolAdapterExtra threadPoolAdapterExtra; + + public HystrixThreadPoolAdapter(ThreadPoolAdapterExtra threadPoolAdapterExtra) { + + this.threadPoolAdapterExtra = threadPoolAdapterExtra; + + scheduler = new ScheduledThreadPoolExecutor(2, + new ThreadFactoryBuilder() + .setNameFormat("hystrixThreadPoolAdapter") + .setDaemon(true) + .build()); + } + @Override public String mark() { return "hystrix"; @@ -55,17 +81,24 @@ public class HystrixThreadPoolAdapter implements ThreadPoolAdapter, ApplicationL @Override public ThreadPoolAdapterState getThreadPoolState(String identify) { ThreadPoolAdapterState result = new ThreadPoolAdapterState(); - ThreadPoolExecutor rocketMQConsumeExecutor = HYSTRIX_CONSUME_EXECUTOR.get(identify); - if (rocketMQConsumeExecutor != null) { + ThreadPoolExecutor threadPoolExecutor = HYSTRIX_CONSUME_EXECUTOR.get(identify); + if (threadPoolExecutor != null) { result.setThreadPoolKey(identify); - result.setCoreSize(rocketMQConsumeExecutor.getCorePoolSize()); - result.setMaximumSize(rocketMQConsumeExecutor.getMaximumPoolSize()); + result.setCoreSize(threadPoolExecutor.getCorePoolSize()); + result.setMaximumSize(threadPoolExecutor.getMaximumPoolSize()); return result; } log.warn("[{}] hystrix thread pool not found.", identify); return result; } + @Override + public List getThreadPoolStates() { + List threadPoolAdapterStates = new ArrayList<>(); + HYSTRIX_CONSUME_EXECUTOR.forEach((kel, val) -> threadPoolAdapterStates.add(getThreadPoolState(String.valueOf(val)))); + return threadPoolAdapterStates; + } + @Override public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) { String threadPoolKey = threadPoolAdapterParameter.getThreadPoolKey(); @@ -87,32 +120,65 @@ public class HystrixThreadPoolAdapter implements ThreadPoolAdapter, ApplicationL @Override public void onApplicationEvent(ApplicationStartedEvent event) { + HystrixThreadPoolRefreshTask hystrixThreadPoolRefreshTask = new HystrixThreadPoolRefreshTask(scheduler); + scheduler.schedule(hystrixThreadPoolRefreshTask, TASK_INTERVAL_SECONDS, TimeUnit.SECONDS); + } + + public void hystrixThreadPoolRefresh() { try { + boolean addExtraFlag = false; Class factoryClass = HystrixThreadPool.Factory.class; Field threadPoolsField = factoryClass.getDeclaredField(THREAD_POOLS_FIELD); threadPoolsField.setAccessible(true); ConcurrentHashMap threadPools = - (ConcurrentHashMap)threadPoolsField.get(factoryClass); + (ConcurrentHashMap) threadPoolsField.get(factoryClass); if (CollectionUtil.isNotEmpty(threadPools)) { for (Map.Entry stringHystrixThreadPoolEntry : threadPools.entrySet()) { String key = stringHystrixThreadPoolEntry.getKey(); HystrixThreadPool value = stringHystrixThreadPoolEntry.getValue(); if (value instanceof HystrixThreadPool.HystrixThreadPoolDefault) { HystrixThreadPool.HystrixThreadPoolDefault hystrixThreadPoolDefault = - (HystrixThreadPool.HystrixThreadPoolDefault)value; + (HystrixThreadPool.HystrixThreadPoolDefault) value; Class hystrixThreadPoolDefaultClass = hystrixThreadPoolDefault.getClass(); Field threadPoolField = hystrixThreadPoolDefaultClass.getDeclaredField(THREAD_POOL_FIELD); threadPoolField.setAccessible(true); ThreadPoolExecutor threadPoolExecutor = - (ThreadPoolExecutor)threadPoolField.get(hystrixThreadPoolDefault); - if (threadPoolExecutor != null) { - HYSTRIX_CONSUME_EXECUTOR.put(key,threadPoolExecutor); + (ThreadPoolExecutor) threadPoolField.get(hystrixThreadPoolDefault); + if (threadPoolExecutor != null && HYSTRIX_CONSUME_EXECUTOR.get(key) == null) { + HYSTRIX_CONSUME_EXECUTOR.put(key, threadPoolExecutor); + addExtraFlag = true; } } } } - }catch (Exception e) { + if (addExtraFlag) { + Map map = Maps.newHashMap(); + map.putAll(ApplicationContextHolder.getBeansOfType(HystrixThreadPoolAdapter.class)); + threadPoolAdapterExtra.offerQueue(map); + } + } catch (Exception e) { log.error("Failed to get Hystrix thread pool.", e); } + + } + + class HystrixThreadPoolRefreshTask implements Runnable { + + private ScheduledExecutorService scheduler; + + public HystrixThreadPoolRefreshTask(ScheduledExecutorService scheduler) { + this.scheduler = scheduler; + } + + @Override + public void run() { + try { + hystrixThreadPoolRefresh(); + } finally { + if (!scheduler.isShutdown()) { + scheduler.schedule(this, TASK_INTERVAL_SECONDS, TimeUnit.MILLISECONDS); + } + } + } } } diff --git a/hippo4j-config/src/main/java/cn/hippo4j/config/config/NettyServerConfig.java b/hippo4j-config/src/main/java/cn/hippo4j/config/config/NettyServerConfig.java index 57373cd4..d326dff1 100644 --- a/hippo4j-config/src/main/java/cn/hippo4j/config/config/NettyServerConfig.java +++ b/hippo4j-config/src/main/java/cn/hippo4j/config/config/NettyServerConfig.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package cn.hippo4j.config.config; import cn.hippo4j.config.netty.MonitorNettyServer; @@ -11,12 +28,12 @@ import org.springframework.context.annotation.Configuration; public class NettyServerConfig { @Bean - public EventLoopGroup bossGroup(){ + public EventLoopGroup bossGroup() { return new NioEventLoopGroup(); } @Bean - public EventLoopGroup workGroup(){ + public EventLoopGroup workGroup() { return new NioEventLoopGroup(); } @@ -24,9 +41,7 @@ public class NettyServerConfig { public MonitorNettyServer monitorNettyServer(ServerBootstrapProperties serverBootstrapProperties, HisRunDataService hisRunDataService, EventLoopGroup bossGroup, - EventLoopGroup workGroup){ - return new MonitorNettyServer(serverBootstrapProperties,hisRunDataService,bossGroup,workGroup); + EventLoopGroup workGroup) { + return new MonitorNettyServer(serverBootstrapProperties, hisRunDataService, bossGroup, workGroup); } } - - diff --git a/hippo4j-config/src/main/java/cn/hippo4j/config/netty/MonitorNettyServer.java b/hippo4j-config/src/main/java/cn/hippo4j/config/netty/MonitorNettyServer.java index b39e04e5..1a2d1a49 100644 --- a/hippo4j-config/src/main/java/cn/hippo4j/config/netty/MonitorNettyServer.java +++ b/hippo4j-config/src/main/java/cn/hippo4j/config/netty/MonitorNettyServer.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package cn.hippo4j.config.netty; import cn.hippo4j.config.config.ServerBootstrapProperties; @@ -44,16 +61,17 @@ public class MonitorNettyServer { private EventLoopGroup workGroup; @PostConstruct - public void nettyServerInit(){ + public void nettyServerInit() { new Thread(() -> { try { ServerBootstrap serverBootstrap = new ServerBootstrap(); - serverBootstrap.group(bossGroup,workGroup) + serverBootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) - //childHandler的任务由workGroup来执行 - //如果是handler,则由bossGroup来执行 - .childHandler(new ChannelInitializer(){ + // childHandler的任务由workGroup来执行 + // 如果是handler,则由bossGroup来执行 + .childHandler(new ChannelInitializer() { + @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); @@ -66,13 +84,13 @@ public class MonitorNettyServer { ChannelFuture channelFuture = serverBootstrap.bind(Integer.parseInt(serverBootstrapProperties.getNettyServerPort())).sync(); channelFuture.channel().closeFuture().sync(); } catch (Exception e) { - log.error("nettyServerInit error",e); + log.error("nettyServerInit error", e); } - },"nettyServerInit thread").start(); + }, "nettyServerInit thread").start(); } @PreDestroy - public void destroy(){ + public void destroy() { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } diff --git a/hippo4j-config/src/main/java/cn/hippo4j/config/netty/ServerHandler.java b/hippo4j-config/src/main/java/cn/hippo4j/config/netty/ServerHandler.java index c8bce8a6..3f21fe5e 100644 --- a/hippo4j-config/src/main/java/cn/hippo4j/config/netty/ServerHandler.java +++ b/hippo4j-config/src/main/java/cn/hippo4j/config/netty/ServerHandler.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package cn.hippo4j.config.netty; import cn.hippo4j.common.monitor.Message; diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/Hippo4jAdapterKafkaExampleApplication.java b/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/Hippo4jAdapterKafkaExampleApplication.java index 0cf6e45c..e4c0e80a 100644 --- a/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/Hippo4jAdapterKafkaExampleApplication.java +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/Hippo4jAdapterKafkaExampleApplication.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package cn.hippo4j.springboot.starter.adapter.kafka.example; import cn.hippo4j.core.enable.EnableDynamicThreadPool; diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/consumer/KafkaMessageConsumer.java b/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/consumer/KafkaMessageConsumer.java index 8f6a5821..43087cc9 100644 --- a/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/consumer/KafkaMessageConsumer.java +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/consumer/KafkaMessageConsumer.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package cn.hippo4j.springboot.starter.adapter.kafka.example.consumer; import lombok.extern.slf4j.Slf4j; diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/produce/KafkaMessageProduce.java b/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/produce/KafkaMessageProduce.java index 119fb92c..04d01c71 100644 --- a/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/produce/KafkaMessageProduce.java +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/produce/KafkaMessageProduce.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package cn.hippo4j.springboot.starter.adapter.kafka.example.produce; import cn.hippo4j.common.toolkit.JSONUtil; diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-hystrix/src/main/java/cn/hippo4j/springboot/starter/adapter/hystrix/HystrixAdapterAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-hystrix/src/main/java/cn/hippo4j/springboot/starter/adapter/hystrix/HystrixAdapterAutoConfiguration.java index 9e2d1ac2..5d8a9243 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-hystrix/src/main/java/cn/hippo4j/springboot/starter/adapter/hystrix/HystrixAdapterAutoConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-hystrix/src/main/java/cn/hippo4j/springboot/starter/adapter/hystrix/HystrixAdapterAutoConfiguration.java @@ -1,7 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package cn.hippo4j.springboot.starter.adapter.hystrix; +import cn.hippo4j.adapter.base.ThreadPoolAdapterExtra; +import cn.hippo4j.adapter.base.ThreadPoolAdapterExtraAutoConfiguration; import cn.hippo4j.adapter.hystrix.HystrixThreadPoolAdapter; import cn.hippo4j.common.config.ApplicationContextHolder; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -13,6 +33,7 @@ import org.springframework.context.annotation.Configuration; * @create: 2022-07-15 **/ @Configuration(proxyBeanMethods = false) +@AutoConfigureAfter(ThreadPoolAdapterExtraAutoConfiguration.class) public class HystrixAdapterAutoConfiguration { @Bean @@ -22,7 +43,7 @@ public class HystrixAdapterAutoConfiguration { } @Bean - public HystrixThreadPoolAdapter hystrixThreadPoolAdapter(){ - return new HystrixThreadPoolAdapter(); + public HystrixThreadPoolAdapter hystrixThreadPoolAdapter(ThreadPoolAdapterExtra threadPoolAdapterExtra) { + return new HystrixThreadPoolAdapter(threadPoolAdapterExtra); } } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java index dd8d8c93..d218a7e4 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java @@ -18,6 +18,7 @@ package cn.hippo4j.springboot.starter.config; import cn.hippo4j.adapter.base.ThreadPoolAdapterBeanContainer; +import cn.hippo4j.adapter.base.ThreadPoolAdapterExtra; import cn.hippo4j.common.api.ThreadDetailState; import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.core.config.UtilAutoConfiguration; @@ -64,7 +65,8 @@ import org.springframework.core.env.ConfigurableEnvironment; @ConditionalOnBean(MarkerConfiguration.Marker.class) @EnableConfigurationProperties(BootstrapProperties.class) @ConditionalOnProperty(prefix = BootstrapProperties.PREFIX, value = "enable", matchIfMissing = true, havingValue = "true") -@ImportAutoConfiguration({HttpClientConfiguration.class, NettyClientConfiguration.class, DiscoveryConfiguration.class, MessageNotifyConfiguration.class, UtilAutoConfiguration.class, WebThreadPoolConfiguration.class}) +@ImportAutoConfiguration({HttpClientConfiguration.class, NettyClientConfiguration.class, DiscoveryConfiguration.class, MessageNotifyConfiguration.class, UtilAutoConfiguration.class, + WebThreadPoolConfiguration.class}) public class DynamicThreadPoolAutoConfiguration { private final BootstrapProperties properties; @@ -116,7 +118,6 @@ public class DynamicThreadPoolAutoConfiguration { return new WebThreadPoolRunStateController(threadPoolRunStateHandler, threadDetailState); } - @Bean @ConditionalOnMissingBean @SuppressWarnings("all") @@ -165,7 +166,7 @@ public class DynamicThreadPoolAutoConfiguration { @Bean @SuppressWarnings("all") - public ThreadPoolAdapterRegister threadPoolAdapterRegister(HttpAgent httpAgent, InetUtils hippo4JInetUtils) { - return new ThreadPoolAdapterRegister(httpAgent, properties, environment, hippo4JInetUtils); + public ThreadPoolAdapterRegister threadPoolAdapterRegister(HttpAgent httpAgent, InetUtils hippo4JInetUtils, ThreadPoolAdapterExtra threadPoolAdapterExtra) { + return new ThreadPoolAdapterRegister(httpAgent, properties, environment, hippo4JInetUtils, threadPoolAdapterExtra); } } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/NettyClientConfiguration.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/NettyClientConfiguration.java index 85363404..1ac2a0fa 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/NettyClientConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/NettyClientConfiguration.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package cn.hippo4j.springboot.starter.config; import cn.hippo4j.springboot.starter.monitor.send.netty.NettyConnectSender; @@ -22,7 +39,7 @@ public class NettyClientConfiguration { } @Bean - public MessageSender messageSender(ServerNettyAgent serverNettyAgent){ + public MessageSender messageSender(ServerNettyAgent serverNettyAgent) { return new NettyConnectSender(serverNettyAgent); } } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DiscoveryClient.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DiscoveryClient.java index d081f1a1..2beea738 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DiscoveryClient.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DiscoveryClient.java @@ -17,6 +17,7 @@ package cn.hippo4j.springboot.starter.core; +import cn.hippo4j.adapter.base.ThreadPoolAdapter; import cn.hippo4j.common.api.ClientCloseHookExecute; import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.constant.Constants; @@ -31,6 +32,7 @@ import cn.hutool.core.util.StrUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.DisposableBean; +import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -141,7 +143,8 @@ public class DiscoveryClient implements DisposableBean { boolean success = register(); // TODO Abstract server registration logic ThreadPoolAdapterRegister adapterRegister = ApplicationContextHolder.getBean(ThreadPoolAdapterRegister.class); - adapterRegister.register(); + Map threadPoolAdapterMap = ApplicationContextHolder.getBeansOfType(ThreadPoolAdapter.class); + adapterRegister.register(threadPoolAdapterMap); if (success) { instanceInfo.unsetIsDirty(timestamp); } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java index a51c06ab..6a1e3b1a 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java @@ -19,6 +19,7 @@ package cn.hippo4j.springboot.starter.core; import cn.hippo4j.adapter.base.ThreadPoolAdapter; import cn.hippo4j.adapter.base.ThreadPoolAdapterCacheConfig; +import cn.hippo4j.adapter.base.ThreadPoolAdapterExtra; import cn.hippo4j.adapter.base.ThreadPoolAdapterState; import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.toolkit.CollectionUtil; @@ -56,13 +57,17 @@ public class ThreadPoolAdapterRegister implements ApplicationRunner { private final InetUtils hippo4JInetUtils; + private final ThreadPoolAdapterExtra threadPoolAdapterExtra; + @Override public void run(ApplicationArguments args) throws Exception { - register(); + Map threadPoolAdapterMap = ApplicationContextHolder.getBeansOfType(ThreadPoolAdapter.class); + register(threadPoolAdapterMap); + threadPoolAdapterExtra.extraStart(map -> register(map)); } - public void register() { - Map threadPoolAdapterMap = ApplicationContextHolder.getBeansOfType(ThreadPoolAdapter.class); + public void register(Map threadPoolAdapterMap) { + List cacheConfigList = Lists.newArrayList(); threadPoolAdapterMap.forEach((key, val) -> { List threadPoolStates = val.getThreadPoolStates(); diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/monitor/send/netty/NettyConnectSender.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/monitor/send/netty/NettyConnectSender.java index c30cfa6b..370aaf6b 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/monitor/send/netty/NettyConnectSender.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/monitor/send/netty/NettyConnectSender.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package cn.hippo4j.springboot.starter.monitor.send.netty; import cn.hippo4j.common.monitor.Message; @@ -39,7 +56,8 @@ public class NettyConnectSender implements MessageSender { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) - .handler(new ChannelInitializer(){ + .handler(new ChannelInitializer() { + @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); @@ -52,9 +70,9 @@ public class NettyConnectSender implements MessageSender { bootstrap.connect(serverNettyAgent.getNettyServerAddress(), serverNettyAgent.getNettyServerPort()).sync(); } catch (Exception e) { - log.error("netty send error ",e); - } /*finally { - eventLoopGroup.shutdownGracefully(); - }*/ + log.error("netty send error ", e); + } /* + * finally { eventLoopGroup.shutdownGracefully(); } + */ } } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/monitor/send/netty/SenderHandler.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/monitor/send/netty/SenderHandler.java index 72832b52..7cf68542 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/monitor/send/netty/SenderHandler.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/monitor/send/netty/SenderHandler.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package cn.hippo4j.springboot.starter.monitor.send.netty; import cn.hippo4j.common.monitor.MessageWrapper; diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/ServerListManager.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/ServerListManager.java index 34f6cfc2..c384e2eb 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/ServerListManager.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/ServerListManager.java @@ -79,7 +79,7 @@ public class ServerListManager { return currentServerAddr; } - public String getNettyServerPort(){ + public String getNettyServerPort() { return nettyServerPort; } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/ServerNettyAgent.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/ServerNettyAgent.java index 52bf884f..a2958173 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/ServerNettyAgent.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/ServerNettyAgent.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package cn.hippo4j.springboot.starter.remote; import cn.hippo4j.springboot.starter.config.BootstrapProperties; @@ -18,21 +35,21 @@ public class ServerNettyAgent { private EventLoopGroup eventLoopGroup; - public ServerNettyAgent(BootstrapProperties properties){ + public ServerNettyAgent(BootstrapProperties properties) { this.dynamicThreadPoolProperties = properties; this.serverListManager = new ServerListManager(dynamicThreadPoolProperties); this.eventLoopGroup = new NioEventLoopGroup(); } - public EventLoopGroup getEventLoopGroup(){ + public EventLoopGroup getEventLoopGroup() { return eventLoopGroup; } public String getNettyServerAddress() { - return serverListManager.getCurrentServerAddr().split(":")[1].replace("//",""); + return serverListManager.getCurrentServerAddr().split(":")[1].replace("//", ""); } - public Integer getNettyServerPort(){ + public Integer getNettyServerPort() { return Integer.parseInt(serverListManager.getNettyServerPort()); } }