From 4fe8602c7476d8666f2b02f7542ea4b255096d17 Mon Sep 17 00:00:00 2001 From: Haotian Zhang <928016560@qq.com> Date: Tue, 17 May 2022 11:07:35 +0800 Subject: [PATCH] fix:fix route not refreshing bug when first instance of one service up. (#174) --- CHANGELOG.md | 1 + .../PolarisDiscoveryAutoConfiguration.java | 5 +- ...sRefreshApplicationReadyEventListener.java | 70 ++++++++++++++++ .../refresh/PolarisRefreshConfiguration.java | 32 ++++++++ .../PolarisServiceStatusChangeListener.java | 80 +++++++++++++++++++ .../PolarisServiceChangeListener.java | 62 -------------- .../registry/PolarisServiceRegistry.java | 10 +-- ...larisServiceRegistryAutoConfiguration.java | 12 +-- .../loadbalancer/PolarisLoadBalancer.java | 23 +----- 9 files changed, 193 insertions(+), 102 deletions(-) create mode 100644 spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/discovery/refresh/PolarisRefreshApplicationReadyEventListener.java create mode 100644 spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/discovery/refresh/PolarisRefreshConfiguration.java create mode 100644 spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/discovery/refresh/PolarisServiceStatusChangeListener.java delete mode 100644 spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/registry/PolarisServiceChangeListener.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 85c31fadf..c8e3ed71d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,3 +2,4 @@ --- - [fix:fix wrong context data storage.](https://github.com/Tencent/spring-cloud-tencent/pull/170) +- [fix:fix route not refreshing bug when first instance of one service up.](https://github.com/Tencent/spring-cloud-tencent/pull/174) diff --git a/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/discovery/PolarisDiscoveryAutoConfiguration.java b/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/discovery/PolarisDiscoveryAutoConfiguration.java index 139f72508..cc8000f44 100644 --- a/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/discovery/PolarisDiscoveryAutoConfiguration.java +++ b/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/discovery/PolarisDiscoveryAutoConfiguration.java @@ -19,6 +19,7 @@ package com.tencent.cloud.polaris.discovery; import com.tencent.cloud.polaris.discovery.reactive.PolarisReactiveDiscoveryClientConfiguration; +import com.tencent.cloud.polaris.discovery.refresh.PolarisRefreshConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; @@ -32,8 +33,8 @@ import org.springframework.context.annotation.Import; */ @Configuration(proxyBeanMethods = false) @ConditionalOnPolarisDiscoveryEnabled -@Import({ PolarisDiscoveryClientConfiguration.class, - PolarisReactiveDiscoveryClientConfiguration.class }) +@Import({PolarisDiscoveryClientConfiguration.class, + PolarisReactiveDiscoveryClientConfiguration.class, PolarisRefreshConfiguration.class}) public class PolarisDiscoveryAutoConfiguration { @Bean diff --git a/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/discovery/refresh/PolarisRefreshApplicationReadyEventListener.java b/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/discovery/refresh/PolarisRefreshApplicationReadyEventListener.java new file mode 100644 index 000000000..831573893 --- /dev/null +++ b/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/discovery/refresh/PolarisRefreshApplicationReadyEventListener.java @@ -0,0 +1,70 @@ +package com.tencent.cloud.polaris.discovery.refresh; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler; +import com.tencent.polaris.client.util.NamedThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.cloud.client.discovery.event.HeartbeatEvent; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationEventPublisherAware; +import org.springframework.context.ApplicationListener; + +import static com.tencent.cloud.polaris.discovery.refresh.PolarisServiceStatusChangeListener.INDEX; + +/** + * Begin refresh when application is ready. + * + * @author Haotian Zhang + */ +public class PolarisRefreshApplicationReadyEventListener implements ApplicationListener, ApplicationEventPublisherAware { + + private static final Logger LOG = LoggerFactory.getLogger(PolarisRefreshConfiguration.class); + private static final int DELAY = 60; + private final PolarisDiscoveryHandler polarisDiscoveryHandler; + private final PolarisServiceStatusChangeListener polarisServiceStatusChangeListener; + private final ScheduledExecutorService refreshExecutor; + private ApplicationEventPublisher publisher; + + public PolarisRefreshApplicationReadyEventListener(PolarisDiscoveryHandler polarisDiscoveryHandler, PolarisServiceStatusChangeListener polarisServiceStatusChangeListener) { + this.polarisDiscoveryHandler = polarisDiscoveryHandler; + this.polarisServiceStatusChangeListener = polarisServiceStatusChangeListener; + this.refreshExecutor = Executors.newSingleThreadScheduledExecutor( + new NamedThreadFactory("polaris-service-refresh")); + } + + @Override + public void onApplicationEvent(ApplicationReadyEvent event) { + // Register service change listener. + polarisDiscoveryHandler.getSdkContext().getExtensions().getLocalRegistry() + .registerResourceListener(polarisServiceStatusChangeListener); + + // Begin scheduled refresh thread. + refresh(); + } + + /** + * Start the refresh thread. + */ + public void refresh() { + refreshExecutor.scheduleWithFixedDelay(() -> { + try { + // Trigger reload of gateway route cache. + this.publisher.publishEvent(new HeartbeatEvent(this, INDEX.getAndIncrement())); + } + catch (Exception e) { + LOG.error("refresh polaris service error.", e); + } + }, DELAY, DELAY, TimeUnit.SECONDS); + } + + @Override + public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { + this.publisher = applicationEventPublisher; + } +} diff --git a/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/discovery/refresh/PolarisRefreshConfiguration.java b/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/discovery/refresh/PolarisRefreshConfiguration.java new file mode 100644 index 000000000..fe723bda7 --- /dev/null +++ b/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/discovery/refresh/PolarisRefreshConfiguration.java @@ -0,0 +1,32 @@ +package com.tencent.cloud.polaris.discovery.refresh; + +import com.tencent.cloud.polaris.context.ConditionalOnPolarisEnabled; +import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * Configuration for listening the change of service status. + * + * @author Haotian Zhang + */ +@Configuration(proxyBeanMethods = false) +@ConditionalOnPolarisEnabled +public class PolarisRefreshConfiguration { + + @Bean + @ConditionalOnMissingBean + public PolarisServiceStatusChangeListener polarisServiceChangeListener() { + return new PolarisServiceStatusChangeListener(); + } + + @Bean + @ConditionalOnMissingBean + public PolarisRefreshApplicationReadyEventListener polarisServiceStatusApplicationReadyEventListener( + PolarisDiscoveryHandler polarisDiscoveryHandler, + PolarisServiceStatusChangeListener polarisServiceStatusChangeListener) { + return new PolarisRefreshApplicationReadyEventListener(polarisDiscoveryHandler, polarisServiceStatusChangeListener); + } +} diff --git a/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/discovery/refresh/PolarisServiceStatusChangeListener.java b/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/discovery/refresh/PolarisServiceStatusChangeListener.java new file mode 100644 index 000000000..ccbc32e61 --- /dev/null +++ b/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/discovery/refresh/PolarisServiceStatusChangeListener.java @@ -0,0 +1,80 @@ +package com.tencent.cloud.polaris.discovery.refresh; + +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +import com.google.common.collect.Sets; +import com.tencent.polaris.api.plugin.registry.AbstractResourceEventListener; +import com.tencent.polaris.api.pojo.RegistryCacheValue; +import com.tencent.polaris.api.pojo.ServiceEventKey; +import com.tencent.polaris.client.pojo.ServiceInstancesByProto; +import com.tencent.polaris.client.pojo.ServicesByProto; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.cloud.client.discovery.event.HeartbeatEvent; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationEventPublisherAware; +import org.springframework.util.CollectionUtils; + +/** + * Change listener of Polaris service info. When service info is created or deleted, or, instance of service is from 0 to + * + * @author Haotian Zhang + */ +public class PolarisServiceStatusChangeListener extends AbstractResourceEventListener implements ApplicationEventPublisherAware { + + /** + * Index of service info status. + */ + public static final AtomicLong INDEX = new AtomicLong(0); + + private static final Logger LOG = LoggerFactory.getLogger(PolarisServiceStatusChangeListener.class); + + private ApplicationEventPublisher publisher; + + @Override + public void onResourceUpdated(ServiceEventKey svcEventKey, RegistryCacheValue oldValue, + RegistryCacheValue newValue) { + if (newValue.getEventType() == ServiceEventKey.EventType.SERVICE) { + if (oldValue instanceof ServicesByProto && newValue instanceof ServicesByProto) { + LOG.debug("receive service={} change event", svcEventKey); + Set oldServiceInfoSet = ((ServicesByProto) oldValue).getServices().stream() + .map(i -> i.getNamespace() + "::" + i.getService()).collect(Collectors.toSet()); + Set newServiceInfoSet = ((ServicesByProto) newValue).getServices().stream() + .map(i -> i.getNamespace() + "::" + i.getService()).collect(Collectors.toSet()); + + Sets.SetView addServiceInfoSetView = Sets.difference(newServiceInfoSet, oldServiceInfoSet); + Sets.SetView deleteServiceInfoSetView = Sets.difference(oldServiceInfoSet, newServiceInfoSet); + + if (addServiceInfoSetView.isEmpty() && deleteServiceInfoSetView.isEmpty()) { + return; + } + LOG.info("Service status is update. Add service of {}. Delete service of {}", addServiceInfoSetView, deleteServiceInfoSetView); + + // Trigger reload of gateway route cache. + this.publisher.publishEvent(new HeartbeatEvent(this, INDEX.getAndIncrement())); + } + } + else if (newValue.getEventType() == ServiceEventKey.EventType.INSTANCE) { + if (oldValue instanceof ServiceInstancesByProto && newValue instanceof ServiceInstancesByProto) { + LOG.debug("receive service instances={} change event", svcEventKey); + ServiceInstancesByProto oldIns = (ServiceInstancesByProto) oldValue; + ServiceInstancesByProto newIns = (ServiceInstancesByProto) newValue; + if ((CollectionUtils.isEmpty(oldIns.getInstances()) && !CollectionUtils.isEmpty(newIns.getInstances())) || + (!CollectionUtils.isEmpty(oldIns.getInstances()) && CollectionUtils.isEmpty(newIns.getInstances()))) { + LOG.info("Service status of {} is update.", newIns.getService()); + + // Trigger reload of gateway route cache. + this.publisher.publishEvent(new HeartbeatEvent(this, INDEX.getAndIncrement())); + } + } + } + } + + @Override + public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { + this.publisher = applicationEventPublisher; + } +} diff --git a/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/registry/PolarisServiceChangeListener.java b/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/registry/PolarisServiceChangeListener.java deleted file mode 100644 index d52628f28..000000000 --- a/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/registry/PolarisServiceChangeListener.java +++ /dev/null @@ -1,62 +0,0 @@ -package com.tencent.cloud.polaris.registry; - -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; - -import com.google.common.collect.Sets; -import com.tencent.polaris.api.plugin.registry.AbstractResourceEventListener; -import com.tencent.polaris.api.pojo.RegistryCacheValue; -import com.tencent.polaris.api.pojo.ServiceEventKey; -import com.tencent.polaris.client.pojo.ServicesByProto; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.springframework.cloud.client.discovery.event.HeartbeatEvent; -import org.springframework.context.ApplicationEventPublisher; -import org.springframework.context.ApplicationEventPublisherAware; - -/** - * Change listener of Polaris service info. - * - * @author Haotian Zhang - */ -public class PolarisServiceChangeListener extends AbstractResourceEventListener implements ApplicationEventPublisherAware { - - private static final Logger LOG = LoggerFactory.getLogger(PolarisServiceChangeListener.class); - - private static final AtomicInteger INDEX = new AtomicInteger(0); - - private ApplicationEventPublisher publisher; - - @Override - public void onResourceUpdated(ServiceEventKey svcEventKey, RegistryCacheValue oldValue, - RegistryCacheValue newValue) { - if (newValue.getEventType() != ServiceEventKey.EventType.SERVICE) { - return; - } - if (oldValue instanceof ServicesByProto && newValue instanceof ServicesByProto) { - LOG.debug("receive service={} change event", svcEventKey); - Set oldServiceInfoSet = ((ServicesByProto) oldValue).getServices().stream() - .map(i -> i.getNamespace() + "::" + i.getService()).collect(Collectors.toSet()); - Set newServiceInfoSet = ((ServicesByProto) newValue).getServices().stream() - .map(i -> i.getNamespace() + "::" + i.getService()).collect(Collectors.toSet()); - - Sets.SetView addServiceInfoSetView = Sets.difference(newServiceInfoSet, oldServiceInfoSet); - Sets.SetView deleteServiceInfoSetView = Sets.difference(oldServiceInfoSet, newServiceInfoSet); - - if (addServiceInfoSetView.isEmpty() && deleteServiceInfoSetView.isEmpty()) { - return; - } - LOG.info("Service info is update. Add service of {}. Delete service of {}", addServiceInfoSetView, deleteServiceInfoSetView); - - // Trigger reload of gateway route cache. - this.publisher.publishEvent(new HeartbeatEvent(this, INDEX.getAndIncrement())); - } - } - - @Override - public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { - this.publisher = applicationEventPublisher; - } -} diff --git a/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/registry/PolarisServiceRegistry.java b/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/registry/PolarisServiceRegistry.java index 6b171c1de..0d39c7262 100644 --- a/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/registry/PolarisServiceRegistry.java +++ b/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/registry/PolarisServiceRegistry.java @@ -64,11 +64,9 @@ public class PolarisServiceRegistry implements ServiceRegistry { private final ScheduledExecutorService heartbeatExecutor; - private final PolarisServiceChangeListener polarisServiceChangeListener; - public PolarisServiceRegistry(PolarisDiscoveryProperties polarisDiscoveryProperties, PolarisDiscoveryHandler polarisDiscoveryHandler, - MetadataLocalProperties metadataLocalProperties, PolarisServiceChangeListener polarisServiceChangeListener) { + MetadataLocalProperties metadataLocalProperties) { this.polarisDiscoveryProperties = polarisDiscoveryProperties; this.polarisDiscoveryHandler = polarisDiscoveryHandler; this.metadataLocalProperties = metadataLocalProperties; @@ -80,8 +78,6 @@ public class PolarisServiceRegistry implements ServiceRegistry { else { this.heartbeatExecutor = null; } - - this.polarisServiceChangeListener = polarisServiceChangeListener; } @Override @@ -119,10 +115,6 @@ public class PolarisServiceRegistry implements ServiceRegistry { // Start the heartbeat thread after the registration is successful. heartbeat(heartbeatRequest); } - - // Register service change listener - polarisDiscoveryHandler.getSdkContext().getExtensions().getLocalRegistry() - .registerResourceListener(polarisServiceChangeListener); } catch (Exception e) { log.error("polaris registry, {} register failed...{},", diff --git a/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/registry/PolarisServiceRegistryAutoConfiguration.java b/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/registry/PolarisServiceRegistryAutoConfiguration.java index fba7be0bd..68f338c5d 100644 --- a/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/registry/PolarisServiceRegistryAutoConfiguration.java +++ b/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/registry/PolarisServiceRegistryAutoConfiguration.java @@ -27,7 +27,6 @@ import com.tencent.polaris.client.api.SDKContext; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; -import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration; @@ -54,9 +53,8 @@ public class PolarisServiceRegistryAutoConfiguration { @Bean public PolarisServiceRegistry polarisServiceRegistry( PolarisDiscoveryProperties polarisDiscoveryProperties, PolarisDiscoveryHandler polarisDiscoveryHandler, - MetadataLocalProperties metadataLocalProperties, PolarisServiceChangeListener polarisServiceChangeListener) { - return new PolarisServiceRegistry(polarisDiscoveryProperties, - polarisDiscoveryHandler, metadataLocalProperties, polarisServiceChangeListener); + MetadataLocalProperties metadataLocalProperties) { + return new PolarisServiceRegistry(polarisDiscoveryProperties, polarisDiscoveryHandler, metadataLocalProperties); } @Bean @@ -77,10 +75,4 @@ public class PolarisServiceRegistryAutoConfiguration { return new PolarisAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration); } - - @Bean - @ConditionalOnMissingBean - public PolarisServiceChangeListener polarisServiceChangeListener() { - return new PolarisServiceChangeListener(); - } } diff --git a/spring-cloud-tencent-polaris-loadbalancer/src/main/java/com/tencent/cloud/polaris/loadbalancer/PolarisLoadBalancer.java b/spring-cloud-tencent-polaris-loadbalancer/src/main/java/com/tencent/cloud/polaris/loadbalancer/PolarisLoadBalancer.java index b9e0de6f1..574a42610 100644 --- a/spring-cloud-tencent-polaris-loadbalancer/src/main/java/com/tencent/cloud/polaris/loadbalancer/PolarisLoadBalancer.java +++ b/spring-cloud-tencent-polaris-loadbalancer/src/main/java/com/tencent/cloud/polaris/loadbalancer/PolarisLoadBalancer.java @@ -107,12 +107,7 @@ public class PolarisLoadBalancer extends DynamicServerListLoadBalancer { } private ServiceInstances getPolarisDiscoveryServiceInstances() { - List allServers = super.getAllServers(); - if (CollectionUtils.isEmpty(allServers)) { - return null; - } - String serviceName = ((PolarisServer) super.getAllServers().get(0)).getServiceInstances().getService(); - return getAllInstances(MetadataContext.LOCAL_NAMESPACE, serviceName).toServiceInstances(); + return getAllInstances(MetadataContext.LOCAL_NAMESPACE, name).toServiceInstances(); } private ServiceInstances getExtendDiscoveryServiceInstances() { @@ -121,26 +116,16 @@ public class PolarisLoadBalancer extends DynamicServerListLoadBalancer { return null; } ServiceInstances serviceInstances; - String serviceName; - // notice the difference between different service registries - if (StringUtils.isNotBlank( - allServers.get(0).getMetaInfo().getServiceIdForDiscovery())) { - serviceName = allServers.get(0).getMetaInfo().getServiceIdForDiscovery(); - } - else { - serviceName = allServers.get(0).getMetaInfo().getAppName(); - } - if (StringUtils.isBlank(serviceName)) { + if (StringUtils.isBlank(name)) { throw new IllegalStateException( "PolarisLoadBalancer only Server with AppName or ServiceIdForDiscovery attribute"); } - ServiceKey serviceKey = new ServiceKey(MetadataContext.LOCAL_NAMESPACE, - serviceName); + ServiceKey serviceKey = new ServiceKey(MetadataContext.LOCAL_NAMESPACE, name); List instances = new ArrayList<>(8); for (Server server : allServers) { DefaultInstance instance = new DefaultInstance(); instance.setNamespace(MetadataContext.LOCAL_NAMESPACE); - instance.setService(serviceName); + instance.setService(name); instance.setHealthy(server.isAlive()); instance.setProtocol(server.getScheme()); instance.setId(server.getId());