fix optimize heartbeat control in service registry

pull/1621/head
zihenz 3 months ago
parent 9d90295916
commit 444804a2fd

@ -55,6 +55,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.DisposableBean;
import org.springframework.cloud.client.serviceregistry.ServiceRegistry; import org.springframework.cloud.client.serviceregistry.ServiceRegistry;
import org.springframework.cloud.context.scope.refresh.RefreshScopeRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.concurrent.TimeUnit.SECONDS;
@ -87,17 +89,10 @@ public class PolarisServiceRegistry implements ServiceRegistry<PolarisRegistrati
this.polarisDiscoveryHandler = polarisDiscoveryHandler; this.polarisDiscoveryHandler = polarisDiscoveryHandler;
this.staticMetadataManager = staticMetadataManager; this.staticMetadataManager = staticMetadataManager;
this.polarisStatProperties = polarisStatProperties; this.polarisStatProperties = polarisStatProperties;
// Only create heartbeat executor if enabled
if (polarisDiscoveryProperties.getHeartbeatEnabled()) {
this.heartbeatExecutor = Executors
.newSingleThreadScheduledExecutor(new NamedThreadFactory("polaris-heartbeat"));
}
} }
@Override @Override
public void register(PolarisRegistration registration) { public void register(PolarisRegistration registration) {
if (StringUtils.isBlank(registration.getServiceId())) { if (StringUtils.isBlank(registration.getServiceId())) {
LOGGER.warn("No service to register for polaris client..."); LOGGER.warn("No service to register for polaris client...");
return; return;
@ -118,7 +113,6 @@ public class PolarisServiceRegistry implements ServiceRegistry<PolarisRegistrati
instanceRegisterRequest.setRegion(staticMetadataManager.getRegion()); instanceRegisterRequest.setRegion(staticMetadataManager.getRegion());
instanceRegisterRequest.setZone(staticMetadataManager.getZone()); instanceRegisterRequest.setZone(staticMetadataManager.getZone());
instanceRegisterRequest.setCampus(staticMetadataManager.getCampus()); instanceRegisterRequest.setCampus(staticMetadataManager.getCampus());
instanceRegisterRequest.setTtl(polarisDiscoveryProperties.getHeartbeatInterval());
instanceRegisterRequest.setMetadata(registration.getMetadata()); instanceRegisterRequest.setMetadata(registration.getMetadata());
instanceRegisterRequest.setExtendedMetadata(registration.getExtendedMetadata()); instanceRegisterRequest.setExtendedMetadata(registration.getExtendedMetadata());
instanceRegisterRequest.setProtocol(polarisDiscoveryProperties.getProtocol()); instanceRegisterRequest.setProtocol(polarisDiscoveryProperties.getProtocol());
@ -135,43 +129,37 @@ public class PolarisServiceRegistry implements ServiceRegistry<PolarisRegistrati
try { try {
ProviderAPI providerClient = polarisSDKContextManager.getProviderAPI(); ProviderAPI providerClient = polarisSDKContextManager.getProviderAPI();
InstanceRegisterResponse instanceRegisterResponse; InstanceRegisterResponse instanceRegisterResponse;
if (!polarisDiscoveryProperties.getHeartbeatEnabled() || StringUtils.isBlank(polarisDiscoveryProperties.getHealthCheckUrl())) {
instanceRegisterResponse = providerClient.registerInstance(instanceRegisterRequest); // Always use registerInstance to avoid automatic heartbeat
} instanceRegisterRequest.setTtl(polarisDiscoveryProperties.getHeartbeatEnabled() ?
else { polarisDiscoveryProperties.getHeartbeatInterval() : 2592000);
instanceRegisterResponse = providerClient.register(instanceRegisterRequest); instanceRegisterResponse = providerClient.registerInstance(instanceRegisterRequest);
if (heartbeatExecutor != null) {
InstanceHeartbeatRequest heartbeatRequest = new InstanceHeartbeatRequest(); // Only start heartbeat if explicitly enabled and health check URL is provided
BeanUtils.copyProperties(instanceRegisterRequest, heartbeatRequest); if (polarisDiscoveryProperties.getHeartbeatEnabled() &&
heartbeatRequest.setInstanceID(instanceRegisterResponse.getInstanceId()); StringUtils.isNotBlank(polarisDiscoveryProperties.getHealthCheckUrl())) {
heartbeat(heartbeatRequest); startHeartbeat(instanceRegisterRequest, instanceRegisterResponse);
} LOGGER.info("Registered instance with heartbeat enabled and health check URL.");
} else {
stopHeartbeat(); // Ensure no heartbeat is running
LOGGER.info("Registered instance without heartbeat.");
} }
registration.setInstanceId(instanceRegisterResponse.getInstanceId()); registration.setInstanceId(instanceRegisterResponse.getInstanceId());
LOGGER.info("polaris registry, {} {} {} {}:{} {} {} {} {} register finished", polarisDiscoveryProperties.getNamespace(), LOGGER.info("polaris registry, {} {} {} {}:{} {} {} {} {} register finished",
registration.getServiceId(), registration.getInstanceId(), registration.getHost(), registration.getPort(), instanceRegisterRequest.getNamespace(),
staticMetadataManager.getRegion(), staticMetadataManager.getZone(), staticMetadataManager.getCampus(), instanceRegisterRequest.getService(),
staticMetadataManager.getMergedStaticMetadata()); instanceRegisterResponse.getInstanceId(),
if (Objects.nonNull(polarisStatProperties) && polarisStatProperties.isEnabled()) { instanceRegisterRequest.getHost(),
try { instanceRegisterRequest.getPort(),
StatReporter statReporter = (StatReporter) polarisSDKContextManager.getSDKContext().getPlugins() instanceRegisterRequest.getRegion(),
.getPlugin(PluginTypes.STAT_REPORTER.getBaseType(), StatReporterConfig.DEFAULT_REPORTER_PROMETHEUS); instanceRegisterRequest.getZone(),
if (Objects.nonNull(statReporter)) { instanceRegisterRequest.getCampus(),
ReporterMetaInfo reporterMetaInfo = statReporter.metaInfo(); instanceRegisterRequest.getMetadata());
if (reporterMetaInfo.getPort() != null) {
LOGGER.info("Stat server started on port: " + reporterMetaInfo.getPort() + " (http)"); // Start stat server and configure service
} if (polarisStatProperties.isEnabled()) {
else { startStatServer();
LOGGER.info("Stat server is set to type of Push gateway");
}
}
else {
LOGGER.warn("Plugin StatReporter not found");
}
}
catch (Exception e) {
LOGGER.warn("Stat server started error, ", e);
}
} }
ServiceConfigImpl serviceConfig = (ServiceConfigImpl) polarisSDKContextManager.getSDKContext().getConfig() ServiceConfigImpl serviceConfig = (ServiceConfigImpl) polarisSDKContextManager.getSDKContext().getConfig()
@ -182,8 +170,108 @@ public class PolarisServiceRegistry implements ServiceRegistry<PolarisRegistrati
PolarisSDKContextManager.isRegistered = true; PolarisSDKContextManager.isRegistered = true;
} }
catch (Exception e) { catch (Exception e) {
LOGGER.error("polaris registry, {} register failed...{},", registration.getServiceId(), registration, e); LOGGER.error("ERR_POLARIS_REGISTER, register failed...{},", registration, e);
rethrowRuntimeException(e); }
}
private void startStatServer() {
try {
StatReporter statReporter = (StatReporter) polarisSDKContextManager.getSDKContext().getPlugins()
.getPlugin(PluginTypes.STAT_REPORTER.getBaseType(), StatReporterConfig.DEFAULT_REPORTER_PROMETHEUS);
if (Objects.nonNull(statReporter)) {
ReporterMetaInfo reporterMetaInfo = statReporter.metaInfo();
if (reporterMetaInfo.getPort() != null) {
LOGGER.info("Stat server started on port: " + reporterMetaInfo.getPort() + " (http)");
}
else {
LOGGER.info("Stat server is set to type of Push gateway");
}
}
else {
LOGGER.warn("Plugin StatReporter not found");
}
}
catch (Exception e) {
LOGGER.warn("Stat server started error, ", e);
}
}
private void startHeartbeat(InstanceRegisterRequest instanceRegisterRequest, InstanceRegisterResponse instanceRegisterResponse) {
// Double check heartbeat is enabled and health check URL is not blank
if (!polarisDiscoveryProperties.getHeartbeatEnabled() || StringUtils.isBlank(polarisDiscoveryProperties.getHealthCheckUrl())) {
stopHeartbeat();
LOGGER.info("Heartbeat is disabled or health check URL is blank, skipping heartbeat initialization.");
return;
}
// Stop any existing heartbeat task before starting a new one
stopHeartbeat();
heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("polaris-heartbeat"));
LOGGER.info("Created new heartbeat executor.");
InstanceHeartbeatRequest heartbeatRequest = new InstanceHeartbeatRequest();
BeanUtils.copyProperties(instanceRegisterRequest, heartbeatRequest);
heartbeatRequest.setInstanceID(instanceRegisterResponse.getInstanceId());
heartbeatExecutor.scheduleWithFixedDelay(() -> {
try {
// Check if heartbeat should still be running
if (!polarisDiscoveryProperties.getHeartbeatEnabled() ||
StringUtils.isBlank(polarisDiscoveryProperties.getHealthCheckUrl())) {
LOGGER.info("Heartbeat conditions no longer met, stopping heartbeat task.");
stopHeartbeat();
return;
}
// Perform health check
Map<String, String> headers = new HashMap<>(1);
headers.put(HttpHeaders.USER_AGENT, "polaris");
if (!OkHttpUtil.checkUrl(heartbeatRequest.getHost(), heartbeatRequest.getPort(),
polarisDiscoveryProperties.getHealthCheckUrl(), headers)) {
LOGGER.error("Backend service health check failed. health check endpoint = {}",
polarisDiscoveryProperties.getHealthCheckUrl());
return;
}
// Send heartbeat
polarisSDKContextManager.getProviderAPI().heartbeat(heartbeatRequest);
LOGGER.debug("Polaris heartbeat is sent successfully.");
}
catch (PolarisException e) {
LOGGER.error("Polaris heartbeat error with code [{}]", e.getCode(), e);
}
catch (Exception e) {
LOGGER.error("Polaris heartbeat runtime error", e);
}
}, polarisDiscoveryProperties.getHeartbeatInterval(), polarisDiscoveryProperties.getHeartbeatInterval(), SECONDS);
LOGGER.info("Heartbeat task scheduled with interval {} seconds.", polarisDiscoveryProperties.getHeartbeatInterval());
}
private void stopHeartbeat() {
if (heartbeatExecutor != null) {
try {
heartbeatExecutor.shutdownNow();
if (heartbeatExecutor.awaitTermination(5, SECONDS)) {
LOGGER.info("Polaris heartbeat task stopped gracefully.");
} else {
LOGGER.warn("Polaris heartbeat task did not terminate in time.");
}
} catch (InterruptedException e) {
LOGGER.warn("Interrupted while stopping heartbeat task.", e);
Thread.currentThread().interrupt();
} finally {
heartbeatExecutor = null;
}
}
}
@EventListener(RefreshScopeRefreshedEvent.class)
public void onPropertyRefreshed() {
if (!polarisDiscoveryProperties.getHeartbeatEnabled()) {
LOGGER.info("Heartbeat disabled via property refresh, stopping heartbeat task.");
stopHeartbeat();
} }
} }
@ -196,6 +284,8 @@ public class PolarisServiceRegistry implements ServiceRegistry<PolarisRegistrati
return; return;
} }
stopHeartbeat();
InstanceDeregisterRequest deRegisterRequest = new InstanceDeregisterRequest(); InstanceDeregisterRequest deRegisterRequest = new InstanceDeregisterRequest();
deRegisterRequest.setInstanceID(registration.getInstanceId()); deRegisterRequest.setInstanceID(registration.getInstanceId());
deRegisterRequest.setToken(polarisDiscoveryProperties.getToken()); deRegisterRequest.setToken(polarisDiscoveryProperties.getToken());
@ -213,21 +303,15 @@ public class PolarisServiceRegistry implements ServiceRegistry<PolarisRegistrati
catch (Exception e) { catch (Exception e) {
LOGGER.error("ERR_POLARIS_DEREGISTER, de-register failed...{},", registration, e); LOGGER.error("ERR_POLARIS_DEREGISTER, de-register failed...{},", registration, e);
} }
finally {
if (null != heartbeatExecutor) {
heartbeatExecutor.shutdown();
}
}
} }
@Override @Override
public void close() { public void close() {
stopHeartbeat();
} }
@Override @Override
public void setStatus(PolarisRegistration registration, String status) { public void setStatus(PolarisRegistration registration, String status) {
} }
@Override @Override
@ -247,58 +331,8 @@ public class PolarisServiceRegistry implements ServiceRegistry<PolarisRegistrati
return "DOWN"; return "DOWN";
} }
/**
* Start the heartbeat thread.
* @param heartbeatRequest heartbeat request
*/
public void heartbeat(InstanceHeartbeatRequest heartbeatRequest) {
// Stop heartbeat if disabled
if (!polarisDiscoveryProperties.getHeartbeatEnabled()) {
if (heartbeatExecutor != null) {
heartbeatExecutor.shutdown();
heartbeatExecutor = null;
}
return;
}
// Don't proceed if executor is null
if (heartbeatExecutor == null) {
return;
}
heartbeatExecutor.scheduleWithFixedDelay(() -> {
try {
// Skip heartbeat if disabled or health check URL is not set
if (!polarisDiscoveryProperties.getHeartbeatEnabled() ||
StringUtils.isBlank(polarisDiscoveryProperties.getHealthCheckUrl())) {
return;
}
Map<String, String> headers = new HashMap<>(1);
headers.put(HttpHeaders.USER_AGENT, "polaris");
if (!OkHttpUtil.checkUrl(heartbeatRequest.getHost(), heartbeatRequest.getPort(),
polarisDiscoveryProperties.getHealthCheckUrl(), headers)) {
LOGGER.error("backend service health check failed. health check endpoint = {}",
polarisDiscoveryProperties.getHealthCheckUrl());
return;
}
polarisSDKContextManager.getProviderAPI().heartbeat(heartbeatRequest);
LOGGER.trace("Polaris heartbeat is sent");
}
catch (PolarisException e) {
LOGGER.error("polaris heartbeat error with code [{}]", e.getCode(), e);
}
catch (Exception e) {
LOGGER.error("polaris heartbeat runtime error", e);
}
}, polarisDiscoveryProperties.getHeartbeatInterval(), polarisDiscoveryProperties.getHeartbeatInterval(), SECONDS);
}
@Override @Override
public void destroy() { public void destroy() {
if (heartbeatExecutor != null) { stopHeartbeat();
heartbeatExecutor.shutdown();
}
} }
} }

Loading…
Cancel
Save