监控适配netty

pull/277/head
shining-stars-lk 3 years ago
parent a0c72defcb
commit 6f6ad9e01d

@ -21,6 +21,7 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
@ -30,7 +31,7 @@ import java.util.Map;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MessageWrapper implements MessageRequest<Message> {
public class MessageWrapper implements MessageRequest<Message>, Serializable {
/**
* contentParams

@ -43,6 +43,11 @@
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
</dependencies>
<build>

@ -0,0 +1,73 @@
package cn.hippo4j.console.netty;
import cn.hippo4j.config.monitor.QueryMonitorExecuteChoose;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* Netty MonitorNettyServer
*
* @author lk
* @date 2022/06/18
*/
@Slf4j
@AllArgsConstructor
@Component
public class MonitorNettyServer {
private final QueryMonitorExecuteChoose queryMonitorExecuteChoose;
private final ThreadPoolTaskExecutor monitorThreadPoolTaskExecutor;
@PostConstruct
public void nettyServerInit(){
new Thread(() -> {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
//childHandler的任务由workGroup来执行
//如果是handler则由bossGroup来执行
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE,
ClassResolvers.cacheDisabled(null)));
pipeline.addLast(new ServerHandler(queryMonitorExecuteChoose,
monitorThreadPoolTaskExecutor));
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
},"netty-server-thread").start();
}
}

@ -0,0 +1,39 @@
package cn.hippo4j.console.netty;
import cn.hippo4j.common.monitor.Message;
import cn.hippo4j.common.monitor.MessageWrapper;
import cn.hippo4j.common.toolkit.MessageConvert;
import cn.hippo4j.config.monitor.QueryMonitorExecuteChoose;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
* ServerHandler
*
* @author lk
* @date 2022/06/18
*/
@Slf4j
@AllArgsConstructor
public class ServerHandler extends SimpleChannelInboundHandler<MessageWrapper> {
private QueryMonitorExecuteChoose queryMonitorExecuteChoose;
private ThreadPoolTaskExecutor monitorThreadPoolTaskExecutor;
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageWrapper msg) throws Exception {
Runnable task = () -> {
Message message = MessageConvert.convert(msg);
queryMonitorExecuteChoose.chooseAndExecute(message);
};
try {
monitorThreadPoolTaskExecutor.execute(task);
} catch (Exception ex) {
log.error("Monitoring data insertion database task overflow.", ex);
}
}
}

@ -6,6 +6,8 @@ spring.profiles.active=dev
spring.application.name=dynamic-threadpool-example
spring.dynamic.thread-pool.server-addr=http://localhost:6691
spring.dynamic.thread-pool.netty-server-port=8899
spring.dynamic.thread-pool.report-type=netty
spring.dynamic.thread-pool.namespace=prescription
spring.dynamic.thread-pool.item-id=dynamic-threadpool-example
spring.dynamic.thread-pool.username=admin

@ -86,6 +86,12 @@
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-adapter-base</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
</dependencies>
<build>

@ -52,6 +52,15 @@ public class BootstrapProperties implements BootstrapPropertiesInterface {
*/
private String serverAddr;
/**
* Netty server port
*/
private String nettyServerPort;
/**
* Report type
*/
private String reportType;
/**
* Namespace
*/

@ -42,6 +42,7 @@ import cn.hippo4j.springboot.starter.remote.HttpAgent;
import cn.hippo4j.springboot.starter.remote.HttpScheduledHealthCheck;
import cn.hippo4j.springboot.starter.remote.ServerHealthCheck;
import lombok.AllArgsConstructor;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
@ -64,7 +65,7 @@ import org.springframework.core.env.ConfigurableEnvironment;
@ConditionalOnBean(MarkerConfiguration.Marker.class)
@EnableConfigurationProperties(BootstrapProperties.class)
@ConditionalOnProperty(prefix = BootstrapProperties.PREFIX, value = "enable", matchIfMissing = true, havingValue = "true")
@ImportAutoConfiguration({HttpClientConfiguration.class, DiscoveryConfiguration.class, MessageNotifyConfiguration.class, UtilAutoConfiguration.class, WebThreadPoolConfiguration.class})
@ImportAutoConfiguration({HttpClientConfiguration.class, NettyClientConfiguration.class, DiscoveryConfiguration.class, MessageNotifyConfiguration.class, UtilAutoConfiguration.class, WebThreadPoolConfiguration.class})
public class DynamicThreadPoolAutoConfiguration {
private final BootstrapProperties properties;
@ -116,9 +117,11 @@ public class DynamicThreadPoolAutoConfiguration {
return new WebThreadPoolRunStateController(threadPoolRunStateHandler, threadDetailState);
}
@Bean
@ConditionalOnMissingBean
@SuppressWarnings("all")
public HttpConnectSender httpMvcSender(HttpAgent httpAgent) {
public MessageSender httpMvcSender(HttpAgent httpAgent) {
return new HttpConnectSender(httpAgent);
}

@ -0,0 +1,31 @@
package cn.hippo4j.springboot.starter.config;
import cn.hippo4j.springboot.starter.monitor.netty.NettyConnectSender;
import cn.hippo4j.springboot.starter.monitor.send.MessageSender;
import cn.hippo4j.springboot.starter.remote.ServerNettyAgent;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Netty ClientCon figuration
*
* @author lk
* @date 2022/6/18
*/
@ConditionalOnProperty(prefix = BootstrapProperties.PREFIX, name = "report-type", matchIfMissing = false, havingValue = "netty")
public class NettyClientConfiguration {
@Bean
@SuppressWarnings("all")
public ServerNettyAgent serverNettyAgent(BootstrapProperties properties) {
return new ServerNettyAgent(properties);
}
@Bean
public MessageSender nettyConnectSender(ServerNettyAgent serverNettyAgent){
return new NettyConnectSender(serverNettyAgent);
}
}

@ -0,0 +1,60 @@
package cn.hippo4j.springboot.starter.monitor.netty;
import cn.hippo4j.common.monitor.Message;
import cn.hippo4j.common.monitor.MessageWrapper;
import cn.hippo4j.common.toolkit.MessageConvert;
import cn.hippo4j.springboot.starter.monitor.send.MessageSender;
import cn.hippo4j.springboot.starter.remote.ServerNettyAgent;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* Netty ConnectSender
*
* @author lk
* @date 2022/06/18
*/
@Slf4j
@AllArgsConstructor
@Component
public class NettyConnectSender implements MessageSender {
private ServerNettyAgent serverNettyAgent;
@Override
public void send(Message message) {
MessageWrapper messageWrapper = MessageConvert.convert(message);
EventLoopGroup eventLoopGroup = serverNettyAgent.getEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE,
ClassResolvers.cacheDisabled(null)));
pipeline.addLast(new SenderHandler(messageWrapper));
}
});
bootstrap.connect(serverNettyAgent.getNettyServerAddress(), serverNettyAgent.getNettyServerPort()).sync();
} catch (Exception e) {
log.error("netty send error ",e);
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}

@ -0,0 +1,30 @@
package cn.hippo4j.springboot.starter.monitor.netty;
import cn.hippo4j.common.monitor.MessageWrapper;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* SenderHandler
*
* @author lk
* @date 2022/06/18
*/
@Slf4j
@AllArgsConstructor
public class SenderHandler extends SimpleChannelInboundHandler<MessageWrapper> {
private MessageWrapper messageWrapper;
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageWrapper msg) throws Exception {
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.channel().writeAndFlush(messageWrapper);
}
}

@ -40,6 +40,8 @@ public class ServerListManager {
private String serverAddrsStr;
private String nettyServerPort;
@Getter
volatile List<String> serverUrls = new ArrayList();
@ -52,6 +54,7 @@ public class ServerListManager {
public ServerListManager(BootstrapProperties dynamicThreadPoolProperties) {
this.properties = dynamicThreadPoolProperties;
serverAddrsStr = properties.getServerAddr();
nettyServerPort = properties.getNettyServerPort();
if (!StringUtils.isEmpty(serverAddrsStr)) {
List<String> serverAddrList = new ArrayList();
String[] serverAddrListArr = this.serverAddrsStr.split(",");
@ -76,6 +79,10 @@ public class ServerListManager {
return currentServerAddr;
}
public String getNettyServerPort(){
return nettyServerPort;
}
Iterator<String> iterator() {
return new ServerAddressIterator(serverUrls);
}

@ -0,0 +1,36 @@
package cn.hippo4j.springboot.starter.remote;
import cn.hippo4j.springboot.starter.config.BootstrapProperties;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
/**
* Server Netty Agent
*
* @author lk
* @date 2022/6/18
*/
public class ServerNettyAgent {
private final BootstrapProperties dynamicThreadPoolProperties;
private final ServerListManager serverListManager;
public ServerNettyAgent(BootstrapProperties properties){
this.dynamicThreadPoolProperties = properties;
this.serverListManager = new ServerListManager(dynamicThreadPoolProperties);
}
public EventLoopGroup getEventLoopGroup(){
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
return eventLoopGroup;
}
public String getNettyServerAddress() {
return serverListManager.getCurrentServerAddr().split(":")[1].replace("//","");
}
public Integer getNettyServerPort(){
return Integer.parseInt(serverListManager.getNettyServerPort());
}
}

@ -6,6 +6,18 @@
"defaultValue": "localhost:6691",
"description": "dynamic thread-pool server address."
},
{
"name": "spring.dynamic.thread-pool.netty-server-port",
"type": "java.lang.String",
"defaultValue": "8899",
"description": "dynamic thread-pool server netty port."
},
{
"name": "spring.dynamic.thread-pool.report-type",
"type": "java.lang.String",
"defaultValue": "http",
"description": "dynamic thread-pool report-type."
},
{
"name": "spring.dynamic.thread-pool.namespace",
"type": "java.lang.String",

@ -49,6 +49,7 @@
<apollo.version>1.9.1</apollo.version>
<rocketmq.version>2.2.2</rocketmq.version>
<spring-cloud-starter-stream-rocketmq.version>2.2.6.RELEASE</spring-cloud-starter-stream-rocketmq.version>
<netty.version>4.1.10.Final</netty.version>
<maven.javadoc.failOnError>false</maven.javadoc.failOnError>
<maven.compiler.encoding>UTF-8</maven.compiler.encoding>
@ -236,6 +237,12 @@
<version>${spring-boot.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

Loading…
Cancel
Save