Provide server services to view registered nodes and health status in real time.

pull/161/head
chen.ma 3 years ago
parent 3cf2bbc988
commit 5cf3009db9

@ -44,4 +44,8 @@ public class Constants {
public static final String GROUP_KEY_DELIMITER = "+"; public static final String GROUP_KEY_DELIMITER = "+";
public static final long EVICTION_INTERVAL_TIMER_IN_MS = 60 * 1000;
public static final int SCHEDULED_THREAD_CORE_NUM = 1;
} }

@ -0,0 +1,135 @@
package com.github.dynamic.threadpool.common.model;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
/**
* Instance Info.
*
* @author chen.ma
* @date 2021/7/13 22:10
*/
@Slf4j
@Getter
@Setter
public class InstanceInfo {
private static final String UNKNOWN = "unknown";
private String appName = UNKNOWN;
private String hostName;
private String instanceId;
private volatile String vipAddress;
private volatile String secureVipAddress;
private volatile ActionType actionType;
private volatile boolean isInstanceInfoDirty = false;
private volatile Long lastUpdatedTimestamp;
private volatile Long lastDirtyTimestamp;
private volatile InstanceStatus status = InstanceStatus.UP;
private volatile InstanceStatus overriddenStatus = InstanceStatus.UNKNOWN;
public InstanceInfo() {
this.lastUpdatedTimestamp = System.currentTimeMillis();
this.lastDirtyTimestamp = lastUpdatedTimestamp;
}
public void setLastUpdatedTimestamp() {
this.lastUpdatedTimestamp = System.currentTimeMillis();
}
public Long getLastDirtyTimestamp() {
return lastDirtyTimestamp;
}
public synchronized void setOverriddenStatus(InstanceStatus status) {
if (this.overriddenStatus != status) {
this.overriddenStatus = status;
}
}
public InstanceStatus getStatus() {
return status;
}
public synchronized void setIsDirty() {
isInstanceInfoDirty = true;
lastDirtyTimestamp = System.currentTimeMillis();
}
public synchronized long setIsDirtyWithTime() {
setIsDirty();
return lastDirtyTimestamp;
}
public synchronized void unsetIsDirty(long unsetDirtyTimestamp) {
if (lastDirtyTimestamp <= unsetDirtyTimestamp) {
isInstanceInfoDirty = false;
}
}
public void setActionType(ActionType actionType) {
this.actionType = actionType;
}
public enum InstanceStatus {
UP,
DOWN,
STARTING,
OUT_OF_SERVICE,
UNKNOWN;
public static InstanceStatus toEnum(String s) {
if (s != null) {
try {
return InstanceStatus.valueOf(s.toUpperCase());
} catch (IllegalArgumentException e) {
// ignore and fall through to unknown
log.debug("illegal argument supplied to InstanceStatus.valueOf: {}, defaulting to {}", s, UNKNOWN);
}
}
return UNKNOWN;
}
}
public enum ActionType {
ADDED,
MODIFIED,
DELETED
}
@Data
@Accessors(chain = true)
public static class InstanceRenew {
private String appName;
private String instanceId;
private String lastDirtyTimestamp;
private String status;
}
}

@ -22,8 +22,17 @@
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId> <artifactId>spring-boot-starter-web</artifactId>
<scope>test</scope> </dependency>
<dependency>
<groupId>com.github.dynamic-threadpool</groupId>
<artifactId>dynamic-threadpool-common</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency> </dependency>
</dependencies> </dependencies>

@ -0,0 +1,26 @@
package com.github.dynamic.threadpool.registry.config;
import com.github.dynamic.threadpool.registry.core.BaseInstanceRegistry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
/**
* Registry Configuration.
*
* @author chen.ma
* @date 2021/8/12 21:48
*/
@Configuration
public class RegistryConfiguration {
@Autowired
private BaseInstanceRegistry baseInstanceRegistry;
@PostConstruct
public void registryInit() {
baseInstanceRegistry.postInit();
}
}

@ -0,0 +1,55 @@
package com.github.dynamic.threadpool.registry.controller;
import com.github.dynamic.threadpool.common.model.InstanceInfo;
import com.github.dynamic.threadpool.common.web.base.Result;
import com.github.dynamic.threadpool.common.web.base.Results;
import com.github.dynamic.threadpool.common.web.exception.ErrorCodeEnum;
import com.github.dynamic.threadpool.registry.core.InstanceRegistry;
import com.github.dynamic.threadpool.registry.core.Lease;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import static com.github.dynamic.threadpool.common.constant.Constants.BASE_PATH;
/**
* Application Controller.
*
* @author chen.ma
* @date 2021/8/8 22:24
*/
@Slf4j
@RestController
@RequiredArgsConstructor
@RequestMapping(BASE_PATH + "/apps")
public class ApplicationController {
@NonNull
private final InstanceRegistry instanceRegistry;
@GetMapping("/{appName}")
public Result applications(@PathVariable String appName) {
List<Lease<InstanceInfo>> resultInstanceList = instanceRegistry.listInstance(appName);
return Results.success(resultInstanceList);
}
@PostMapping("/register")
public Result addInstance(@RequestBody InstanceInfo instanceInfo) {
instanceRegistry.register(instanceInfo);
return Results.success();
}
@PostMapping("/renew")
public Result renew(@RequestBody InstanceInfo.InstanceRenew instanceRenew) {
boolean isSuccess = instanceRegistry.renew(instanceRenew);
if (!isSuccess) {
log.warn("Not Found (Renew) :: {} - {}", instanceRenew.getAppName(), instanceRenew.getInstanceId());
return Results.failure(ErrorCodeEnum.NOT_FOUND);
}
return Results.success();
}
}

@ -0,0 +1,289 @@
package com.github.dynamic.threadpool.registry.core;
import com.github.dynamic.threadpool.common.model.InstanceInfo;
import com.github.dynamic.threadpool.common.model.InstanceInfo.InstanceStatus;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static com.github.dynamic.threadpool.common.constant.Constants.EVICTION_INTERVAL_TIMER_IN_MS;
import static com.github.dynamic.threadpool.common.constant.Constants.SCHEDULED_THREAD_CORE_NUM;
/**
* Base Instance Registry.
*
* @author chen.ma
* @date 2021/8/8 22:46
*/
@Slf4j
@Service
public class BaseInstanceRegistry implements InstanceRegistry<InstanceInfo> {
private final int CONTAINER_SIZE = 1024;
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Lock read = readWriteLock.readLock();
protected final Object lock = new Object();
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap(CONTAINER_SIZE);
protected volatile int expectedNumberOfClientsSendingRenews;
private final CircularQueue<Pair<Long, String>> recentRegisteredQueue;
private final CircularQueue<Pair<Long, String>> recentCanceledQueue;
private ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue = new ConcurrentLinkedQueue();
protected final ConcurrentMap<String, InstanceStatus> overriddenInstanceStatusMap = CacheBuilder
.newBuilder().initialCapacity(512)
.expireAfterAccess(1, TimeUnit.HOURS)
.<String, InstanceStatus>build().asMap();
public BaseInstanceRegistry() {
this.recentRegisteredQueue = new CircularQueue(CONTAINER_SIZE);
this.recentCanceledQueue = new CircularQueue(CONTAINER_SIZE);
}
@Override
public List<Lease<InstanceInfo>> listInstance(String appName) {
Map<String, Lease<InstanceInfo>> appNameLeaseMap = registry.get(appName);
if (CollectionUtils.isEmpty(appNameLeaseMap)) {
return Lists.newArrayList();
}
List<Lease<InstanceInfo>> appNameLeaseList = Lists.newArrayList();
appNameLeaseMap.values().forEach(each -> appNameLeaseList.add(each));
return appNameLeaseList;
}
@Override
public void register(InstanceInfo registrant) {
read.lock();
try {
Map<String, Lease<InstanceInfo>> registerMap = registry.get(registrant.getAppName());
if (registerMap == null) {
ConcurrentHashMap<String, Lease<InstanceInfo>> registerNewMap = new ConcurrentHashMap(12);
registerMap = registry.putIfAbsent(registrant.getAppName(), registerNewMap);
if (registerMap == null) {
registerMap = registerNewMap;
}
}
Lease<InstanceInfo> existingLease = registerMap.get(registrant.getInstanceId());
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
registrant = existingLease.getHolder();
}
}
Lease<InstanceInfo> lease = new Lease(registrant);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
registerMap.put(registrant.getInstanceId(), lease);
recentRegisteredQueue.add(new Pair(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getInstanceId() + ")"));
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getInstanceId());
if (overriddenStatusFromMap != null) {
log.info("Storing overridden status :: {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(InstanceInfo.ActionType.ADDED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
} finally {
read.unlock();
}
}
@Override
public boolean renew(InstanceInfo.InstanceRenew instanceRenew) {
String appName = instanceRenew.getAppName();
String instanceId = instanceRenew.getInstanceId();
Map<String, Lease<InstanceInfo>> registryMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (registryMap == null || (leaseToRenew = registryMap.get(instanceId)) == null) {
return false;
}
leaseToRenew.renew();
return true;
}
static class CircularQueue<E> extends AbstractQueue<E> {
private final ArrayBlockingQueue<E> delegate;
public CircularQueue(int capacity) {
this.delegate = new ArrayBlockingQueue(capacity);
}
@Override
public Iterator<E> iterator() {
return delegate.iterator();
}
@Override
public int size() {
return delegate.size();
}
@Override
public boolean offer(E e) {
while (!delegate.offer(e)) {
delegate.poll();
}
return true;
}
@Override
public E poll() {
return delegate.poll();
}
@Override
public E peek() {
return delegate.peek();
}
@Override
public void clear() {
delegate.clear();
}
@Override
public Object[] toArray() {
return delegate.toArray();
}
}
private static final class RecentlyChangedItem {
private long lastUpdateTime;
private Lease<InstanceInfo> leaseInfo;
public RecentlyChangedItem(Lease<InstanceInfo> lease) {
this.leaseInfo = lease;
lastUpdateTime = System.currentTimeMillis();
}
public long getLastUpdateTime() {
return this.lastUpdateTime;
}
public Lease<InstanceInfo> getLeaseInfo() {
return this.leaseInfo;
}
}
public void evict(long additionalLeaseMs) {
List<Lease<InstanceInfo>> expiredLeases = new ArrayList();
for (Map.Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Map.Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
for (Lease<InstanceInfo> expiredLease : expiredLeases) {
String appName = expiredLease.getHolder().getAppName();
String id = expiredLease.getHolder().getInstanceId();
internalCancel(appName, id, false);
}
}
protected boolean internalCancel(String appName, String id, boolean isReplication) {
read.lock();
try {
Map<String, Lease<InstanceInfo>> registerMap = registry.get(appName);
if (!CollectionUtils.isEmpty(registerMap)) {
registerMap.remove(id);
}
} finally {
read.unlock();
}
return true;
}
public class EvictionTask extends TimerTask {
private final AtomicLong lastExecutionNanosRef = new AtomicLong(0L);
@Override
public void run() {
try {
long compensationTimeMs = getCompensationTimeMs();
log.info("Running the evict task with compensationTime {} ms", compensationTimeMs);
evict(compensationTimeMs);
} catch (Throwable e) {
log.error("Could not run the evict task", e);
}
}
long getCompensationTimeMs() {
long currNanos = getCurrentTimeNano();
long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);
if (lastNanos == 0L) {
return 0L;
}
long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
long compensationTime = elapsedMs - EVICTION_INTERVAL_TIMER_IN_MS;
return compensationTime <= 0L ? 0L : compensationTime;
}
long getCurrentTimeNano() {
return System.nanoTime();
}
}
private final ScheduledExecutorService scheduledExecutorService =
new ScheduledThreadPoolExecutor(
SCHEDULED_THREAD_CORE_NUM,
new ThreadFactoryBuilder()
.setNameFormat("registry-eviction")
.setDaemon(true)
.build()
);
private final AtomicReference<EvictionTask> evictionTaskRef = new AtomicReference();
public void postInit() {
evictionTaskRef.set(new BaseInstanceRegistry.EvictionTask());
scheduledExecutorService.scheduleWithFixedDelay(evictionTaskRef.get(),
EVICTION_INTERVAL_TIMER_IN_MS, EVICTION_INTERVAL_TIMER_IN_MS, TimeUnit.MILLISECONDS);
}
}

@ -0,0 +1,38 @@
package com.github.dynamic.threadpool.registry.core;
import com.github.dynamic.threadpool.common.model.InstanceInfo;
import java.util.List;
/**
* Instance Registry.
*
* @author chen.ma
* @date 2021/8/8 22:31
*/
public interface InstanceRegistry<T> {
/**
* list Instance.
*
* @param appName
* @return
*/
List<Lease<T>> listInstance(String appName);
/**
* register.
*
* @param info
*/
void register(T info);
/**
* renew.
*
* @param instanceRenew
* @return
*/
boolean renew(InstanceInfo.InstanceRenew instanceRenew);
}

@ -0,0 +1,87 @@
package com.github.dynamic.threadpool.registry.core;
/**
* Lease.
*
* @author chen.ma
* @date 2021/8/8 22:49
*/
public class Lease<T> {
enum Action {
REGISTER, CANCEL, RENEW
}
private T holder;
private long evictionTimestamp;
private long registrationTimestamp;
private long serviceUpTimestamp;
/**
* Make it volatile so that the expiration task would see this quicker
*/
private volatile long lastUpdateTimestamp;
private long duration;
public static final int DEFAULT_DURATION_IN_SECS = 90;
public Lease(T r) {
holder = r;
registrationTimestamp = System.currentTimeMillis();
lastUpdateTimestamp = registrationTimestamp;
duration = DEFAULT_DURATION_IN_SECS * 1000;
}
public void renew() {
lastUpdateTimestamp = System.currentTimeMillis() + duration;
}
public void cancel() {
if (evictionTimestamp <= 0) {
evictionTimestamp = System.currentTimeMillis();
}
}
public void serviceUp() {
if (serviceUpTimestamp == 0) {
serviceUpTimestamp = System.currentTimeMillis();
}
}
public void setServiceUpTimestamp(long serviceUpTimestamp) {
this.serviceUpTimestamp = serviceUpTimestamp;
}
public boolean isExpired() {
return isExpired(0L);
}
public boolean isExpired(long additionalLeaseMs) {
return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
}
public long getRegistrationTimestamp() {
return registrationTimestamp;
}
public long getLastRenewalTimestamp() {
return lastUpdateTimestamp;
}
public long getEvictionTimestamp() {
return evictionTimestamp;
}
public long getServiceUpTimestamp() {
return serviceUpTimestamp;
}
public T getHolder() {
return holder;
}
}

@ -0,0 +1,36 @@
package com.github.dynamic.threadpool.registry.core;
/**
* Pair.
*
* @author chen.ma
* @date 2021/8/8 23:04
*/
public class Pair<E1, E2> {
public E1 first() {
return first;
}
public void setFirst(E1 first) {
this.first = first;
}
public E2 second() {
return second;
}
public void setSecond(E2 second) {
this.second = second;
}
private E1 first;
private E2 second;
public Pair(E1 first, E2 second) {
this.first = first;
this.second = second;
}
}

@ -0,0 +1,36 @@
package com.github.dynamic.threadpool.registry.core;
import com.github.dynamic.threadpool.common.model.InstanceInfo.InstanceStatus;
/**
* Status Override Result.
*
* @author chen.ma
* @date 2021/8/8 23:11
*/
public class StatusOverrideResult {
public static StatusOverrideResult NO_MATCH = new StatusOverrideResult(false, null);
public static StatusOverrideResult matchingStatus(InstanceStatus status) {
return new StatusOverrideResult(true, status);
}
private final boolean matches;
private final InstanceStatus status;
private StatusOverrideResult(boolean matches, InstanceStatus status) {
this.matches = matches;
this.status = status;
}
public boolean matches() {
return matches;
}
public InstanceStatus status() {
return status;
}
}

@ -1,13 +1,15 @@
package com.github.dynamic.threadpool.starter.config; package com.github.dynamic.threadpool.starter.config;
import com.github.dynamic.threadpool.common.model.InstanceInfo;
import com.github.dynamic.threadpool.starter.core.DiscoveryClient; import com.github.dynamic.threadpool.starter.core.DiscoveryClient;
import com.github.dynamic.threadpool.starter.core.InstanceConfig;
import com.github.dynamic.threadpool.starter.core.InstanceInfo;
import com.github.dynamic.threadpool.starter.remote.HttpAgent; import com.github.dynamic.threadpool.starter.remote.HttpAgent;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.core.env.ConfigurableEnvironment; import org.springframework.core.env.ConfigurableEnvironment;
import java.net.InetAddress;
import static com.github.dynamic.threadpool.starter.toolkit.CloudCommonIdUtil.getDefaultInstanceId; import static com.github.dynamic.threadpool.starter.toolkit.CloudCommonIdUtil.getDefaultInstanceId;
/** /**
@ -22,20 +24,19 @@ public class DiscoveryConfig {
private ConfigurableEnvironment environment; private ConfigurableEnvironment environment;
@Bean @Bean
public InstanceConfig instanceConfig() { @SneakyThrows
public InstanceInfo instanceConfig() {
InstanceInfo instanceInfo = new InstanceInfo(); InstanceInfo instanceInfo = new InstanceInfo();
instanceInfo.setInstanceId(getDefaultInstanceId(environment)); instanceInfo.setInstanceId(getDefaultInstanceId(environment));
instanceInfo.setAppName(environment.getProperty("spring.application.name"));
String hostNameKey = "eureka.instance.hostname"; instanceInfo.setHostName(InetAddress.getLocalHost().getHostAddress());
String hostNameVal = environment.containsProperty(hostNameKey) ? environment.getProperty(hostNameKey) : "";
instanceInfo.setHostName(hostNameVal);
return instanceInfo; return instanceInfo;
} }
@Bean @Bean
public DiscoveryClient discoveryClient(HttpAgent httpAgent, InstanceConfig instanceConfig) { public DiscoveryClient discoveryClient(HttpAgent httpAgent, InstanceInfo instanceInfo) {
return new DiscoveryClient(httpAgent, instanceConfig); return new DiscoveryClient(httpAgent, instanceInfo);
} }
} }

@ -1,6 +1,10 @@
package com.github.dynamic.threadpool.starter.core; package com.github.dynamic.threadpool.starter.core;
import com.github.dynamic.threadpool.common.constant.Constants;
import com.github.dynamic.threadpool.common.model.InstanceInfo;
import com.github.dynamic.threadpool.common.web.base.Result; import com.github.dynamic.threadpool.common.web.base.Result;
import com.github.dynamic.threadpool.common.web.base.Results;
import com.github.dynamic.threadpool.common.web.exception.ErrorCodeEnum;
import com.github.dynamic.threadpool.starter.remote.HttpAgent; import com.github.dynamic.threadpool.starter.remote.HttpAgent;
import com.github.dynamic.threadpool.starter.toolkit.thread.ThreadFactoryBuilder; import com.github.dynamic.threadpool.starter.toolkit.thread.ThreadFactoryBuilder;
import com.github.dynamic.threadpool.starter.toolkit.thread.ThreadPoolBuilder; import com.github.dynamic.threadpool.starter.toolkit.thread.ThreadPoolBuilder;
@ -23,25 +27,26 @@ public class DiscoveryClient {
private final HttpAgent httpAgent; private final HttpAgent httpAgent;
private final InstanceConfig instanceConfig; private final InstanceInfo instanceInfo;
private volatile long lastSuccessfulHeartbeatTimestamp = -1; private volatile long lastSuccessfulHeartbeatTimestamp = -1;
private static final String PREFIX = "DiscoveryClient_"; private static final String PREFIX = "DiscoveryClient_";
private String appPathIdentifier; private final String appPathIdentifier;
public DiscoveryClient(HttpAgent httpAgent, InstanceConfig instanceConfig) { public DiscoveryClient(HttpAgent httpAgent, InstanceInfo instanceInfo) {
this.httpAgent = httpAgent; this.httpAgent = httpAgent;
this.instanceConfig = instanceConfig; this.instanceInfo = instanceInfo;
heartbeatExecutor = ThreadPoolBuilder.builder() this.appPathIdentifier = instanceInfo.getAppName().toUpperCase() + "/" + instanceInfo.getInstanceId();
this.heartbeatExecutor = ThreadPoolBuilder.builder()
.poolThreadSize(1, 5) .poolThreadSize(1, 5)
.keepAliveTime(0, TimeUnit.SECONDS) .keepAliveTime(0, TimeUnit.SECONDS)
.workQueue(new SynchronousQueue()) .workQueue(new SynchronousQueue())
.threadFactory("DiscoveryClient-HeartbeatExecutor", true) .threadFactory("DiscoveryClient-HeartbeatExecutor", true)
.build(); .build();
scheduler = Executors.newScheduledThreadPool(2, this.scheduler = Executors.newScheduledThreadPool(2,
ThreadFactoryBuilder.builder() ThreadFactoryBuilder.builder()
.daemon(true) .daemon(true)
.prefix("DiscoveryClient-Scheduler") .prefix("DiscoveryClient-Scheduler")
@ -55,23 +60,23 @@ public class DiscoveryClient {
} }
private void initScheduledTasks() { private void initScheduledTasks() {
scheduler.schedule(new HeartbeatThread(), 30, TimeUnit.SECONDS); scheduler.scheduleWithFixedDelay(new HeartbeatThread(), 30, 30, TimeUnit.SECONDS);
} }
boolean register() { boolean register() {
log.info("{}{} :: registering service...", PREFIX, appPathIdentifier); log.info("{}{} - registering service...", PREFIX, appPathIdentifier);
String urlPath = "/apps/" + appPathIdentifier;
String urlPath = Constants.BASE_PATH + "/apps/register/";
Result registerResult = null; Result registerResult = null;
try { try {
registerResult = httpAgent.httpPostByDiscovery(urlPath, instanceConfig); registerResult = httpAgent.httpPostByDiscovery(urlPath, instanceInfo);
} catch (Exception ex) { } catch (Exception ex) {
log.warn("{} {} - registration failed :: {}.", PREFIX, appPathIdentifier, ex.getMessage(), ex); registerResult = Results.failure(ErrorCodeEnum.SERVICE_ERROR);
throw ex; log.error("{}{} - registration failed :: {}", PREFIX, appPathIdentifier, ex.getMessage(), ex);
} }
if (log.isInfoEnabled()) { if (log.isInfoEnabled()) {
log.info("{} {} - registration status: {}.", PREFIX, appPathIdentifier, registerResult.getCode()); log.info("{}{} - registration status :: {}", PREFIX, appPathIdentifier, registerResult.isSuccess() ? "success" : "fail");
} }
return registerResult.isSuccess(); return registerResult.isSuccess();
@ -85,12 +90,32 @@ public class DiscoveryClient {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
} }
} }
} }
boolean renew() { boolean renew() {
Result renewResult = null;
return true; try {
InstanceInfo.InstanceRenew instanceRenew = new InstanceInfo.InstanceRenew()
.setAppName(instanceInfo.getAppName())
.setInstanceId(instanceInfo.getInstanceId())
.setLastDirtyTimestamp(instanceInfo.getLastDirtyTimestamp().toString())
.setStatus(instanceInfo.getStatus().toString());
renewResult = httpAgent.httpPostByDiscovery(Constants.BASE_PATH + "/apps/renew", instanceRenew);
if (renewResult.getCode() == ErrorCodeEnum.NOT_FOUND.getCode()) {
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return renewResult.isSuccess();
} catch (Exception ex) {
log.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, ex);
return false;
}
} }
} }

@ -1,25 +0,0 @@
package com.github.dynamic.threadpool.starter.core;
/**
* Dynamic thread pool instance configuration.
*
* @author chen.ma
* @date 2021/8/6 21:31
*/
public interface InstanceConfig {
/**
* get Host Name.
*
* @return
*/
String getHostName();
/**
* get Instance Id.
*
* @return
*/
String getInstanceId();
}

@ -1,25 +0,0 @@
package com.github.dynamic.threadpool.starter.core;
import lombok.Getter;
import lombok.Setter;
/**
* Instance Info.
*
* @author chen.ma
* @date 2021/7/13 22:10
*/
@Getter
@Setter
public class InstanceInfo implements InstanceConfig {
private static final String UNKNOWN = "unknown";
private String appName = UNKNOWN;
private String hostName;
private String instanceId;
}

@ -32,13 +32,13 @@ public interface HttpAgent {
String getEncode(); String getEncode();
/** /**
* Http Get By Discovery * Http Post By Discovery
* *
* @param url * @param path
* @param body * @param body
* @return * @return
*/ */
Result httpPostByDiscovery(String url, Object body); Result httpPostByDiscovery(String path, Object body);
/** /**
* Http Get By * Http Get By

@ -42,8 +42,8 @@ public class ServerHttpAgent implements HttpAgent {
} }
@Override @Override
public Result httpPostByDiscovery(String url, Object body) { public Result httpPostByDiscovery(String path, Object body) {
return httpClientUtil.restApiPost(url, body, Result.class); return httpClientUtil.restApiPost(buildUrl(path), body, Result.class);
} }
@Override @Override

@ -1,7 +1,10 @@
package com.github.dynamic.threadpool.starter.toolkit; package com.github.dynamic.threadpool.starter.toolkit;
import lombok.SneakyThrows;
import org.springframework.core.env.PropertyResolver; import org.springframework.core.env.PropertyResolver;
import java.net.InetAddress;
/** /**
* Cloud Common Id Util. * Cloud Common Id Util.
* *
@ -12,8 +15,10 @@ public class CloudCommonIdUtil {
private static final String SEPARATOR = ":"; private static final String SEPARATOR = ":";
@SneakyThrows
public static String getDefaultInstanceId(PropertyResolver resolver) { public static String getDefaultInstanceId(PropertyResolver resolver) {
String hostname = resolver.getProperty("spring.cloud.client.hostname"); InetAddress host = InetAddress.getLocalHost();
String hostname = host.getHostAddress();
String appName = resolver.getProperty("spring.application.name"); String appName = resolver.getProperty("spring.application.name");
String namePart = combineParts(hostname, SEPARATOR, appName); String namePart = combineParts(hostname, SEPARATOR, appName);
String indexPart = resolver.getProperty("spring.application.instance_id", resolver.getProperty("server.port")); String indexPart = resolver.getProperty("spring.application.instance_id", resolver.getProperty("server.port"));

Loading…
Cancel
Save