diff --git a/README.md b/README.md
index e75f43eb..fb55d867 100644
--- a/README.md
+++ b/README.md
@@ -135,6 +135,13 @@ Hippo-4J 通过对 JDK 线程池增强,以及扩展三方框架底层线程池
+
+
+
+
+ WuLang
+
+ |
@@ -149,13 +156,6 @@ Hippo-4J 通过对 JDK 线程池增强,以及扩展三方框架底层线程池
黄成兴
|
-
-
-
-
- WuLang
-
- |
@@ -419,6 +419,13 @@ Hippo-4J 通过对 JDK 线程池增强,以及扩展三方框架底层线程池
Malcolm
|
+
+
+
+
+ Null
+
+ |
@@ -446,15 +453,15 @@ Hippo-4J 通过对 JDK 线程池增强,以及扩展三方框架底层线程池
Zhenye
- |
+
+
Null
- |
-
+
@@ -510,15 +517,15 @@ Hippo-4J 通过对 JDK 线程池增强,以及扩展三方框架底层线程池
Hui Cao
- |
+
+
Null
- |
-
+
@@ -574,15 +581,15 @@ Hippo-4J 通过对 JDK 线程池增强,以及扩展三方框架底层线程池
Null
- |
+
+
Opentanent
- |
-
+
diff --git a/dev-support/hippo4j_checkstyle_suppression.xml b/dev-support/hippo4j_checkstyle_suppression.xml
index 1fc7f035..765a781b 100644
--- a/dev-support/hippo4j_checkstyle_suppression.xml
+++ b/dev-support/hippo4j_checkstyle_suppression.xml
@@ -28,4 +28,5 @@
+
diff --git a/docs/src/pages/team.md b/docs/src/pages/team.md
index 6c242062..65609fba 100644
--- a/docs/src/pages/team.md
+++ b/docs/src/pages/team.md
@@ -72,6 +72,13 @@ sidebar_position: 1
| Createsequence's Blog |
841396397@qq.com |
+
+  |
+ 吴浪 |
+ wulangcode |
+ - |
+ sanliangitch@foxmail.com |
+
## 贡献者
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPlugin.java
index dd6ad982..0c3f2bf5 100644
--- a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPlugin.java
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPlugin.java
@@ -17,46 +17,62 @@
package cn.hippo4j.core.plugin.impl;
+import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.core.plugin.PluginRuntime;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import java.lang.reflect.Array;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
/**
- * Record task execution time indicator.
+ * Record task execution time indicator.
+ * The initialization size of the timer container can be specified during construction,
+ * It will route it to different timers in the container according to the {@link Thread#getId},
+ * to reduce the lock competition strength for a single timer.
*/
-@RequiredArgsConstructor
public class TaskTimeRecordPlugin extends AbstractTaskTimerPlugin {
+ private static final int MAXIMUM_CAPACITY = 1 << 30;
public static final String PLUGIN_NAME = "task-time-record-plugin";
/**
- * Lock instance
+ * modulo
*/
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final int modulo;
/**
- * Total execution milli time of all tasks
+ * timers
*/
- private long totalTaskTimeMillis = 0L;
+ public final Timer[] timerTable;
/**
- * Maximum task milli execution time, default -1
- */
- private long maxTaskTimeMillis = -1L;
-
- /**
- * Minimal task milli execution time, default -1
+ * Create a {@link TaskTimeRecordPlugin}
+ *
+ * @param initialCapacity initial capacity of timer table
*/
- private long minTaskTimeMillis = -1L;
+ public TaskTimeRecordPlugin(int initialCapacity) {
+ Assert.isTrue(initialCapacity >= 1, "count must great then 0");
+ initialCapacity = tableSizeFor(initialCapacity);
+ timerTable = (Timer[]) Array.newInstance(Timer.class, initialCapacity);
+ for (int i = 0; i < timerTable.length; i++) {
+ timerTable[i] = new Timer();
+ }
+ modulo = initialCapacity - 1;
+ }
/**
- * Count of completed task
+ * Create a {@link TaskTimeRecordPlugin}
*/
- private long taskCount = 0L;
+ public TaskTimeRecordPlugin() {
+ this(1);
+ }
/**
* Get id.
@@ -91,21 +107,8 @@ public class TaskTimeRecordPlugin extends AbstractTaskTimerPlugin {
*/
@Override
protected void processTaskTime(long taskExecuteTime) {
- Lock writeLock = lock.writeLock();
- writeLock.lock();
- try {
- if (taskCount == 0) {
- maxTaskTimeMillis = taskExecuteTime;
- minTaskTimeMillis = taskExecuteTime;
- } else {
- maxTaskTimeMillis = Math.max(taskExecuteTime, maxTaskTimeMillis);
- minTaskTimeMillis = Math.min(taskExecuteTime, minTaskTimeMillis);
- }
- taskCount = taskCount + 1;
- totalTaskTimeMillis += taskExecuteTime;
- } finally {
- writeLock.unlock();
- }
+ Timer timer = getTimerForCurrentThread();
+ timer.recordTaskTime(taskExecuteTime);
}
/**
@@ -114,19 +117,129 @@ public class TaskTimeRecordPlugin extends AbstractTaskTimerPlugin {
* @return data snapshot
*/
public Summary summarize() {
- Lock readLock = lock.readLock();
- Summary statistics;
- readLock.lock();
- try {
- statistics = new Summary(
- this.totalTaskTimeMillis,
- this.maxTaskTimeMillis,
- this.minTaskTimeMillis,
- this.taskCount);
- } finally {
- readLock.unlock();
+ // ignore unused timers
+ List summaries = Arrays.stream(timerTable)
+ .map(Timer::summarize)
+ .filter(s -> s.getTaskCount() > 0)
+ .collect(Collectors.toList());
+
+ // summarize data
+ long totalTaskTimeMillis = 0L;
+ long maxTaskTimeMillis = -1L;
+ long minTaskTimeMillis = -1L;
+ long taskCount = 0L;
+ for (Summary summary : summaries) {
+ if (taskCount > 0) {
+ maxTaskTimeMillis = Math.max(maxTaskTimeMillis, summary.getMaxTaskTimeMillis());
+ minTaskTimeMillis = Math.min(minTaskTimeMillis, summary.getMinTaskTimeMillis());
+ } else {
+ maxTaskTimeMillis = summary.getMaxTaskTimeMillis();
+ minTaskTimeMillis = summary.getMinTaskTimeMillis();
+ }
+ totalTaskTimeMillis += summary.getTotalTaskTimeMillis();
+ taskCount += summary.getTaskCount();
+ }
+ return new Summary(totalTaskTimeMillis, maxTaskTimeMillis, minTaskTimeMillis, taskCount);
+ }
+
+ private Timer getTimerForCurrentThread() {
+ /*
+ * use table tableSize - 1 to take modulus for tid, and the remainder obtained is the subscript of the timer corresponding to the thread in the table. eg: tid = 10086, tableSize = 8, then we
+ * get 10086 & (8 - 1) = 4
+ */
+ long threadId = Thread.currentThread().getId();
+ int index = (int) (threadId & modulo);
+ return timerTable[index];
+ }
+
+ /**
+ * copy from {@link HashMap#tableSizeFor}
+ */
+ static int tableSizeFor(int cap) {
+ int n = cap - 1;
+ n |= n >>> 1;
+ n |= n >>> 2;
+ n |= n >>> 4;
+ n |= n >>> 8;
+ n |= n >>> 16;
+ return n >= MAXIMUM_CAPACITY ? MAXIMUM_CAPACITY : n + 1;
+ }
+
+ /**
+ * Independent unit for providing time recording function.
+ * Support thread-safe operations when reading and writing in a concurrent environment.
+ */
+ private static class Timer {
+
+ /**
+ * Lock instance
+ */
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ /**
+ * Total execution milli time of all tasks
+ */
+ private long totalTaskTimeMillis = 0L;
+
+ /**
+ * Maximum task milli execution time, default -1
+ */
+ private long maxTaskTimeMillis = -1L;
+
+ /**
+ * Minimal task milli execution time, default -1
+ */
+ private long minTaskTimeMillis = -1L;
+
+ /**
+ * Count of completed task
+ */
+ private long taskCount = 0L;
+
+ /**
+ * Record task execute time.
+ *
+ * @param taskExecuteTime task execute time
+ */
+ public void recordTaskTime(long taskExecuteTime) {
+ Lock writeLock = lock.writeLock();
+ writeLock.lock();
+ try {
+ if (taskCount > 0) {
+ maxTaskTimeMillis = Math.max(taskExecuteTime, maxTaskTimeMillis);
+ minTaskTimeMillis = Math.min(taskExecuteTime, minTaskTimeMillis);
+ } else {
+ maxTaskTimeMillis = taskExecuteTime;
+ minTaskTimeMillis = taskExecuteTime;
+ }
+ taskCount = taskCount + 1;
+ totalTaskTimeMillis += taskExecuteTime;
+ } finally {
+ writeLock.unlock();
+ }
}
- return statistics;
+
+ /**
+ * Get the summary statistics of the instance at the current time.
+ *
+ * @return data snapshot
+ */
+ public Summary summarize() {
+ Lock readLock = lock.readLock();
+ Summary statistics;
+ readLock.lock();
+ try {
+ statistics = new Summary(
+ this.totalTaskTimeMillis,
+ this.maxTaskTimeMillis,
+ this.minTaskTimeMillis,
+ this.taskCount);
+ } finally {
+ readLock.unlock();
+ }
+ return statistics;
+ }
+
}
/**
@@ -166,4 +279,5 @@ public class TaskTimeRecordPlugin extends AbstractTaskTimerPlugin {
return totalTaskCount > 0L ? getTotalTaskTimeMillis() / totalTaskCount : -1;
}
}
+
}
diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPluginTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPluginTest.java
index d7866a28..d4751554 100644
--- a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPluginTest.java
+++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPluginTest.java
@@ -20,6 +20,7 @@ package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager;
+import lombok.extern.slf4j.Slf4j;
import org.junit.Assert;
import org.junit.Test;
@@ -30,6 +31,7 @@ import java.util.concurrent.TimeUnit;
/**
* test for {@link TaskTimeRecordPlugin}
*/
+@Slf4j
public class TaskTimeRecordPluginTest {
@Test
@@ -49,20 +51,29 @@ public class TaskTimeRecordPluginTest {
3, 3, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
- TaskTimeRecordPlugin plugin = new TaskTimeRecordPlugin();
+ TaskTimeRecordPlugin plugin = new TaskTimeRecordPlugin(3);
executor.register(plugin);
executor.submit(() -> ThreadUtil.sleep(1000L));
executor.submit(() -> ThreadUtil.sleep(3000L));
executor.submit(() -> ThreadUtil.sleep(2000L));
+ executor.submit(() -> ThreadUtil.sleep(2000L));
// waiting for shutdown
executor.shutdown();
while (!executor.isTerminated()) {
}
TaskTimeRecordPlugin.Summary summary = plugin.summarize();
- Assert.assertEquals(1, summary.getMinTaskTimeMillis() / 1000L);
- Assert.assertEquals(3, summary.getMaxTaskTimeMillis() / 1000L);
- Assert.assertEquals(2, summary.getAvgTaskTimeMillis() / 1000L);
- Assert.assertEquals(6, summary.getTotalTaskTimeMillis() / 1000L);
+ Assert.assertTrue(testInDeviation(summary.getMinTaskTimeMillis(), 1000L, 300L));
+ Assert.assertTrue(testInDeviation(summary.getMaxTaskTimeMillis(), 3000L, 300L));
+ Assert.assertTrue(testInDeviation(summary.getAvgTaskTimeMillis(), 2000L, 300L));
+ Assert.assertTrue(testInDeviation(summary.getTotalTaskTimeMillis(), 8000L, 300L));
+ }
+
+ private boolean testInDeviation(long except, long actual, long offer) {
+ long exceptLower = except - offer;
+ long exceptUpper = except + offer;
+ log.info("test {} < [{}] < {}", exceptLower, actual, exceptUpper);
+ return exceptLower < actual && actual < exceptUpper;
}
+
}
diff --git a/hippo4j-message/src/main/java/cn/hippo4j/message/dto/AlarmControlDTO.java b/hippo4j-message/src/main/java/cn/hippo4j/message/dto/AlarmControlDTO.java
index 11c5128f..73cb7314 100644
--- a/hippo4j-message/src/main/java/cn/hippo4j/message/dto/AlarmControlDTO.java
+++ b/hippo4j-message/src/main/java/cn/hippo4j/message/dto/AlarmControlDTO.java
@@ -18,7 +18,9 @@
package cn.hippo4j.message.dto;
import cn.hippo4j.message.enums.NotifyTypeEnum;
-import lombok.*;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
/**
* Alarm control DTO.
diff --git a/hippo4j-message/src/main/java/cn/hippo4j/message/platform/WeChatSendMessageHandler.java b/hippo4j-message/src/main/java/cn/hippo4j/message/platform/WeChatSendMessageHandler.java
index 68dcbed5..b4fb42bf 100644
--- a/hippo4j-message/src/main/java/cn/hippo4j/message/platform/WeChatSendMessageHandler.java
+++ b/hippo4j-message/src/main/java/cn/hippo4j/message/platform/WeChatSendMessageHandler.java
@@ -70,6 +70,9 @@ public class WeChatSendMessageHandler extends AbstractRobotSendMessageHandler {
}
}
+ /**
+ * WeChat
+ */
@Data
@Accessors(chain = true)
public static class WeChatReqDTO {
@@ -79,6 +82,9 @@ public class WeChatSendMessageHandler extends AbstractRobotSendMessageHandler {
private Markdown markdown;
}
+ /**
+ * Markdown
+ */
@Data
public static class Markdown {
diff --git a/hippo4j-message/src/main/java/cn/hippo4j/message/platform/constant/DingAlarmConstants.java b/hippo4j-message/src/main/java/cn/hippo4j/message/platform/constant/DingAlarmConstants.java
index f8505403..f4bb2285 100644
--- a/hippo4j-message/src/main/java/cn/hippo4j/message/platform/constant/DingAlarmConstants.java
+++ b/hippo4j-message/src/main/java/cn/hippo4j/message/platform/constant/DingAlarmConstants.java
@@ -46,8 +46,8 @@ public class DingAlarmConstants {
* Replace task timeout template
*/
public static final String DING_ALARM_TIMEOUT_REPLACE_TXT =
- "任务执行时间:%d / ms \n\n " +
- "超时时间:%d / ms \n\n " +
- DING_ALARM_TIMEOUT_TRACE_REPLACE_TXT +
- " --- \n\n ";
+ "任务执行时间:%d / ms \n\n "
+ + "超时时间:%d / ms \n\n "
+ + DING_ALARM_TIMEOUT_TRACE_REPLACE_TXT
+ + " --- \n\n ";
}
diff --git a/hippo4j-message/src/main/java/cn/hippo4j/message/platform/constant/WeChatAlarmConstants.java b/hippo4j-message/src/main/java/cn/hippo4j/message/platform/constant/WeChatAlarmConstants.java
index 8d4f56f8..8b6001fc 100644
--- a/hippo4j-message/src/main/java/cn/hippo4j/message/platform/constant/WeChatAlarmConstants.java
+++ b/hippo4j-message/src/main/java/cn/hippo4j/message/platform/constant/WeChatAlarmConstants.java
@@ -36,7 +36,7 @@ public class WeChatAlarmConstants {
* Replace task timeout template
*/
public static final String WE_CHAT_ALARM_TIMOUT_REPLACE_TXT =
- "\n> 任务执行时间:%s / ms \n" +
- "> 超时时间:%s / ms " +
- WE_CHAT_ALARM_TIMOUT_TRACE_REPLACE_TXT;
+ "\n> 任务执行时间:%s / ms \n"
+ + "> 超时时间:%s / ms "
+ + WE_CHAT_ALARM_TIMOUT_TRACE_REPLACE_TXT;
}
diff --git a/hippo4j-message/src/main/java/cn/hippo4j/message/service/DefaultThreadPoolCheckAlarmHandler.java b/hippo4j-message/src/main/java/cn/hippo4j/message/service/DefaultThreadPoolCheckAlarmHandler.java
index bb155495..de770758 100644
--- a/hippo4j-message/src/main/java/cn/hippo4j/message/service/DefaultThreadPoolCheckAlarmHandler.java
+++ b/hippo4j-message/src/main/java/cn/hippo4j/message/service/DefaultThreadPoolCheckAlarmHandler.java
@@ -64,11 +64,11 @@ public class DefaultThreadPoolCheckAlarmHandler implements Runnable, ThreadPoolC
@Value("${spring.dynamic.thread-pool.check-state-interval:5}")
private Integer checkStateInterval;
- private final ScheduledExecutorService ALARM_NOTIFY_EXECUTOR = new ScheduledThreadPoolExecutor(
+ private final ScheduledExecutorService alarmNotifyExecutor = new ScheduledThreadPoolExecutor(
1,
r -> new Thread(r, "client.alarm.notify"));
- private final ExecutorService ASYNC_ALARM_NOTIFY_EXECUTOR = ThreadPoolBuilder.builder()
+ private final ExecutorService asyncAlarmNotifyExecutor = ThreadPoolBuilder.builder()
.poolThreadSize(2, 4)
.threadFactory("client.execute.timeout.alarm")
.allowCoreThreadTimeOut(true)
@@ -79,7 +79,7 @@ public class DefaultThreadPoolCheckAlarmHandler implements Runnable, ThreadPoolC
@Override
public void run(String... args) throws Exception {
- ALARM_NOTIFY_EXECUTOR.scheduleWithFixedDelay(this, 0, checkStateInterval, TimeUnit.SECONDS);
+ alarmNotifyExecutor.scheduleWithFixedDelay(this, 0, checkStateInterval, TimeUnit.SECONDS);
}
@Override
@@ -162,7 +162,7 @@ public class DefaultThreadPoolCheckAlarmHandler implements Runnable, ThreadPoolC
hippo4jSendMessageService.sendAlarmMessage(NotifyTypeEnum.REJECT, alarmNotifyRequest);
}
};
- ASYNC_ALARM_NOTIFY_EXECUTOR.execute(checkPoolRejectedAlarmTask);
+ asyncAlarmNotifyExecutor.execute(checkPoolRejectedAlarmTask);
}
/**
@@ -190,7 +190,7 @@ public class DefaultThreadPoolCheckAlarmHandler implements Runnable, ThreadPoolC
alarmNotifyRequest.setExecuteTimeoutTrace(executeTimeoutTrace);
}
Runnable task = () -> hippo4jSendMessageService.sendAlarmMessage(NotifyTypeEnum.TIMEOUT, alarmNotifyRequest);
- ASYNC_ALARM_NOTIFY_EXECUTOR.execute(task);
+ asyncAlarmNotifyExecutor.execute(task);
} catch (Throwable ex) {
log.error("Send thread pool execution timeout alarm error.", ex);
}
diff --git a/hippo4j-message/src/main/java/cn/hippo4j/message/service/Hippo4jBaseSendMessageService.java b/hippo4j-message/src/main/java/cn/hippo4j/message/service/Hippo4jBaseSendMessageService.java
index 2f5eedbd..0792d6f1 100644
--- a/hippo4j-message/src/main/java/cn/hippo4j/message/service/Hippo4jBaseSendMessageService.java
+++ b/hippo4j-message/src/main/java/cn/hippo4j/message/service/Hippo4jBaseSendMessageService.java
@@ -46,7 +46,7 @@ public class Hippo4jBaseSendMessageService implements Hippo4jSendMessageService,
private final AlarmControlHandler alarmControlHandler;
@Getter
- public final Map> notifyConfigs = new HashMap<>();
+ private final Map> notifyConfigs = new HashMap<>();
private final Map sendMessageHandlers = new HashMap<>();
diff --git a/hippo4j-monitor/hippo4j-monitor-elasticsearch/src/main/java/cn/hippo4j/monitor/elasticsearch/DynamicThreadPoolElasticSearchMonitorHandler.java b/hippo4j-monitor/hippo4j-monitor-elasticsearch/src/main/java/cn/hippo4j/monitor/elasticsearch/DynamicThreadPoolElasticSearchMonitorHandler.java
index 36101cb8..203d2457 100644
--- a/hippo4j-monitor/hippo4j-monitor-elasticsearch/src/main/java/cn/hippo4j/monitor/elasticsearch/DynamicThreadPoolElasticSearchMonitorHandler.java
+++ b/hippo4j-monitor/hippo4j-monitor-elasticsearch/src/main/java/cn/hippo4j/monitor/elasticsearch/DynamicThreadPoolElasticSearchMonitorHandler.java
@@ -25,6 +25,8 @@ import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.monitor.base.AbstractDynamicThreadPoolMonitor;
import cn.hippo4j.monitor.base.MonitorTypeEnum;
import cn.hippo4j.monitor.elasticsearch.model.ElasticSearchThreadPoolRunStateInfo;
+import lombok.Builder;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
@@ -63,7 +65,7 @@ public class DynamicThreadPoolElasticSearchMonitorHandler extends AbstractDynami
List rawMapping = FileUtil.readLines(Thread.currentThread().getContextClassLoader().getResource("mapping.json").getPath(), StandardCharsets.UTF_8);
String mapping = String.join(" ", rawMapping);
// if index doesn't exsit, this function may try to create one, but recommend to create index manually.
- this.createIndex(indexName, "_doc", mapping, null, null, null);
+ this.createIndex(EsIndex.builder().index(indexName).type("_doc").mapping(mapping).build());
}
esThreadPoolRunStateInfo.setApplicationName(applicationName);
esThreadPoolRunStateInfo.setId(indexName + "-" + System.currentTimeMillis());
@@ -104,20 +106,20 @@ public class DynamicThreadPoolElasticSearchMonitorHandler extends AbstractDynami
return isIndexExist.get();
}
- public void createIndex(String index, String type, String mapping, Integer shards, Integer replicas, String alias) {
+ public void createIndex(EsIndex esIndex) {
RestHighLevelClient client = ElasticSearchClientHolder.getClient();
boolean acknowledged = false;
- CreateIndexRequest request = new CreateIndexRequest(index);
- if (StringUtils.hasText(mapping)) {
- request.mapping(type, mapping, XContentType.JSON);
+ CreateIndexRequest request = new CreateIndexRequest(esIndex.getIndex());
+ if (StringUtils.hasText(esIndex.getMapping())) {
+ request.mapping(esIndex.getType(), esIndex.getMapping(), XContentType.JSON);
}
- if (!Objects.isNull(shards) && !Objects.isNull(replicas)) {
+ if (!Objects.isNull(esIndex.getShards()) && !Objects.isNull(esIndex.getReplicas())) {
request.settings(Settings.builder()
- .put("index.number_of_shards", shards) // 5
- .put("index.number_of_replicas", replicas));// 1
+ .put("index.number_of_shards", esIndex.getShards())
+ .put("index.number_of_replicas", esIndex.getReplicas()));
}
- if (StringUtils.hasText(alias)) {
- request.alias(new Alias(alias));
+ if (StringUtils.hasText(esIndex.getAlias())) {
+ request.alias(new Alias(esIndex.getAlias()));
}
try {
CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
@@ -138,4 +140,18 @@ public class DynamicThreadPoolElasticSearchMonitorHandler extends AbstractDynami
public String getType() {
return MonitorTypeEnum.ELASTICSEARCH.name().toLowerCase();
}
+
+ /**
+ * Es Index
+ */
+ @Getter
+ @Builder
+ private static class EsIndex {
+ String index;
+ String type;
+ String mapping;
+ Integer shards;
+ Integer replicas;
+ String alias;
+ }
}
diff --git a/hippo4j-monitor/hippo4j-monitor-elasticsearch/src/main/java/cn/hippo4j/monitor/elasticsearch/ElasticSearchClientHolder.java b/hippo4j-monitor/hippo4j-monitor-elasticsearch/src/main/java/cn/hippo4j/monitor/elasticsearch/ElasticSearchClientHolder.java
index f30bbdc6..ba428722 100644
--- a/hippo4j-monitor/hippo4j-monitor-elasticsearch/src/main/java/cn/hippo4j/monitor/elasticsearch/ElasticSearchClientHolder.java
+++ b/hippo4j-monitor/hippo4j-monitor-elasticsearch/src/main/java/cn/hippo4j/monitor/elasticsearch/ElasticSearchClientHolder.java
@@ -28,8 +28,6 @@ import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.core.env.Environment;
-import java.io.PrintWriter;
-import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
@@ -66,9 +64,7 @@ public class ElasticSearchClientHolder {
log.info("[ES RestHighLevelClient] success to connect es!host:{},scheme:{}", host, scheme);
return client;
} catch (Exception ex) {
- StringWriter stackTrace = new StringWriter();
- ex.printStackTrace(new PrintWriter(stackTrace));
- log.error("[ES RestHighLevelClient] fail to connect es! cause:{}", stackTrace);
+ log.error("[ES RestHighLevelClient] fail to connect es! cause:", ex);
}
return null;
}
diff --git a/hippo4j-monitor/hippo4j-monitor-elasticsearch/src/main/java/cn/hippo4j/monitor/elasticsearch/model/ElasticSearchThreadPoolRunStateInfo.java b/hippo4j-monitor/hippo4j-monitor-elasticsearch/src/main/java/cn/hippo4j/monitor/elasticsearch/model/ElasticSearchThreadPoolRunStateInfo.java
index cc4da6e1..e72e9a27 100644
--- a/hippo4j-monitor/hippo4j-monitor-elasticsearch/src/main/java/cn/hippo4j/monitor/elasticsearch/model/ElasticSearchThreadPoolRunStateInfo.java
+++ b/hippo4j-monitor/hippo4j-monitor-elasticsearch/src/main/java/cn/hippo4j/monitor/elasticsearch/model/ElasticSearchThreadPoolRunStateInfo.java
@@ -28,7 +28,7 @@ import lombok.Setter;
@Setter
public class ElasticSearchThreadPoolRunStateInfo extends ThreadPoolRunStateInfo {
- private String Id;
+ private String id;
private String applicationName;
}
diff --git a/hippo4j-monitor/hippo4j-monitor-micrometer/src/main/java/cn/hippo4j/monitor/micrometer/AdapterThreadPoolMicrometerMonitorHandler.java b/hippo4j-monitor/hippo4j-monitor-micrometer/src/main/java/cn/hippo4j/monitor/micrometer/AdapterThreadPoolMicrometerMonitorHandler.java
index 0db66c68..b5e20608 100644
--- a/hippo4j-monitor/hippo4j-monitor-micrometer/src/main/java/cn/hippo4j/monitor/micrometer/AdapterThreadPoolMicrometerMonitorHandler.java
+++ b/hippo4j-monitor/hippo4j-monitor-micrometer/src/main/java/cn/hippo4j/monitor/micrometer/AdapterThreadPoolMicrometerMonitorHandler.java
@@ -35,21 +35,21 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class AdapterThreadPoolMicrometerMonitorHandler extends AbstractAdapterThreadPoolMonitor {
- private final static String METRIC_NAME_PREFIX = "adapter.thread-pool";
+ private static final String METRIC_NAME_PREFIX = "adapter.thread-pool";
- private final static String ADAPTER_THREAD_POOL_ID_TAG = METRIC_NAME_PREFIX + ".id";
+ private static final String ADAPTER_THREAD_POOL_ID_TAG = METRIC_NAME_PREFIX + ".id";
- private final static String APPLICATION_NAME_TAG = "application.name";
+ private static final String APPLICATION_NAME_TAG = "application.name";
- private final Map RUN_STATE_CACHE = new ConcurrentHashMap<>();
+ private final Map runStateCache = new ConcurrentHashMap<>();
@Override
protected void execute(ThreadPoolAdapterState threadPoolAdapterState) {
- ThreadPoolAdapterState stateInfo = RUN_STATE_CACHE.get(threadPoolAdapterState.getThreadPoolKey());
+ ThreadPoolAdapterState stateInfo = runStateCache.get(threadPoolAdapterState.getThreadPoolKey());
if (stateInfo != null) {
BeanUtil.convert(threadPoolAdapterState, stateInfo);
} else {
- RUN_STATE_CACHE.put(threadPoolAdapterState.getThreadPoolKey(), threadPoolAdapterState);
+ runStateCache.put(threadPoolAdapterState.getThreadPoolKey(), threadPoolAdapterState);
}
Environment environment = ApplicationContextHolder.getInstance().getEnvironment();
String applicationName = environment.getProperty("spring.application.name", "application");
diff --git a/hippo4j-monitor/hippo4j-monitor-micrometer/src/main/java/cn/hippo4j/monitor/micrometer/DynamicThreadPoolMicrometerMonitorHandler.java b/hippo4j-monitor/hippo4j-monitor-micrometer/src/main/java/cn/hippo4j/monitor/micrometer/DynamicThreadPoolMicrometerMonitorHandler.java
index f9c9f1d8..19dac923 100644
--- a/hippo4j-monitor/hippo4j-monitor-micrometer/src/main/java/cn/hippo4j/monitor/micrometer/DynamicThreadPoolMicrometerMonitorHandler.java
+++ b/hippo4j-monitor/hippo4j-monitor-micrometer/src/main/java/cn/hippo4j/monitor/micrometer/DynamicThreadPoolMicrometerMonitorHandler.java
@@ -35,21 +35,21 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class DynamicThreadPoolMicrometerMonitorHandler extends AbstractDynamicThreadPoolMonitor {
- private final static String METRIC_NAME_PREFIX = "dynamic.thread-pool";
+ private static final String METRIC_NAME_PREFIX = "dynamic.thread-pool";
- private final static String DYNAMIC_THREAD_POOL_ID_TAG = METRIC_NAME_PREFIX + ".id";
+ private static final String DYNAMIC_THREAD_POOL_ID_TAG = METRIC_NAME_PREFIX + ".id";
- private final static String APPLICATION_NAME_TAG = "application.name";
+ private static final String APPLICATION_NAME_TAG = "application.name";
- private final Map RUN_STATE_CACHE = new ConcurrentHashMap<>();
+ private final Map runStateCache = new ConcurrentHashMap<>();
@Override
protected void execute(ThreadPoolRunStateInfo poolRunStateInfo) {
- ThreadPoolRunStateInfo stateInfo = RUN_STATE_CACHE.get(poolRunStateInfo.getTpId());
+ ThreadPoolRunStateInfo stateInfo = runStateCache.get(poolRunStateInfo.getTpId());
if (stateInfo != null) {
BeanUtil.convert(poolRunStateInfo, stateInfo);
} else {
- RUN_STATE_CACHE.put(poolRunStateInfo.getTpId(), poolRunStateInfo);
+ runStateCache.put(poolRunStateInfo.getTpId(), poolRunStateInfo);
}
Environment environment = ApplicationContextHolder.getInstance().getEnvironment();
String applicationName = environment.getProperty("spring.application.name", "application");
diff --git a/hippo4j-monitor/hippo4j-monitor-micrometer/src/main/java/cn/hippo4j/monitor/micrometer/WebThreadPoolMicrometerMonitorHandler.java b/hippo4j-monitor/hippo4j-monitor-micrometer/src/main/java/cn/hippo4j/monitor/micrometer/WebThreadPoolMicrometerMonitorHandler.java
index 3c0795d9..078cb8c1 100644
--- a/hippo4j-monitor/hippo4j-monitor-micrometer/src/main/java/cn/hippo4j/monitor/micrometer/WebThreadPoolMicrometerMonitorHandler.java
+++ b/hippo4j-monitor/hippo4j-monitor-micrometer/src/main/java/cn/hippo4j/monitor/micrometer/WebThreadPoolMicrometerMonitorHandler.java
@@ -35,21 +35,21 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class WebThreadPoolMicrometerMonitorHandler extends AbstractWebThreadPoolMonitor {
- private final static String METRIC_NAME_PREFIX = "web.thread-pool";
+ private static final String METRIC_NAME_PREFIX = "web.thread-pool";
- private final static String APPLICATION_NAME_TAG = "application.name";
+ private static final String APPLICATION_NAME_TAG = "application.name";
- private final Map RUN_STATE_CACHE = new ConcurrentHashMap<>();
+ private final Map runStateCache = new ConcurrentHashMap<>();
@Override
protected void execute(ThreadPoolRunStateInfo webThreadPoolRunStateInfo) {
Environment environment = ApplicationContextHolder.getInstance().getEnvironment();
String applicationName = environment.getProperty("spring.application.name", "application");
- ThreadPoolRunStateInfo stateInfo = RUN_STATE_CACHE.get(applicationName);
+ ThreadPoolRunStateInfo stateInfo = runStateCache.get(applicationName);
if (stateInfo != null) {
BeanUtil.convert(webThreadPoolRunStateInfo, stateInfo);
} else {
- RUN_STATE_CACHE.put(applicationName, webThreadPoolRunStateInfo);
+ runStateCache.put(applicationName, webThreadPoolRunStateInfo);
}
Iterable tags = CollectionUtil.newArrayList(Tag.of(APPLICATION_NAME_TAG, applicationName));
Metrics.gauge(metricName("current.load"), tags, webThreadPoolRunStateInfo, ThreadPoolRunStateInfo::getSimpleCurrentLoad);
diff --git a/hippo4j-server/hippo4j-bootstrap/src/main/java/cn/hippo4j/server/ServerApplication.java b/hippo4j-server/hippo4j-bootstrap/src/main/java/cn/hippo4j/server/ServerApplication.java
index 239a632f..e235e69a 100644
--- a/hippo4j-server/hippo4j-bootstrap/src/main/java/cn/hippo4j/server/ServerApplication.java
+++ b/hippo4j-server/hippo4j-bootstrap/src/main/java/cn/hippo4j/server/ServerApplication.java
@@ -22,6 +22,9 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.transaction.annotation.EnableTransactionManagement;
+/**
+ * ServerApplication
+ */
@EnableTransactionManagement
@SpringBootApplication(scanBasePackages = "cn.hippo4j")
@MapperScan(basePackages = {"cn.hippo4j.config.mapper", "cn.hippo4j.auth.mapper"})
diff --git a/hippo4j-server/hippo4j-bootstrap/src/main/java/cn/hippo4j/server/init/LocalDataSourceLoader.java b/hippo4j-server/hippo4j-bootstrap/src/main/java/cn/hippo4j/server/init/LocalDataSourceLoader.java
index ae59559b..6cba358f 100644
--- a/hippo4j-server/hippo4j-bootstrap/src/main/java/cn/hippo4j/server/init/LocalDataSourceLoader.java
+++ b/hippo4j-server/hippo4j-bootstrap/src/main/java/cn/hippo4j/server/init/LocalDataSourceLoader.java
@@ -35,7 +35,8 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
-import java.sql.*;
+import java.sql.Connection;
+import java.sql.DriverManager;
import java.util.Objects;
/**
diff --git a/hippo4j-server/hippo4j-discovery/src/main/java/cn/hippo4j/discovery/controller/ApplicationController.java b/hippo4j-server/hippo4j-discovery/src/main/java/cn/hippo4j/discovery/controller/ApplicationController.java
index a72b197a..089ed82a 100644
--- a/hippo4j-server/hippo4j-discovery/src/main/java/cn/hippo4j/discovery/controller/ApplicationController.java
+++ b/hippo4j-server/hippo4j-discovery/src/main/java/cn/hippo4j/discovery/controller/ApplicationController.java
@@ -25,7 +25,12 @@ import cn.hippo4j.discovery.core.InstanceRegistry;
import cn.hippo4j.discovery.core.Lease;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.web.bind.annotation.*;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
import java.util.List;
diff --git a/hippo4j-server/hippo4j-discovery/src/main/java/cn/hippo4j/discovery/core/BaseInstanceRegistry.java b/hippo4j-server/hippo4j-discovery/src/main/java/cn/hippo4j/discovery/core/BaseInstanceRegistry.java
index ad6ed740..54bf589b 100644
--- a/hippo4j-server/hippo4j-discovery/src/main/java/cn/hippo4j/discovery/core/BaseInstanceRegistry.java
+++ b/hippo4j-server/hippo4j-discovery/src/main/java/cn/hippo4j/discovery/core/BaseInstanceRegistry.java
@@ -21,11 +21,13 @@ import cn.hippo4j.common.design.builder.ThreadFactoryBuilder;
import cn.hippo4j.common.design.observer.AbstractSubjectCenter;
import cn.hippo4j.common.model.InstanceInfo;
import cn.hippo4j.common.model.InstanceInfo.InstanceStatus;
+import cn.hippo4j.common.toolkit.CollectionUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TimerTask;
@@ -48,26 +50,21 @@ import static cn.hippo4j.common.constant.Constants.SCHEDULED_THREAD_CORE_NUM;
@Service
public class BaseInstanceRegistry implements InstanceRegistry {
- private final int containerSize = 1024;
+ private static final int CONTAINER_SIZE = 1024;
- private final ConcurrentHashMap>> registry = new ConcurrentHashMap(containerSize);
+ private final ConcurrentHashMap>> registry = new ConcurrentHashMap<>(CONTAINER_SIZE);
@Override
public List> listInstance(String appName) {
Map> appNameLeaseMap = registry.get(appName);
- if (CollectionUtils.isEmpty(appNameLeaseMap)) {
- return new ArrayList<>();
- }
- List> appNameLeaseList = new ArrayList<>();
- appNameLeaseMap.values().forEach(each -> appNameLeaseList.add(each));
- return appNameLeaseList;
+ return CollectionUtils.isEmpty(appNameLeaseMap) ? Collections.emptyList() : new ArrayList<>(appNameLeaseMap.values());
}
@Override
public void register(InstanceInfo registrant) {
Map> registerMap = registry.get(registrant.getAppName());
if (registerMap == null) {
- ConcurrentHashMap> registerNewMap = new ConcurrentHashMap(12);
+ ConcurrentHashMap> registerNewMap = new ConcurrentHashMap<>();
registerMap = registry.putIfAbsent(registrant.getAppName(), registerNewMap);
if (registerMap == null) {
registerMap = registerNewMap;
@@ -98,8 +95,11 @@ public class BaseInstanceRegistry implements InstanceRegistry {
String appName = instanceRenew.getAppName();
String instanceId = instanceRenew.getInstanceId();
Map> registryMap = registry.get(appName);
- Lease leaseToRenew;
- if (registryMap == null || (leaseToRenew = registryMap.get(instanceId)) == null) {
+ if (registryMap == null) {
+ return false;
+ }
+ Lease leaseToRenew = registryMap.get(instanceId);
+ if (leaseToRenew == null) {
return false;
}
leaseToRenew.renew();
@@ -111,20 +111,20 @@ public class BaseInstanceRegistry implements InstanceRegistry {
String appName = info.getAppName();
String instanceId = info.getInstanceId();
Map> leaseMap = registry.get(appName);
- if (CollectionUtils.isEmpty(leaseMap)) {
+ if (CollectionUtil.isNotEmpty(leaseMap)) {
+ Lease remove = leaseMap.remove(instanceId);
+ if (remove != null) {
+ log.info("Remove unhealthy node, node ID: {}", instanceId);
+ } else {
+ log.warn("Failed to remove unhealthy node, no instance found: {}", instanceId);
+ }
+ } else {
log.warn("Failed to remove unhealthy node, no application found: {}", appName);
- return;
- }
- Lease remove = leaseMap.remove(instanceId);
- if (remove == null) {
- log.warn("Failed to remove unhealthy node, no instance found: {}", instanceId);
- return;
}
- log.info("Remove unhealthy node, node ID: {}", instanceId);
}
public void evict(long additionalLeaseMs) {
- List> expiredLeases = new ArrayList();
+ List> expiredLeases = new ArrayList<>();
for (Map.Entry>> groupEntry : registry.entrySet()) {
Map> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
@@ -146,7 +146,7 @@ public class BaseInstanceRegistry implements InstanceRegistry {
protected boolean internalCancel(String appName, String id, String identify) {
Map> registerMap = registry.get(appName);
- if (!CollectionUtils.isEmpty(registerMap)) {
+ if (CollectionUtil.isNotEmpty(registerMap)) {
registerMap.remove(id);
AbstractSubjectCenter.notify(AbstractSubjectCenter.SubjectType.CLEAR_CONFIG_CACHE, () -> identify);
log.info("Clean up unhealthy nodes. Node id: {}", id);
@@ -154,6 +154,9 @@ public class BaseInstanceRegistry implements InstanceRegistry {
return true;
}
+ /**
+ * EvictionTask
+ */
public class EvictionTask extends TimerTask {
private final AtomicLong lastExecutionNanosRef = new AtomicLong(0L);
@@ -177,7 +180,7 @@ public class BaseInstanceRegistry implements InstanceRegistry {
}
long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
long compensationTime = elapsedMs - EVICTION_INTERVAL_TIMER_IN_MS;
- return compensationTime <= 0L ? 0L : compensationTime;
+ return Math.max(compensationTime, 0L);
}
long getCurrentTimeNano() {
@@ -193,7 +196,7 @@ public class BaseInstanceRegistry implements InstanceRegistry {
.daemon(true)
.build());
- private final AtomicReference evictionTaskRef = new AtomicReference();
+ private final AtomicReference evictionTaskRef = new AtomicReference<>();
public void postInit() {
evictionTaskRef.set(new BaseInstanceRegistry.EvictionTask());
diff --git a/hippo4j-server/hippo4j-discovery/src/main/java/cn/hippo4j/discovery/core/Lease.java b/hippo4j-server/hippo4j-discovery/src/main/java/cn/hippo4j/discovery/core/Lease.java
index ea76264a..0d796f9e 100644
--- a/hippo4j-server/hippo4j-discovery/src/main/java/cn/hippo4j/discovery/core/Lease.java
+++ b/hippo4j-server/hippo4j-discovery/src/main/java/cn/hippo4j/discovery/core/Lease.java
@@ -42,13 +42,13 @@ public class Lease {
private long duration;
- public static final int DEFAULT_DURATION_IN_SECS = 90;
+ public static final long DEFAULT_DURATION_IN_SECS = 90 * 1000L;
public Lease(T r) {
holder = r;
registrationTimestamp = System.currentTimeMillis();
lastUpdateTimestamp = registrationTimestamp;
- duration = DEFAULT_DURATION_IN_SECS * 1000;
+ duration = DEFAULT_DURATION_IN_SECS;
}
public void renew() {