fix:fix route not refreshing bug when first instance of one service up.

pull/177/head
SkyeBeFreeman 2 years ago
parent 7ee56fd58d
commit 4e6a97fb04

@ -2,3 +2,4 @@
---
- [fix:fix wrong context data storage.](https://github.com/Tencent/spring-cloud-tencent/pull/176)
- [fix:fix route not refreshing bug when first instance of one service up.](https://github.com/Tencent/spring-cloud-tencent/pull/177)

@ -19,6 +19,7 @@
package com.tencent.cloud.polaris.discovery;
import com.tencent.cloud.polaris.discovery.reactive.PolarisReactiveDiscoveryClientConfiguration;
import com.tencent.cloud.polaris.discovery.refresh.PolarisRefreshConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
@ -32,12 +33,14 @@ import org.springframework.context.annotation.Import;
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnPolarisDiscoveryEnabled
@Import({ PolarisDiscoveryClientConfiguration.class, PolarisReactiveDiscoveryClientConfiguration.class })
@Import({PolarisDiscoveryClientConfiguration.class,
PolarisReactiveDiscoveryClientConfiguration.class, PolarisRefreshConfiguration.class})
public class PolarisDiscoveryAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public PolarisServiceDiscovery polarisServiceDiscovery(PolarisDiscoveryHandler polarisDiscoveryHandler) {
public PolarisServiceDiscovery polarisServiceDiscovery(
PolarisDiscoveryHandler polarisDiscoveryHandler) {
return new PolarisServiceDiscovery(polarisDiscoveryHandler);
}

@ -0,0 +1,70 @@
package com.tencent.cloud.polaris.discovery.refresh;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import com.tencent.polaris.client.util.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.cloud.client.discovery.event.HeartbeatEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.ApplicationListener;
import static com.tencent.cloud.polaris.discovery.refresh.PolarisServiceStatusChangeListener.INDEX;
/**
* Begin refresh when application is ready.
*
* @author Haotian Zhang
*/
public class PolarisRefreshApplicationReadyEventListener implements ApplicationListener<ApplicationReadyEvent>, ApplicationEventPublisherAware {
private static final Logger LOG = LoggerFactory.getLogger(PolarisRefreshConfiguration.class);
private static final int DELAY = 60;
private final PolarisDiscoveryHandler polarisDiscoveryHandler;
private final PolarisServiceStatusChangeListener polarisServiceStatusChangeListener;
private final ScheduledExecutorService refreshExecutor;
private ApplicationEventPublisher publisher;
public PolarisRefreshApplicationReadyEventListener(PolarisDiscoveryHandler polarisDiscoveryHandler, PolarisServiceStatusChangeListener polarisServiceStatusChangeListener) {
this.polarisDiscoveryHandler = polarisDiscoveryHandler;
this.polarisServiceStatusChangeListener = polarisServiceStatusChangeListener;
this.refreshExecutor = Executors.newSingleThreadScheduledExecutor(
new NamedThreadFactory("polaris-service-refresh"));
}
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
// Register service change listener.
polarisDiscoveryHandler.getSdkContext().getExtensions().getLocalRegistry()
.registerResourceListener(polarisServiceStatusChangeListener);
// Begin scheduled refresh thread.
refresh();
}
/**
* Start the refresh thread.
*/
public void refresh() {
refreshExecutor.scheduleWithFixedDelay(() -> {
try {
// Trigger reload of gateway route cache.
this.publisher.publishEvent(new HeartbeatEvent(this, INDEX.getAndIncrement()));
}
catch (Exception e) {
LOG.error("refresh polaris service error.", e);
}
}, DELAY, DELAY, TimeUnit.SECONDS);
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.publisher = applicationEventPublisher;
}
}

@ -0,0 +1,32 @@
package com.tencent.cloud.polaris.discovery.refresh;
import com.tencent.cloud.polaris.context.ConditionalOnPolarisEnabled;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Configuration for listening the change of service status.
*
* @author Haotian Zhang
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnPolarisEnabled
public class PolarisRefreshConfiguration {
@Bean
@ConditionalOnMissingBean
public PolarisServiceStatusChangeListener polarisServiceChangeListener() {
return new PolarisServiceStatusChangeListener();
}
@Bean
@ConditionalOnMissingBean
public PolarisRefreshApplicationReadyEventListener polarisServiceStatusApplicationReadyEventListener(
PolarisDiscoveryHandler polarisDiscoveryHandler,
PolarisServiceStatusChangeListener polarisServiceStatusChangeListener) {
return new PolarisRefreshApplicationReadyEventListener(polarisDiscoveryHandler, polarisServiceStatusChangeListener);
}
}

@ -0,0 +1,80 @@
package com.tencent.cloud.polaris.discovery.refresh;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import com.google.common.collect.Sets;
import com.tencent.polaris.api.plugin.registry.AbstractResourceEventListener;
import com.tencent.polaris.api.pojo.RegistryCacheValue;
import com.tencent.polaris.api.pojo.ServiceEventKey;
import com.tencent.polaris.client.pojo.ServiceInstancesByProto;
import com.tencent.polaris.client.pojo.ServicesByProto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.client.discovery.event.HeartbeatEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.util.CollectionUtils;
/**
* Change listener of Polaris service info. When service info is created or deleted, or, instance of service is from 0 to
*
* @author Haotian Zhang
*/
public class PolarisServiceStatusChangeListener extends AbstractResourceEventListener implements ApplicationEventPublisherAware {
/**
* Index of service info status.
*/
public static final AtomicLong INDEX = new AtomicLong(0);
private static final Logger LOG = LoggerFactory.getLogger(PolarisServiceStatusChangeListener.class);
private ApplicationEventPublisher publisher;
@Override
public void onResourceUpdated(ServiceEventKey svcEventKey, RegistryCacheValue oldValue,
RegistryCacheValue newValue) {
if (newValue.getEventType() == ServiceEventKey.EventType.SERVICE) {
if (oldValue instanceof ServicesByProto && newValue instanceof ServicesByProto) {
LOG.debug("receive service={} change event", svcEventKey);
Set<String> oldServiceInfoSet = ((ServicesByProto) oldValue).getServices().stream()
.map(i -> i.getNamespace() + "::" + i.getService()).collect(Collectors.toSet());
Set<String> newServiceInfoSet = ((ServicesByProto) newValue).getServices().stream()
.map(i -> i.getNamespace() + "::" + i.getService()).collect(Collectors.toSet());
Sets.SetView<String> addServiceInfoSetView = Sets.difference(newServiceInfoSet, oldServiceInfoSet);
Sets.SetView<String> deleteServiceInfoSetView = Sets.difference(oldServiceInfoSet, newServiceInfoSet);
if (addServiceInfoSetView.isEmpty() && deleteServiceInfoSetView.isEmpty()) {
return;
}
LOG.info("Service status is update. Add service of {}. Delete service of {}", addServiceInfoSetView, deleteServiceInfoSetView);
// Trigger reload of gateway route cache.
this.publisher.publishEvent(new HeartbeatEvent(this, INDEX.getAndIncrement()));
}
}
else if (newValue.getEventType() == ServiceEventKey.EventType.INSTANCE) {
if (oldValue instanceof ServiceInstancesByProto && newValue instanceof ServiceInstancesByProto) {
LOG.debug("receive service instances={} change event", svcEventKey);
ServiceInstancesByProto oldIns = (ServiceInstancesByProto) oldValue;
ServiceInstancesByProto newIns = (ServiceInstancesByProto) newValue;
if ((CollectionUtils.isEmpty(oldIns.getInstances()) && !CollectionUtils.isEmpty(newIns.getInstances())) ||
(!CollectionUtils.isEmpty(oldIns.getInstances()) && CollectionUtils.isEmpty(newIns.getInstances()))) {
LOG.info("Service status of {} is update.", newIns.getService());
// Trigger reload of gateway route cache.
this.publisher.publishEvent(new HeartbeatEvent(this, INDEX.getAndIncrement()));
}
}
}
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.publisher = applicationEventPublisher;
}
}

@ -1,62 +0,0 @@
package com.tencent.cloud.polaris.registry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import com.google.common.collect.Sets;
import com.tencent.polaris.api.plugin.registry.AbstractResourceEventListener;
import com.tencent.polaris.api.pojo.RegistryCacheValue;
import com.tencent.polaris.api.pojo.ServiceEventKey;
import com.tencent.polaris.client.pojo.ServicesByProto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.client.discovery.event.HeartbeatEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
/**
* Change listener of Polaris service info.
*
* @author Haotian Zhang
*/
public class PolarisServiceChangeListener extends AbstractResourceEventListener implements ApplicationEventPublisherAware {
private static final Logger LOG = LoggerFactory.getLogger(PolarisServiceChangeListener.class);
private static final AtomicInteger INDEX = new AtomicInteger(0);
private ApplicationEventPublisher publisher;
@Override
public void onResourceUpdated(ServiceEventKey svcEventKey, RegistryCacheValue oldValue,
RegistryCacheValue newValue) {
if (newValue.getEventType() != ServiceEventKey.EventType.SERVICE) {
return;
}
if (oldValue instanceof ServicesByProto && newValue instanceof ServicesByProto) {
LOG.debug("receive service={} change event", svcEventKey);
Set<String> oldServiceInfoSet = ((ServicesByProto) oldValue).getServices().stream()
.map(i -> i.getNamespace() + "::" + i.getService()).collect(Collectors.toSet());
Set<String> newServiceInfoSet = ((ServicesByProto) newValue).getServices().stream()
.map(i -> i.getNamespace() + "::" + i.getService()).collect(Collectors.toSet());
Sets.SetView<String> addServiceInfoSetView = Sets.difference(newServiceInfoSet, oldServiceInfoSet);
Sets.SetView<String> deleteServiceInfoSetView = Sets.difference(oldServiceInfoSet, newServiceInfoSet);
if (addServiceInfoSetView.isEmpty() && deleteServiceInfoSetView.isEmpty()) {
return;
}
LOG.info("Service info is update. Add service of {}. Delete service of {}", addServiceInfoSetView, deleteServiceInfoSetView);
// Trigger reload of gateway route cache.
this.publisher.publishEvent(new HeartbeatEvent(this, INDEX.getAndIncrement()));
}
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.publisher = applicationEventPublisher;
}
}

@ -63,11 +63,9 @@ public class PolarisServiceRegistry implements ServiceRegistry<Registration> {
private final ScheduledExecutorService heartbeatExecutor;
private final PolarisServiceChangeListener polarisServiceChangeListener;
public PolarisServiceRegistry(PolarisDiscoveryProperties polarisDiscoveryProperties,
PolarisDiscoveryHandler polarisDiscoveryHandler,
MetadataLocalProperties metadataLocalProperties, PolarisServiceChangeListener polarisServiceChangeListener) {
MetadataLocalProperties metadataLocalProperties) {
this.polarisDiscoveryProperties = polarisDiscoveryProperties;
this.polarisDiscoveryHandler = polarisDiscoveryHandler;
this.metadataLocalProperties = metadataLocalProperties;
@ -79,8 +77,6 @@ public class PolarisServiceRegistry implements ServiceRegistry<Registration> {
else {
this.heartbeatExecutor = null;
}
this.polarisServiceChangeListener = polarisServiceChangeListener;
}
@Override
@ -117,10 +113,6 @@ public class PolarisServiceRegistry implements ServiceRegistry<Registration> {
// Start the heartbeat thread after the registration is successful.
heartbeat(heartbeatRequest);
}
// Register service change listener
polarisDiscoveryHandler.getSdkContext().getExtensions().getLocalRegistry()
.registerResourceListener(polarisServiceChangeListener);
}
catch (Exception e) {
log.error("polaris registry, {} register failed...{},", registration.getServiceId(), registration, e);

@ -27,7 +27,6 @@ import com.tencent.polaris.client.api.SDKContext;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration;
@ -52,9 +51,8 @@ public class PolarisServiceRegistryAutoConfiguration {
@Bean
public PolarisServiceRegistry polarisServiceRegistry(
PolarisDiscoveryProperties polarisDiscoveryProperties, PolarisDiscoveryHandler polarisDiscoveryHandler,
MetadataLocalProperties metadataLocalProperties, PolarisServiceChangeListener polarisServiceChangeListener) {
return new PolarisServiceRegistry(polarisDiscoveryProperties,
polarisDiscoveryHandler, metadataLocalProperties, polarisServiceChangeListener);
MetadataLocalProperties metadataLocalProperties) {
return new PolarisServiceRegistry(polarisDiscoveryProperties, polarisDiscoveryHandler, metadataLocalProperties);
}
@Bean
@ -71,10 +69,4 @@ public class PolarisServiceRegistryAutoConfiguration {
AutoServiceRegistrationProperties autoServiceRegistrationProperties, PolarisRegistration registration) {
return new PolarisAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration);
}
@Bean
@ConditionalOnMissingBean
public PolarisServiceChangeListener polarisServiceChangeListener() {
return new PolarisServiceChangeListener();
}
}

Loading…
Cancel
Save