监控适配netty

pull/277/head
shining-stars-lk 2 years ago
parent 6f6ad9e01d
commit 4b644453ba

@ -69,5 +69,10 @@
<groupId>cn.hippo4j</groupId> <groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-adapter-base</artifactId> <artifactId>hippo4j-adapter-base</artifactId>
</dependency> </dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>

@ -0,0 +1,32 @@
package cn.hippo4j.config.config;
import cn.hippo4j.config.netty.MonitorNettyServer;
import cn.hippo4j.config.service.biz.HisRunDataService;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class NettyServerConfig {
@Bean
public EventLoopGroup bossGroup(){
return new NioEventLoopGroup();
}
@Bean
public EventLoopGroup workGroup(){
return new NioEventLoopGroup();
}
@Bean
public MonitorNettyServer monitorNettyServer(ServerBootstrapProperties serverBootstrapProperties,
HisRunDataService hisRunDataService,
EventLoopGroup bossGroup,
EventLoopGroup workGroup){
return new MonitorNettyServer(serverBootstrapProperties,hisRunDataService,bossGroup,workGroup);
}
}

@ -48,4 +48,5 @@ public class ServerBootstrapProperties {
*/ */
private Integer cleanHistoryDataPeriod = 30; private Integer cleanHistoryDataPeriod = 30;
private String nettyServerPort = "8899";
} }

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package cn.hippo4j.console.controller; package cn.hippo4j.config.controller;
import cn.hippo4j.common.constant.Constants; import cn.hippo4j.common.constant.Constants;
import cn.hippo4j.common.monitor.Message; import cn.hippo4j.common.monitor.Message;
@ -70,15 +70,6 @@ public class MonitorController {
@PostMapping @PostMapping
public Result<Void> dataCollect(@RequestBody MessageWrapper messageWrapper) { public Result<Void> dataCollect(@RequestBody MessageWrapper messageWrapper) {
Runnable task = () -> { return hisRunDataService.dataCollect(messageWrapper);
Message message = MessageConvert.convert(messageWrapper);
queryMonitorExecuteChoose.chooseAndExecute(message);
};
try {
monitorThreadPoolTaskExecutor.execute(task);
} catch (Exception ex) {
log.error("Monitoring data insertion database task overflow.", ex);
}
return Results.success();
} }
} }

@ -1,6 +1,8 @@
package cn.hippo4j.console.netty; package cn.hippo4j.config.netty;
import cn.hippo4j.config.config.ServerBootstrapProperties;
import cn.hippo4j.config.monitor.QueryMonitorExecuteChoose; import cn.hippo4j.config.monitor.QueryMonitorExecuteChoose;
import cn.hippo4j.config.service.biz.HisRunDataService;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
@ -20,6 +22,7 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.util.concurrent.ThreadPoolExecutor;
/** /**
* Netty MonitorNettyServer * Netty MonitorNettyServer
@ -29,19 +32,19 @@ import javax.annotation.PostConstruct;
*/ */
@Slf4j @Slf4j
@AllArgsConstructor @AllArgsConstructor
@Component
public class MonitorNettyServer { public class MonitorNettyServer {
private final QueryMonitorExecuteChoose queryMonitorExecuteChoose; private ServerBootstrapProperties serverBootstrapProperties;
private final ThreadPoolTaskExecutor monitorThreadPoolTaskExecutor; private HisRunDataService hisRunDataService;
private EventLoopGroup bossGroup;
private EventLoopGroup workGroup;
@PostConstruct @PostConstruct
public void nettyServerInit(){ public void nettyServerInit(){
new Thread(() -> { new Thread(() -> {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try { try {
ServerBootstrap serverBootstrap = new ServerBootstrap(); ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workGroup) serverBootstrap.group(bossGroup,workGroup)
@ -56,18 +59,17 @@ public class MonitorNettyServer {
pipeline.addLast(new ObjectEncoder()); pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE,
ClassResolvers.cacheDisabled(null))); ClassResolvers.cacheDisabled(null)));
pipeline.addLast(new ServerHandler(queryMonitorExecuteChoose, pipeline.addLast(new ServerHandler(hisRunDataService));
monitorThreadPoolTaskExecutor));
} }
}); });
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync(); ChannelFuture channelFuture = serverBootstrap.bind(Integer.parseInt(serverBootstrapProperties.getNettyServerPort())).sync();
channelFuture.channel().closeFuture().sync(); channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) { } catch (Exception e) {
e.printStackTrace(); log.error("nettyServerInit error",e);
} finally { } finally {
bossGroup.shutdownGracefully(); bossGroup.shutdownGracefully();
workGroup.shutdownGracefully(); workGroup.shutdownGracefully();
} }
},"netty-server-thread").start(); },"nettyServerInit thread").start();
} }
} }

@ -1,9 +1,10 @@
package cn.hippo4j.console.netty; package cn.hippo4j.config.netty;
import cn.hippo4j.common.monitor.Message; import cn.hippo4j.common.monitor.Message;
import cn.hippo4j.common.monitor.MessageWrapper; import cn.hippo4j.common.monitor.MessageWrapper;
import cn.hippo4j.common.toolkit.MessageConvert; import cn.hippo4j.common.toolkit.MessageConvert;
import cn.hippo4j.config.monitor.QueryMonitorExecuteChoose; import cn.hippo4j.config.monitor.QueryMonitorExecuteChoose;
import cn.hippo4j.config.service.biz.HisRunDataService;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
@ -20,20 +21,10 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@AllArgsConstructor @AllArgsConstructor
public class ServerHandler extends SimpleChannelInboundHandler<MessageWrapper> { public class ServerHandler extends SimpleChannelInboundHandler<MessageWrapper> {
private QueryMonitorExecuteChoose queryMonitorExecuteChoose; private HisRunDataService hisRunDataService;
private ThreadPoolTaskExecutor monitorThreadPoolTaskExecutor;
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, MessageWrapper msg) throws Exception { protected void channelRead0(ChannelHandlerContext ctx, MessageWrapper msg) throws Exception {
Runnable task = () -> { hisRunDataService.dataCollect(msg);
Message message = MessageConvert.convert(msg);
queryMonitorExecuteChoose.chooseAndExecute(message);
};
try {
monitorThreadPoolTaskExecutor.execute(task);
} catch (Exception ex) {
log.error("Monitoring data insertion database task overflow.", ex);
}
} }
} }

@ -18,11 +18,14 @@
package cn.hippo4j.config.service.biz; package cn.hippo4j.config.service.biz;
import cn.hippo4j.common.monitor.Message; import cn.hippo4j.common.monitor.Message;
import cn.hippo4j.common.monitor.MessageWrapper;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.config.model.HisRunDataInfo; import cn.hippo4j.config.model.HisRunDataInfo;
import cn.hippo4j.config.model.biz.monitor.MonitorActiveRespDTO; import cn.hippo4j.config.model.biz.monitor.MonitorActiveRespDTO;
import cn.hippo4j.config.model.biz.monitor.MonitorQueryReqDTO; import cn.hippo4j.config.model.biz.monitor.MonitorQueryReqDTO;
import cn.hippo4j.config.model.biz.monitor.MonitorRespDTO; import cn.hippo4j.config.model.biz.monitor.MonitorRespDTO;
import com.baomidou.mybatisplus.extension.service.IService; import com.baomidou.mybatisplus.extension.service.IService;
import org.springframework.web.bind.annotation.RequestBody;
import java.util.List; import java.util.List;
@ -65,4 +68,11 @@ public interface HisRunDataService extends IService<HisRunDataInfo> {
*/ */
void save(Message message); void save(Message message);
/**
* dataCollect.
*
* @param messageWrapper
*/
Result<Void> dataCollect(MessageWrapper messageWrapper);
} }

@ -18,14 +18,19 @@
package cn.hippo4j.config.service.biz.impl; package cn.hippo4j.config.service.biz.impl;
import cn.hippo4j.common.monitor.Message; import cn.hippo4j.common.monitor.Message;
import cn.hippo4j.common.monitor.MessageWrapper;
import cn.hippo4j.common.monitor.RuntimeMessage; import cn.hippo4j.common.monitor.RuntimeMessage;
import cn.hippo4j.common.toolkit.GroupKey; import cn.hippo4j.common.toolkit.GroupKey;
import cn.hippo4j.common.toolkit.MessageConvert;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.common.web.base.Results;
import cn.hippo4j.config.config.ServerBootstrapProperties; import cn.hippo4j.config.config.ServerBootstrapProperties;
import cn.hippo4j.config.mapper.HisRunDataMapper; import cn.hippo4j.config.mapper.HisRunDataMapper;
import cn.hippo4j.config.model.HisRunDataInfo; import cn.hippo4j.config.model.HisRunDataInfo;
import cn.hippo4j.config.model.biz.monitor.MonitorActiveRespDTO; import cn.hippo4j.config.model.biz.monitor.MonitorActiveRespDTO;
import cn.hippo4j.config.model.biz.monitor.MonitorQueryReqDTO; import cn.hippo4j.config.model.biz.monitor.MonitorQueryReqDTO;
import cn.hippo4j.config.model.biz.monitor.MonitorRespDTO; import cn.hippo4j.config.model.biz.monitor.MonitorRespDTO;
import cn.hippo4j.config.monitor.QueryMonitorExecuteChoose;
import cn.hippo4j.config.service.biz.HisRunDataService; import cn.hippo4j.config.service.biz.HisRunDataService;
import cn.hippo4j.config.toolkit.BeanUtil; import cn.hippo4j.config.toolkit.BeanUtil;
import cn.hutool.core.date.DateTime; import cn.hutool.core.date.DateTime;
@ -33,6 +38,7 @@ import cn.hutool.core.date.DateUtil;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
@ -54,6 +60,10 @@ public class HisRunDataServiceImpl extends ServiceImpl<HisRunDataMapper, HisRunD
private final ServerBootstrapProperties properties; private final ServerBootstrapProperties properties;
private final QueryMonitorExecuteChoose queryMonitorExecuteChoose;
private final ThreadPoolTaskExecutor monitorThreadPoolTaskExecutor;
@Override @Override
public List<MonitorRespDTO> query(MonitorQueryReqDTO reqDTO) { public List<MonitorRespDTO> query(MonitorQueryReqDTO reqDTO) {
Date currentDate = new Date(); Date currentDate = new Date();
@ -166,4 +176,18 @@ public class HisRunDataServiceImpl extends ServiceImpl<HisRunDataMapper, HisRunD
this.saveBatch(hisRunDataInfos); this.saveBatch(hisRunDataInfos);
} }
@Override
public Result<Void> dataCollect(MessageWrapper messageWrapper) {
Runnable task = () -> {
Message message = MessageConvert.convert(messageWrapper);
queryMonitorExecuteChoose.chooseAndExecute(message);
};
try {
monitorThreadPoolTaskExecutor.execute(task);
} catch (Exception ex) {
log.error("Monitoring data insertion database task overflow.", ex);
}
return Results.success();
}
} }

@ -43,11 +43,6 @@
<groupId>cn.hutool</groupId> <groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId> <artifactId>hutool-all</artifactId>
</dependency> </dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>

@ -7,7 +7,7 @@ spring.application.name=dynamic-threadpool-example
spring.dynamic.thread-pool.server-addr=http://localhost:6691 spring.dynamic.thread-pool.server-addr=http://localhost:6691
spring.dynamic.thread-pool.netty-server-port=8899 spring.dynamic.thread-pool.netty-server-port=8899
spring.dynamic.thread-pool.report-type=netty #spring.dynamic.thread-pool.report-type=netty
spring.dynamic.thread-pool.namespace=prescription spring.dynamic.thread-pool.namespace=prescription
spring.dynamic.thread-pool.item-id=dynamic-threadpool-example spring.dynamic.thread-pool.item-id=dynamic-threadpool-example
spring.dynamic.thread-pool.username=admin spring.dynamic.thread-pool.username=admin

@ -36,13 +36,12 @@ import cn.hippo4j.springboot.starter.event.ApplicationContentPostProcessor;
import cn.hippo4j.springboot.starter.core.ThreadPoolAdapterRegister; import cn.hippo4j.springboot.starter.core.ThreadPoolAdapterRegister;
import cn.hippo4j.springboot.starter.monitor.ReportingEventExecutor; import cn.hippo4j.springboot.starter.monitor.ReportingEventExecutor;
import cn.hippo4j.springboot.starter.monitor.collect.RunTimeInfoCollector; import cn.hippo4j.springboot.starter.monitor.collect.RunTimeInfoCollector;
import cn.hippo4j.springboot.starter.monitor.send.HttpConnectSender; import cn.hippo4j.springboot.starter.monitor.send.http.HttpConnectSender;
import cn.hippo4j.springboot.starter.monitor.send.MessageSender; import cn.hippo4j.springboot.starter.monitor.send.MessageSender;
import cn.hippo4j.springboot.starter.remote.HttpAgent; import cn.hippo4j.springboot.starter.remote.HttpAgent;
import cn.hippo4j.springboot.starter.remote.HttpScheduledHealthCheck; import cn.hippo4j.springboot.starter.remote.HttpScheduledHealthCheck;
import cn.hippo4j.springboot.starter.remote.ServerHealthCheck; import cn.hippo4j.springboot.starter.remote.ServerHealthCheck;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration; import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
@ -121,7 +120,7 @@ public class DynamicThreadPoolAutoConfiguration {
@Bean @Bean
@ConditionalOnMissingBean @ConditionalOnMissingBean
@SuppressWarnings("all") @SuppressWarnings("all")
public MessageSender httpMvcSender(HttpAgent httpAgent) { public MessageSender messageSender(HttpAgent httpAgent) {
return new HttpConnectSender(httpAgent); return new HttpConnectSender(httpAgent);
} }

@ -1,13 +1,10 @@
package cn.hippo4j.springboot.starter.config; package cn.hippo4j.springboot.starter.config;
import cn.hippo4j.springboot.starter.monitor.netty.NettyConnectSender; import cn.hippo4j.springboot.starter.monitor.send.netty.NettyConnectSender;
import cn.hippo4j.springboot.starter.monitor.send.MessageSender; import cn.hippo4j.springboot.starter.monitor.send.MessageSender;
import cn.hippo4j.springboot.starter.remote.ServerNettyAgent; import cn.hippo4j.springboot.starter.remote.ServerNettyAgent;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/** /**
* Netty ClientCon figuration * Netty ClientCon figuration
@ -25,7 +22,7 @@ public class NettyClientConfiguration {
} }
@Bean @Bean
public MessageSender nettyConnectSender(ServerNettyAgent serverNettyAgent){ public MessageSender messageSender(ServerNettyAgent serverNettyAgent){
return new NettyConnectSender(serverNettyAgent); return new NettyConnectSender(serverNettyAgent);
} }
} }

@ -15,11 +15,12 @@
* limitations under the License. * limitations under the License.
*/ */
package cn.hippo4j.springboot.starter.monitor.send; package cn.hippo4j.springboot.starter.monitor.send.http;
import cn.hippo4j.common.monitor.Message; import cn.hippo4j.common.monitor.Message;
import cn.hippo4j.common.monitor.MessageWrapper; import cn.hippo4j.common.monitor.MessageWrapper;
import cn.hippo4j.common.toolkit.MessageConvert; import cn.hippo4j.common.toolkit.MessageConvert;
import cn.hippo4j.springboot.starter.monitor.send.MessageSender;
import cn.hippo4j.springboot.starter.remote.HttpAgent; import cn.hippo4j.springboot.starter.remote.HttpAgent;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;

@ -1,4 +1,4 @@
package cn.hippo4j.springboot.starter.monitor.netty; package cn.hippo4j.springboot.starter.monitor.send.netty;
import cn.hippo4j.common.monitor.Message; import cn.hippo4j.common.monitor.Message;
import cn.hippo4j.common.monitor.MessageWrapper; import cn.hippo4j.common.monitor.MessageWrapper;
@ -53,8 +53,8 @@ public class NettyConnectSender implements MessageSender {
bootstrap.connect(serverNettyAgent.getNettyServerAddress(), serverNettyAgent.getNettyServerPort()).sync(); bootstrap.connect(serverNettyAgent.getNettyServerAddress(), serverNettyAgent.getNettyServerPort()).sync();
} catch (Exception e) { } catch (Exception e) {
log.error("netty send error ",e); log.error("netty send error ",e);
} finally { } /*finally {
eventLoopGroup.shutdownGracefully(); eventLoopGroup.shutdownGracefully();
} }*/
} }
} }

@ -1,4 +1,4 @@
package cn.hippo4j.springboot.starter.monitor.netty; package cn.hippo4j.springboot.starter.monitor.send.netty;
import cn.hippo4j.common.monitor.MessageWrapper; import cn.hippo4j.common.monitor.MessageWrapper;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;

@ -16,13 +16,15 @@ public class ServerNettyAgent {
private final ServerListManager serverListManager; private final ServerListManager serverListManager;
private EventLoopGroup eventLoopGroup;
public ServerNettyAgent(BootstrapProperties properties){ public ServerNettyAgent(BootstrapProperties properties){
this.dynamicThreadPoolProperties = properties; this.dynamicThreadPoolProperties = properties;
this.serverListManager = new ServerListManager(dynamicThreadPoolProperties); this.serverListManager = new ServerListManager(dynamicThreadPoolProperties);
this.eventLoopGroup = new NioEventLoopGroup();
} }
public EventLoopGroup getEventLoopGroup(){ public EventLoopGroup getEventLoopGroup(){
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
return eventLoopGroup; return eventLoopGroup;
} }

Loading…
Cancel
Save