fix:fix route not refreshing bug when first instance of one service up.

pull/174/head
SkyeBeFreeman 3 years ago
parent 266ee13bd2
commit 900acc4421

@ -2,3 +2,4 @@
---
- [fix:fix wrong context data storage.](https://github.com/Tencent/spring-cloud-tencent/pull/170)
- [fix:fix route not refreshing bug when first instance of one service up.](https://github.com/Tencent/spring-cloud-tencent/pull/174)

@ -19,6 +19,7 @@
package com.tencent.cloud.polaris.discovery;
import com.tencent.cloud.polaris.discovery.reactive.PolarisReactiveDiscoveryClientConfiguration;
import com.tencent.cloud.polaris.discovery.refresh.PolarisRefreshConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
@ -32,8 +33,8 @@ import org.springframework.context.annotation.Import;
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnPolarisDiscoveryEnabled
@Import({ PolarisDiscoveryClientConfiguration.class,
PolarisReactiveDiscoveryClientConfiguration.class })
@Import({PolarisDiscoveryClientConfiguration.class,
PolarisReactiveDiscoveryClientConfiguration.class, PolarisRefreshConfiguration.class})
public class PolarisDiscoveryAutoConfiguration {
@Bean

@ -0,0 +1,70 @@
package com.tencent.cloud.polaris.discovery.refresh;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import com.tencent.polaris.client.util.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.cloud.client.discovery.event.HeartbeatEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.ApplicationListener;
import static com.tencent.cloud.polaris.discovery.refresh.PolarisServiceStatusChangeListener.INDEX;
/**
* Begin refresh when application is ready.
*
* @author Haotian Zhang
*/
public class PolarisRefreshApplicationReadyEventListener implements ApplicationListener<ApplicationReadyEvent>, ApplicationEventPublisherAware {
private static final Logger LOG = LoggerFactory.getLogger(PolarisRefreshConfiguration.class);
private static final int DELAY = 60;
private final PolarisDiscoveryHandler polarisDiscoveryHandler;
private final PolarisServiceStatusChangeListener polarisServiceStatusChangeListener;
private final ScheduledExecutorService refreshExecutor;
private ApplicationEventPublisher publisher;
public PolarisRefreshApplicationReadyEventListener(PolarisDiscoveryHandler polarisDiscoveryHandler, PolarisServiceStatusChangeListener polarisServiceStatusChangeListener) {
this.polarisDiscoveryHandler = polarisDiscoveryHandler;
this.polarisServiceStatusChangeListener = polarisServiceStatusChangeListener;
this.refreshExecutor = Executors.newSingleThreadScheduledExecutor(
new NamedThreadFactory("polaris-service-refresh"));
}
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
// Register service change listener.
polarisDiscoveryHandler.getSdkContext().getExtensions().getLocalRegistry()
.registerResourceListener(polarisServiceStatusChangeListener);
// Begin scheduled refresh thread.
refresh();
}
/**
* Start the refresh thread.
*/
public void refresh() {
refreshExecutor.scheduleWithFixedDelay(() -> {
try {
// Trigger reload of gateway route cache.
this.publisher.publishEvent(new HeartbeatEvent(this, INDEX.getAndIncrement()));
}
catch (Exception e) {
LOG.error("refresh polaris service error.", e);
}
}, DELAY, DELAY, TimeUnit.SECONDS);
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.publisher = applicationEventPublisher;
}
}

@ -0,0 +1,32 @@
package com.tencent.cloud.polaris.discovery.refresh;
import com.tencent.cloud.polaris.context.ConditionalOnPolarisEnabled;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Configuration for listening the change of service status.
*
* @author Haotian Zhang
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnPolarisEnabled
public class PolarisRefreshConfiguration {
@Bean
@ConditionalOnMissingBean
public PolarisServiceStatusChangeListener polarisServiceChangeListener() {
return new PolarisServiceStatusChangeListener();
}
@Bean
@ConditionalOnMissingBean
public PolarisRefreshApplicationReadyEventListener polarisServiceStatusApplicationReadyEventListener(
PolarisDiscoveryHandler polarisDiscoveryHandler,
PolarisServiceStatusChangeListener polarisServiceStatusChangeListener) {
return new PolarisRefreshApplicationReadyEventListener(polarisDiscoveryHandler, polarisServiceStatusChangeListener);
}
}

@ -0,0 +1,80 @@
package com.tencent.cloud.polaris.discovery.refresh;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import com.google.common.collect.Sets;
import com.tencent.polaris.api.plugin.registry.AbstractResourceEventListener;
import com.tencent.polaris.api.pojo.RegistryCacheValue;
import com.tencent.polaris.api.pojo.ServiceEventKey;
import com.tencent.polaris.client.pojo.ServiceInstancesByProto;
import com.tencent.polaris.client.pojo.ServicesByProto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.client.discovery.event.HeartbeatEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.util.CollectionUtils;
/**
* Change listener of Polaris service info. When service info is created or deleted, or, instance of service is from 0 to
*
* @author Haotian Zhang
*/
public class PolarisServiceStatusChangeListener extends AbstractResourceEventListener implements ApplicationEventPublisherAware {
/**
* Index of service info status.
*/
public static final AtomicLong INDEX = new AtomicLong(0);
private static final Logger LOG = LoggerFactory.getLogger(PolarisServiceStatusChangeListener.class);
private ApplicationEventPublisher publisher;
@Override
public void onResourceUpdated(ServiceEventKey svcEventKey, RegistryCacheValue oldValue,
RegistryCacheValue newValue) {
if (newValue.getEventType() == ServiceEventKey.EventType.SERVICE) {
if (oldValue instanceof ServicesByProto && newValue instanceof ServicesByProto) {
LOG.debug("receive service={} change event", svcEventKey);
Set<String> oldServiceInfoSet = ((ServicesByProto) oldValue).getServices().stream()
.map(i -> i.getNamespace() + "::" + i.getService()).collect(Collectors.toSet());
Set<String> newServiceInfoSet = ((ServicesByProto) newValue).getServices().stream()
.map(i -> i.getNamespace() + "::" + i.getService()).collect(Collectors.toSet());
Sets.SetView<String> addServiceInfoSetView = Sets.difference(newServiceInfoSet, oldServiceInfoSet);
Sets.SetView<String> deleteServiceInfoSetView = Sets.difference(oldServiceInfoSet, newServiceInfoSet);
if (addServiceInfoSetView.isEmpty() && deleteServiceInfoSetView.isEmpty()) {
return;
}
LOG.info("Service status is update. Add service of {}. Delete service of {}", addServiceInfoSetView, deleteServiceInfoSetView);
// Trigger reload of gateway route cache.
this.publisher.publishEvent(new HeartbeatEvent(this, INDEX.getAndIncrement()));
}
}
else if (newValue.getEventType() == ServiceEventKey.EventType.INSTANCE) {
if (oldValue instanceof ServiceInstancesByProto && newValue instanceof ServiceInstancesByProto) {
LOG.debug("receive service instances={} change event", svcEventKey);
ServiceInstancesByProto oldIns = (ServiceInstancesByProto) oldValue;
ServiceInstancesByProto newIns = (ServiceInstancesByProto) newValue;
if ((CollectionUtils.isEmpty(oldIns.getInstances()) && !CollectionUtils.isEmpty(newIns.getInstances())) ||
(!CollectionUtils.isEmpty(oldIns.getInstances()) && CollectionUtils.isEmpty(newIns.getInstances()))) {
LOG.info("Service status of {} is update.", newIns.getService());
// Trigger reload of gateway route cache.
this.publisher.publishEvent(new HeartbeatEvent(this, INDEX.getAndIncrement()));
}
}
}
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.publisher = applicationEventPublisher;
}
}

@ -1,62 +0,0 @@
package com.tencent.cloud.polaris.registry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import com.google.common.collect.Sets;
import com.tencent.polaris.api.plugin.registry.AbstractResourceEventListener;
import com.tencent.polaris.api.pojo.RegistryCacheValue;
import com.tencent.polaris.api.pojo.ServiceEventKey;
import com.tencent.polaris.client.pojo.ServicesByProto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.client.discovery.event.HeartbeatEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
/**
* Change listener of Polaris service info.
*
* @author Haotian Zhang
*/
public class PolarisServiceChangeListener extends AbstractResourceEventListener implements ApplicationEventPublisherAware {
private static final Logger LOG = LoggerFactory.getLogger(PolarisServiceChangeListener.class);
private static final AtomicInteger INDEX = new AtomicInteger(0);
private ApplicationEventPublisher publisher;
@Override
public void onResourceUpdated(ServiceEventKey svcEventKey, RegistryCacheValue oldValue,
RegistryCacheValue newValue) {
if (newValue.getEventType() != ServiceEventKey.EventType.SERVICE) {
return;
}
if (oldValue instanceof ServicesByProto && newValue instanceof ServicesByProto) {
LOG.debug("receive service={} change event", svcEventKey);
Set<String> oldServiceInfoSet = ((ServicesByProto) oldValue).getServices().stream()
.map(i -> i.getNamespace() + "::" + i.getService()).collect(Collectors.toSet());
Set<String> newServiceInfoSet = ((ServicesByProto) newValue).getServices().stream()
.map(i -> i.getNamespace() + "::" + i.getService()).collect(Collectors.toSet());
Sets.SetView<String> addServiceInfoSetView = Sets.difference(newServiceInfoSet, oldServiceInfoSet);
Sets.SetView<String> deleteServiceInfoSetView = Sets.difference(oldServiceInfoSet, newServiceInfoSet);
if (addServiceInfoSetView.isEmpty() && deleteServiceInfoSetView.isEmpty()) {
return;
}
LOG.info("Service info is update. Add service of {}. Delete service of {}", addServiceInfoSetView, deleteServiceInfoSetView);
// Trigger reload of gateway route cache.
this.publisher.publishEvent(new HeartbeatEvent(this, INDEX.getAndIncrement()));
}
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.publisher = applicationEventPublisher;
}
}

@ -64,11 +64,9 @@ public class PolarisServiceRegistry implements ServiceRegistry<Registration> {
private final ScheduledExecutorService heartbeatExecutor;
private final PolarisServiceChangeListener polarisServiceChangeListener;
public PolarisServiceRegistry(PolarisDiscoveryProperties polarisDiscoveryProperties,
PolarisDiscoveryHandler polarisDiscoveryHandler,
MetadataLocalProperties metadataLocalProperties, PolarisServiceChangeListener polarisServiceChangeListener) {
MetadataLocalProperties metadataLocalProperties) {
this.polarisDiscoveryProperties = polarisDiscoveryProperties;
this.polarisDiscoveryHandler = polarisDiscoveryHandler;
this.metadataLocalProperties = metadataLocalProperties;
@ -80,8 +78,6 @@ public class PolarisServiceRegistry implements ServiceRegistry<Registration> {
else {
this.heartbeatExecutor = null;
}
this.polarisServiceChangeListener = polarisServiceChangeListener;
}
@Override
@ -119,10 +115,6 @@ public class PolarisServiceRegistry implements ServiceRegistry<Registration> {
// Start the heartbeat thread after the registration is successful.
heartbeat(heartbeatRequest);
}
// Register service change listener
polarisDiscoveryHandler.getSdkContext().getExtensions().getLocalRegistry()
.registerResourceListener(polarisServiceChangeListener);
}
catch (Exception e) {
log.error("polaris registry, {} register failed...{},",

@ -27,7 +27,6 @@ import com.tencent.polaris.client.api.SDKContext;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration;
@ -54,9 +53,8 @@ public class PolarisServiceRegistryAutoConfiguration {
@Bean
public PolarisServiceRegistry polarisServiceRegistry(
PolarisDiscoveryProperties polarisDiscoveryProperties, PolarisDiscoveryHandler polarisDiscoveryHandler,
MetadataLocalProperties metadataLocalProperties, PolarisServiceChangeListener polarisServiceChangeListener) {
return new PolarisServiceRegistry(polarisDiscoveryProperties,
polarisDiscoveryHandler, metadataLocalProperties, polarisServiceChangeListener);
MetadataLocalProperties metadataLocalProperties) {
return new PolarisServiceRegistry(polarisDiscoveryProperties, polarisDiscoveryHandler, metadataLocalProperties);
}
@Bean
@ -77,10 +75,4 @@ public class PolarisServiceRegistryAutoConfiguration {
return new PolarisAutoServiceRegistration(registry,
autoServiceRegistrationProperties, registration);
}
@Bean
@ConditionalOnMissingBean
public PolarisServiceChangeListener polarisServiceChangeListener() {
return new PolarisServiceChangeListener();
}
}

@ -107,12 +107,7 @@ public class PolarisLoadBalancer extends DynamicServerListLoadBalancer<Server> {
}
private ServiceInstances getPolarisDiscoveryServiceInstances() {
List<Server> allServers = super.getAllServers();
if (CollectionUtils.isEmpty(allServers)) {
return null;
}
String serviceName = ((PolarisServer) super.getAllServers().get(0)).getServiceInstances().getService();
return getAllInstances(MetadataContext.LOCAL_NAMESPACE, serviceName).toServiceInstances();
return getAllInstances(MetadataContext.LOCAL_NAMESPACE, name).toServiceInstances();
}
private ServiceInstances getExtendDiscoveryServiceInstances() {
@ -121,26 +116,16 @@ public class PolarisLoadBalancer extends DynamicServerListLoadBalancer<Server> {
return null;
}
ServiceInstances serviceInstances;
String serviceName;
// notice the difference between different service registries
if (StringUtils.isNotBlank(
allServers.get(0).getMetaInfo().getServiceIdForDiscovery())) {
serviceName = allServers.get(0).getMetaInfo().getServiceIdForDiscovery();
}
else {
serviceName = allServers.get(0).getMetaInfo().getAppName();
}
if (StringUtils.isBlank(serviceName)) {
if (StringUtils.isBlank(name)) {
throw new IllegalStateException(
"PolarisLoadBalancer only Server with AppName or ServiceIdForDiscovery attribute");
}
ServiceKey serviceKey = new ServiceKey(MetadataContext.LOCAL_NAMESPACE,
serviceName);
ServiceKey serviceKey = new ServiceKey(MetadataContext.LOCAL_NAMESPACE, name);
List<Instance> instances = new ArrayList<>(8);
for (Server server : allServers) {
DefaultInstance instance = new DefaultInstance();
instance.setNamespace(MetadataContext.LOCAL_NAMESPACE);
instance.setService(serviceName);
instance.setService(name);
instance.setHealthy(server.isAlive());
instance.setProtocol(server.getScheme());
instance.setId(server.getId());

Loading…
Cancel
Save