优化 hippo4j starter 代码格式.

pull/40/head
chen.ma 3 years ago
parent b1229956f2
commit d545b93511

@ -42,12 +42,11 @@ public class CacheData {
this.content = ContentUtil.getPoolContent(GlobalThreadPoolManage.getPoolParameter(tpId));
this.md5 = getMd5String(content);
this.listeners = new CopyOnWriteArrayList();
}
public void addListener(Listener listener) {
if (null == listener) {
throw new IllegalArgumentException("listener is null");
throw new IllegalArgumentException("Listener is null.");
}
ManagerListenerWrapper managerListenerWrap = new ManagerListenerWrapper(md5, listener);

@ -62,7 +62,7 @@ public class ClientWorker {
});
this.executorService = Executors.newSingleThreadScheduledExecutor(
ThreadFactoryBuilder.builder().prefix("client-long-polling-executor").daemon(true).build()
ThreadFactoryBuilder.builder().prefix("client.long.polling.executor").daemon(true).build()
);
log.info("Client identity :: {}", identification);
@ -71,7 +71,7 @@ public class ClientWorker {
try {
checkConfigInfo();
} catch (Throwable e) {
log.error("[Sub check] rotate check error", e);
log.error("Sub check rotate check error.", e);
}
}, 1L, 10L, TimeUnit.MILLISECONDS);
}
@ -176,7 +176,7 @@ public class ClientWorker {
}
} catch (Exception ex) {
setHealthServer(false);
log.error("[Check update] get changed dataId exception. error message :: {}", ex.getMessage());
log.error("Check update get changed dataId exception. error message :: {}", ex.getMessage());
}
return Collections.emptyList();
@ -194,7 +194,7 @@ public class ClientWorker {
return JSONUtil.toJSONString(result.getData());
}
log.error("[Sub server] namespace :: {}, itemId :: {}, tpId :: {}, result code :: {}",
log.error("Sub server namespace :: {}, itemId :: {}, tpId :: {}, result code :: {}",
namespace, itemId, tpId, result.getCode());
return NULL;
}
@ -207,7 +207,7 @@ public class ClientWorker {
try {
response = URLDecoder.decode(response, "UTF-8");
} catch (Exception e) {
log.error("[Polling resp] decode modifiedDataIdsString error", e);
log.error("Polling resp decode modifiedDataIdsString error.", e);
}
@ -222,7 +222,7 @@ public class ClientWorker {
updateList.add(GroupKey.getKeyTenant(dataId, group, tenant));
log.info("Refresh thread pool changed. [{}]", dataId);
} else {
log.error("[{}] [Polling resp] invalid dataIdAndGroup error.", dataIdAndGroup);
log.error("[{}] Polling resp invalid dataIdAndGroup error.", dataIdAndGroup);
}
}
}

@ -13,8 +13,14 @@ import lombok.Data;
@AllArgsConstructor
public class ConfigEmptyException extends RuntimeException {
/**
* description
*/
private String description;
/**
* action
*/
private String action;
}

@ -7,7 +7,6 @@ import cn.hippo4j.common.web.base.Results;
import cn.hippo4j.common.web.exception.ErrorCodeEnum;
import cn.hippo4j.starter.remote.HttpAgent;
import cn.hippo4j.starter.toolkit.thread.ThreadFactoryBuilder;
import cn.hippo4j.starter.toolkit.thread.ThreadPoolBuilder;
import cn.hutool.core.text.StrBuilder;
import cn.hutool.core.util.StrUtil;
import com.google.common.collect.Maps;
@ -15,9 +14,12 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static cn.hippo4j.common.constant.Constants.*;
import static cn.hippo4j.common.constant.Constants.BASE_PATH;
import static cn.hippo4j.common.constant.Constants.GROUP_KEY;
/**
* Discovery client.
@ -28,8 +30,6 @@ import static cn.hippo4j.common.constant.Constants.*;
@Slf4j
public class DiscoveryClient implements DisposableBean {
private final ThreadPoolExecutor heartbeatExecutor;
private final ScheduledExecutorService scheduler;
private final HttpAgent httpAgent;
@ -46,16 +46,10 @@ public class DiscoveryClient implements DisposableBean {
this.httpAgent = httpAgent;
this.instanceInfo = instanceInfo;
this.appPathIdentifier = instanceInfo.getAppName().toUpperCase() + "/" + instanceInfo.getInstanceId();
this.heartbeatExecutor = ThreadPoolBuilder.builder()
.poolThreadSize(1, 5)
.keepAliveTime(0, TimeUnit.SECONDS)
.workQueue(new SynchronousQueue())
.threadFactory("DiscoveryClient-HeartbeatExecutor", true)
.build();
this.scheduler = new ScheduledThreadPoolExecutor(
new Integer(1),
ThreadFactoryBuilder.builder().daemon(true).prefix("DiscoveryClient-Scheduler").build()
ThreadFactoryBuilder.builder().daemon(true).prefix("client.discovery.scheduler").build()
);
register();

@ -127,13 +127,13 @@ public abstract class DynamicExecutorConfigurationSupport extends ThreadPoolExec
if (!executor.awaitTermination(this.awaitTerminationMillis, TimeUnit.MILLISECONDS)) {
if (log.isWarnEnabled()) {
log.warn("Timed out while waiting for executor" +
(this.threadPoolId != null ? " '" + this.threadPoolId + "'" : "") + " to terminate");
(this.threadPoolId != null ? " '" + this.threadPoolId + "'" : "") + " to terminate.");
}
}
} catch (InterruptedException ex) {
if (log.isWarnEnabled()) {
log.warn("Interrupted while waiting for executor" +
(this.threadPoolId != null ? " '" + this.threadPoolId + "'" : "") + " to terminate");
(this.threadPoolId != null ? " '" + this.threadPoolId + "'" : "") + " to terminate.");
}
Thread.currentThread().interrupt();
}

@ -15,7 +15,6 @@ import cn.hippo4j.starter.toolkit.thread.ThreadPoolBuilder;
import cn.hippo4j.starter.wrapper.DynamicThreadPoolWrapper;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import lombok.var;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.core.task.TaskDecorator;
@ -54,24 +53,24 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
.workQueue(QueueTypeEnum.ARRAY_BLOCKING_QUEUE)
.capacity(1024)
.allowCoreThreadTimeOut(true)
.threadFactory("dynamic-threadPool-change-config")
.threadFactory("client.dynamic.threadPool.change.config")
.rejected(new ThreadPoolExecutor.AbortPolicy())
.build();
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof DynamicThreadPoolExecutor) {
var dynamicThreadPool = ApplicationContextHolder.findAnnotationOnBean(beanName, DynamicThreadPool.class);
DynamicThreadPool dynamicThreadPool = ApplicationContextHolder.findAnnotationOnBean(beanName, DynamicThreadPool.class);
if (Objects.isNull(dynamicThreadPool)) {
return bean;
}
var dynamicExecutor = (DynamicThreadPoolExecutor) bean;
var wrap = new DynamicThreadPoolWrapper(dynamicExecutor.getThreadPoolId(), dynamicExecutor);
var remoteExecutor = fillPoolAndRegister(wrap);
DynamicThreadPoolExecutor dynamicExecutor = (DynamicThreadPoolExecutor) bean;
DynamicThreadPoolWrapper wrap = new DynamicThreadPoolWrapper(dynamicExecutor.getThreadPoolId(), dynamicExecutor);
ThreadPoolExecutor remoteExecutor = fillPoolAndRegister(wrap);
subscribeConfig(wrap);
return remoteExecutor;
} else if (bean instanceof DynamicThreadPoolWrapper) {
var wrap = (DynamicThreadPoolWrapper) bean;
DynamicThreadPoolWrapper wrap = (DynamicThreadPoolWrapper) bean;
registerAndSubscribe(wrap);
}
@ -136,7 +135,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
poolExecutor = dynamicThreadPoolWrap.getExecutor() != null ? dynamicThreadPoolWrap.getExecutor() : CommonDynamicThreadPool.getInstance(tpId);
dynamicThreadPoolWrap.setExecutor(poolExecutor);
log.error("[Init pool] Failed to initialize thread pool configuration. error message :: {}", ex.getMessage());
log.error("Failed to initialize thread pool configuration. error message :: {}", ex.getMessage());
} finally {
if (Objects.isNull(dynamicThreadPoolWrap.getExecutor())) {
dynamicThreadPoolWrap.setExecutor(CommonDynamicThreadPool.getInstance(tpId));

@ -72,7 +72,7 @@ public class ThreadPoolDynamicRefresh {
ResizableCapacityLinkedBlockIngQueue queue = (ResizableCapacityLinkedBlockIngQueue) executor.getQueue();
queue.setCapacity(parameter.getCapacity());
} else {
log.warn("[Pool change] The queue length cannot be modified. Queue type mismatch. Current queue type :: {}", executor.getQueue().getClass().getSimpleName());
log.warn("The queue length cannot be modified. Queue type mismatch. Current queue type :: {}", executor.getQueue().getClass().getSimpleName());
}
}

@ -21,7 +21,7 @@ import static cn.hippo4j.common.constant.Constants.AVAILABLE_PROCESSORS;
public class MonitorEventExecutor {
private static final ExecutorService EVENT_EXECUTOR = ThreadPoolBuilder.builder()
.threadFactory("monitor-event-executor")
.threadFactory("client.monitor.event.executor")
.corePoolSize(AVAILABLE_PROCESSORS)
.maxPoolNum(AVAILABLE_PROCESSORS)
.workQueue(QueueTypeEnum.LINKED_BLOCKING_QUEUE)

@ -80,7 +80,7 @@ public class ReportingEventExecutor implements Runnable, CommandLineRunner, Disp
Integer bufferSize = properties.getTaskBufferSize();
messageCollectVessel = new ArrayBlockingQueue(bufferSize);
String collectVesselTaskName = "collect-data-scheduled";
String collectVesselTaskName = "client.scheduled.collect.data";
collectVesselExecutor = new ScheduledThreadPoolExecutor(
new Integer(1),
ThreadFactoryBuilder.builder().daemon(true).prefix(collectVesselTaskName).build()
@ -95,7 +95,7 @@ public class ReportingEventExecutor implements Runnable, CommandLineRunner, Disp
);
// 启动上报监控数据线程
String reportingTaskName = "reporting-task";
String reportingTaskName = "client.thread.reporting.task";
ThreadUtil.newThread(this, reportingTaskName, Boolean.TRUE).start();
// 获取所有数据采集组件, 目前仅有历史运行数据采集
@ -111,7 +111,7 @@ public class ReportingEventExecutor implements Runnable, CommandLineRunner, Disp
}
/**
* 线,
* 线, .
*/
private void runTimeGatherTask() {
boolean healthStatus = serverHealthCheck.isHealthStatus();

@ -44,11 +44,11 @@ public abstract class AbstractHealthCheck implements ServerHealthCheck, Initiali
private final Condition healthCondition = healthMainLock.newCondition();
/**
* Health check executor.
* Health check executor
*/
private final ScheduledThreadPoolExecutor healthCheckExecutor = new ScheduledThreadPoolExecutor(
new Integer(1),
ThreadFactoryBuilder.builder().daemon(true).prefix("health-check-scheduled").build()
ThreadFactoryBuilder.builder().daemon(true).prefix("client.scheduled.health.check").build()
);
/**

@ -46,7 +46,7 @@ public class ServerHttpAgent implements HttpAgent {
this.securityProxy.applyToken(this.serverListManager.getServerUrls());
this.executorService = new ScheduledThreadPoolExecutor(
new Integer(1),
ThreadFactoryBuilder.builder().daemon(true).prefix("token-security-updater").build()
ThreadFactoryBuilder.builder().daemon(true).prefix("client.scheduled.token.security.updater").build()
);
this.executorService.scheduleWithFixedDelay(

@ -8,6 +8,13 @@ package cn.hippo4j.starter.toolkit;
*/
public class CalculateUtil {
/**
* Divide.
*
* @param num1
* @param num2
* @return
*/
public static int divide(int num1, int num2) {
return ((int) (Double.parseDouble(num1 + "") / Double.parseDouble(num2 + "") * 100));
}

@ -27,7 +27,7 @@ public class HttpClientUtil {
private static int HTTP_OK_CODE = 200;
/**
* Get
* Get .
*
* @param url
* @return
@ -43,7 +43,7 @@ public class HttpClientUtil {
}
/**
* Get ,
* Get , .
*
* @param url
* @param queryString
@ -55,7 +55,7 @@ public class HttpClientUtil {
}
/**
* Json
* Json .
*
* @param url
* @param clazz
@ -67,7 +67,7 @@ public class HttpClientUtil {
}
/**
*
* .
*
* @param url
* @param clazz
@ -81,7 +81,7 @@ public class HttpClientUtil {
}
/**
* Get ,
* Get , .
*
* @param url
* @param queryString
@ -96,7 +96,7 @@ public class HttpClientUtil {
}
/**
* Rest Post
* Rest Post .
*
* @param url
* @param body
@ -112,8 +112,8 @@ public class HttpClientUtil {
}
/**
* Rest Post
*
* Rest Post .
* .
*
* @param url
* @param body
@ -125,7 +125,7 @@ public class HttpClientUtil {
}
/**
* Url
* Url.
*
* @param url
* @param queryString

@ -21,9 +21,9 @@ import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class AbstractBuildThreadPoolTemplate {
/**
* 线
* 线.
* <p>
* , , abstract
* , , abstract.
* {@link AbstractQueuedSynchronizer#tryAcquire}
*
* @return
@ -33,7 +33,7 @@ public class AbstractBuildThreadPoolTemplate {
}
/**
* 线
* 线.
*
* @return
*/
@ -43,7 +43,7 @@ public class AbstractBuildThreadPoolTemplate {
}
/**
* 线
* 线.
*
* @return
*/
@ -67,7 +67,7 @@ public class AbstractBuildThreadPoolTemplate {
}
/**
* 线
* 线.
*
* @return
*/
@ -77,7 +77,7 @@ public class AbstractBuildThreadPoolTemplate {
}
/**
* 线
* 线.
*
* @return
*/
@ -102,7 +102,7 @@ public class AbstractBuildThreadPoolTemplate {
}
/**
* 线
* 线.
*
* @param initParam
* @return

@ -48,7 +48,7 @@ public class FastThreadPoolExecutor extends ThreadPoolExecutorTemplate {
try {
if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException("队列容量已满.", rx);
throw new RejectedExecutionException("The blocking queue capacity is full.", rx);
}
} catch (InterruptedException x) {
submittedTaskCount.decrementAndGet();

@ -49,7 +49,7 @@ public class RejectedPolicies {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
log.error("线程池添加队列任务失败", e);
log.error("Adding Queue task to thread pool failed.", e);
}
};
}

@ -47,4 +47,5 @@ public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable>
}
return super.offer(o, timeout, unit);
}
}

@ -40,7 +40,7 @@ public class ThreadFactoryBuilder implements Builder<ThreadFactory> {
private Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
/**
* 线线
* 线线.
*
* @param backingThreadFactory 线线
* @return this
@ -51,7 +51,7 @@ public class ThreadFactoryBuilder implements Builder<ThreadFactory> {
}
/**
* 线, mb-thread- 线 mb-thread-1
* 线, mb-thread- 线 mb-thread-1 .
*
* @param namePrefix 线
* @return this
@ -62,7 +62,7 @@ public class ThreadFactoryBuilder implements Builder<ThreadFactory> {
}
/**
* 线
* 线.
*
* @param daemon 线
* @return this
@ -73,7 +73,7 @@ public class ThreadFactoryBuilder implements Builder<ThreadFactory> {
}
/**
* 线
* 线.
*
* @param priority
* @return this
@ -93,7 +93,7 @@ public class ThreadFactoryBuilder implements Builder<ThreadFactory> {
}
/**
*
* .
*
* @param uncaughtExceptionHandler {@link Thread.UncaughtExceptionHandler}
*/
@ -102,7 +102,7 @@ public class ThreadFactoryBuilder implements Builder<ThreadFactory> {
}
/**
*
* .
*
* @return
*/
@ -121,7 +121,7 @@ public class ThreadFactoryBuilder implements Builder<ThreadFactory> {
}
/**
*
* .
*
* @param builder {@link ThreadFactoryBuilder}
* @return {@link ThreadFactory}

@ -245,7 +245,7 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
}
/**
*
* .
*
* @return
*/
@ -258,7 +258,7 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
}
/**
*
* .
*
* @return
*/
@ -267,7 +267,7 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
}
/**
* 线
* 线.
*
* @param builder
* @return
@ -277,7 +277,7 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
}
/**
* 线
* 线.
*
* @param builder
* @return
@ -287,7 +287,7 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
}
/**
* 线
* 线.
*
* @param builder
* @return
@ -297,7 +297,7 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
}
/**
*
* .
*
* @param builder
* @return

@ -23,7 +23,7 @@ public class ThreadPoolExecutorTemplate extends ThreadPoolExecutor {
}
private Exception clientTrace() {
return new Exception("tread task root stack trace");
return new Exception("Tread task root stack trace.");
}
@Override

@ -18,14 +18,30 @@ import java.util.concurrent.ThreadPoolExecutor;
@Data
public class DynamicThreadPoolWrapper implements DisposableBean {
/**
* Tenant id
*/
private String tenantId;
/**
* Item id
*/
private String itemId;
/**
* Thread pool id
*/
private String tpId;
/**
* Subscribe flag
*/
private boolean subscribeFlag;
/**
* executor
* {@link cn.hippo4j.starter.core.DynamicThreadPoolExecutor}
*/
private ThreadPoolExecutor executor;
/**
@ -38,7 +54,7 @@ public class DynamicThreadPoolWrapper implements DisposableBean {
}
/**
* 线, 使 threadPoolExecutor
* 线, 使 threadPoolExecutor.
*
* @param threadPoolId
* @param threadPoolExecutor
@ -49,7 +65,7 @@ public class DynamicThreadPoolWrapper implements DisposableBean {
}
/**
*
* .
*
* @param command
*/
@ -58,7 +74,7 @@ public class DynamicThreadPoolWrapper implements DisposableBean {
}
/**
*
* .
*
* @param task
* @return
@ -68,7 +84,7 @@ public class DynamicThreadPoolWrapper implements DisposableBean {
}
/**
*
* .
*
* @param task
* @param <T>

@ -1,7 +1,9 @@
package cn.hippo4j.starter.wrapper;
import cn.hippo4j.starter.core.Listener;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
/**
@ -12,15 +14,18 @@ import lombok.Setter;
*/
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class ManagerListenerWrapper {
/**
* Last call md5
*/
private String lastCallMd5;
final Listener listener;
public ManagerListenerWrapper(String md5, Listener listener) {
this.lastCallMd5 = md5;
this.listener = listener;
}
/**
* Listener
*/
private Listener listener;
}

Loading…
Cancel
Save