|
|
|
@ -22,7 +22,6 @@ import java.util.Map;
|
|
|
|
|
import java.util.Objects;
|
|
|
|
|
import java.util.concurrent.Executors;
|
|
|
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
|
|
|
|
import com.tencent.cloud.common.metadata.MetadataContext;
|
|
|
|
|
import com.tencent.cloud.common.metadata.StaticMetadataManager;
|
|
|
|
@ -56,13 +55,10 @@ import org.slf4j.LoggerFactory;
|
|
|
|
|
import org.springframework.beans.BeanUtils;
|
|
|
|
|
import org.springframework.beans.factory.DisposableBean;
|
|
|
|
|
import org.springframework.cloud.client.serviceregistry.ServiceRegistry;
|
|
|
|
|
import org.springframework.cloud.context.scope.refresh.RefreshScopeRefreshedEvent;
|
|
|
|
|
import org.springframework.context.event.EventListener;
|
|
|
|
|
import org.springframework.http.HttpHeaders;
|
|
|
|
|
|
|
|
|
|
import static java.util.concurrent.TimeUnit.SECONDS;
|
|
|
|
|
import static org.springframework.util.ReflectionUtils.rethrowRuntimeException;
|
|
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Service registry of Polaris.
|
|
|
|
@ -81,6 +77,7 @@ public class PolarisServiceRegistry implements ServiceRegistry<PolarisRegistrati
|
|
|
|
|
|
|
|
|
|
private final StaticMetadataManager staticMetadataManager;
|
|
|
|
|
private final PolarisStatProperties polarisStatProperties;
|
|
|
|
|
private final ScheduledExecutorService heartbeatExecutor;
|
|
|
|
|
|
|
|
|
|
public PolarisServiceRegistry(PolarisDiscoveryProperties polarisDiscoveryProperties,
|
|
|
|
|
PolarisSDKContextManager polarisSDKContextManager, PolarisDiscoveryHandler polarisDiscoveryHandler,
|
|
|
|
@ -89,6 +86,15 @@ public class PolarisServiceRegistry implements ServiceRegistry<PolarisRegistrati
|
|
|
|
|
this.polarisSDKContextManager = polarisSDKContextManager;
|
|
|
|
|
this.polarisDiscoveryHandler = polarisDiscoveryHandler;
|
|
|
|
|
this.staticMetadataManager = staticMetadataManager;
|
|
|
|
|
|
|
|
|
|
if (StringUtils.isNotBlank(polarisDiscoveryProperties.getHealthCheckUrl())) {
|
|
|
|
|
this.heartbeatExecutor = Executors
|
|
|
|
|
.newSingleThreadScheduledExecutor(new NamedThreadFactory("polaris-heartbeat"));
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
this.heartbeatExecutor = null;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
this.polarisStatProperties = polarisStatProperties;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -130,66 +136,50 @@ public class PolarisServiceRegistry implements ServiceRegistry<PolarisRegistrati
|
|
|
|
|
try {
|
|
|
|
|
ProviderAPI providerClient = polarisSDKContextManager.getProviderAPI();
|
|
|
|
|
InstanceRegisterResponse instanceRegisterResponse;
|
|
|
|
|
|
|
|
|
|
if (!polarisDiscoveryProperties.getHeartbeatEnabled()) {
|
|
|
|
|
if (polarisDiscoveryProperties.getHeartbeatEnabled()) {
|
|
|
|
|
if (StringUtils.isBlank(polarisDiscoveryProperties.getHealthCheckUrl())) {
|
|
|
|
|
instanceRegisterResponse = providerClient.registerInstance(instanceRegisterRequest);
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
instanceRegisterRequest.setTtl(polarisDiscoveryProperties.getHeartbeatInterval());
|
|
|
|
|
instanceRegisterResponse = providerClient.register(instanceRegisterRequest);
|
|
|
|
|
InstanceHeartbeatRequest heartbeatRequest = new InstanceHeartbeatRequest();
|
|
|
|
|
BeanUtils.copyProperties(instanceRegisterRequest, heartbeatRequest);
|
|
|
|
|
heartbeatRequest.setInstanceID(instanceRegisterResponse.getInstanceId());
|
|
|
|
|
// Start the heartbeat thread after the registration is successful.
|
|
|
|
|
heartbeat(heartbeatRequest);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
// Heartbeat is disabled
|
|
|
|
|
instanceRegisterResponse = providerClient.register(instanceRegisterRequest);
|
|
|
|
|
LOGGER.info("Registered instance without heartbeat.");
|
|
|
|
|
} else {
|
|
|
|
|
// Heartbeat is enabled
|
|
|
|
|
instanceRegisterRequest.setTtl(polarisDiscoveryProperties.getHeartbeatInterval());
|
|
|
|
|
instanceRegisterResponse = providerClient.registerInstance(instanceRegisterRequest);
|
|
|
|
|
LOGGER.info("Registered instance with heartbeat enabled.");
|
|
|
|
|
|
|
|
|
|
// Heartbeat thread always starts, health check only affects heartbeat content
|
|
|
|
|
String healthCheckUrl = polarisDiscoveryProperties.getHealthCheckUrl();
|
|
|
|
|
ScheduledExecutorService heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
|
|
|
|
|
heartbeatExecutor.scheduleWithFixedDelay(() -> {
|
|
|
|
|
try {
|
|
|
|
|
boolean canSendHeartbeat = true;
|
|
|
|
|
if (StringUtils.isNotBlank(healthCheckUrl)) {
|
|
|
|
|
Map<String, String> headers = new HashMap<>(1);
|
|
|
|
|
headers.put(HttpHeaders.USER_AGENT, "polaris");
|
|
|
|
|
canSendHeartbeat = OkHttpUtil.checkUrl(
|
|
|
|
|
instanceRegisterRequest.getHost(),
|
|
|
|
|
instanceRegisterRequest.getPort(),
|
|
|
|
|
healthCheckUrl,
|
|
|
|
|
headers
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
registration.setInstanceId(instanceRegisterResponse.getInstanceId());
|
|
|
|
|
LOGGER.info("polaris registry, {} {} {} {}:{} {} {} {} {} register finished", polarisDiscoveryProperties.getNamespace(),
|
|
|
|
|
registration.getServiceId(), registration.getInstanceId(), registration.getHost(), registration.getPort(),
|
|
|
|
|
staticMetadataManager.getRegion(), staticMetadataManager.getZone(), staticMetadataManager.getCampus(),
|
|
|
|
|
staticMetadataManager.getMergedStaticMetadata());
|
|
|
|
|
if (Objects.nonNull(polarisStatProperties) && polarisStatProperties.isEnabled()) {
|
|
|
|
|
try {
|
|
|
|
|
StatReporter statReporter = (StatReporter) polarisSDKContextManager.getSDKContext().getPlugins()
|
|
|
|
|
.getPlugin(PluginTypes.STAT_REPORTER.getBaseType(), StatReporterConfig.DEFAULT_REPORTER_PROMETHEUS);
|
|
|
|
|
if (Objects.nonNull(statReporter)) {
|
|
|
|
|
ReporterMetaInfo reporterMetaInfo = statReporter.metaInfo();
|
|
|
|
|
if (reporterMetaInfo.getPort() != null) {
|
|
|
|
|
LOGGER.info("Stat server started on port: " + reporterMetaInfo.getPort() + " (http)");
|
|
|
|
|
}
|
|
|
|
|
if (!canSendHeartbeat) {
|
|
|
|
|
LOGGER.warn("Health check failed, skip heartbeat this round. health check endpoint = {}", healthCheckUrl);
|
|
|
|
|
return;
|
|
|
|
|
else {
|
|
|
|
|
LOGGER.info("Stat server is set to type of Push gateway");
|
|
|
|
|
}
|
|
|
|
|
InstanceHeartbeatRequest heartbeatRequest = new InstanceHeartbeatRequest();
|
|
|
|
|
BeanUtils.copyProperties(instanceRegisterRequest, heartbeatRequest);
|
|
|
|
|
heartbeatRequest.setInstanceID(instanceRegisterResponse.getInstanceId());
|
|
|
|
|
providerClient.heartbeat(heartbeatRequest);
|
|
|
|
|
LOGGER.debug("Polaris heartbeat is sent successfully.");
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
LOGGER.error("Polaris heartbeat runtime error", e);
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
polarisDiscoveryProperties.getHeartbeatInterval(),
|
|
|
|
|
polarisDiscoveryProperties.getHeartbeatInterval(),
|
|
|
|
|
TimeUnit.SECONDS);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
registration.setInstanceId(instanceRegisterResponse.getInstanceId());
|
|
|
|
|
LOGGER.info("polaris registry, {} {} {} {}:{} {} {} {} {} register finished",
|
|
|
|
|
instanceRegisterRequest.getNamespace(),
|
|
|
|
|
instanceRegisterRequest.getService(),
|
|
|
|
|
instanceRegisterResponse.getInstanceId(),
|
|
|
|
|
instanceRegisterRequest.getHost(),
|
|
|
|
|
instanceRegisterRequest.getPort(),
|
|
|
|
|
instanceRegisterRequest.getRegion(),
|
|
|
|
|
instanceRegisterRequest.getZone(),
|
|
|
|
|
instanceRegisterRequest.getCampus(),
|
|
|
|
|
instanceRegisterRequest.getMetadata());
|
|
|
|
|
|
|
|
|
|
// Start stat server and configure service
|
|
|
|
|
if (polarisStatProperties.isEnabled()) {
|
|
|
|
|
startStatServer();
|
|
|
|
|
else {
|
|
|
|
|
LOGGER.warn("Plugin StatReporter not found");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
catch (Exception e) {
|
|
|
|
|
LOGGER.warn("Stat server started error, ", e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ServiceConfigImpl serviceConfig = (ServiceConfigImpl) polarisSDKContextManager.getSDKContext().getConfig()
|
|
|
|
@ -200,29 +190,8 @@ public class PolarisServiceRegistry implements ServiceRegistry<PolarisRegistrati
|
|
|
|
|
PolarisSDKContextManager.isRegistered = true;
|
|
|
|
|
}
|
|
|
|
|
catch (Exception e) {
|
|
|
|
|
LOGGER.error("ERR_POLARIS_REGISTER, register failed...{},", registration, e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void startStatServer() {
|
|
|
|
|
try {
|
|
|
|
|
StatReporter statReporter = (StatReporter) polarisSDKContextManager.getSDKContext().getPlugins()
|
|
|
|
|
.getPlugin(PluginTypes.STAT_REPORTER.getBaseType(), StatReporterConfig.DEFAULT_REPORTER_PROMETHEUS);
|
|
|
|
|
if (Objects.nonNull(statReporter)) {
|
|
|
|
|
ReporterMetaInfo reporterMetaInfo = statReporter.metaInfo();
|
|
|
|
|
if (reporterMetaInfo.getPort() != null) {
|
|
|
|
|
LOGGER.info("Stat server started on port: " + reporterMetaInfo.getPort() + " (http)");
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
LOGGER.info("Stat server is set to type of Push gateway");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
LOGGER.warn("Plugin StatReporter not found");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
catch (Exception e) {
|
|
|
|
|
LOGGER.warn("Stat server started error, ", e);
|
|
|
|
|
LOGGER.error("polaris registry, {} register failed...{},", registration.getServiceId(), registration, e);
|
|
|
|
|
rethrowRuntimeException(e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -252,14 +221,21 @@ public class PolarisServiceRegistry implements ServiceRegistry<PolarisRegistrati
|
|
|
|
|
catch (Exception e) {
|
|
|
|
|
LOGGER.error("ERR_POLARIS_DEREGISTER, de-register failed...{},", registration, e);
|
|
|
|
|
}
|
|
|
|
|
finally {
|
|
|
|
|
if (null != heartbeatExecutor) {
|
|
|
|
|
heartbeatExecutor.shutdown();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void close() {
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void setStatus(PolarisRegistration registration, String status) {
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -279,7 +255,40 @@ public class PolarisServiceRegistry implements ServiceRegistry<PolarisRegistrati
|
|
|
|
|
return "DOWN";
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Start the heartbeat thread.
|
|
|
|
|
* @param heartbeatRequest heartbeat request
|
|
|
|
|
*/
|
|
|
|
|
public void heartbeat(InstanceHeartbeatRequest heartbeatRequest) {
|
|
|
|
|
heartbeatExecutor.scheduleWithFixedDelay(() -> {
|
|
|
|
|
try {
|
|
|
|
|
// If the health check passes, the heartbeat will be reported.
|
|
|
|
|
// If it does not pass, the heartbeat will not be reported.
|
|
|
|
|
Map<String, String> headers = new HashMap<>(1);
|
|
|
|
|
headers.put(HttpHeaders.USER_AGENT, "polaris");
|
|
|
|
|
if (!OkHttpUtil.checkUrl(heartbeatRequest.getHost(), heartbeatRequest.getPort(),
|
|
|
|
|
polarisDiscoveryProperties.getHealthCheckUrl(), headers)) {
|
|
|
|
|
LOGGER.error("backend service health check failed. health check endpoint = {}",
|
|
|
|
|
polarisDiscoveryProperties.getHealthCheckUrl());
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
polarisSDKContextManager.getProviderAPI().heartbeat(heartbeatRequest);
|
|
|
|
|
LOGGER.trace("Polaris heartbeat is sent");
|
|
|
|
|
}
|
|
|
|
|
catch (PolarisException e) {
|
|
|
|
|
LOGGER.error("polaris heartbeat error with code [{}]", e.getCode(), e);
|
|
|
|
|
}
|
|
|
|
|
catch (Exception e) {
|
|
|
|
|
LOGGER.error("polaris heartbeat runtime error", e);
|
|
|
|
|
}
|
|
|
|
|
}, polarisDiscoveryProperties.getHeartbeatInterval(), polarisDiscoveryProperties.getHeartbeatInterval(), SECONDS);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void destroy() {
|
|
|
|
|
if (heartbeatExecutor != null) {
|
|
|
|
|
heartbeatExecutor.shutdown();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|