Merge branch 'develop' into check-console

pull/930/head
李金来 3 years ago committed by GitHub
commit 84c2172f0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -135,6 +135,13 @@ Hippo-4J 通过对 JDK 线程池增强,以及扩展三方框架底层线程池
</a>
</td></tr>
<tr>
<td align="center">
<a href="https://github.com/wulangcode">
<img src="https://avatars.githubusercontent.com/u/48200100?v=4" width="50;" alt="wulangcode"/>
<br />
<sub><b>WuLang</b></sub>
</a>
</td>
<td align="center">
<a href="https://github.com/Gdk666">
<img src="https://avatars.githubusercontent.com/u/22442067?v=4" width="50;" alt="Gdk666"/>
@ -149,13 +156,6 @@ Hippo-4J 通过对 JDK 线程池增强,以及扩展三方框架底层线程池
<sub><b>黄成兴</b></sub>
</a>
</td>
<td align="center">
<a href="https://github.com/wulangcode">
<img src="https://avatars.githubusercontent.com/u/48200100?v=4" width="50;" alt="wulangcode"/>
<br />
<sub><b>WuLang</b></sub>
</a>
</td>
<td align="center">
<a href="https://github.com/xqxyxchy">
<img src="https://avatars.githubusercontent.com/u/21134578?v=4" width="50;" alt="xqxyxchy"/>
@ -419,6 +419,13 @@ Hippo-4J 通过对 JDK 线程池增强,以及扩展三方框架底层线程池
<sub><b>Malcolm</b></sub>
</a>
</td>
<td align="center">
<a href="https://github.com/PleasePerfunctory">
<img src="https://avatars.githubusercontent.com/u/47876885?v=4" width="50;" alt="PleasePerfunctory"/>
<br />
<sub><b>Null</b></sub>
</a>
</td>
<td align="center">
<a href="https://github.com/alexhaoxuan">
<img src="https://avatars.githubusercontent.com/u/46749051?v=4" width="50;" alt="alexhaoxuan"/>
@ -446,15 +453,15 @@ Hippo-4J 通过对 JDK 线程池增强,以及扩展三方框架底层线程池
<br />
<sub><b>Zhenye</b></sub>
</a>
</td>
</td></tr>
<tr>
<td align="center">
<a href="https://github.com/dongming0920">
<img src="https://avatars.githubusercontent.com/u/57832778?v=4" width="50;" alt="dongming0920"/>
<br />
<sub><b>Null</b></sub>
</a>
</td></tr>
<tr>
</td>
<td align="center">
<a href="https://github.com/f497196689">
<img src="https://avatars.githubusercontent.com/u/15325854?v=4" width="50;" alt="f497196689"/>
@ -510,15 +517,15 @@ Hippo-4J 通过对 JDK 线程池增强,以及扩展三方框架底层线程池
<br />
<sub><b>Hui Cao</b></sub>
</a>
</td>
</td></tr>
<tr>
<td align="center">
<a href="https://github.com/kongyanbo-cx">
<img src="https://avatars.githubusercontent.com/u/58963923?v=4" width="50;" alt="kongyanbo-cx"/>
<br />
<sub><b>Null</b></sub>
</a>
</td></tr>
<tr>
</td>
<td align="center">
<a href="https://github.com/lishiyu">
<img src="https://avatars.githubusercontent.com/u/36871640?v=4" width="50;" alt="lishiyu"/>
@ -574,15 +581,15 @@ Hippo-4J 通过对 JDK 线程池增强,以及扩展三方框架底层线程池
<br />
<sub><b>Null</b></sub>
</a>
</td>
</td></tr>
<tr>
<td align="center">
<a href="https://github.com/yangzhiw">
<img src="https://avatars.githubusercontent.com/u/13634974?v=4" width="50;" alt="yangzhiw"/>
<br />
<sub><b>Opentanent</b></sub>
</a>
</td></tr>
<tr>
</td>
<td align="center">
<a href="https://github.com/yhc777">
<img src="https://avatars.githubusercontent.com/u/71164753?v=4" width="50;" alt="yhc777"/>

@ -28,4 +28,5 @@
<suppress checks="MagicNumber" files="DubboThreadPoolAdapter.java"/>
<suppress checks="MagicNumber" files="UndertowWebThreadPoolHandler.java"/>
<suppress checks="MagicNumber" files="DashboardServiceImpl.java"/>
<suppress checks="MagicNumber" files="DefaultThreadPoolCheckAlarmHandler.java"/>
</suppressions>

@ -72,6 +72,13 @@ sidebar_position: 1
<td align="center" ><a href="https://blog.xiajibagao.top">Createsequence's Blog</a></td>
<td align="center" >841396397@qq.com</td>
</tr>
<tr>
<td align="center"><a href="https://github.com/wulangcode"><img src="https://avatars.githubusercontent.com/u/48200100?v=4" width="64px;"/></a></td>
<td align="center">吴浪</td>
<td align="center" ><a href="https://github.com/wulangcode">wulangcode</a></td>
<td align="center" >-</td>
<td align="center" >sanliangitch@foxmail.com</td>
</tr>
</table>
## 贡献者

@ -17,46 +17,62 @@
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.core.plugin.PluginRuntime;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
/**
* Record task execution time indicator.
* <p>Record task execution time indicator. <br />
* The initialization size of the timer container can be specified during construction,
* It will route it to different timers in the container according to the {@link Thread#getId},
* to reduce the lock competition strength for a single timer.
*/
@RequiredArgsConstructor
public class TaskTimeRecordPlugin extends AbstractTaskTimerPlugin {
private static final int MAXIMUM_CAPACITY = 1 << 30;
public static final String PLUGIN_NAME = "task-time-record-plugin";
/**
* Lock instance
*/
private final ReadWriteLock lock = new ReentrantReadWriteLock();
/**
* Total execution milli time of all tasks
* modulo
*/
private long totalTaskTimeMillis = 0L;
private final int modulo;
/**
* Maximum task milli execution time, default -1
* timers
*/
private long maxTaskTimeMillis = -1L;
public final Timer[] timerTable;
/**
* Minimal task milli execution time, default -1
* Create a {@link TaskTimeRecordPlugin}
*
* @param initialCapacity initial capacity of timer table
*/
private long minTaskTimeMillis = -1L;
public TaskTimeRecordPlugin(int initialCapacity) {
Assert.isTrue(initialCapacity >= 1, "count must great then 0");
initialCapacity = tableSizeFor(initialCapacity);
timerTable = (Timer[]) Array.newInstance(Timer.class, initialCapacity);
for (int i = 0; i < timerTable.length; i++) {
timerTable[i] = new Timer();
}
modulo = initialCapacity - 1;
}
/**
* Count of completed task
* Create a {@link TaskTimeRecordPlugin}
*/
private long taskCount = 0L;
public TaskTimeRecordPlugin() {
this(1);
}
/**
* Get id.
@ -91,15 +107,110 @@ public class TaskTimeRecordPlugin extends AbstractTaskTimerPlugin {
*/
@Override
protected void processTaskTime(long taskExecuteTime) {
Timer timer = getTimerForCurrentThread();
timer.recordTaskTime(taskExecuteTime);
}
/**
* Get the summary statistics of the instance at the current time.
*
* @return data snapshot
*/
public Summary summarize() {
// ignore unused timers
List<Summary> summaries = Arrays.stream(timerTable)
.map(Timer::summarize)
.filter(s -> s.getTaskCount() > 0)
.collect(Collectors.toList());
// summarize data
long totalTaskTimeMillis = 0L;
long maxTaskTimeMillis = -1L;
long minTaskTimeMillis = -1L;
long taskCount = 0L;
for (Summary summary : summaries) {
if (taskCount > 0) {
maxTaskTimeMillis = Math.max(maxTaskTimeMillis, summary.getMaxTaskTimeMillis());
minTaskTimeMillis = Math.min(minTaskTimeMillis, summary.getMinTaskTimeMillis());
} else {
maxTaskTimeMillis = summary.getMaxTaskTimeMillis();
minTaskTimeMillis = summary.getMinTaskTimeMillis();
}
totalTaskTimeMillis += summary.getTotalTaskTimeMillis();
taskCount += summary.getTaskCount();
}
return new Summary(totalTaskTimeMillis, maxTaskTimeMillis, minTaskTimeMillis, taskCount);
}
private Timer getTimerForCurrentThread() {
/*
* use table tableSize - 1 to take modulus for tid, and the remainder obtained is the subscript of the timer corresponding to the thread in the table. eg: tid = 10086, tableSize = 8, then we
* get 10086 & (8 - 1) = 4
*/
long threadId = Thread.currentThread().getId();
int index = (int) (threadId & modulo);
return timerTable[index];
}
/**
* copy from {@link HashMap#tableSizeFor}
*/
static int tableSizeFor(int cap) {
int n = cap - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return n >= MAXIMUM_CAPACITY ? MAXIMUM_CAPACITY : n + 1;
}
/**
* <p>Independent unit for providing time recording function.<br />
* Support thread-safe operations when reading and writing in a concurrent environment.
*/
private static class Timer {
/**
* Lock instance
*/
private final ReadWriteLock lock = new ReentrantReadWriteLock();
/**
* Total execution milli time of all tasks
*/
private long totalTaskTimeMillis = 0L;
/**
* Maximum task milli execution time, default -1
*/
private long maxTaskTimeMillis = -1L;
/**
* Minimal task milli execution time, default -1
*/
private long minTaskTimeMillis = -1L;
/**
* Count of completed task
*/
private long taskCount = 0L;
/**
* Record task execute time.
*
* @param taskExecuteTime task execute time
*/
public void recordTaskTime(long taskExecuteTime) {
Lock writeLock = lock.writeLock();
writeLock.lock();
try {
if (taskCount == 0) {
maxTaskTimeMillis = taskExecuteTime;
minTaskTimeMillis = taskExecuteTime;
} else {
if (taskCount > 0) {
maxTaskTimeMillis = Math.max(taskExecuteTime, maxTaskTimeMillis);
minTaskTimeMillis = Math.min(taskExecuteTime, minTaskTimeMillis);
} else {
maxTaskTimeMillis = taskExecuteTime;
minTaskTimeMillis = taskExecuteTime;
}
taskCount = taskCount + 1;
totalTaskTimeMillis += taskExecuteTime;
@ -129,6 +240,8 @@ public class TaskTimeRecordPlugin extends AbstractTaskTimerPlugin {
return statistics;
}
}
/**
* Summary statistics of SyncTimeRecorder instance at a certain time.
*/
@ -166,4 +279,5 @@ public class TaskTimeRecordPlugin extends AbstractTaskTimerPlugin {
return totalTaskCount > 0L ? getTotalTaskTimeMillis() / totalTaskCount : -1;
}
}
}

@ -20,6 +20,7 @@ package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager;
import lombok.extern.slf4j.Slf4j;
import org.junit.Assert;
import org.junit.Test;
@ -30,6 +31,7 @@ import java.util.concurrent.TimeUnit;
/**
* test for {@link TaskTimeRecordPlugin}
*/
@Slf4j
public class TaskTimeRecordPluginTest {
@Test
@ -49,20 +51,29 @@ public class TaskTimeRecordPluginTest {
3, 3, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
TaskTimeRecordPlugin plugin = new TaskTimeRecordPlugin();
TaskTimeRecordPlugin plugin = new TaskTimeRecordPlugin(3);
executor.register(plugin);
executor.submit(() -> ThreadUtil.sleep(1000L));
executor.submit(() -> ThreadUtil.sleep(3000L));
executor.submit(() -> ThreadUtil.sleep(2000L));
executor.submit(() -> ThreadUtil.sleep(2000L));
// waiting for shutdown
executor.shutdown();
while (!executor.isTerminated()) {
}
TaskTimeRecordPlugin.Summary summary = plugin.summarize();
Assert.assertEquals(1, summary.getMinTaskTimeMillis() / 1000L);
Assert.assertEquals(3, summary.getMaxTaskTimeMillis() / 1000L);
Assert.assertEquals(2, summary.getAvgTaskTimeMillis() / 1000L);
Assert.assertEquals(6, summary.getTotalTaskTimeMillis() / 1000L);
Assert.assertTrue(testInDeviation(summary.getMinTaskTimeMillis(), 1000L, 300L));
Assert.assertTrue(testInDeviation(summary.getMaxTaskTimeMillis(), 3000L, 300L));
Assert.assertTrue(testInDeviation(summary.getAvgTaskTimeMillis(), 2000L, 300L));
Assert.assertTrue(testInDeviation(summary.getTotalTaskTimeMillis(), 8000L, 300L));
}
private boolean testInDeviation(long except, long actual, long offer) {
long exceptLower = except - offer;
long exceptUpper = except + offer;
log.info("test {} < [{}] < {}", exceptLower, actual, exceptUpper);
return exceptLower < actual && actual < exceptUpper;
}
}

@ -18,7 +18,9 @@
package cn.hippo4j.message.dto;
import cn.hippo4j.message.enums.NotifyTypeEnum;
import lombok.*;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
/**
* Alarm control DTO.

@ -70,6 +70,9 @@ public class WeChatSendMessageHandler extends AbstractRobotSendMessageHandler {
}
}
/**
* WeChat
*/
@Data
@Accessors(chain = true)
public static class WeChatReqDTO {
@ -79,6 +82,9 @@ public class WeChatSendMessageHandler extends AbstractRobotSendMessageHandler {
private Markdown markdown;
}
/**
* Markdown
*/
@Data
public static class Markdown {

@ -46,8 +46,8 @@ public class DingAlarmConstants {
* Replace task timeout template
*/
public static final String DING_ALARM_TIMEOUT_REPLACE_TXT =
"<font color=#708090 size=2>任务执行时间:%d / ms </font> \n\n " +
"<font color=#708090 size=2>超时时间:%d / ms</font> \n\n " +
DING_ALARM_TIMEOUT_TRACE_REPLACE_TXT +
" --- \n\n ";
"<font color=#708090 size=2>任务执行时间:%d / ms </font> \n\n "
+ "<font color=#708090 size=2>超时时间:%d / ms</font> \n\n "
+ DING_ALARM_TIMEOUT_TRACE_REPLACE_TXT
+ " --- \n\n ";
}

@ -36,7 +36,7 @@ public class WeChatAlarmConstants {
* Replace task timeout template
*/
public static final String WE_CHAT_ALARM_TIMOUT_REPLACE_TXT =
"\n> 任务执行时间:%s / ms \n" +
"> 超时时间:%s / ms " +
WE_CHAT_ALARM_TIMOUT_TRACE_REPLACE_TXT;
"\n> 任务执行时间:%s / ms \n"
+ "> 超时时间:%s / ms "
+ WE_CHAT_ALARM_TIMOUT_TRACE_REPLACE_TXT;
}

@ -64,11 +64,11 @@ public class DefaultThreadPoolCheckAlarmHandler implements Runnable, ThreadPoolC
@Value("${spring.dynamic.thread-pool.check-state-interval:5}")
private Integer checkStateInterval;
private final ScheduledExecutorService ALARM_NOTIFY_EXECUTOR = new ScheduledThreadPoolExecutor(
private final ScheduledExecutorService alarmNotifyExecutor = new ScheduledThreadPoolExecutor(
1,
r -> new Thread(r, "client.alarm.notify"));
private final ExecutorService ASYNC_ALARM_NOTIFY_EXECUTOR = ThreadPoolBuilder.builder()
private final ExecutorService asyncAlarmNotifyExecutor = ThreadPoolBuilder.builder()
.poolThreadSize(2, 4)
.threadFactory("client.execute.timeout.alarm")
.allowCoreThreadTimeOut(true)
@ -79,7 +79,7 @@ public class DefaultThreadPoolCheckAlarmHandler implements Runnable, ThreadPoolC
@Override
public void run(String... args) throws Exception {
ALARM_NOTIFY_EXECUTOR.scheduleWithFixedDelay(this, 0, checkStateInterval, TimeUnit.SECONDS);
alarmNotifyExecutor.scheduleWithFixedDelay(this, 0, checkStateInterval, TimeUnit.SECONDS);
}
@Override
@ -162,7 +162,7 @@ public class DefaultThreadPoolCheckAlarmHandler implements Runnable, ThreadPoolC
hippo4jSendMessageService.sendAlarmMessage(NotifyTypeEnum.REJECT, alarmNotifyRequest);
}
};
ASYNC_ALARM_NOTIFY_EXECUTOR.execute(checkPoolRejectedAlarmTask);
asyncAlarmNotifyExecutor.execute(checkPoolRejectedAlarmTask);
}
/**
@ -190,7 +190,7 @@ public class DefaultThreadPoolCheckAlarmHandler implements Runnable, ThreadPoolC
alarmNotifyRequest.setExecuteTimeoutTrace(executeTimeoutTrace);
}
Runnable task = () -> hippo4jSendMessageService.sendAlarmMessage(NotifyTypeEnum.TIMEOUT, alarmNotifyRequest);
ASYNC_ALARM_NOTIFY_EXECUTOR.execute(task);
asyncAlarmNotifyExecutor.execute(task);
} catch (Throwable ex) {
log.error("Send thread pool execution timeout alarm error.", ex);
}

@ -46,7 +46,7 @@ public class Hippo4jBaseSendMessageService implements Hippo4jSendMessageService,
private final AlarmControlHandler alarmControlHandler;
@Getter
public final Map<String, List<NotifyConfigDTO>> notifyConfigs = new HashMap<>();
private final Map<String, List<NotifyConfigDTO>> notifyConfigs = new HashMap<>();
private final Map<String, SendMessageHandler> sendMessageHandlers = new HashMap<>();

@ -25,6 +25,8 @@ import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.monitor.base.AbstractDynamicThreadPoolMonitor;
import cn.hippo4j.monitor.base.MonitorTypeEnum;
import cn.hippo4j.monitor.elasticsearch.model.ElasticSearchThreadPoolRunStateInfo;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
@ -63,7 +65,7 @@ public class DynamicThreadPoolElasticSearchMonitorHandler extends AbstractDynami
List<String> rawMapping = FileUtil.readLines(Thread.currentThread().getContextClassLoader().getResource("mapping.json").getPath(), StandardCharsets.UTF_8);
String mapping = String.join(" ", rawMapping);
// if index doesn't exsit, this function may try to create one, but recommend to create index manually.
this.createIndex(indexName, "_doc", mapping, null, null, null);
this.createIndex(EsIndex.builder().index(indexName).type("_doc").mapping(mapping).build());
}
esThreadPoolRunStateInfo.setApplicationName(applicationName);
esThreadPoolRunStateInfo.setId(indexName + "-" + System.currentTimeMillis());
@ -104,20 +106,20 @@ public class DynamicThreadPoolElasticSearchMonitorHandler extends AbstractDynami
return isIndexExist.get();
}
public void createIndex(String index, String type, String mapping, Integer shards, Integer replicas, String alias) {
public void createIndex(EsIndex esIndex) {
RestHighLevelClient client = ElasticSearchClientHolder.getClient();
boolean acknowledged = false;
CreateIndexRequest request = new CreateIndexRequest(index);
if (StringUtils.hasText(mapping)) {
request.mapping(type, mapping, XContentType.JSON);
CreateIndexRequest request = new CreateIndexRequest(esIndex.getIndex());
if (StringUtils.hasText(esIndex.getMapping())) {
request.mapping(esIndex.getType(), esIndex.getMapping(), XContentType.JSON);
}
if (!Objects.isNull(shards) && !Objects.isNull(replicas)) {
if (!Objects.isNull(esIndex.getShards()) && !Objects.isNull(esIndex.getReplicas())) {
request.settings(Settings.builder()
.put("index.number_of_shards", shards) // 5
.put("index.number_of_replicas", replicas));// 1
.put("index.number_of_shards", esIndex.getShards())
.put("index.number_of_replicas", esIndex.getReplicas()));
}
if (StringUtils.hasText(alias)) {
request.alias(new Alias(alias));
if (StringUtils.hasText(esIndex.getAlias())) {
request.alias(new Alias(esIndex.getAlias()));
}
try {
CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
@ -138,4 +140,18 @@ public class DynamicThreadPoolElasticSearchMonitorHandler extends AbstractDynami
public String getType() {
return MonitorTypeEnum.ELASTICSEARCH.name().toLowerCase();
}
/**
* Es Index
*/
@Getter
@Builder
private static class EsIndex {
String index;
String type;
String mapping;
Integer shards;
Integer replicas;
String alias;
}
}

@ -28,8 +28,6 @@ import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.core.env.Environment;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
@ -66,9 +64,7 @@ public class ElasticSearchClientHolder {
log.info("[ES RestHighLevelClient] success to connect eshost:{},scheme:{}", host, scheme);
return client;
} catch (Exception ex) {
StringWriter stackTrace = new StringWriter();
ex.printStackTrace(new PrintWriter(stackTrace));
log.error("[ES RestHighLevelClient] fail to connect es! cause:{}", stackTrace);
log.error("[ES RestHighLevelClient] fail to connect es! cause:", ex);
}
return null;
}

@ -28,7 +28,7 @@ import lombok.Setter;
@Setter
public class ElasticSearchThreadPoolRunStateInfo extends ThreadPoolRunStateInfo {
private String Id;
private String id;
private String applicationName;
}

@ -35,21 +35,21 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class AdapterThreadPoolMicrometerMonitorHandler extends AbstractAdapterThreadPoolMonitor {
private final static String METRIC_NAME_PREFIX = "adapter.thread-pool";
private static final String METRIC_NAME_PREFIX = "adapter.thread-pool";
private final static String ADAPTER_THREAD_POOL_ID_TAG = METRIC_NAME_PREFIX + ".id";
private static final String ADAPTER_THREAD_POOL_ID_TAG = METRIC_NAME_PREFIX + ".id";
private final static String APPLICATION_NAME_TAG = "application.name";
private static final String APPLICATION_NAME_TAG = "application.name";
private final Map<String, ThreadPoolAdapterState> RUN_STATE_CACHE = new ConcurrentHashMap<>();
private final Map<String, ThreadPoolAdapterState> runStateCache = new ConcurrentHashMap<>();
@Override
protected void execute(ThreadPoolAdapterState threadPoolAdapterState) {
ThreadPoolAdapterState stateInfo = RUN_STATE_CACHE.get(threadPoolAdapterState.getThreadPoolKey());
ThreadPoolAdapterState stateInfo = runStateCache.get(threadPoolAdapterState.getThreadPoolKey());
if (stateInfo != null) {
BeanUtil.convert(threadPoolAdapterState, stateInfo);
} else {
RUN_STATE_CACHE.put(threadPoolAdapterState.getThreadPoolKey(), threadPoolAdapterState);
runStateCache.put(threadPoolAdapterState.getThreadPoolKey(), threadPoolAdapterState);
}
Environment environment = ApplicationContextHolder.getInstance().getEnvironment();
String applicationName = environment.getProperty("spring.application.name", "application");

@ -35,21 +35,21 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class DynamicThreadPoolMicrometerMonitorHandler extends AbstractDynamicThreadPoolMonitor {
private final static String METRIC_NAME_PREFIX = "dynamic.thread-pool";
private static final String METRIC_NAME_PREFIX = "dynamic.thread-pool";
private final static String DYNAMIC_THREAD_POOL_ID_TAG = METRIC_NAME_PREFIX + ".id";
private static final String DYNAMIC_THREAD_POOL_ID_TAG = METRIC_NAME_PREFIX + ".id";
private final static String APPLICATION_NAME_TAG = "application.name";
private static final String APPLICATION_NAME_TAG = "application.name";
private final Map<String, ThreadPoolRunStateInfo> RUN_STATE_CACHE = new ConcurrentHashMap<>();
private final Map<String, ThreadPoolRunStateInfo> runStateCache = new ConcurrentHashMap<>();
@Override
protected void execute(ThreadPoolRunStateInfo poolRunStateInfo) {
ThreadPoolRunStateInfo stateInfo = RUN_STATE_CACHE.get(poolRunStateInfo.getTpId());
ThreadPoolRunStateInfo stateInfo = runStateCache.get(poolRunStateInfo.getTpId());
if (stateInfo != null) {
BeanUtil.convert(poolRunStateInfo, stateInfo);
} else {
RUN_STATE_CACHE.put(poolRunStateInfo.getTpId(), poolRunStateInfo);
runStateCache.put(poolRunStateInfo.getTpId(), poolRunStateInfo);
}
Environment environment = ApplicationContextHolder.getInstance().getEnvironment();
String applicationName = environment.getProperty("spring.application.name", "application");

@ -35,21 +35,21 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class WebThreadPoolMicrometerMonitorHandler extends AbstractWebThreadPoolMonitor {
private final static String METRIC_NAME_PREFIX = "web.thread-pool";
private static final String METRIC_NAME_PREFIX = "web.thread-pool";
private final static String APPLICATION_NAME_TAG = "application.name";
private static final String APPLICATION_NAME_TAG = "application.name";
private final Map<String, ThreadPoolRunStateInfo> RUN_STATE_CACHE = new ConcurrentHashMap<>();
private final Map<String, ThreadPoolRunStateInfo> runStateCache = new ConcurrentHashMap<>();
@Override
protected void execute(ThreadPoolRunStateInfo webThreadPoolRunStateInfo) {
Environment environment = ApplicationContextHolder.getInstance().getEnvironment();
String applicationName = environment.getProperty("spring.application.name", "application");
ThreadPoolRunStateInfo stateInfo = RUN_STATE_CACHE.get(applicationName);
ThreadPoolRunStateInfo stateInfo = runStateCache.get(applicationName);
if (stateInfo != null) {
BeanUtil.convert(webThreadPoolRunStateInfo, stateInfo);
} else {
RUN_STATE_CACHE.put(applicationName, webThreadPoolRunStateInfo);
runStateCache.put(applicationName, webThreadPoolRunStateInfo);
}
Iterable<Tag> tags = CollectionUtil.newArrayList(Tag.of(APPLICATION_NAME_TAG, applicationName));
Metrics.gauge(metricName("current.load"), tags, webThreadPoolRunStateInfo, ThreadPoolRunStateInfo::getSimpleCurrentLoad);

@ -22,6 +22,9 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.transaction.annotation.EnableTransactionManagement;
/**
* ServerApplication
*/
@EnableTransactionManagement
@SpringBootApplication(scanBasePackages = "cn.hippo4j")
@MapperScan(basePackages = {"cn.hippo4j.config.mapper", "cn.hippo4j.auth.mapper"})

@ -35,7 +35,8 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.sql.*;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.Objects;
/**

@ -25,7 +25,12 @@ import cn.hippo4j.discovery.core.InstanceRegistry;
import cn.hippo4j.discovery.core.Lease;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;

@ -21,11 +21,13 @@ import cn.hippo4j.common.design.builder.ThreadFactoryBuilder;
import cn.hippo4j.common.design.observer.AbstractSubjectCenter;
import cn.hippo4j.common.model.InstanceInfo;
import cn.hippo4j.common.model.InstanceInfo.InstanceStatus;
import cn.hippo4j.common.toolkit.CollectionUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TimerTask;
@ -48,26 +50,21 @@ import static cn.hippo4j.common.constant.Constants.SCHEDULED_THREAD_CORE_NUM;
@Service
public class BaseInstanceRegistry implements InstanceRegistry<InstanceInfo> {
private final int containerSize = 1024;
private static final int CONTAINER_SIZE = 1024;
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap(containerSize);
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap<>(CONTAINER_SIZE);
@Override
public List<Lease<InstanceInfo>> listInstance(String appName) {
Map<String, Lease<InstanceInfo>> appNameLeaseMap = registry.get(appName);
if (CollectionUtils.isEmpty(appNameLeaseMap)) {
return new ArrayList<>();
}
List<Lease<InstanceInfo>> appNameLeaseList = new ArrayList<>();
appNameLeaseMap.values().forEach(each -> appNameLeaseList.add(each));
return appNameLeaseList;
return CollectionUtils.isEmpty(appNameLeaseMap) ? Collections.emptyList() : new ArrayList<>(appNameLeaseMap.values());
}
@Override
public void register(InstanceInfo registrant) {
Map<String, Lease<InstanceInfo>> registerMap = registry.get(registrant.getAppName());
if (registerMap == null) {
ConcurrentHashMap<String, Lease<InstanceInfo>> registerNewMap = new ConcurrentHashMap(12);
ConcurrentHashMap<String, Lease<InstanceInfo>> registerNewMap = new ConcurrentHashMap<>();
registerMap = registry.putIfAbsent(registrant.getAppName(), registerNewMap);
if (registerMap == null) {
registerMap = registerNewMap;
@ -98,8 +95,11 @@ public class BaseInstanceRegistry implements InstanceRegistry<InstanceInfo> {
String appName = instanceRenew.getAppName();
String instanceId = instanceRenew.getInstanceId();
Map<String, Lease<InstanceInfo>> registryMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew;
if (registryMap == null || (leaseToRenew = registryMap.get(instanceId)) == null) {
if (registryMap == null) {
return false;
}
Lease<InstanceInfo> leaseToRenew = registryMap.get(instanceId);
if (leaseToRenew == null) {
return false;
}
leaseToRenew.renew();
@ -111,20 +111,20 @@ public class BaseInstanceRegistry implements InstanceRegistry<InstanceInfo> {
String appName = info.getAppName();
String instanceId = info.getInstanceId();
Map<String, Lease<InstanceInfo>> leaseMap = registry.get(appName);
if (CollectionUtils.isEmpty(leaseMap)) {
log.warn("Failed to remove unhealthy node, no application found: {}", appName);
return;
}
if (CollectionUtil.isNotEmpty(leaseMap)) {
Lease<InstanceInfo> remove = leaseMap.remove(instanceId);
if (remove == null) {
if (remove != null) {
log.info("Remove unhealthy node, node ID: {}", instanceId);
} else {
log.warn("Failed to remove unhealthy node, no instance found: {}", instanceId);
return;
}
log.info("Remove unhealthy node, node ID: {}", instanceId);
} else {
log.warn("Failed to remove unhealthy node, no application found: {}", appName);
}
}
public void evict(long additionalLeaseMs) {
List<Lease<InstanceInfo>> expiredLeases = new ArrayList();
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Map.Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
@ -146,7 +146,7 @@ public class BaseInstanceRegistry implements InstanceRegistry<InstanceInfo> {
protected boolean internalCancel(String appName, String id, String identify) {
Map<String, Lease<InstanceInfo>> registerMap = registry.get(appName);
if (!CollectionUtils.isEmpty(registerMap)) {
if (CollectionUtil.isNotEmpty(registerMap)) {
registerMap.remove(id);
AbstractSubjectCenter.notify(AbstractSubjectCenter.SubjectType.CLEAR_CONFIG_CACHE, () -> identify);
log.info("Clean up unhealthy nodes. Node id: {}", id);
@ -154,6 +154,9 @@ public class BaseInstanceRegistry implements InstanceRegistry<InstanceInfo> {
return true;
}
/**
* EvictionTask
*/
public class EvictionTask extends TimerTask {
private final AtomicLong lastExecutionNanosRef = new AtomicLong(0L);
@ -177,7 +180,7 @@ public class BaseInstanceRegistry implements InstanceRegistry<InstanceInfo> {
}
long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
long compensationTime = elapsedMs - EVICTION_INTERVAL_TIMER_IN_MS;
return compensationTime <= 0L ? 0L : compensationTime;
return Math.max(compensationTime, 0L);
}
long getCurrentTimeNano() {
@ -193,7 +196,7 @@ public class BaseInstanceRegistry implements InstanceRegistry<InstanceInfo> {
.daemon(true)
.build());
private final AtomicReference<EvictionTask> evictionTaskRef = new AtomicReference();
private final AtomicReference<EvictionTask> evictionTaskRef = new AtomicReference<>();
public void postInit() {
evictionTaskRef.set(new BaseInstanceRegistry.EvictionTask());

@ -42,13 +42,13 @@ public class Lease<T> {
private long duration;
public static final int DEFAULT_DURATION_IN_SECS = 90;
public static final long DEFAULT_DURATION_IN_SECS = 90 * 1000L;
public Lease(T r) {
holder = r;
registrationTimestamp = System.currentTimeMillis();
lastUpdateTimestamp = registrationTimestamp;
duration = DEFAULT_DURATION_IN_SECS * 1000;
duration = DEFAULT_DURATION_IN_SECS;
}
public void renew() {

Loading…
Cancel
Save