Add monitoring of Hystrix thread pools

pull/284/head
shiming-stars-lk 2 years ago committed by shining-stars-lk
parent 3ed35efee1
commit 26dea183c6

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.adapter.base;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* thread pool adapter extra.
*/
@Slf4j
public class ThreadPoolAdapterExtra {
private static final int BLOCKING_QUEUE_CAPACITY = 100;
private BlockingQueue<Map<String, ThreadPoolAdapter>> blockingQueue;
public ThreadPoolAdapterExtra() {
blockingQueue = new ArrayBlockingQueue(BLOCKING_QUEUE_CAPACITY);
}
public void offerQueue(Map<String, ThreadPoolAdapter> map) throws InterruptedException {
blockingQueue.offer(map, 5, TimeUnit.SECONDS);
}
public void extraStart(ThreadPoolAdapterExtraHandle threadPoolAdapterExtraHandle) {
new Thread(() -> {
try {
for (;;) {
Map<String, ThreadPoolAdapter> map = blockingQueue.take();
threadPoolAdapterExtraHandle.execute(map);
}
} catch (InterruptedException e) {
log.error("extraStart error", e);
}
}, "threadPoolAdapterExtra").start();
}
}

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.adapter.base;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* thread pool adapter extra auto configuration.
*/
@Configuration(proxyBeanMethods = false)
public class ThreadPoolAdapterExtraAutoConfiguration {
@Bean
public ThreadPoolAdapterExtra threadPoolAdapterExtra() {
return new ThreadPoolAdapterExtra();
}
}

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.adapter.base;
import java.util.Map;
/**
* Thread Pool Adapter Extra Handle
**/
@FunctionalInterface
public interface ThreadPoolAdapterExtraHandle {
void execute(Map<String, ThreadPoolAdapter> map);
}

@ -0,0 +1 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=cn.hippo4j.adapter.base.ThreadPoolAdapterExtraAutoConfiguration

@ -18,20 +18,29 @@
package cn.hippo4j.adapter.hystrix; package cn.hippo4j.adapter.hystrix;
import cn.hippo4j.adapter.base.ThreadPoolAdapter; import cn.hippo4j.adapter.base.ThreadPoolAdapter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterExtra;
import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter; import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterState; import cn.hippo4j.adapter.base.ThreadPoolAdapterState;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.toolkit.CollectionUtil; import cn.hippo4j.common.toolkit.CollectionUtil;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.netflix.hystrix.HystrixThreadPool; import com.netflix.hystrix.HystrixThreadPool;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.event.ApplicationStartedEvent; import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.ApplicationListener; import org.springframework.context.ApplicationListener;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER; import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER;
@ -45,8 +54,25 @@ public class HystrixThreadPoolAdapter implements ThreadPoolAdapter, ApplicationL
private static final String THREAD_POOLS_FIELD = "threadPools"; private static final String THREAD_POOLS_FIELD = "threadPools";
private static final int TASK_INTERVAL_SECONDS = 2;
private final Map<String, ThreadPoolExecutor> HYSTRIX_CONSUME_EXECUTOR = Maps.newHashMap(); private final Map<String, ThreadPoolExecutor> HYSTRIX_CONSUME_EXECUTOR = Maps.newHashMap();
private final ScheduledExecutorService scheduler;
private ThreadPoolAdapterExtra threadPoolAdapterExtra;
public HystrixThreadPoolAdapter(ThreadPoolAdapterExtra threadPoolAdapterExtra) {
this.threadPoolAdapterExtra = threadPoolAdapterExtra;
scheduler = new ScheduledThreadPoolExecutor(2,
new ThreadFactoryBuilder()
.setNameFormat("hystrixThreadPoolAdapter")
.setDaemon(true)
.build());
}
@Override @Override
public String mark() { public String mark() {
return "hystrix"; return "hystrix";
@ -55,17 +81,24 @@ public class HystrixThreadPoolAdapter implements ThreadPoolAdapter, ApplicationL
@Override @Override
public ThreadPoolAdapterState getThreadPoolState(String identify) { public ThreadPoolAdapterState getThreadPoolState(String identify) {
ThreadPoolAdapterState result = new ThreadPoolAdapterState(); ThreadPoolAdapterState result = new ThreadPoolAdapterState();
ThreadPoolExecutor rocketMQConsumeExecutor = HYSTRIX_CONSUME_EXECUTOR.get(identify); ThreadPoolExecutor threadPoolExecutor = HYSTRIX_CONSUME_EXECUTOR.get(identify);
if (rocketMQConsumeExecutor != null) { if (threadPoolExecutor != null) {
result.setThreadPoolKey(identify); result.setThreadPoolKey(identify);
result.setCoreSize(rocketMQConsumeExecutor.getCorePoolSize()); result.setCoreSize(threadPoolExecutor.getCorePoolSize());
result.setMaximumSize(rocketMQConsumeExecutor.getMaximumPoolSize()); result.setMaximumSize(threadPoolExecutor.getMaximumPoolSize());
return result; return result;
} }
log.warn("[{}] hystrix thread pool not found.", identify); log.warn("[{}] hystrix thread pool not found.", identify);
return result; return result;
} }
@Override
public List<ThreadPoolAdapterState> getThreadPoolStates() {
List<ThreadPoolAdapterState> threadPoolAdapterStates = new ArrayList<>();
HYSTRIX_CONSUME_EXECUTOR.forEach((kel, val) -> threadPoolAdapterStates.add(getThreadPoolState(String.valueOf(val))));
return threadPoolAdapterStates;
}
@Override @Override
public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) { public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) {
String threadPoolKey = threadPoolAdapterParameter.getThreadPoolKey(); String threadPoolKey = threadPoolAdapterParameter.getThreadPoolKey();
@ -87,32 +120,65 @@ public class HystrixThreadPoolAdapter implements ThreadPoolAdapter, ApplicationL
@Override @Override
public void onApplicationEvent(ApplicationStartedEvent event) { public void onApplicationEvent(ApplicationStartedEvent event) {
HystrixThreadPoolRefreshTask hystrixThreadPoolRefreshTask = new HystrixThreadPoolRefreshTask(scheduler);
scheduler.schedule(hystrixThreadPoolRefreshTask, TASK_INTERVAL_SECONDS, TimeUnit.SECONDS);
}
public void hystrixThreadPoolRefresh() {
try { try {
boolean addExtraFlag = false;
Class<HystrixThreadPool.Factory> factoryClass = HystrixThreadPool.Factory.class; Class<HystrixThreadPool.Factory> factoryClass = HystrixThreadPool.Factory.class;
Field threadPoolsField = factoryClass.getDeclaredField(THREAD_POOLS_FIELD); Field threadPoolsField = factoryClass.getDeclaredField(THREAD_POOLS_FIELD);
threadPoolsField.setAccessible(true); threadPoolsField.setAccessible(true);
ConcurrentHashMap<String, HystrixThreadPool> threadPools = ConcurrentHashMap<String, HystrixThreadPool> threadPools =
(ConcurrentHashMap<String, HystrixThreadPool>)threadPoolsField.get(factoryClass); (ConcurrentHashMap<String, HystrixThreadPool>) threadPoolsField.get(factoryClass);
if (CollectionUtil.isNotEmpty(threadPools)) { if (CollectionUtil.isNotEmpty(threadPools)) {
for (Map.Entry<String, HystrixThreadPool> stringHystrixThreadPoolEntry : threadPools.entrySet()) { for (Map.Entry<String, HystrixThreadPool> stringHystrixThreadPoolEntry : threadPools.entrySet()) {
String key = stringHystrixThreadPoolEntry.getKey(); String key = stringHystrixThreadPoolEntry.getKey();
HystrixThreadPool value = stringHystrixThreadPoolEntry.getValue(); HystrixThreadPool value = stringHystrixThreadPoolEntry.getValue();
if (value instanceof HystrixThreadPool.HystrixThreadPoolDefault) { if (value instanceof HystrixThreadPool.HystrixThreadPoolDefault) {
HystrixThreadPool.HystrixThreadPoolDefault hystrixThreadPoolDefault = HystrixThreadPool.HystrixThreadPoolDefault hystrixThreadPoolDefault =
(HystrixThreadPool.HystrixThreadPoolDefault)value; (HystrixThreadPool.HystrixThreadPoolDefault) value;
Class<? extends HystrixThreadPool.HystrixThreadPoolDefault> hystrixThreadPoolDefaultClass = hystrixThreadPoolDefault.getClass(); Class<? extends HystrixThreadPool.HystrixThreadPoolDefault> hystrixThreadPoolDefaultClass = hystrixThreadPoolDefault.getClass();
Field threadPoolField = hystrixThreadPoolDefaultClass.getDeclaredField(THREAD_POOL_FIELD); Field threadPoolField = hystrixThreadPoolDefaultClass.getDeclaredField(THREAD_POOL_FIELD);
threadPoolField.setAccessible(true); threadPoolField.setAccessible(true);
ThreadPoolExecutor threadPoolExecutor = ThreadPoolExecutor threadPoolExecutor =
(ThreadPoolExecutor)threadPoolField.get(hystrixThreadPoolDefault); (ThreadPoolExecutor) threadPoolField.get(hystrixThreadPoolDefault);
if (threadPoolExecutor != null) { if (threadPoolExecutor != null && HYSTRIX_CONSUME_EXECUTOR.get(key) == null) {
HYSTRIX_CONSUME_EXECUTOR.put(key,threadPoolExecutor); HYSTRIX_CONSUME_EXECUTOR.put(key, threadPoolExecutor);
addExtraFlag = true;
} }
} }
} }
} }
}catch (Exception e) { if (addExtraFlag) {
Map<String, ThreadPoolAdapter> map = Maps.newHashMap();
map.putAll(ApplicationContextHolder.getBeansOfType(HystrixThreadPoolAdapter.class));
threadPoolAdapterExtra.offerQueue(map);
}
} catch (Exception e) {
log.error("Failed to get Hystrix thread pool.", e); log.error("Failed to get Hystrix thread pool.", e);
} }
}
class HystrixThreadPoolRefreshTask implements Runnable {
private ScheduledExecutorService scheduler;
public HystrixThreadPoolRefreshTask(ScheduledExecutorService scheduler) {
this.scheduler = scheduler;
}
@Override
public void run() {
try {
hystrixThreadPoolRefresh();
} finally {
if (!scheduler.isShutdown()) {
scheduler.schedule(this, TASK_INTERVAL_SECONDS, TimeUnit.MILLISECONDS);
}
}
}
} }
} }

@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.config; package cn.hippo4j.config.config;
import cn.hippo4j.config.netty.MonitorNettyServer; import cn.hippo4j.config.netty.MonitorNettyServer;
@ -11,12 +28,12 @@ import org.springframework.context.annotation.Configuration;
public class NettyServerConfig { public class NettyServerConfig {
@Bean @Bean
public EventLoopGroup bossGroup(){ public EventLoopGroup bossGroup() {
return new NioEventLoopGroup(); return new NioEventLoopGroup();
} }
@Bean @Bean
public EventLoopGroup workGroup(){ public EventLoopGroup workGroup() {
return new NioEventLoopGroup(); return new NioEventLoopGroup();
} }
@ -24,9 +41,7 @@ public class NettyServerConfig {
public MonitorNettyServer monitorNettyServer(ServerBootstrapProperties serverBootstrapProperties, public MonitorNettyServer monitorNettyServer(ServerBootstrapProperties serverBootstrapProperties,
HisRunDataService hisRunDataService, HisRunDataService hisRunDataService,
EventLoopGroup bossGroup, EventLoopGroup bossGroup,
EventLoopGroup workGroup){ EventLoopGroup workGroup) {
return new MonitorNettyServer(serverBootstrapProperties,hisRunDataService,bossGroup,workGroup); return new MonitorNettyServer(serverBootstrapProperties, hisRunDataService, bossGroup, workGroup);
} }
} }

@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.netty; package cn.hippo4j.config.netty;
import cn.hippo4j.config.config.ServerBootstrapProperties; import cn.hippo4j.config.config.ServerBootstrapProperties;
@ -44,16 +61,17 @@ public class MonitorNettyServer {
private EventLoopGroup workGroup; private EventLoopGroup workGroup;
@PostConstruct @PostConstruct
public void nettyServerInit(){ public void nettyServerInit() {
new Thread(() -> { new Thread(() -> {
try { try {
ServerBootstrap serverBootstrap = new ServerBootstrap(); ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workGroup) serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class) .channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO)) .handler(new LoggingHandler(LogLevel.INFO))
//childHandler的任务由workGroup来执行 // childHandler的任务由workGroup来执行
//如果是handler则由bossGroup来执行 // 如果是handler则由bossGroup来执行
.childHandler(new ChannelInitializer<SocketChannel>(){ .childHandler(new ChannelInitializer<SocketChannel>() {
@Override @Override
protected void initChannel(SocketChannel ch) throws Exception { protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline(); ChannelPipeline pipeline = ch.pipeline();
@ -66,13 +84,13 @@ public class MonitorNettyServer {
ChannelFuture channelFuture = serverBootstrap.bind(Integer.parseInt(serverBootstrapProperties.getNettyServerPort())).sync(); ChannelFuture channelFuture = serverBootstrap.bind(Integer.parseInt(serverBootstrapProperties.getNettyServerPort())).sync();
channelFuture.channel().closeFuture().sync(); channelFuture.channel().closeFuture().sync();
} catch (Exception e) { } catch (Exception e) {
log.error("nettyServerInit error",e); log.error("nettyServerInit error", e);
} }
},"nettyServerInit thread").start(); }, "nettyServerInit thread").start();
} }
@PreDestroy @PreDestroy
public void destroy(){ public void destroy() {
bossGroup.shutdownGracefully(); bossGroup.shutdownGracefully();
workGroup.shutdownGracefully(); workGroup.shutdownGracefully();
} }

@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.netty; package cn.hippo4j.config.netty;
import cn.hippo4j.common.monitor.Message; import cn.hippo4j.common.monitor.Message;

@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.springboot.starter.adapter.kafka.example; package cn.hippo4j.springboot.starter.adapter.kafka.example;
import cn.hippo4j.core.enable.EnableDynamicThreadPool; import cn.hippo4j.core.enable.EnableDynamicThreadPool;

@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.springboot.starter.adapter.kafka.example.consumer; package cn.hippo4j.springboot.starter.adapter.kafka.example.consumer;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;

@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.springboot.starter.adapter.kafka.example.produce; package cn.hippo4j.springboot.starter.adapter.kafka.example.produce;
import cn.hippo4j.common.toolkit.JSONUtil; import cn.hippo4j.common.toolkit.JSONUtil;

@ -1,7 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.springboot.starter.adapter.hystrix; package cn.hippo4j.springboot.starter.adapter.hystrix;
import cn.hippo4j.adapter.base.ThreadPoolAdapterExtra;
import cn.hippo4j.adapter.base.ThreadPoolAdapterExtraAutoConfiguration;
import cn.hippo4j.adapter.hystrix.HystrixThreadPoolAdapter; import cn.hippo4j.adapter.hystrix.HystrixThreadPoolAdapter;
import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.config.ApplicationContextHolder;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
@ -13,6 +33,7 @@ import org.springframework.context.annotation.Configuration;
* @create: 2022-07-15 * @create: 2022-07-15
**/ **/
@Configuration(proxyBeanMethods = false) @Configuration(proxyBeanMethods = false)
@AutoConfigureAfter(ThreadPoolAdapterExtraAutoConfiguration.class)
public class HystrixAdapterAutoConfiguration { public class HystrixAdapterAutoConfiguration {
@Bean @Bean
@ -22,7 +43,7 @@ public class HystrixAdapterAutoConfiguration {
} }
@Bean @Bean
public HystrixThreadPoolAdapter hystrixThreadPoolAdapter(){ public HystrixThreadPoolAdapter hystrixThreadPoolAdapter(ThreadPoolAdapterExtra threadPoolAdapterExtra) {
return new HystrixThreadPoolAdapter(); return new HystrixThreadPoolAdapter(threadPoolAdapterExtra);
} }
} }

@ -18,6 +18,7 @@
package cn.hippo4j.springboot.starter.config; package cn.hippo4j.springboot.starter.config;
import cn.hippo4j.adapter.base.ThreadPoolAdapterBeanContainer; import cn.hippo4j.adapter.base.ThreadPoolAdapterBeanContainer;
import cn.hippo4j.adapter.base.ThreadPoolAdapterExtra;
import cn.hippo4j.common.api.ThreadDetailState; import cn.hippo4j.common.api.ThreadDetailState;
import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.core.config.UtilAutoConfiguration; import cn.hippo4j.core.config.UtilAutoConfiguration;
@ -64,7 +65,8 @@ import org.springframework.core.env.ConfigurableEnvironment;
@ConditionalOnBean(MarkerConfiguration.Marker.class) @ConditionalOnBean(MarkerConfiguration.Marker.class)
@EnableConfigurationProperties(BootstrapProperties.class) @EnableConfigurationProperties(BootstrapProperties.class)
@ConditionalOnProperty(prefix = BootstrapProperties.PREFIX, value = "enable", matchIfMissing = true, havingValue = "true") @ConditionalOnProperty(prefix = BootstrapProperties.PREFIX, value = "enable", matchIfMissing = true, havingValue = "true")
@ImportAutoConfiguration({HttpClientConfiguration.class, NettyClientConfiguration.class, DiscoveryConfiguration.class, MessageNotifyConfiguration.class, UtilAutoConfiguration.class, WebThreadPoolConfiguration.class}) @ImportAutoConfiguration({HttpClientConfiguration.class, NettyClientConfiguration.class, DiscoveryConfiguration.class, MessageNotifyConfiguration.class, UtilAutoConfiguration.class,
WebThreadPoolConfiguration.class})
public class DynamicThreadPoolAutoConfiguration { public class DynamicThreadPoolAutoConfiguration {
private final BootstrapProperties properties; private final BootstrapProperties properties;
@ -116,7 +118,6 @@ public class DynamicThreadPoolAutoConfiguration {
return new WebThreadPoolRunStateController(threadPoolRunStateHandler, threadDetailState); return new WebThreadPoolRunStateController(threadPoolRunStateHandler, threadDetailState);
} }
@Bean @Bean
@ConditionalOnMissingBean @ConditionalOnMissingBean
@SuppressWarnings("all") @SuppressWarnings("all")
@ -165,7 +166,7 @@ public class DynamicThreadPoolAutoConfiguration {
@Bean @Bean
@SuppressWarnings("all") @SuppressWarnings("all")
public ThreadPoolAdapterRegister threadPoolAdapterRegister(HttpAgent httpAgent, InetUtils hippo4JInetUtils) { public ThreadPoolAdapterRegister threadPoolAdapterRegister(HttpAgent httpAgent, InetUtils hippo4JInetUtils, ThreadPoolAdapterExtra threadPoolAdapterExtra) {
return new ThreadPoolAdapterRegister(httpAgent, properties, environment, hippo4JInetUtils); return new ThreadPoolAdapterRegister(httpAgent, properties, environment, hippo4JInetUtils, threadPoolAdapterExtra);
} }
} }

@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.springboot.starter.config; package cn.hippo4j.springboot.starter.config;
import cn.hippo4j.springboot.starter.monitor.send.netty.NettyConnectSender; import cn.hippo4j.springboot.starter.monitor.send.netty.NettyConnectSender;
@ -22,7 +39,7 @@ public class NettyClientConfiguration {
} }
@Bean @Bean
public MessageSender messageSender(ServerNettyAgent serverNettyAgent){ public MessageSender messageSender(ServerNettyAgent serverNettyAgent) {
return new NettyConnectSender(serverNettyAgent); return new NettyConnectSender(serverNettyAgent);
} }
} }

@ -17,6 +17,7 @@
package cn.hippo4j.springboot.starter.core; package cn.hippo4j.springboot.starter.core;
import cn.hippo4j.adapter.base.ThreadPoolAdapter;
import cn.hippo4j.common.api.ClientCloseHookExecute; import cn.hippo4j.common.api.ClientCloseHookExecute;
import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.constant.Constants; import cn.hippo4j.common.constant.Constants;
@ -31,6 +32,7 @@ import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.DisposableBean;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -141,7 +143,8 @@ public class DiscoveryClient implements DisposableBean {
boolean success = register(); boolean success = register();
// TODO Abstract server registration logic // TODO Abstract server registration logic
ThreadPoolAdapterRegister adapterRegister = ApplicationContextHolder.getBean(ThreadPoolAdapterRegister.class); ThreadPoolAdapterRegister adapterRegister = ApplicationContextHolder.getBean(ThreadPoolAdapterRegister.class);
adapterRegister.register(); Map<String, ThreadPoolAdapter> threadPoolAdapterMap = ApplicationContextHolder.getBeansOfType(ThreadPoolAdapter.class);
adapterRegister.register(threadPoolAdapterMap);
if (success) { if (success) {
instanceInfo.unsetIsDirty(timestamp); instanceInfo.unsetIsDirty(timestamp);
} }

@ -19,6 +19,7 @@ package cn.hippo4j.springboot.starter.core;
import cn.hippo4j.adapter.base.ThreadPoolAdapter; import cn.hippo4j.adapter.base.ThreadPoolAdapter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterCacheConfig; import cn.hippo4j.adapter.base.ThreadPoolAdapterCacheConfig;
import cn.hippo4j.adapter.base.ThreadPoolAdapterExtra;
import cn.hippo4j.adapter.base.ThreadPoolAdapterState; import cn.hippo4j.adapter.base.ThreadPoolAdapterState;
import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.toolkit.CollectionUtil; import cn.hippo4j.common.toolkit.CollectionUtil;
@ -56,13 +57,17 @@ public class ThreadPoolAdapterRegister implements ApplicationRunner {
private final InetUtils hippo4JInetUtils; private final InetUtils hippo4JInetUtils;
private final ThreadPoolAdapterExtra threadPoolAdapterExtra;
@Override @Override
public void run(ApplicationArguments args) throws Exception { public void run(ApplicationArguments args) throws Exception {
register(); Map<String, ThreadPoolAdapter> threadPoolAdapterMap = ApplicationContextHolder.getBeansOfType(ThreadPoolAdapter.class);
register(threadPoolAdapterMap);
threadPoolAdapterExtra.extraStart(map -> register(map));
} }
public void register() { public void register(Map<String, ThreadPoolAdapter> threadPoolAdapterMap) {
Map<String, ThreadPoolAdapter> threadPoolAdapterMap = ApplicationContextHolder.getBeansOfType(ThreadPoolAdapter.class);
List<ThreadPoolAdapterCacheConfig> cacheConfigList = Lists.newArrayList(); List<ThreadPoolAdapterCacheConfig> cacheConfigList = Lists.newArrayList();
threadPoolAdapterMap.forEach((key, val) -> { threadPoolAdapterMap.forEach((key, val) -> {
List<ThreadPoolAdapterState> threadPoolStates = val.getThreadPoolStates(); List<ThreadPoolAdapterState> threadPoolStates = val.getThreadPoolStates();

@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.springboot.starter.monitor.send.netty; package cn.hippo4j.springboot.starter.monitor.send.netty;
import cn.hippo4j.common.monitor.Message; import cn.hippo4j.common.monitor.Message;
@ -39,7 +56,8 @@ public class NettyConnectSender implements MessageSender {
Bootstrap bootstrap = new Bootstrap(); Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup) bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class) .channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>(){ .handler(new ChannelInitializer<SocketChannel>() {
@Override @Override
protected void initChannel(SocketChannel ch) throws Exception { protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline(); ChannelPipeline pipeline = ch.pipeline();
@ -52,9 +70,9 @@ public class NettyConnectSender implements MessageSender {
bootstrap.connect(serverNettyAgent.getNettyServerAddress(), serverNettyAgent.getNettyServerPort()).sync(); bootstrap.connect(serverNettyAgent.getNettyServerAddress(), serverNettyAgent.getNettyServerPort()).sync();
} catch (Exception e) { } catch (Exception e) {
log.error("netty send error ",e); log.error("netty send error ", e);
} /*finally { } /*
eventLoopGroup.shutdownGracefully(); * finally { eventLoopGroup.shutdownGracefully(); }
}*/ */
} }
} }

@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.springboot.starter.monitor.send.netty; package cn.hippo4j.springboot.starter.monitor.send.netty;
import cn.hippo4j.common.monitor.MessageWrapper; import cn.hippo4j.common.monitor.MessageWrapper;

@ -79,7 +79,7 @@ public class ServerListManager {
return currentServerAddr; return currentServerAddr;
} }
public String getNettyServerPort(){ public String getNettyServerPort() {
return nettyServerPort; return nettyServerPort;
} }

@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.springboot.starter.remote; package cn.hippo4j.springboot.starter.remote;
import cn.hippo4j.springboot.starter.config.BootstrapProperties; import cn.hippo4j.springboot.starter.config.BootstrapProperties;
@ -18,21 +35,21 @@ public class ServerNettyAgent {
private EventLoopGroup eventLoopGroup; private EventLoopGroup eventLoopGroup;
public ServerNettyAgent(BootstrapProperties properties){ public ServerNettyAgent(BootstrapProperties properties) {
this.dynamicThreadPoolProperties = properties; this.dynamicThreadPoolProperties = properties;
this.serverListManager = new ServerListManager(dynamicThreadPoolProperties); this.serverListManager = new ServerListManager(dynamicThreadPoolProperties);
this.eventLoopGroup = new NioEventLoopGroup(); this.eventLoopGroup = new NioEventLoopGroup();
} }
public EventLoopGroup getEventLoopGroup(){ public EventLoopGroup getEventLoopGroup() {
return eventLoopGroup; return eventLoopGroup;
} }
public String getNettyServerAddress() { public String getNettyServerAddress() {
return serverListManager.getCurrentServerAddr().split(":")[1].replace("//",""); return serverListManager.getCurrentServerAddr().split(":")[1].replace("//", "");
} }
public Integer getNettyServerPort(){ public Integer getNettyServerPort() {
return Integer.parseInt(serverListManager.getNettyServerPort()); return Integer.parseInt(serverListManager.getNettyServerPort());
} }
} }

Loading…
Cancel
Save