Merge pull request #277 from shining-stars-lk/develop

监控添加netty通信
1.3.1
小马哥 3 years ago committed by GitHub
commit ce53d6b36b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -21,6 +21,7 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
@ -30,7 +31,7 @@ import java.util.Map;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MessageWrapper implements MessageRequest<Message> {
public class MessageWrapper implements MessageRequest<Message>, Serializable {
/**
* contentParams

@ -69,5 +69,10 @@
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-adapter-base</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
</dependencies>
</project>

@ -0,0 +1,32 @@
package cn.hippo4j.config.config;
import cn.hippo4j.config.netty.MonitorNettyServer;
import cn.hippo4j.config.service.biz.HisRunDataService;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class NettyServerConfig {
@Bean
public EventLoopGroup bossGroup(){
return new NioEventLoopGroup();
}
@Bean
public EventLoopGroup workGroup(){
return new NioEventLoopGroup();
}
@Bean
public MonitorNettyServer monitorNettyServer(ServerBootstrapProperties serverBootstrapProperties,
HisRunDataService hisRunDataService,
EventLoopGroup bossGroup,
EventLoopGroup workGroup){
return new MonitorNettyServer(serverBootstrapProperties,hisRunDataService,bossGroup,workGroup);
}
}

@ -48,4 +48,5 @@ public class ServerBootstrapProperties {
*/
private Integer cleanHistoryDataPeriod = 30;
private String nettyServerPort = "8899";
}

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.console.controller;
package cn.hippo4j.config.controller;
import cn.hippo4j.common.constant.Constants;
import cn.hippo4j.common.monitor.Message;
@ -70,15 +70,6 @@ public class MonitorController {
@PostMapping
public Result<Void> dataCollect(@RequestBody MessageWrapper messageWrapper) {
Runnable task = () -> {
Message message = MessageConvert.convert(messageWrapper);
queryMonitorExecuteChoose.chooseAndExecute(message);
};
try {
monitorThreadPoolTaskExecutor.execute(task);
} catch (Exception ex) {
log.error("Monitoring data insertion database task overflow.", ex);
}
return Results.success();
return hisRunDataService.dataCollect(messageWrapper);
}
}

@ -0,0 +1,79 @@
package cn.hippo4j.config.netty;
import cn.hippo4j.config.config.ServerBootstrapProperties;
import cn.hippo4j.config.monitor.QueryMonitorExecuteChoose;
import cn.hippo4j.config.service.biz.HisRunDataService;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Netty MonitorNettyServer
*
* @author lk
* @date 2022/06/18
*/
@Slf4j
@AllArgsConstructor
public class MonitorNettyServer {
private ServerBootstrapProperties serverBootstrapProperties;
private HisRunDataService hisRunDataService;
private EventLoopGroup bossGroup;
private EventLoopGroup workGroup;
@PostConstruct
public void nettyServerInit(){
new Thread(() -> {
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
//childHandler的任务由workGroup来执行
//如果是handler则由bossGroup来执行
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE,
ClassResolvers.cacheDisabled(null)));
pipeline.addLast(new ServerHandler(hisRunDataService));
}
});
ChannelFuture channelFuture = serverBootstrap.bind(Integer.parseInt(serverBootstrapProperties.getNettyServerPort())).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
log.error("nettyServerInit error",e);
}
},"nettyServerInit thread").start();
}
@PreDestroy
public void destroy(){
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}

@ -0,0 +1,30 @@
package cn.hippo4j.config.netty;
import cn.hippo4j.common.monitor.Message;
import cn.hippo4j.common.monitor.MessageWrapper;
import cn.hippo4j.common.toolkit.MessageConvert;
import cn.hippo4j.config.monitor.QueryMonitorExecuteChoose;
import cn.hippo4j.config.service.biz.HisRunDataService;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
* ServerHandler
*
* @author lk
* @date 2022/06/18
*/
@Slf4j
@AllArgsConstructor
public class ServerHandler extends SimpleChannelInboundHandler<MessageWrapper> {
private HisRunDataService hisRunDataService;
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageWrapper msg) throws Exception {
hisRunDataService.dataCollect(msg);
}
}

@ -18,11 +18,14 @@
package cn.hippo4j.config.service.biz;
import cn.hippo4j.common.monitor.Message;
import cn.hippo4j.common.monitor.MessageWrapper;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.config.model.HisRunDataInfo;
import cn.hippo4j.config.model.biz.monitor.MonitorActiveRespDTO;
import cn.hippo4j.config.model.biz.monitor.MonitorQueryReqDTO;
import cn.hippo4j.config.model.biz.monitor.MonitorRespDTO;
import com.baomidou.mybatisplus.extension.service.IService;
import org.springframework.web.bind.annotation.RequestBody;
import java.util.List;
@ -65,4 +68,11 @@ public interface HisRunDataService extends IService<HisRunDataInfo> {
*/
void save(Message message);
/**
* dataCollect.
*
* @param messageWrapper
*/
Result<Void> dataCollect(MessageWrapper messageWrapper);
}

@ -18,14 +18,19 @@
package cn.hippo4j.config.service.biz.impl;
import cn.hippo4j.common.monitor.Message;
import cn.hippo4j.common.monitor.MessageWrapper;
import cn.hippo4j.common.monitor.RuntimeMessage;
import cn.hippo4j.common.toolkit.GroupKey;
import cn.hippo4j.common.toolkit.MessageConvert;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.common.web.base.Results;
import cn.hippo4j.config.config.ServerBootstrapProperties;
import cn.hippo4j.config.mapper.HisRunDataMapper;
import cn.hippo4j.config.model.HisRunDataInfo;
import cn.hippo4j.config.model.biz.monitor.MonitorActiveRespDTO;
import cn.hippo4j.config.model.biz.monitor.MonitorQueryReqDTO;
import cn.hippo4j.config.model.biz.monitor.MonitorRespDTO;
import cn.hippo4j.config.monitor.QueryMonitorExecuteChoose;
import cn.hippo4j.config.service.biz.HisRunDataService;
import cn.hippo4j.config.toolkit.BeanUtil;
import cn.hutool.core.date.DateTime;
@ -33,6 +38,7 @@ import cn.hutool.core.date.DateUtil;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@ -54,6 +60,10 @@ public class HisRunDataServiceImpl extends ServiceImpl<HisRunDataMapper, HisRunD
private final ServerBootstrapProperties properties;
private final QueryMonitorExecuteChoose queryMonitorExecuteChoose;
private final ThreadPoolTaskExecutor monitorThreadPoolTaskExecutor;
@Override
public List<MonitorRespDTO> query(MonitorQueryReqDTO reqDTO) {
Date currentDate = new Date();
@ -166,4 +176,18 @@ public class HisRunDataServiceImpl extends ServiceImpl<HisRunDataMapper, HisRunD
this.saveBatch(hisRunDataInfos);
}
@Override
public Result<Void> dataCollect(MessageWrapper messageWrapper) {
Runnable task = () -> {
Message message = MessageConvert.convert(messageWrapper);
queryMonitorExecuteChoose.chooseAndExecute(message);
};
try {
monitorThreadPoolTaskExecutor.execute(task);
} catch (Exception ex) {
log.error("Monitoring data insertion database task overflow.", ex);
}
return Results.success();
}
}

@ -6,6 +6,8 @@ spring.profiles.active=dev
spring.application.name=dynamic-threadpool-example
spring.dynamic.thread-pool.server-addr=http://localhost:6691
spring.dynamic.thread-pool.netty-server-port=8899
spring.dynamic.thread-pool.report-type=netty
spring.dynamic.thread-pool.namespace=prescription
spring.dynamic.thread-pool.item-id=dynamic-threadpool-example
spring.dynamic.thread-pool.username=admin

@ -55,6 +55,7 @@ public abstract class AbstractCoreThreadPoolDynamicRefresh implements ThreadPool
dynamicRefresh(configContent, null);
}
@Override
public void dynamicRefresh(String configContent, Map<String, Object> newValueChangeMap) {
try {
Map<Object, Object> configInfo = ConfigParserHandler.getInstance().parseConfig(configContent, bootstrapCoreProperties.getConfigFileType());

@ -86,6 +86,12 @@
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-adapter-base</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
</dependencies>
<build>

@ -52,6 +52,15 @@ public class BootstrapProperties implements BootstrapPropertiesInterface {
*/
private String serverAddr;
/**
* Netty server port
*/
private String nettyServerPort;
/**
* Report type
*/
private String reportType;
/**
* Namespace
*/

@ -36,7 +36,7 @@ import cn.hippo4j.springboot.starter.event.ApplicationContentPostProcessor;
import cn.hippo4j.springboot.starter.core.ThreadPoolAdapterRegister;
import cn.hippo4j.springboot.starter.monitor.ReportingEventExecutor;
import cn.hippo4j.springboot.starter.monitor.collect.RunTimeInfoCollector;
import cn.hippo4j.springboot.starter.monitor.send.HttpConnectSender;
import cn.hippo4j.springboot.starter.monitor.send.http.HttpConnectSender;
import cn.hippo4j.springboot.starter.monitor.send.MessageSender;
import cn.hippo4j.springboot.starter.remote.HttpAgent;
import cn.hippo4j.springboot.starter.remote.HttpScheduledHealthCheck;
@ -64,7 +64,7 @@ import org.springframework.core.env.ConfigurableEnvironment;
@ConditionalOnBean(MarkerConfiguration.Marker.class)
@EnableConfigurationProperties(BootstrapProperties.class)
@ConditionalOnProperty(prefix = BootstrapProperties.PREFIX, value = "enable", matchIfMissing = true, havingValue = "true")
@ImportAutoConfiguration({HttpClientConfiguration.class, DiscoveryConfiguration.class, MessageNotifyConfiguration.class, UtilAutoConfiguration.class, WebThreadPoolConfiguration.class})
@ImportAutoConfiguration({HttpClientConfiguration.class, NettyClientConfiguration.class, DiscoveryConfiguration.class, MessageNotifyConfiguration.class, UtilAutoConfiguration.class, WebThreadPoolConfiguration.class})
public class DynamicThreadPoolAutoConfiguration {
private final BootstrapProperties properties;
@ -116,9 +116,11 @@ public class DynamicThreadPoolAutoConfiguration {
return new WebThreadPoolRunStateController(threadPoolRunStateHandler, threadDetailState);
}
@Bean
@ConditionalOnMissingBean
@SuppressWarnings("all")
public HttpConnectSender httpMvcSender(HttpAgent httpAgent) {
public MessageSender messageSender(HttpAgent httpAgent) {
return new HttpConnectSender(httpAgent);
}

@ -0,0 +1,28 @@
package cn.hippo4j.springboot.starter.config;
import cn.hippo4j.springboot.starter.monitor.send.netty.NettyConnectSender;
import cn.hippo4j.springboot.starter.monitor.send.MessageSender;
import cn.hippo4j.springboot.starter.remote.ServerNettyAgent;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
/**
* Netty ClientCon figuration
*
* @author lk
* @date 2022/6/18
*/
@ConditionalOnProperty(prefix = BootstrapProperties.PREFIX, name = "report-type", matchIfMissing = false, havingValue = "netty")
public class NettyClientConfiguration {
@Bean
@SuppressWarnings("all")
public ServerNettyAgent serverNettyAgent(BootstrapProperties properties) {
return new ServerNettyAgent(properties);
}
@Bean
public MessageSender messageSender(ServerNettyAgent serverNettyAgent){
return new NettyConnectSender(serverNettyAgent);
}
}

@ -15,11 +15,12 @@
* limitations under the License.
*/
package cn.hippo4j.springboot.starter.monitor.send;
package cn.hippo4j.springboot.starter.monitor.send.http;
import cn.hippo4j.common.monitor.Message;
import cn.hippo4j.common.monitor.MessageWrapper;
import cn.hippo4j.common.toolkit.MessageConvert;
import cn.hippo4j.springboot.starter.monitor.send.MessageSender;
import cn.hippo4j.springboot.starter.remote.HttpAgent;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@ -0,0 +1,60 @@
package cn.hippo4j.springboot.starter.monitor.send.netty;
import cn.hippo4j.common.monitor.Message;
import cn.hippo4j.common.monitor.MessageWrapper;
import cn.hippo4j.common.toolkit.MessageConvert;
import cn.hippo4j.springboot.starter.monitor.send.MessageSender;
import cn.hippo4j.springboot.starter.remote.ServerNettyAgent;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* Netty ConnectSender
*
* @author lk
* @date 2022/06/18
*/
@Slf4j
@AllArgsConstructor
@Component
public class NettyConnectSender implements MessageSender {
private ServerNettyAgent serverNettyAgent;
@Override
public void send(Message message) {
MessageWrapper messageWrapper = MessageConvert.convert(message);
EventLoopGroup eventLoopGroup = serverNettyAgent.getEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE,
ClassResolvers.cacheDisabled(null)));
pipeline.addLast(new SenderHandler(messageWrapper));
}
});
bootstrap.connect(serverNettyAgent.getNettyServerAddress(), serverNettyAgent.getNettyServerPort()).sync();
} catch (Exception e) {
log.error("netty send error ",e);
} /*finally {
eventLoopGroup.shutdownGracefully();
}*/
}
}

@ -0,0 +1,30 @@
package cn.hippo4j.springboot.starter.monitor.send.netty;
import cn.hippo4j.common.monitor.MessageWrapper;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* SenderHandler
*
* @author lk
* @date 2022/06/18
*/
@Slf4j
@AllArgsConstructor
public class SenderHandler extends SimpleChannelInboundHandler<MessageWrapper> {
private MessageWrapper messageWrapper;
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageWrapper msg) throws Exception {
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.channel().writeAndFlush(messageWrapper);
}
}

@ -40,6 +40,8 @@ public class ServerListManager {
private String serverAddrsStr;
private String nettyServerPort;
@Getter
volatile List<String> serverUrls = new ArrayList();
@ -52,6 +54,7 @@ public class ServerListManager {
public ServerListManager(BootstrapProperties dynamicThreadPoolProperties) {
this.properties = dynamicThreadPoolProperties;
serverAddrsStr = properties.getServerAddr();
nettyServerPort = properties.getNettyServerPort();
if (!StringUtils.isEmpty(serverAddrsStr)) {
List<String> serverAddrList = new ArrayList();
String[] serverAddrListArr = this.serverAddrsStr.split(",");
@ -76,6 +79,10 @@ public class ServerListManager {
return currentServerAddr;
}
public String getNettyServerPort(){
return nettyServerPort;
}
Iterator<String> iterator() {
return new ServerAddressIterator(serverUrls);
}

@ -0,0 +1,38 @@
package cn.hippo4j.springboot.starter.remote;
import cn.hippo4j.springboot.starter.config.BootstrapProperties;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
/**
* Server Netty Agent
*
* @author lk
* @date 2022/6/18
*/
public class ServerNettyAgent {
private final BootstrapProperties dynamicThreadPoolProperties;
private final ServerListManager serverListManager;
private EventLoopGroup eventLoopGroup;
public ServerNettyAgent(BootstrapProperties properties){
this.dynamicThreadPoolProperties = properties;
this.serverListManager = new ServerListManager(dynamicThreadPoolProperties);
this.eventLoopGroup = new NioEventLoopGroup();
}
public EventLoopGroup getEventLoopGroup(){
return eventLoopGroup;
}
public String getNettyServerAddress() {
return serverListManager.getCurrentServerAddr().split(":")[1].replace("//","");
}
public Integer getNettyServerPort(){
return Integer.parseInt(serverListManager.getNettyServerPort());
}
}

@ -6,6 +6,18 @@
"defaultValue": "localhost:6691",
"description": "dynamic thread-pool server address."
},
{
"name": "spring.dynamic.thread-pool.netty-server-port",
"type": "java.lang.String",
"defaultValue": "8899",
"description": "dynamic thread-pool server netty port."
},
{
"name": "spring.dynamic.thread-pool.report-type",
"type": "java.lang.String",
"defaultValue": "http",
"description": "dynamic thread-pool report-type."
},
{
"name": "spring.dynamic.thread-pool.namespace",
"type": "java.lang.String",

@ -50,6 +50,7 @@
<rocketmq.version>2.2.2</rocketmq.version>
<tomcat-embed-core.version>9.0.55</tomcat-embed-core.version>
<spring-cloud-starter-stream-rocketmq.version>2.2.6.RELEASE</spring-cloud-starter-stream-rocketmq.version>
<netty.version>4.1.10.Final</netty.version>
<maven.javadoc.failOnError>false</maven.javadoc.failOnError>
<maven.compiler.encoding>UTF-8</maven.compiler.encoding>
@ -237,6 +238,12 @@
<version>${spring-boot.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

Loading…
Cancel
Save