Merge remote-tracking branch 'origin/develop' into develop

pull/453/head
chen.ma 2 years ago
commit f496ceffbe

@ -101,7 +101,7 @@ public class ThreadPoolConfig {
.threadPoolId(threadPoolId)
.dynamicPool()
.build();
return dynamicExecutor;
return messageProduceDynamicExecutor;
}
}

@ -13,7 +13,8 @@ sidebar_position: 2
- <a href="#server-端宕机会影响-client-运行么">Server 端宕机会影响 Client 运行么</a>
- <a href="#hippo4j-的发布方式是怎样的-如何选择正确的版本">Hippo4J 的发布方式是怎样的?如何选择正确的版本</a>
- <a href="#群机器人接受不到通知报警">群机器人接受不到通知报警</a>
- <a href="#设置线程池参数优先级问题">设置线程池参数优先级问题</a>
- <a href="#线程池实例中修改队列容量参数问题">线程池实例中修改队列容量参数问题</a>
## 租户和项目在 Hippo4J 中是什么意思
@ -77,3 +78,11 @@ Hippo4J 发布时可能会涉及到两端发布,分别是 Server 和 Starter
如果使用 hippo4j-server请检查在 hippo4j-server 添加的报警通知记录,是否在客户端项目启动前,因为客户端只有在启动时会去 hippo4j-server 拉取报警通知记录。
重启客户端项目,会重新拉取最新报警推送配置,问题解决。
## 设置线程池参数优先级问题
- 当使用`@DynamicThreadPool`进行修饰的方法中和在管理界面设置中同时存在的话,则管理界面设置的优先级最高;
- 如果连接service端失败的话使用`@DynamicThreadPool`进行修饰设置的优先级最高。
## 线程池实例中修改队列容量参数问题
在线程池管理中添加时,只有当选择队列类型为`ResizableCapacityLinkedBlockingQueue`时,后续再进行修改容量大小时才会实时的刷新修改成功。

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.common.toolkit;
import lombok.SneakyThrows;
import org.springframework.core.io.ClassPathResource;
import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
/**
* File util;
*/
public class FileUtil {
@SneakyThrows
public static String readUtf8String(String path) {
String resultReadStr;
ClassPathResource classPathResource = new ClassPathResource(path);
try (
InputStream inputStream = classPathResource.getInputStream();
BufferedInputStream bis = new BufferedInputStream(inputStream);
ByteArrayOutputStream buf = new ByteArrayOutputStream()) {
int result = bis.read();
while (result != -1) {
buf.write((byte) result);
result = bis.read();
}
resultReadStr = buf.toString("UTF-8");
}
return resultReadStr;
}
}

@ -43,7 +43,6 @@ public final class Singleton {
return result == null ? null : (T) result;
}
/**
* Get a singleton object by key.
*

@ -0,0 +1,78 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-example</artifactId>
<version>${revision}</version>
</parent>
<artifactId>hippo4j-spring-boot-starter-es-monitor-example</artifactId>
<properties>
<maven.deploy.skip>true</maven.deploy.skip>
</properties>
<dependencies>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-example-core</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-core-spring-boot-starter</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-core</artifactId>
<version>${tomcat-embed-core.version}</version>
</dependency>
<!--<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</exclusion>
</exclusions>
</dependency>-->
<!--<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-undertow</artifactId>
</dependency>-->
<!--<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jetty</artifactId>
</dependency>-->
</dependencies>
<build>
<finalName>${project.artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.example.es.monitor;
import cn.hippo4j.core.enable.EnableDynamicThreadPool;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@EnableDynamicThreadPool
@SpringBootApplication(scanBasePackages = {"cn.hippo4j.example.core", "cn.hippo4j.monitor"})
public class Hippo4JExampleEsMonitorApplication {
public static void main(String[] args) {
SpringApplication.run(Hippo4JExampleEsMonitorApplication.class, args);
}
}

@ -0,0 +1,41 @@
server.port=8088
server.servlet.context-path=/example
spring.profiles.active=dev
spring.application.name=dynamic-threadpool-example
es.thread-pool-state.host = ip1:port,ip2:port
es.thread-pool-state.scheme = http
es.thread-pool-state.userName = xxx
es.thread-pool-state.password = xxx
es.thread-pool-state.index.name = thread-pool-state
spring.dynamic.thread-pool.item-id=test
spring.dynamic.thread-pool.enable=true
spring.dynamic.thread-pool.banner=false
spring.dynamic.thread-pool.collect=true
spring.dynamic.thread-pool.collect-type=es
spring.dynamic.thread-pool.notify-platforms[0].platform=DING
spring.dynamic.thread-pool.notify-platforms[0].token=xxx
spring.dynamic.thread-pool.notify-platforms[0].secret=xxx
spring.dynamic.thread-pool.apollo.namespace=threadpool.yaml
spring.dynamic.thread-pool.config-file-type=yaml
spring.dynamic.thread-pool.alarm=true
spring.dynamic.thread-pool.check-state-interval=3000
spring.dynamic.thread-pool.capacity-alarm=80
spring.dynamic.thread-pool.alarm-interval=8
spring.dynamic.thread-pool.executors[0].thread-pool-id=message-consume
spring.dynamic.thread-pool.executors[0].core-pool-size=32
spring.dynamic.thread-pool.executors[0].maximum-pool-size=32
spring.dynamic.thread-pool.executors[0].queue-capacity=999
spring.dynamic.thread-pool.executors[0].execute-time-out=1500
spring.dynamic.thread-pool.executors[0].blocking-queue=ResizableCapacityLinkedBlockIngQueue
spring.dynamic.thread-pool.executors[0].rejected-handler=CallerRunsPolicy
spring.dynamic.thread-pool.executors[0].keep-alive-time=1024
spring.dynamic.thread-pool.executors[0].allow-core-thread-time-out=true
spring.dynamic.thread-pool.executors[0].thread-name-prefix=untimely-thread-pool
spring.dynamic.thread-pool.executors[0].notify.is-alarm=true
spring.dynamic.thread-pool.executors[0].notify.capacity-alarm=20
spring.dynamic.thread-pool.executors[0].notify.interval=2
spring.dynamic.thread-pool.executors[0].notify.receive=xxx

@ -26,6 +26,16 @@
<version>${revision}</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>

@ -2,14 +2,21 @@ server.port=8088
server.servlet.context-path=/example
management.metrics.export.prometheus.enabled=true
management.server.port=29901
management.endpoints.web.exposure.include=*
spring.profiles.active=dev
spring.application.name=dynamic-threadpool-example
spring.dynamic.thread-pool.server-addr=http://localhost:6691
spring.dynamic.thread-pool.netty-server-port=8899
### Use netty to report thread pool monitoring data. The default is http.
# spring.dynamic.thread-pool.report-type=netty
# spring.dynamic.thread-pool.netty-server-port=8899
spring.dynamic.thread-pool.namespace=prescription
spring.dynamic.thread-pool.item-id=dynamic-threadpool-example
spring.dynamic.thread-pool.username=admin
spring.dynamic.thread-pool.password=123456
# Enable server and prometheus monitoring at the same time
spring.dynamic.thread-pool.collect-type=server,prometheus

@ -13,6 +13,7 @@
<modules>
<module>hippo4j-example-core</module>
<module>hippo4j-spring-boot-starter-example</module>
<module>hippo4j-spring-boot-starter-es-monitor-example</module>
<module>hippo4j-core-nacos-spring-boot-starter-example</module>
<module>hippo4j-core-apollo-spring-boot-starter-example</module>
<module>hippo4j-core-zookeeper-spring-boot-starter-example</module>

@ -25,7 +25,7 @@ import cn.hippo4j.message.platform.base.AbstractRobotSendMessageHandler;
import cn.hippo4j.message.platform.base.RobotMessageActualContent;
import cn.hippo4j.message.platform.base.RobotMessageExecuteDTO;
import cn.hippo4j.message.platform.constant.DingAlarmConstants;
import cn.hutool.core.io.FileUtil;
import cn.hippo4j.common.toolkit.FileUtil;
import com.dingtalk.api.DefaultDingTalkClient;
import com.dingtalk.api.DingTalkClient;
import com.dingtalk.api.request.OapiRobotSendRequest;

@ -26,7 +26,7 @@ import cn.hippo4j.message.request.AlarmNotifyRequest;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.io.FileUtil;
import cn.hippo4j.common.toolkit.FileUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpRequest;
import lombok.AllArgsConstructor;

@ -17,13 +17,13 @@
package cn.hippo4j.message.platform;
import cn.hippo4j.common.toolkit.FileUtil;
import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.common.toolkit.Singleton;
import cn.hippo4j.message.enums.NotifyPlatformEnum;
import cn.hippo4j.message.platform.base.AbstractRobotSendMessageHandler;
import cn.hippo4j.message.platform.base.RobotMessageActualContent;
import cn.hippo4j.message.platform.base.RobotMessageExecuteDTO;
import cn.hutool.core.io.FileUtil;
import cn.hutool.http.HttpRequest;
import lombok.Data;
import lombok.experimental.Accessors;
@ -44,15 +44,15 @@ public class WeChatSendMessageHandler extends AbstractRobotSendMessageHandler {
@Override
protected RobotMessageActualContent buildMessageActualContent() {
String weChatAlarmTxtKet = "message/robot/dynamic-thread-pool/wechat-alarm.txt";
String weChatConfigTxtKet = "message/robot/dynamic-thread-pool/wechat-alarm.txt";
String weChatAlarmTxtKey = "message/robot/dynamic-thread-pool/wechat-alarm.txt";
String weChatConfigTxtKey = "message/robot/dynamic-thread-pool/wechat-alarm.txt";
RobotMessageActualContent robotMessageActualContent = RobotMessageActualContent.builder()
.receiveSeparator("><@")
.changeSeparator(" ➲ ")
.replaceTxt(WE_CHAT_ALARM_TIMOUT_REPLACE_TXT)
.traceReplaceTxt(WE_CHAT_ALARM_TIMOUT_TRACE_REPLACE_TXT)
.alarmMessageContent(Singleton.get(weChatAlarmTxtKet, () -> FileUtil.readUtf8String(weChatAlarmTxtKet)))
.configMessageContent(Singleton.get(weChatConfigTxtKet, () -> FileUtil.readUtf8String(weChatConfigTxtKet)))
.alarmMessageContent(Singleton.get(weChatAlarmTxtKey, () -> FileUtil.readUtf8String(weChatAlarmTxtKey)))
.configMessageContent(Singleton.get(weChatConfigTxtKey, () -> FileUtil.readUtf8String(weChatConfigTxtKey)))
.build();
return robotMessageActualContent;
}

@ -22,5 +22,5 @@ package com.example.monitor.base;
*/
public enum MonitorTypeEnum {
LOG, PROMETHEUS, SERVER
LOG, PROMETHEUS, SERVER, ES
}

@ -0,0 +1,81 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-monitor</artifactId>
<version>${revision}</version>
</parent>
<artifactId>hippo4j-monitor-es</artifactId>
<dependencies>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-monitor-base</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
<version>${micrometer-core.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.txt</include>
<include>**/*.json</include>
</includes>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifestEntries>
<Implementation-Title>${project.artifactId}</Implementation-Title>
<Implementation-Version>${project.version}</Implementation-Version>
<Build-Time>${maven.build.timestamp}</Build-Time>
<Built-By>chen.ma</Built-By>
</manifestEntries>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.10.3</version>
<executions>
<execution>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.monitor.es;
import cn.hippo4j.common.config.ApplicationContextHolder;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.Strings;
import org.springframework.core.env.Environment;
import java.util.List;
/**
* Create by yuyang
* 2022/8/4 16:26
*/
@Slf4j
public class EsClientHolder {
private static String host;
private static String scheme;
private static String userName;
private static String password;
private static RestHighLevelClient client;
private static RestHighLevelClient initRestClient() {
try {
Environment environment = ApplicationContextHolder.getInstance().getEnvironment();
host = environment.getProperty("es.thread-pool-state.host");
scheme = environment.getProperty("es.thread-pool-state.schema");
userName = environment.getProperty("es.thread-pool-state.userName");
password = environment.getProperty("es.thread-pool-state.password");
List<HttpHost> hosts = parseHosts();
if (Strings.isNullOrEmpty(userName) || Strings.isNullOrEmpty(password)) {
client = new RestHighLevelClient(RestClient.builder(hosts.toArray(new HttpHost[]{})));
} else {
client = new RestHighLevelClient(RestClient.builder(hosts.toArray(new HttpHost[]{}))
.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(getCredentialsProvider())));
}
log.info("[ES RestHighLevelClient] success to connect eshost:{},scheme:{}", host, scheme);
return client;
} catch (Exception ex) {
log.error("[ES RestHighLevelClient] fail to connect es! cause:{}", Throwables.getStackTraceAsString(ex));
}
return null;
}
private static BasicCredentialsProvider getCredentialsProvider() {
if (!Strings.isNullOrEmpty(userName) && !Strings.isNullOrEmpty(password)) {
final BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(userName, password));
return credentialsProvider;
}
return null;
}
public static RestHighLevelClient getClient() {
return null == client ? initRestClient() : client;
}
private static List<HttpHost> parseHosts() {
String[] hostAndPorts = host.split(",");
List<HttpHost> hosts = Lists.newArrayList();
for (String hostAndPort : hostAndPorts) {
hostAndPort = hostAndPort.trim();
hosts.add(new HttpHost(hostAndPort.split(":")[0], Integer.parseInt(hostAndPort.split(":")[1]), scheme));
}
return hosts;
}
}

@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.monitor.es;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.model.ThreadPoolRunStateInfo;
import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler;
import cn.hippo4j.monitor.es.model.EsThreadPoolRunStateInfo;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.io.FileUtil;
import com.example.monitor.base.AbstractDynamicThreadPoolMonitor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.core.env.Environment;
import org.springframework.util.StringUtils;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
public class EsMonitorHandler extends AbstractDynamicThreadPoolMonitor {
public EsMonitorHandler(ThreadPoolRunStateHandler threadPoolRunStateHandler) {
super(threadPoolRunStateHandler);
}
private AtomicBoolean isIndexExist = null;
@Override
protected void execute(ThreadPoolRunStateInfo poolRunStateInfo) {
EsThreadPoolRunStateInfo esThreadPoolRunStateInfo = new EsThreadPoolRunStateInfo();
BeanUtil.copyProperties(poolRunStateInfo, esThreadPoolRunStateInfo);
Environment environment = ApplicationContextHolder.getInstance().getEnvironment();
String indexName = environment.getProperty("es.thread-pool-state.index.name", "thread-pool-state");
String applicationName = environment.getProperty("spring.application.name", "application");
if (!this.isExists(indexName)) {
List<String> rawMapping = FileUtil.readLines(new File(Thread.currentThread().getContextClassLoader().getResource("mapping.json").getPath()), StandardCharsets.UTF_8);
String mapping = String.join(" ", rawMapping);
// if index doesn't exsit, this function may try to create one, but recommend to create index manually.
this.createIndex(indexName, "_doc", mapping, null, null, null);
}
esThreadPoolRunStateInfo.setApplicationName(applicationName);
esThreadPoolRunStateInfo.setId(indexName + "-" + System.currentTimeMillis());
this.log2Es(esThreadPoolRunStateInfo, indexName);
}
public void log2Es(EsThreadPoolRunStateInfo esThreadPoolRunStateInfo, String indexName) {
RestHighLevelClient client = EsClientHolder.getClient();
try {
IndexRequest request = new IndexRequest(indexName, "_doc");
request.id(esThreadPoolRunStateInfo.getId());
String stateJson = JSONUtil.toJSONString(esThreadPoolRunStateInfo);
request.source(stateJson, XContentType.JSON);
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
log.info("write thread-pool state to es, id is :{}", response.getId());
} catch (Exception ex) {
log.error("es index error, the exception was thrown in create index. name:{},type:{},id:{}. {} ",
indexName,
"_doc",
esThreadPoolRunStateInfo.getId(),
ex);
}
}
public synchronized boolean isExists(String index) {
// cache check result
if (Objects.isNull(isIndexExist)) {
boolean exists = false;
GetIndexRequest request = new GetIndexRequest();
request.indices(index);
try {
RestHighLevelClient client = EsClientHolder.getClient();
exists = client.indices().exists(request, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("check es index fail");
}
isIndexExist = new AtomicBoolean(exists);
}
return isIndexExist.get();
}
public void createIndex(String index, String type, String mapping, Integer shards, Integer replicas, String alias) {
RestHighLevelClient client = EsClientHolder.getClient();
boolean acknowledged = false;
CreateIndexRequest request = new CreateIndexRequest(index);
if (StringUtils.hasText(mapping)) {
request.mapping(type, mapping, XContentType.JSON);
}
if (!Objects.isNull(shards) && !Objects.isNull(replicas)) {
request.settings(Settings.builder()
.put("index.number_of_shards", shards) // 5
.put("index.number_of_replicas", replicas));// 1
}
if (StringUtils.hasText(alias)) {
request.alias(new Alias(alias));
}
try {
CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
acknowledged = createIndexResponse.isAcknowledged();
} catch (IOException e) {
log.error("create es index exception", e);
}
if (acknowledged) {
log.info("create es index success");
isIndexExist.set(true);
} else {
log.error("create es index fail");
throw new RuntimeException("cannot auto create thread-pool state es index");
}
}
@Override
public String getType() {
return "es";
}
}

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.monitor.es.model;
import cn.hippo4j.common.model.ThreadPoolRunStateInfo;
import lombok.Getter;
import lombok.Setter;
/**
* Create by yuyang
* 2022/8/4 17:17
*/
@Getter
@Setter
public class EsThreadPoolRunStateInfo extends ThreadPoolRunStateInfo {
private String Id;
private String applicationName;
}

@ -0,0 +1,78 @@
{
"_doc": {
"properties": {
"activeSize": {
"type": "long"
},
"applicationName": {
"type": "keyword"
},
"clientLastRefreshTime": {
"type": "text"
},
"completedTaskCount": {
"type": "long"
},
"coreSize": {
"type": "long"
},
"currentLoad": {
"type": "text"
},
"freeMemory": {
"type": "text"
},
"host": {
"type": "keyword"
},
"id": {
"type": "text"
},
"largestPoolSize": {
"type": "long"
},
"maximumSize": {
"type": "long"
},
"memoryProportion": {
"type": "text"
},
"peakLoad": {
"type": "text"
},
"poolSize": {
"type": "long"
},
"queueCapacity": {
"type": "long"
},
"queueRemainingCapacity": {
"type": "long"
},
"queueSize": {
"type": "long"
},
"queueType": {
"type": "text"
},
"rejectCount": {
"type": "long"
},
"rejectedName": {
"type": "text"
},
"simpleCurrentLoad": {
"type": "long"
},
"simplePeakLoad": {
"type": "long"
},
"timestamp": {
"type": "date"
},
"tpId": {
"type": "keyword"
}
}
}
}

@ -14,6 +14,7 @@
<module>hippo4j-monitor-base</module>
<module>hippo4j-monitor-log</module>
<module>hippo4j-monitor-prometheus</module>
<module>hippo4j-monitor-es</module>
</modules>
<build>

@ -83,6 +83,12 @@
<artifactId>hippo4j-monitor-prometheus</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-monitor-es</artifactId>
<version>${revision}</version>
</dependency>
</dependencies>
<build>

@ -59,7 +59,8 @@ import org.springframework.core.annotation.Order;
@Import({
ConfigHandlerConfiguration.EmbeddedNacos.class, ConfigHandlerConfiguration.EmbeddedNacosCloud.class,
ConfigHandlerConfiguration.EmbeddedApollo.class, ConfigHandlerConfiguration.EmbeddedZookeeper.class,
MonitorHandlerConfiguration.EmbeddedLogMonitor.class, MonitorHandlerConfiguration.EmbeddedPrometheusMonitor.class
MonitorHandlerConfiguration.EmbeddedLogMonitor.class, MonitorHandlerConfiguration.EmbeddedPrometheusMonitor.class,
MonitorHandlerConfiguration.EmbeddedEsMonitor.class
})
public class DynamicThreadPoolCoreAutoConfiguration {

@ -18,8 +18,10 @@
package cn.hippo4j.core.springboot.starter.config;
import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler;
import cn.hippo4j.core.springboot.starter.config.condition.EsMonitorCondition;
import cn.hippo4j.core.springboot.starter.config.condition.LogMonitorCondition;
import cn.hippo4j.core.springboot.starter.config.condition.PrometheusMonitorCondition;
import cn.hippo4j.monitor.es.EsMonitorHandler;
import cn.hippo4j.monitor.log.LogMonitorHandler;
import cn.hippo4j.monitor.prometheus.PrometheusMonitorHandler;
import org.springframework.context.annotation.Bean;
@ -49,4 +51,13 @@ public class MonitorHandlerConfiguration {
return new PrometheusMonitorHandler(threadPoolRunStateHandler);
}
}
@Conditional(EsMonitorCondition.class)
static class EmbeddedEsMonitor {
@Bean
public EsMonitorHandler EsMonitorHandler(ThreadPoolRunStateHandler threadPoolRunStateHandler) {
return new EsMonitorHandler(threadPoolRunStateHandler);
}
}
}

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.springboot.starter.config.condition;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.core.springboot.starter.config.BootstrapCoreProperties;
import com.example.monitor.base.MonitorTypeEnum;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.type.AnnotatedTypeMetadata;
/**
* Prometheus monitor condition.
*/
public class EsMonitorCondition implements Condition {
@Override
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
String collectType = context.getEnvironment().getProperty(BootstrapCoreProperties.PREFIX + ".collect-type", "");
return StringUtil.isNotEmpty(collectType) && collectType.contains(MonitorTypeEnum.ES.name().toLowerCase()) ? true : false;
}
}

@ -102,6 +102,12 @@
<artifactId>hippo4j-monitor-prometheus</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-monitor-es</artifactId>
<version>${revision}</version>
</dependency>
</dependencies>
<build>

@ -112,7 +112,8 @@ public class ReportingEventExecutor implements Runnable, CommandLineRunner, Disp
new Integer(collectType.split(",").length),
ThreadFactoryBuilder.builder().daemon(true).prefix("client.scheduled.collect.data").build());
if (collectType.contains(MonitorTypeEnum.PROMETHEUS.name().toLowerCase())
|| collectType.contains(MonitorTypeEnum.LOG.name().toLowerCase())) {
|| collectType.contains(MonitorTypeEnum.LOG.name().toLowerCase())
|| collectType.contains(MonitorTypeEnum.ES.name().toLowerCase())) {
// Get all dynamic thread pool monitoring components.
threadPoolMonitors = ApplicationContextHolder.getBeansOfType(ThreadPoolMonitor.class);
collectVesselExecutor.scheduleWithFixedDelay(

@ -17,6 +17,7 @@
package cn.hippo4j.springboot.starter.remote;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.springboot.starter.event.ApplicationCompleteEvent;
import cn.hippo4j.springboot.starter.core.ShutdownExecuteException;
import cn.hippo4j.core.executor.support.ThreadFactoryBuilder;
@ -47,6 +48,11 @@ public abstract class AbstractHealthCheck implements ServerHealthCheck, Initiali
*/
private volatile boolean healthStatus = true;
/**
* Health check failure count
*/
private volatile int checkFailureCount = 0;
/**
* Client shutdown hook
*/
@ -89,11 +95,18 @@ public abstract class AbstractHealthCheck implements ServerHealthCheck, Initiali
if (healthCheckStatus) {
if (Objects.equals(healthStatus, false)) {
healthStatus = true;
checkFailureCount = 0;
log.info("The client reconnects to the server successfully.");
signalAllBizThread();
}
} else {
healthStatus = false;
checkFailureCount++;
if (checkFailureCount > 1 && checkFailureCount < 4) {
ThreadUtil.sleep(HEALTH_CHECK_INTERVAL * 1000 * (checkFailureCount - 1));
} else if (checkFailureCount >= 4) {
ThreadUtil.sleep(25000L);
}
}
}

@ -57,10 +57,12 @@
<spring-cloud-starter-stream-rocketmq.version>2.2.6.RELEASE</spring-cloud-starter-stream-rocketmq.version>
<spring-cloud-starter-netflix-hystrix.version>2.2.9.RELEASE</spring-cloud-starter-netflix-hystrix.version>
<spring-cloud-starter-alibaba-nacos-config.version>2.2.5.RELEASE</spring-cloud-starter-alibaba-nacos-config.version>
<elasticsearch.version>6.5.0</elasticsearch.version>
<!-- Maven & Build -->
<java.version>1.8</java.version>
<skip.gpg>true</skip.gpg>
<skip.spotless.apply>false</skip.spotless.apply>
<maven.javadoc.failOnError>false</maven.javadoc.failOnError>
<maven.compiler.encoding>UTF-8</maven.compiler.encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@ -381,6 +383,7 @@
<artifactId>spotless-maven-plugin</artifactId>
<version>${spotless-maven-plugin.version}</version>
<configuration>
<applySkip>${skip.spotless.apply}</applySkip>
<java>
<eclipse>
<file>${maven.multiModuleProjectDirectory}/dev-support/hippo4j_spotless_formatter.xml</file>

Loading…
Cancel
Save