From 5cf3009db93767882efc51e3588ea3b221d72c05 Mon Sep 17 00:00:00 2001 From: "chen.ma" Date: Sat, 14 Aug 2021 20:36:59 +0800 Subject: [PATCH] Provide server services to view registered nodes and health status in real time. --- .../threadpool/common/constant/Constants.java | 4 + .../threadpool/common/model/InstanceInfo.java | 135 ++++++++ dynamic-threadpool-registry/pom.xml | 13 +- .../config/RegistryConfiguration.java | 26 ++ .../controller/ApplicationController.java | 55 ++++ .../registry/core/BaseInstanceRegistry.java | 289 ++++++++++++++++++ .../registry/core/InstanceRegistry.java | 38 +++ .../threadpool/registry/core/Lease.java | 87 ++++++ .../threadpool/registry/core/Pair.java | 36 +++ .../registry/core/StatusOverrideResult.java | 36 +++ .../starter/config/DiscoveryConfig.java | 19 +- .../starter/core/DiscoveryClient.java | 57 +++- .../starter/core/InstanceConfig.java | 25 -- .../threadpool/starter/core/InstanceInfo.java | 25 -- .../threadpool/starter/remote/HttpAgent.java | 6 +- .../starter/remote/ServerHttpAgent.java | 4 +- .../starter/toolkit/CloudCommonIdUtil.java | 7 +- 17 files changed, 779 insertions(+), 83 deletions(-) create mode 100644 dynamic-threadpool-common/src/main/java/com/github/dynamic/threadpool/common/model/InstanceInfo.java create mode 100644 dynamic-threadpool-registry/src/main/java/com/github/dynamic/threadpool/registry/config/RegistryConfiguration.java create mode 100644 dynamic-threadpool-registry/src/main/java/com/github/dynamic/threadpool/registry/controller/ApplicationController.java create mode 100644 dynamic-threadpool-registry/src/main/java/com/github/dynamic/threadpool/registry/core/BaseInstanceRegistry.java create mode 100644 dynamic-threadpool-registry/src/main/java/com/github/dynamic/threadpool/registry/core/InstanceRegistry.java create mode 100644 dynamic-threadpool-registry/src/main/java/com/github/dynamic/threadpool/registry/core/Lease.java create mode 100644 dynamic-threadpool-registry/src/main/java/com/github/dynamic/threadpool/registry/core/Pair.java create mode 100644 dynamic-threadpool-registry/src/main/java/com/github/dynamic/threadpool/registry/core/StatusOverrideResult.java delete mode 100644 dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/InstanceConfig.java delete mode 100644 dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/InstanceInfo.java diff --git a/dynamic-threadpool-common/src/main/java/com/github/dynamic/threadpool/common/constant/Constants.java b/dynamic-threadpool-common/src/main/java/com/github/dynamic/threadpool/common/constant/Constants.java index 34274215..7e89c491 100644 --- a/dynamic-threadpool-common/src/main/java/com/github/dynamic/threadpool/common/constant/Constants.java +++ b/dynamic-threadpool-common/src/main/java/com/github/dynamic/threadpool/common/constant/Constants.java @@ -44,4 +44,8 @@ public class Constants { public static final String GROUP_KEY_DELIMITER = "+"; + public static final long EVICTION_INTERVAL_TIMER_IN_MS = 60 * 1000; + + public static final int SCHEDULED_THREAD_CORE_NUM = 1; + } diff --git a/dynamic-threadpool-common/src/main/java/com/github/dynamic/threadpool/common/model/InstanceInfo.java b/dynamic-threadpool-common/src/main/java/com/github/dynamic/threadpool/common/model/InstanceInfo.java new file mode 100644 index 00000000..cf817ba5 --- /dev/null +++ b/dynamic-threadpool-common/src/main/java/com/github/dynamic/threadpool/common/model/InstanceInfo.java @@ -0,0 +1,135 @@ +package com.github.dynamic.threadpool.common.model; + +import lombok.Data; +import lombok.Getter; +import lombok.Setter; +import lombok.experimental.Accessors; +import lombok.extern.slf4j.Slf4j; + +/** + * Instance Info. + * + * @author chen.ma + * @date 2021/7/13 22:10 + */ +@Slf4j +@Getter +@Setter +public class InstanceInfo { + + private static final String UNKNOWN = "unknown"; + + private String appName = UNKNOWN; + + private String hostName; + + private String instanceId; + + private volatile String vipAddress; + + private volatile String secureVipAddress; + + private volatile ActionType actionType; + + private volatile boolean isInstanceInfoDirty = false; + + private volatile Long lastUpdatedTimestamp; + + private volatile Long lastDirtyTimestamp; + + private volatile InstanceStatus status = InstanceStatus.UP; + + private volatile InstanceStatus overriddenStatus = InstanceStatus.UNKNOWN; + + public InstanceInfo() { + this.lastUpdatedTimestamp = System.currentTimeMillis(); + this.lastDirtyTimestamp = lastUpdatedTimestamp; + } + + public void setLastUpdatedTimestamp() { + this.lastUpdatedTimestamp = System.currentTimeMillis(); + } + + public Long getLastDirtyTimestamp() { + return lastDirtyTimestamp; + } + + public synchronized void setOverriddenStatus(InstanceStatus status) { + if (this.overriddenStatus != status) { + this.overriddenStatus = status; + } + } + + public InstanceStatus getStatus() { + return status; + } + + public synchronized void setIsDirty() { + isInstanceInfoDirty = true; + lastDirtyTimestamp = System.currentTimeMillis(); + } + + public synchronized long setIsDirtyWithTime() { + setIsDirty(); + return lastDirtyTimestamp; + } + + public synchronized void unsetIsDirty(long unsetDirtyTimestamp) { + if (lastDirtyTimestamp <= unsetDirtyTimestamp) { + isInstanceInfoDirty = false; + } + } + + public void setActionType(ActionType actionType) { + this.actionType = actionType; + } + + public enum InstanceStatus { + + UP, + + DOWN, + + STARTING, + + OUT_OF_SERVICE, + + UNKNOWN; + + public static InstanceStatus toEnum(String s) { + if (s != null) { + try { + return InstanceStatus.valueOf(s.toUpperCase()); + } catch (IllegalArgumentException e) { + // ignore and fall through to unknown + log.debug("illegal argument supplied to InstanceStatus.valueOf: {}, defaulting to {}", s, UNKNOWN); + } + } + return UNKNOWN; + } + } + + public enum ActionType { + ADDED, + + MODIFIED, + + DELETED + } + + @Data + @Accessors(chain = true) + public static class InstanceRenew { + + private String appName; + + private String instanceId; + + private String lastDirtyTimestamp; + + private String status; + + } + +} + diff --git a/dynamic-threadpool-registry/pom.xml b/dynamic-threadpool-registry/pom.xml index 12ebde36..56db9d87 100644 --- a/dynamic-threadpool-registry/pom.xml +++ b/dynamic-threadpool-registry/pom.xml @@ -22,8 +22,17 @@ org.springframework.boot - spring-boot-starter-test - test + spring-boot-starter-web + + + + com.github.dynamic-threadpool + dynamic-threadpool-common + + + + com.google.guava + guava diff --git a/dynamic-threadpool-registry/src/main/java/com/github/dynamic/threadpool/registry/config/RegistryConfiguration.java b/dynamic-threadpool-registry/src/main/java/com/github/dynamic/threadpool/registry/config/RegistryConfiguration.java new file mode 100644 index 00000000..ee4ae415 --- /dev/null +++ b/dynamic-threadpool-registry/src/main/java/com/github/dynamic/threadpool/registry/config/RegistryConfiguration.java @@ -0,0 +1,26 @@ +package com.github.dynamic.threadpool.registry.config; + +import com.github.dynamic.threadpool.registry.core.BaseInstanceRegistry; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; + +import javax.annotation.PostConstruct; + +/** + * Registry Configuration. + * + * @author chen.ma + * @date 2021/8/12 21:48 + */ +@Configuration +public class RegistryConfiguration { + + @Autowired + private BaseInstanceRegistry baseInstanceRegistry; + + @PostConstruct + public void registryInit() { + baseInstanceRegistry.postInit(); + } + +} diff --git a/dynamic-threadpool-registry/src/main/java/com/github/dynamic/threadpool/registry/controller/ApplicationController.java b/dynamic-threadpool-registry/src/main/java/com/github/dynamic/threadpool/registry/controller/ApplicationController.java new file mode 100644 index 00000000..05b79677 --- /dev/null +++ b/dynamic-threadpool-registry/src/main/java/com/github/dynamic/threadpool/registry/controller/ApplicationController.java @@ -0,0 +1,55 @@ +package com.github.dynamic.threadpool.registry.controller; + +import com.github.dynamic.threadpool.common.model.InstanceInfo; +import com.github.dynamic.threadpool.common.web.base.Result; +import com.github.dynamic.threadpool.common.web.base.Results; +import com.github.dynamic.threadpool.common.web.exception.ErrorCodeEnum; +import com.github.dynamic.threadpool.registry.core.InstanceRegistry; +import com.github.dynamic.threadpool.registry.core.Lease; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.*; + +import java.util.List; + +import static com.github.dynamic.threadpool.common.constant.Constants.BASE_PATH; + +/** + * Application Controller. + * + * @author chen.ma + * @date 2021/8/8 22:24 + */ +@Slf4j +@RestController +@RequiredArgsConstructor +@RequestMapping(BASE_PATH + "/apps") +public class ApplicationController { + + @NonNull + private final InstanceRegistry instanceRegistry; + + @GetMapping("/{appName}") + public Result applications(@PathVariable String appName) { + List> resultInstanceList = instanceRegistry.listInstance(appName); + return Results.success(resultInstanceList); + } + + @PostMapping("/register") + public Result addInstance(@RequestBody InstanceInfo instanceInfo) { + instanceRegistry.register(instanceInfo); + return Results.success(); + } + + @PostMapping("/renew") + public Result renew(@RequestBody InstanceInfo.InstanceRenew instanceRenew) { + boolean isSuccess = instanceRegistry.renew(instanceRenew); + if (!isSuccess) { + log.warn("Not Found (Renew) :: {} - {}", instanceRenew.getAppName(), instanceRenew.getInstanceId()); + return Results.failure(ErrorCodeEnum.NOT_FOUND); + } + return Results.success(); + } + +} diff --git a/dynamic-threadpool-registry/src/main/java/com/github/dynamic/threadpool/registry/core/BaseInstanceRegistry.java b/dynamic-threadpool-registry/src/main/java/com/github/dynamic/threadpool/registry/core/BaseInstanceRegistry.java new file mode 100644 index 00000000..a0cc3213 --- /dev/null +++ b/dynamic-threadpool-registry/src/main/java/com/github/dynamic/threadpool/registry/core/BaseInstanceRegistry.java @@ -0,0 +1,289 @@ +package com.github.dynamic.threadpool.registry.core; + +import com.github.dynamic.threadpool.common.model.InstanceInfo; +import com.github.dynamic.threadpool.common.model.InstanceInfo.InstanceStatus; +import com.google.common.cache.CacheBuilder; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static com.github.dynamic.threadpool.common.constant.Constants.EVICTION_INTERVAL_TIMER_IN_MS; +import static com.github.dynamic.threadpool.common.constant.Constants.SCHEDULED_THREAD_CORE_NUM; + +/** + * Base Instance Registry. + * + * @author chen.ma + * @date 2021/8/8 22:46 + */ +@Slf4j +@Service +public class BaseInstanceRegistry implements InstanceRegistry { + + private final int CONTAINER_SIZE = 1024; + + private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + + private final Lock read = readWriteLock.readLock(); + + protected final Object lock = new Object(); + + private final ConcurrentHashMap>> registry = new ConcurrentHashMap(CONTAINER_SIZE); + + protected volatile int expectedNumberOfClientsSendingRenews; + + private final CircularQueue> recentRegisteredQueue; + + private final CircularQueue> recentCanceledQueue; + + private ConcurrentLinkedQueue recentlyChangedQueue = new ConcurrentLinkedQueue(); + + protected final ConcurrentMap overriddenInstanceStatusMap = CacheBuilder + .newBuilder().initialCapacity(512) + .expireAfterAccess(1, TimeUnit.HOURS) + .build().asMap(); + + public BaseInstanceRegistry() { + this.recentRegisteredQueue = new CircularQueue(CONTAINER_SIZE); + this.recentCanceledQueue = new CircularQueue(CONTAINER_SIZE); + } + + @Override + public List> listInstance(String appName) { + Map> appNameLeaseMap = registry.get(appName); + if (CollectionUtils.isEmpty(appNameLeaseMap)) { + return Lists.newArrayList(); + } + + List> appNameLeaseList = Lists.newArrayList(); + appNameLeaseMap.values().forEach(each -> appNameLeaseList.add(each)); + return appNameLeaseList; + } + + @Override + public void register(InstanceInfo registrant) { + read.lock(); + try { + Map> registerMap = registry.get(registrant.getAppName()); + if (registerMap == null) { + ConcurrentHashMap> registerNewMap = new ConcurrentHashMap(12); + registerMap = registry.putIfAbsent(registrant.getAppName(), registerNewMap); + if (registerMap == null) { + registerMap = registerNewMap; + } + } + + Lease existingLease = registerMap.get(registrant.getInstanceId()); + if (existingLease != null && (existingLease.getHolder() != null)) { + Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); + Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); + + if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { + registrant = existingLease.getHolder(); + } + } + + Lease lease = new Lease(registrant); + if (existingLease != null) { + lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); + } + registerMap.put(registrant.getInstanceId(), lease); + + recentRegisteredQueue.add(new Pair( + System.currentTimeMillis(), + registrant.getAppName() + "(" + registrant.getInstanceId() + ")")); + + InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getInstanceId()); + if (overriddenStatusFromMap != null) { + log.info("Storing overridden status :: {} from map", overriddenStatusFromMap); + registrant.setOverriddenStatus(overriddenStatusFromMap); + } + + if (InstanceStatus.UP.equals(registrant.getStatus())) { + lease.serviceUp(); + } + + registrant.setActionType(InstanceInfo.ActionType.ADDED); + recentlyChangedQueue.add(new RecentlyChangedItem(lease)); + registrant.setLastUpdatedTimestamp(); + } finally { + read.unlock(); + } + } + + @Override + public boolean renew(InstanceInfo.InstanceRenew instanceRenew) { + String appName = instanceRenew.getAppName(); + String instanceId = instanceRenew.getInstanceId(); + + Map> registryMap = registry.get(appName); + Lease leaseToRenew = null; + if (registryMap == null || (leaseToRenew = registryMap.get(instanceId)) == null) { + return false; + } + + leaseToRenew.renew(); + return true; + } + + static class CircularQueue extends AbstractQueue { + + private final ArrayBlockingQueue delegate; + + public CircularQueue(int capacity) { + this.delegate = new ArrayBlockingQueue(capacity); + } + + @Override + public Iterator iterator() { + return delegate.iterator(); + } + + @Override + public int size() { + return delegate.size(); + } + + @Override + public boolean offer(E e) { + while (!delegate.offer(e)) { + delegate.poll(); + } + return true; + } + + @Override + public E poll() { + return delegate.poll(); + } + + @Override + public E peek() { + return delegate.peek(); + } + + @Override + public void clear() { + delegate.clear(); + } + + @Override + public Object[] toArray() { + return delegate.toArray(); + } + + } + + private static final class RecentlyChangedItem { + private long lastUpdateTime; + + private Lease leaseInfo; + + public RecentlyChangedItem(Lease lease) { + this.leaseInfo = lease; + lastUpdateTime = System.currentTimeMillis(); + } + + public long getLastUpdateTime() { + return this.lastUpdateTime; + } + + public Lease getLeaseInfo() { + return this.leaseInfo; + } + } + + public void evict(long additionalLeaseMs) { + List> expiredLeases = new ArrayList(); + for (Map.Entry>> groupEntry : registry.entrySet()) { + Map> leaseMap = groupEntry.getValue(); + if (leaseMap != null) { + for (Map.Entry> leaseEntry : leaseMap.entrySet()) { + Lease lease = leaseEntry.getValue(); + if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) { + expiredLeases.add(lease); + } + } + } + } + + for (Lease expiredLease : expiredLeases) { + String appName = expiredLease.getHolder().getAppName(); + String id = expiredLease.getHolder().getInstanceId(); + internalCancel(appName, id, false); + } + } + + protected boolean internalCancel(String appName, String id, boolean isReplication) { + read.lock(); + try { + Map> registerMap = registry.get(appName); + if (!CollectionUtils.isEmpty(registerMap)) { + registerMap.remove(id); + } + } finally { + read.unlock(); + } + + return true; + } + + public class EvictionTask extends TimerTask { + + private final AtomicLong lastExecutionNanosRef = new AtomicLong(0L); + + @Override + public void run() { + try { + long compensationTimeMs = getCompensationTimeMs(); + log.info("Running the evict task with compensationTime {} ms", compensationTimeMs); + evict(compensationTimeMs); + } catch (Throwable e) { + log.error("Could not run the evict task", e); + } + } + + long getCompensationTimeMs() { + long currNanos = getCurrentTimeNano(); + long lastNanos = lastExecutionNanosRef.getAndSet(currNanos); + if (lastNanos == 0L) { + return 0L; + } + + long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos); + long compensationTime = elapsedMs - EVICTION_INTERVAL_TIMER_IN_MS; + return compensationTime <= 0L ? 0L : compensationTime; + } + + long getCurrentTimeNano() { + return System.nanoTime(); + } + } + + private final ScheduledExecutorService scheduledExecutorService = + new ScheduledThreadPoolExecutor( + SCHEDULED_THREAD_CORE_NUM, + new ThreadFactoryBuilder() + .setNameFormat("registry-eviction") + .setDaemon(true) + .build() + ); + + private final AtomicReference evictionTaskRef = new AtomicReference(); + + public void postInit() { + evictionTaskRef.set(new BaseInstanceRegistry.EvictionTask()); + scheduledExecutorService.scheduleWithFixedDelay(evictionTaskRef.get(), + EVICTION_INTERVAL_TIMER_IN_MS, EVICTION_INTERVAL_TIMER_IN_MS, TimeUnit.MILLISECONDS); + } + +} diff --git a/dynamic-threadpool-registry/src/main/java/com/github/dynamic/threadpool/registry/core/InstanceRegistry.java b/dynamic-threadpool-registry/src/main/java/com/github/dynamic/threadpool/registry/core/InstanceRegistry.java new file mode 100644 index 00000000..4d13f109 --- /dev/null +++ b/dynamic-threadpool-registry/src/main/java/com/github/dynamic/threadpool/registry/core/InstanceRegistry.java @@ -0,0 +1,38 @@ +package com.github.dynamic.threadpool.registry.core; + +import com.github.dynamic.threadpool.common.model.InstanceInfo; + +import java.util.List; + +/** + * Instance Registry. + * + * @author chen.ma + * @date 2021/8/8 22:31 + */ +public interface InstanceRegistry { + + /** + * list Instance. + * + * @param appName + * @return + */ + List> listInstance(String appName); + + /** + * register. + * + * @param info + */ + void register(T info); + + /** + * renew. + * + * @param instanceRenew + * @return + */ + boolean renew(InstanceInfo.InstanceRenew instanceRenew); + +} diff --git a/dynamic-threadpool-registry/src/main/java/com/github/dynamic/threadpool/registry/core/Lease.java b/dynamic-threadpool-registry/src/main/java/com/github/dynamic/threadpool/registry/core/Lease.java new file mode 100644 index 00000000..45104057 --- /dev/null +++ b/dynamic-threadpool-registry/src/main/java/com/github/dynamic/threadpool/registry/core/Lease.java @@ -0,0 +1,87 @@ +package com.github.dynamic.threadpool.registry.core; + +/** + * Lease. + * + * @author chen.ma + * @date 2021/8/8 22:49 + */ +public class Lease { + + enum Action { + REGISTER, CANCEL, RENEW + } + + private T holder; + + private long evictionTimestamp; + + private long registrationTimestamp; + + private long serviceUpTimestamp; + + /** + * Make it volatile so that the expiration task would see this quicker + */ + private volatile long lastUpdateTimestamp; + + private long duration; + + public static final int DEFAULT_DURATION_IN_SECS = 90; + + public Lease(T r) { + holder = r; + registrationTimestamp = System.currentTimeMillis(); + lastUpdateTimestamp = registrationTimestamp; + duration = DEFAULT_DURATION_IN_SECS * 1000; + } + + public void renew() { + lastUpdateTimestamp = System.currentTimeMillis() + duration; + } + + public void cancel() { + if (evictionTimestamp <= 0) { + evictionTimestamp = System.currentTimeMillis(); + } + } + + public void serviceUp() { + if (serviceUpTimestamp == 0) { + serviceUpTimestamp = System.currentTimeMillis(); + } + } + + public void setServiceUpTimestamp(long serviceUpTimestamp) { + this.serviceUpTimestamp = serviceUpTimestamp; + } + + public boolean isExpired() { + return isExpired(0L); + } + + public boolean isExpired(long additionalLeaseMs) { + return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs)); + } + + public long getRegistrationTimestamp() { + return registrationTimestamp; + } + + public long getLastRenewalTimestamp() { + return lastUpdateTimestamp; + } + + public long getEvictionTimestamp() { + return evictionTimestamp; + } + + public long getServiceUpTimestamp() { + return serviceUpTimestamp; + } + + public T getHolder() { + return holder; + } + +} diff --git a/dynamic-threadpool-registry/src/main/java/com/github/dynamic/threadpool/registry/core/Pair.java b/dynamic-threadpool-registry/src/main/java/com/github/dynamic/threadpool/registry/core/Pair.java new file mode 100644 index 00000000..0b485c68 --- /dev/null +++ b/dynamic-threadpool-registry/src/main/java/com/github/dynamic/threadpool/registry/core/Pair.java @@ -0,0 +1,36 @@ +package com.github.dynamic.threadpool.registry.core; + +/** + * Pair. + * + * @author chen.ma + * @date 2021/8/8 23:04 + */ +public class Pair { + + public E1 first() { + return first; + } + + public void setFirst(E1 first) { + this.first = first; + } + + public E2 second() { + return second; + } + + public void setSecond(E2 second) { + this.second = second; + } + + private E1 first; + + private E2 second; + + public Pair(E1 first, E2 second) { + this.first = first; + this.second = second; + } + +} diff --git a/dynamic-threadpool-registry/src/main/java/com/github/dynamic/threadpool/registry/core/StatusOverrideResult.java b/dynamic-threadpool-registry/src/main/java/com/github/dynamic/threadpool/registry/core/StatusOverrideResult.java new file mode 100644 index 00000000..e62c7b6f --- /dev/null +++ b/dynamic-threadpool-registry/src/main/java/com/github/dynamic/threadpool/registry/core/StatusOverrideResult.java @@ -0,0 +1,36 @@ +package com.github.dynamic.threadpool.registry.core; + +import com.github.dynamic.threadpool.common.model.InstanceInfo.InstanceStatus; + +/** + * Status Override Result. + * + * @author chen.ma + * @date 2021/8/8 23:11 + */ +public class StatusOverrideResult { + + public static StatusOverrideResult NO_MATCH = new StatusOverrideResult(false, null); + + public static StatusOverrideResult matchingStatus(InstanceStatus status) { + return new StatusOverrideResult(true, status); + } + + private final boolean matches; + + private final InstanceStatus status; + + private StatusOverrideResult(boolean matches, InstanceStatus status) { + this.matches = matches; + this.status = status; + } + + public boolean matches() { + return matches; + } + + public InstanceStatus status() { + return status; + } + +} diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/config/DiscoveryConfig.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/config/DiscoveryConfig.java index 5f9d5782..3f83f4ca 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/config/DiscoveryConfig.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/config/DiscoveryConfig.java @@ -1,13 +1,15 @@ package com.github.dynamic.threadpool.starter.config; +import com.github.dynamic.threadpool.common.model.InstanceInfo; import com.github.dynamic.threadpool.starter.core.DiscoveryClient; -import com.github.dynamic.threadpool.starter.core.InstanceConfig; -import com.github.dynamic.threadpool.starter.core.InstanceInfo; import com.github.dynamic.threadpool.starter.remote.HttpAgent; import lombok.AllArgsConstructor; +import lombok.SneakyThrows; import org.springframework.context.annotation.Bean; import org.springframework.core.env.ConfigurableEnvironment; +import java.net.InetAddress; + import static com.github.dynamic.threadpool.starter.toolkit.CloudCommonIdUtil.getDefaultInstanceId; /** @@ -22,20 +24,19 @@ public class DiscoveryConfig { private ConfigurableEnvironment environment; @Bean - public InstanceConfig instanceConfig() { + @SneakyThrows + public InstanceInfo instanceConfig() { InstanceInfo instanceInfo = new InstanceInfo(); instanceInfo.setInstanceId(getDefaultInstanceId(environment)); - - String hostNameKey = "eureka.instance.hostname"; - String hostNameVal = environment.containsProperty(hostNameKey) ? environment.getProperty(hostNameKey) : ""; - instanceInfo.setHostName(hostNameVal); + instanceInfo.setAppName(environment.getProperty("spring.application.name")); + instanceInfo.setHostName(InetAddress.getLocalHost().getHostAddress()); return instanceInfo; } @Bean - public DiscoveryClient discoveryClient(HttpAgent httpAgent, InstanceConfig instanceConfig) { - return new DiscoveryClient(httpAgent, instanceConfig); + public DiscoveryClient discoveryClient(HttpAgent httpAgent, InstanceInfo instanceInfo) { + return new DiscoveryClient(httpAgent, instanceInfo); } } diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/DiscoveryClient.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/DiscoveryClient.java index 2983b251..5f55ee19 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/DiscoveryClient.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/DiscoveryClient.java @@ -1,6 +1,10 @@ package com.github.dynamic.threadpool.starter.core; +import com.github.dynamic.threadpool.common.constant.Constants; +import com.github.dynamic.threadpool.common.model.InstanceInfo; import com.github.dynamic.threadpool.common.web.base.Result; +import com.github.dynamic.threadpool.common.web.base.Results; +import com.github.dynamic.threadpool.common.web.exception.ErrorCodeEnum; import com.github.dynamic.threadpool.starter.remote.HttpAgent; import com.github.dynamic.threadpool.starter.toolkit.thread.ThreadFactoryBuilder; import com.github.dynamic.threadpool.starter.toolkit.thread.ThreadPoolBuilder; @@ -23,25 +27,26 @@ public class DiscoveryClient { private final HttpAgent httpAgent; - private final InstanceConfig instanceConfig; + private final InstanceInfo instanceInfo; private volatile long lastSuccessfulHeartbeatTimestamp = -1; private static final String PREFIX = "DiscoveryClient_"; - private String appPathIdentifier; + private final String appPathIdentifier; - public DiscoveryClient(HttpAgent httpAgent, InstanceConfig instanceConfig) { + public DiscoveryClient(HttpAgent httpAgent, InstanceInfo instanceInfo) { this.httpAgent = httpAgent; - this.instanceConfig = instanceConfig; - heartbeatExecutor = ThreadPoolBuilder.builder() + this.instanceInfo = instanceInfo; + this.appPathIdentifier = instanceInfo.getAppName().toUpperCase() + "/" + instanceInfo.getInstanceId(); + this.heartbeatExecutor = ThreadPoolBuilder.builder() .poolThreadSize(1, 5) .keepAliveTime(0, TimeUnit.SECONDS) .workQueue(new SynchronousQueue()) .threadFactory("DiscoveryClient-HeartbeatExecutor", true) .build(); - scheduler = Executors.newScheduledThreadPool(2, + this.scheduler = Executors.newScheduledThreadPool(2, ThreadFactoryBuilder.builder() .daemon(true) .prefix("DiscoveryClient-Scheduler") @@ -55,23 +60,23 @@ public class DiscoveryClient { } private void initScheduledTasks() { - scheduler.schedule(new HeartbeatThread(), 30, TimeUnit.SECONDS); + scheduler.scheduleWithFixedDelay(new HeartbeatThread(), 30, 30, TimeUnit.SECONDS); } boolean register() { - log.info("{}{} :: registering service...", PREFIX, appPathIdentifier); - String urlPath = "/apps/" + appPathIdentifier; + log.info("{}{} - registering service...", PREFIX, appPathIdentifier); + String urlPath = Constants.BASE_PATH + "/apps/register/"; Result registerResult = null; try { - registerResult = httpAgent.httpPostByDiscovery(urlPath, instanceConfig); + registerResult = httpAgent.httpPostByDiscovery(urlPath, instanceInfo); } catch (Exception ex) { - log.warn("{} {} - registration failed :: {}.", PREFIX, appPathIdentifier, ex.getMessage(), ex); - throw ex; + registerResult = Results.failure(ErrorCodeEnum.SERVICE_ERROR); + log.error("{}{} - registration failed :: {}", PREFIX, appPathIdentifier, ex.getMessage(), ex); } if (log.isInfoEnabled()) { - log.info("{} {} - registration status: {}.", PREFIX, appPathIdentifier, registerResult.getCode()); + log.info("{}{} - registration status :: {}", PREFIX, appPathIdentifier, registerResult.isSuccess() ? "success" : "fail"); } return registerResult.isSuccess(); @@ -85,12 +90,32 @@ public class DiscoveryClient { lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } } - } boolean renew() { - - return true; + Result renewResult = null; + try { + InstanceInfo.InstanceRenew instanceRenew = new InstanceInfo.InstanceRenew() + .setAppName(instanceInfo.getAppName()) + .setInstanceId(instanceInfo.getInstanceId()) + .setLastDirtyTimestamp(instanceInfo.getLastDirtyTimestamp().toString()) + .setStatus(instanceInfo.getStatus().toString()); + + renewResult = httpAgent.httpPostByDiscovery(Constants.BASE_PATH + "/apps/renew", instanceRenew); + + if (renewResult.getCode() == ErrorCodeEnum.NOT_FOUND.getCode()) { + long timestamp = instanceInfo.setIsDirtyWithTime(); + boolean success = register(); + if (success) { + instanceInfo.unsetIsDirty(timestamp); + } + return success; + } + return renewResult.isSuccess(); + } catch (Exception ex) { + log.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, ex); + return false; + } } } diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/InstanceConfig.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/InstanceConfig.java deleted file mode 100644 index dfdd123e..00000000 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/InstanceConfig.java +++ /dev/null @@ -1,25 +0,0 @@ -package com.github.dynamic.threadpool.starter.core; - -/** - * Dynamic thread pool instance configuration. - * - * @author chen.ma - * @date 2021/8/6 21:31 - */ -public interface InstanceConfig { - - /** - * get Host Name. - * - * @return - */ - String getHostName(); - - /** - * get Instance Id. - * - * @return - */ - String getInstanceId(); - -} diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/InstanceInfo.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/InstanceInfo.java deleted file mode 100644 index 41db20e8..00000000 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/InstanceInfo.java +++ /dev/null @@ -1,25 +0,0 @@ -package com.github.dynamic.threadpool.starter.core; - -import lombok.Getter; -import lombok.Setter; - -/** - * Instance Info. - * - * @author chen.ma - * @date 2021/7/13 22:10 - */ -@Getter -@Setter -public class InstanceInfo implements InstanceConfig { - - private static final String UNKNOWN = "unknown"; - - private String appName = UNKNOWN; - - private String hostName; - - private String instanceId; - -} - diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/remote/HttpAgent.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/remote/HttpAgent.java index 66f49848..faf64cb4 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/remote/HttpAgent.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/remote/HttpAgent.java @@ -32,13 +32,13 @@ public interface HttpAgent { String getEncode(); /** - * 发起 Http Get 请求 By Discovery + * 发起 Http Post 请求 By Discovery * - * @param url + * @param path * @param body * @return */ - Result httpPostByDiscovery(String url, Object body); + Result httpPostByDiscovery(String path, Object body); /** * 发起 Http Get 请求 By 动态配置 diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/remote/ServerHttpAgent.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/remote/ServerHttpAgent.java index b873bfcb..07ce80f7 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/remote/ServerHttpAgent.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/remote/ServerHttpAgent.java @@ -42,8 +42,8 @@ public class ServerHttpAgent implements HttpAgent { } @Override - public Result httpPostByDiscovery(String url, Object body) { - return httpClientUtil.restApiPost(url, body, Result.class); + public Result httpPostByDiscovery(String path, Object body) { + return httpClientUtil.restApiPost(buildUrl(path), body, Result.class); } @Override diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/toolkit/CloudCommonIdUtil.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/toolkit/CloudCommonIdUtil.java index 6ac32383..8119f712 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/toolkit/CloudCommonIdUtil.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/toolkit/CloudCommonIdUtil.java @@ -1,7 +1,10 @@ package com.github.dynamic.threadpool.starter.toolkit; +import lombok.SneakyThrows; import org.springframework.core.env.PropertyResolver; +import java.net.InetAddress; + /** * Cloud Common Id Util. * @@ -12,8 +15,10 @@ public class CloudCommonIdUtil { private static final String SEPARATOR = ":"; + @SneakyThrows public static String getDefaultInstanceId(PropertyResolver resolver) { - String hostname = resolver.getProperty("spring.cloud.client.hostname"); + InetAddress host = InetAddress.getLocalHost(); + String hostname = host.getHostAddress(); String appName = resolver.getProperty("spring.application.name"); String namePart = combineParts(hostname, SEPARATOR, appName); String indexPart = resolver.getProperty("spring.application.instance_id", resolver.getProperty("server.port"));