服务注册核心逻辑重构. (#91)

pull/106/head
chen.ma 4 years ago
parent 7936dd5ea4
commit 7cc3b42ad2

@ -6,8 +6,7 @@ import cn.hippo4j.common.web.base.Results;
import cn.hippo4j.common.web.exception.ErrorCodeEnum; import cn.hippo4j.common.web.exception.ErrorCodeEnum;
import cn.hippo4j.discovery.core.InstanceRegistry; import cn.hippo4j.discovery.core.InstanceRegistry;
import cn.hippo4j.discovery.core.Lease; import cn.hippo4j.discovery.core.Lease;
import lombok.NonNull; import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
@ -23,11 +22,10 @@ import static cn.hippo4j.common.constant.Constants.BASE_PATH;
*/ */
@Slf4j @Slf4j
@RestController @RestController
@RequiredArgsConstructor @AllArgsConstructor
@RequestMapping(BASE_PATH + "/apps") @RequestMapping(BASE_PATH + "/apps")
public class ApplicationController { public class ApplicationController {
@NonNull
private final InstanceRegistry instanceRegistry; private final InstanceRegistry instanceRegistry;
@GetMapping("/{appName}") @GetMapping("/{appName}")

@ -3,19 +3,22 @@ package cn.hippo4j.discovery.core;
import cn.hippo4j.common.design.observer.AbstractSubjectCenter; import cn.hippo4j.common.design.observer.AbstractSubjectCenter;
import cn.hippo4j.common.model.InstanceInfo; import cn.hippo4j.common.model.InstanceInfo;
import cn.hippo4j.common.model.InstanceInfo.InstanceStatus; import cn.hippo4j.common.model.InstanceInfo.InstanceStatus;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import java.util.*; import java.util.ArrayList;
import java.util.concurrent.*; import java.util.List;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static cn.hippo4j.common.constant.Constants.EVICTION_INTERVAL_TIMER_IN_MS; import static cn.hippo4j.common.constant.Constants.EVICTION_INTERVAL_TIMER_IN_MS;
import static cn.hippo4j.common.constant.Constants.SCHEDULED_THREAD_CORE_NUM; import static cn.hippo4j.common.constant.Constants.SCHEDULED_THREAD_CORE_NUM;
@ -23,6 +26,11 @@ import static cn.hippo4j.common.constant.Constants.SCHEDULED_THREAD_CORE_NUM;
/** /**
* Base instance registry. * Base instance registry.
* *
* <p>
* Reference from Eureka.
* Service registration, service offline, service renewal.
* </p>
*
* @author chen.ma * @author chen.ma
* @date 2021/8/8 22:46 * @date 2021/8/8 22:46
*/ */
@ -30,33 +38,9 @@ import static cn.hippo4j.common.constant.Constants.SCHEDULED_THREAD_CORE_NUM;
@Service @Service
public class BaseInstanceRegistry implements InstanceRegistry<InstanceInfo> { public class BaseInstanceRegistry implements InstanceRegistry<InstanceInfo> {
private final int CONTAINER_SIZE = 1024; private final int containerSize = 1024;
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Lock read = readWriteLock.readLock();
protected final Object lock = new Object();
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap(CONTAINER_SIZE);
protected volatile int expectedNumberOfClientsSendingRenews;
private final CircularQueue<Pair<Long, String>> recentRegisteredQueue;
private final CircularQueue<Pair<Long, String>> recentCanceledQueue;
private ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue = new ConcurrentLinkedQueue(); private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap(containerSize);
protected final ConcurrentMap<String, InstanceStatus> overriddenInstanceStatusMap = CacheBuilder
.newBuilder().initialCapacity(512)
.expireAfterAccess(1, TimeUnit.HOURS)
.<String, InstanceStatus>build().asMap();
public BaseInstanceRegistry() {
this.recentRegisteredQueue = new CircularQueue(CONTAINER_SIZE);
this.recentCanceledQueue = new CircularQueue(CONTAINER_SIZE);
}
@Override @Override
public List<Lease<InstanceInfo>> listInstance(String appName) { public List<Lease<InstanceInfo>> listInstance(String appName) {
@ -71,54 +55,38 @@ public class BaseInstanceRegistry implements InstanceRegistry<InstanceInfo> {
} }
@Override @Override
public void register(InstanceInfo registrant) { public synchronized void register(InstanceInfo registrant) {
read.lock(); Map<String, Lease<InstanceInfo>> registerMap = registry.get(registrant.getAppName());
try { if (registerMap == null) {
Map<String, Lease<InstanceInfo>> registerMap = registry.get(registrant.getAppName()); ConcurrentHashMap<String, Lease<InstanceInfo>> registerNewMap = new ConcurrentHashMap(12);
registerMap = registry.putIfAbsent(registrant.getAppName(), registerNewMap);
if (registerMap == null) { if (registerMap == null) {
ConcurrentHashMap<String, Lease<InstanceInfo>> registerNewMap = new ConcurrentHashMap(12); registerMap = registerNewMap;
registerMap = registry.putIfAbsent(registrant.getAppName(), registerNewMap);
if (registerMap == null) {
registerMap = registerNewMap;
}
}
Lease<InstanceInfo> existingLease = registerMap.get(registrant.getInstanceId());
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
registrant = existingLease.getHolder();
}
} }
}
Lease<InstanceInfo> lease = new Lease(registrant); Lease<InstanceInfo> existingLease = registerMap.get(registrant.getInstanceId());
if (existingLease != null) { if (existingLease != null && (existingLease.getHolder() != null)) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
} Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
registerMap.put(registrant.getInstanceId(), lease);
recentRegisteredQueue.add(new Pair(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getInstanceId() + ")"));
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getInstanceId()); if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
if (overriddenStatusFromMap != null) { registrant = existingLease.getHolder();
log.info("Storing overridden status :: {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
} }
}
if (InstanceStatus.UP.equals(registrant.getStatus())) { Lease<InstanceInfo> lease = new Lease(registrant);
lease.serviceUp(); if (existingLease != null) {
} lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
registerMap.put(registrant.getInstanceId(), lease);
registrant.setActionType(InstanceInfo.ActionType.ADDED); if (InstanceStatus.UP.equals(registrant.getStatus())) {
recentlyChangedQueue.add(new RecentlyChangedItem(lease)); lease.serviceUp();
registrant.setLastUpdatedTimestamp();
} finally {
read.unlock();
} }
registrant.setActionType(InstanceInfo.ActionType.ADDED);
registrant.setLastUpdatedTimestamp();
} }
@Override @Override
@ -127,7 +95,7 @@ public class BaseInstanceRegistry implements InstanceRegistry<InstanceInfo> {
String instanceId = instanceRenew.getInstanceId(); String instanceId = instanceRenew.getInstanceId();
Map<String, Lease<InstanceInfo>> registryMap = registry.get(appName); Map<String, Lease<InstanceInfo>> registryMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null; Lease<InstanceInfo> leaseToRenew;
if (registryMap == null || (leaseToRenew = registryMap.get(instanceId)) == null) { if (registryMap == null || (leaseToRenew = registryMap.get(instanceId)) == null) {
return false; return false;
} }
@ -138,94 +106,21 @@ public class BaseInstanceRegistry implements InstanceRegistry<InstanceInfo> {
@Override @Override
public void remove(InstanceInfo info) { public void remove(InstanceInfo info) {
ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock(); String appName = info.getAppName();
writeLock.lock(); String instanceId = info.getInstanceId();
try { Map<String, Lease<InstanceInfo>> leaseMap = registry.get(appName);
String appName = info.getAppName(); if (CollectionUtils.isEmpty(leaseMap)) {
String instanceId = info.getInstanceId(); log.warn("Failed to remove unhealthy node, no application found :: {}", appName);
Map<String, Lease<InstanceInfo>> leaseMap = registry.get(appName); return;
if (CollectionUtils.isEmpty(leaseMap)) {
log.warn("Failed to remove unhealthy node, no application found :: {}", appName);
return;
}
Lease<InstanceInfo> remove = leaseMap.remove(instanceId);
if (remove == null) {
log.warn("Failed to remove unhealthy node, no instance found :: {}", instanceId);
return;
}
log.info("Remove unhealthy node, node ID :: {}", instanceId);
} finally {
writeLock.unlock();
} }
}
static class CircularQueue<E> extends AbstractQueue<E> { Lease<InstanceInfo> remove = leaseMap.remove(instanceId);
if (remove == null) {
private final ArrayBlockingQueue<E> delegate; log.warn("Failed to remove unhealthy node, no instance found :: {}", instanceId);
return;
public CircularQueue(int capacity) {
this.delegate = new ArrayBlockingQueue(capacity);
}
@Override
public Iterator<E> iterator() {
return delegate.iterator();
}
@Override
public int size() {
return delegate.size();
}
@Override
public boolean offer(E e) {
while (!delegate.offer(e)) {
delegate.poll();
}
return true;
}
@Override
public E poll() {
return delegate.poll();
}
@Override
public E peek() {
return delegate.peek();
}
@Override
public void clear() {
delegate.clear();
}
@Override
public Object[] toArray() {
return delegate.toArray();
} }
} log.info("Remove unhealthy node, node ID :: {}", instanceId);
private static final class RecentlyChangedItem {
private long lastUpdateTime;
private Lease<InstanceInfo> leaseInfo;
public RecentlyChangedItem(Lease<InstanceInfo> lease) {
this.leaseInfo = lease;
lastUpdateTime = System.currentTimeMillis();
}
public long getLastUpdateTime() {
return this.lastUpdateTime;
}
public Lease<InstanceInfo> getLeaseInfo() {
return this.leaseInfo;
}
} }
public void evict(long additionalLeaseMs) { public void evict(long additionalLeaseMs) {
@ -246,23 +141,17 @@ public class BaseInstanceRegistry implements InstanceRegistry<InstanceInfo> {
String appName = expiredLease.getHolder().getAppName(); String appName = expiredLease.getHolder().getAppName();
String id = expiredLease.getHolder().getInstanceId(); String id = expiredLease.getHolder().getInstanceId();
String identify = expiredLease.getHolder().getIdentify(); String identify = expiredLease.getHolder().getIdentify();
internalCancel(appName, id, identify, false); internalCancel(appName, id, identify);
} }
} }
protected boolean internalCancel(String appName, String id, String identify, boolean isReplication) { protected boolean internalCancel(String appName, String id, String identify) {
read.lock(); Map<String, Lease<InstanceInfo>> registerMap = registry.get(appName);
try { if (!CollectionUtils.isEmpty(registerMap)) {
Map<String, Lease<InstanceInfo>> registerMap = registry.get(appName); registerMap.remove(id);
if (!CollectionUtils.isEmpty(registerMap)) { AbstractSubjectCenter.notify(AbstractSubjectCenter.SubjectType.CLEAR_CONFIG_CACHE, () -> identify);
registerMap.remove(id);
AbstractSubjectCenter.notify(AbstractSubjectCenter.SubjectType.CLEAR_CONFIG_CACHE, () -> identify);
log.info("Clean up unhealthy nodes. Node id :: {}", id);
}
} finally { log.info("Clean up unhealthy nodes. Node id :: {}", id);
read.unlock();
} }
return true; return true;

@ -1,36 +0,0 @@
package cn.hippo4j.discovery.core;
/**
* Pair.
*
* @author chen.ma
* @date 2021/8/8 23:04
*/
public class Pair<E1, E2> {
public E1 first() {
return first;
}
public void setFirst(E1 first) {
this.first = first;
}
public E2 second() {
return second;
}
public void setSecond(E2 second) {
this.second = second;
}
private E1 first;
private E2 second;
public Pair(E1 first, E2 second) {
this.first = first;
this.second = second;
}
}

@ -1,36 +0,0 @@
package cn.hippo4j.discovery.core;
import cn.hippo4j.common.model.InstanceInfo.InstanceStatus;
/**
* Status override result.
*
* @author chen.ma
* @date 2021/8/8 23:11
*/
public class StatusOverrideResult {
public static StatusOverrideResult NO_MATCH = new StatusOverrideResult(false, null);
public static StatusOverrideResult matchingStatus(InstanceStatus status) {
return new StatusOverrideResult(true, status);
}
private final boolean matches;
private final InstanceStatus status;
private StatusOverrideResult(boolean matches, InstanceStatus status) {
this.matches = matches;
this.status = status;
}
public boolean matches() {
return matches;
}
public InstanceStatus status() {
return status;
}
}

@ -40,7 +40,7 @@ public class DiscoveryConfiguration {
String namespace = properties.getNamespace(); String namespace = properties.getNamespace();
String itemId = properties.getItemId(); String itemId = properties.getItemId();
String port = environment.getProperty("server.port"); String port = environment.getProperty("server.port");
String applicationName = environment.getProperty("spring.application.name"); String applicationName = environment.getProperty("spring.dynamic.thread-pool.item-id");
String active = environment.getProperty("spring.profiles.active", "UNKNOWN"); String active = environment.getProperty("spring.profiles.active", "UNKNOWN");
InstanceInfo instanceInfo = new InstanceInfo(); InstanceInfo instanceInfo = new InstanceInfo();

Loading…
Cancel
Save