Merge pull request #177 from SkyeBeFreeman/2020.0
fix:fix route not refreshing bug when first instance of one service up.pull/195/head
commit
0973cd8437
@ -0,0 +1,70 @@
|
|||||||
|
package com.tencent.cloud.polaris.discovery.refresh;
|
||||||
|
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
|
||||||
|
import com.tencent.polaris.client.util.NamedThreadFactory;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.springframework.boot.context.event.ApplicationReadyEvent;
|
||||||
|
import org.springframework.cloud.client.discovery.event.HeartbeatEvent;
|
||||||
|
import org.springframework.context.ApplicationEventPublisher;
|
||||||
|
import org.springframework.context.ApplicationEventPublisherAware;
|
||||||
|
import org.springframework.context.ApplicationListener;
|
||||||
|
|
||||||
|
import static com.tencent.cloud.polaris.discovery.refresh.PolarisServiceStatusChangeListener.INDEX;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Begin refresh when application is ready.
|
||||||
|
*
|
||||||
|
* @author Haotian Zhang
|
||||||
|
*/
|
||||||
|
public class PolarisRefreshApplicationReadyEventListener implements ApplicationListener<ApplicationReadyEvent>, ApplicationEventPublisherAware {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(PolarisRefreshConfiguration.class);
|
||||||
|
private static final int DELAY = 60;
|
||||||
|
private final PolarisDiscoveryHandler polarisDiscoveryHandler;
|
||||||
|
private final PolarisServiceStatusChangeListener polarisServiceStatusChangeListener;
|
||||||
|
private final ScheduledExecutorService refreshExecutor;
|
||||||
|
private ApplicationEventPublisher publisher;
|
||||||
|
|
||||||
|
public PolarisRefreshApplicationReadyEventListener(PolarisDiscoveryHandler polarisDiscoveryHandler, PolarisServiceStatusChangeListener polarisServiceStatusChangeListener) {
|
||||||
|
this.polarisDiscoveryHandler = polarisDiscoveryHandler;
|
||||||
|
this.polarisServiceStatusChangeListener = polarisServiceStatusChangeListener;
|
||||||
|
this.refreshExecutor = Executors.newSingleThreadScheduledExecutor(
|
||||||
|
new NamedThreadFactory("polaris-service-refresh"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onApplicationEvent(ApplicationReadyEvent event) {
|
||||||
|
// Register service change listener.
|
||||||
|
polarisDiscoveryHandler.getSdkContext().getExtensions().getLocalRegistry()
|
||||||
|
.registerResourceListener(polarisServiceStatusChangeListener);
|
||||||
|
|
||||||
|
// Begin scheduled refresh thread.
|
||||||
|
refresh();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start the refresh thread.
|
||||||
|
*/
|
||||||
|
public void refresh() {
|
||||||
|
refreshExecutor.scheduleWithFixedDelay(() -> {
|
||||||
|
try {
|
||||||
|
// Trigger reload of gateway route cache.
|
||||||
|
this.publisher.publishEvent(new HeartbeatEvent(this, INDEX.getAndIncrement()));
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
LOG.error("refresh polaris service error.", e);
|
||||||
|
}
|
||||||
|
}, DELAY, DELAY, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
|
||||||
|
this.publisher = applicationEventPublisher;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,32 @@
|
|||||||
|
package com.tencent.cloud.polaris.discovery.refresh;
|
||||||
|
|
||||||
|
import com.tencent.cloud.polaris.context.ConditionalOnPolarisEnabled;
|
||||||
|
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
|
||||||
|
|
||||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configuration for listening the change of service status.
|
||||||
|
*
|
||||||
|
* @author Haotian Zhang
|
||||||
|
*/
|
||||||
|
@Configuration(proxyBeanMethods = false)
|
||||||
|
@ConditionalOnPolarisEnabled
|
||||||
|
public class PolarisRefreshConfiguration {
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
@ConditionalOnMissingBean
|
||||||
|
public PolarisServiceStatusChangeListener polarisServiceChangeListener() {
|
||||||
|
return new PolarisServiceStatusChangeListener();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
@ConditionalOnMissingBean
|
||||||
|
public PolarisRefreshApplicationReadyEventListener polarisServiceStatusApplicationReadyEventListener(
|
||||||
|
PolarisDiscoveryHandler polarisDiscoveryHandler,
|
||||||
|
PolarisServiceStatusChangeListener polarisServiceStatusChangeListener) {
|
||||||
|
return new PolarisRefreshApplicationReadyEventListener(polarisDiscoveryHandler, polarisServiceStatusChangeListener);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,80 @@
|
|||||||
|
package com.tencent.cloud.polaris.discovery.refresh;
|
||||||
|
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import com.google.common.collect.Sets;
|
||||||
|
import com.tencent.polaris.api.plugin.registry.AbstractResourceEventListener;
|
||||||
|
import com.tencent.polaris.api.pojo.RegistryCacheValue;
|
||||||
|
import com.tencent.polaris.api.pojo.ServiceEventKey;
|
||||||
|
import com.tencent.polaris.client.pojo.ServiceInstancesByProto;
|
||||||
|
import com.tencent.polaris.client.pojo.ServicesByProto;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.springframework.cloud.client.discovery.event.HeartbeatEvent;
|
||||||
|
import org.springframework.context.ApplicationEventPublisher;
|
||||||
|
import org.springframework.context.ApplicationEventPublisherAware;
|
||||||
|
import org.springframework.util.CollectionUtils;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Change listener of Polaris service info. When service info is created or deleted, or, instance of service is from 0 to
|
||||||
|
*
|
||||||
|
* @author Haotian Zhang
|
||||||
|
*/
|
||||||
|
public class PolarisServiceStatusChangeListener extends AbstractResourceEventListener implements ApplicationEventPublisherAware {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Index of service info status.
|
||||||
|
*/
|
||||||
|
public static final AtomicLong INDEX = new AtomicLong(0);
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(PolarisServiceStatusChangeListener.class);
|
||||||
|
|
||||||
|
private ApplicationEventPublisher publisher;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onResourceUpdated(ServiceEventKey svcEventKey, RegistryCacheValue oldValue,
|
||||||
|
RegistryCacheValue newValue) {
|
||||||
|
if (newValue.getEventType() == ServiceEventKey.EventType.SERVICE) {
|
||||||
|
if (oldValue instanceof ServicesByProto && newValue instanceof ServicesByProto) {
|
||||||
|
LOG.debug("receive service={} change event", svcEventKey);
|
||||||
|
Set<String> oldServiceInfoSet = ((ServicesByProto) oldValue).getServices().stream()
|
||||||
|
.map(i -> i.getNamespace() + "::" + i.getService()).collect(Collectors.toSet());
|
||||||
|
Set<String> newServiceInfoSet = ((ServicesByProto) newValue).getServices().stream()
|
||||||
|
.map(i -> i.getNamespace() + "::" + i.getService()).collect(Collectors.toSet());
|
||||||
|
|
||||||
|
Sets.SetView<String> addServiceInfoSetView = Sets.difference(newServiceInfoSet, oldServiceInfoSet);
|
||||||
|
Sets.SetView<String> deleteServiceInfoSetView = Sets.difference(oldServiceInfoSet, newServiceInfoSet);
|
||||||
|
|
||||||
|
if (addServiceInfoSetView.isEmpty() && deleteServiceInfoSetView.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
LOG.info("Service status is update. Add service of {}. Delete service of {}", addServiceInfoSetView, deleteServiceInfoSetView);
|
||||||
|
|
||||||
|
// Trigger reload of gateway route cache.
|
||||||
|
this.publisher.publishEvent(new HeartbeatEvent(this, INDEX.getAndIncrement()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (newValue.getEventType() == ServiceEventKey.EventType.INSTANCE) {
|
||||||
|
if (oldValue instanceof ServiceInstancesByProto && newValue instanceof ServiceInstancesByProto) {
|
||||||
|
LOG.debug("receive service instances={} change event", svcEventKey);
|
||||||
|
ServiceInstancesByProto oldIns = (ServiceInstancesByProto) oldValue;
|
||||||
|
ServiceInstancesByProto newIns = (ServiceInstancesByProto) newValue;
|
||||||
|
if ((CollectionUtils.isEmpty(oldIns.getInstances()) && !CollectionUtils.isEmpty(newIns.getInstances())) ||
|
||||||
|
(!CollectionUtils.isEmpty(oldIns.getInstances()) && CollectionUtils.isEmpty(newIns.getInstances()))) {
|
||||||
|
LOG.info("Service status of {} is update.", newIns.getService());
|
||||||
|
|
||||||
|
// Trigger reload of gateway route cache.
|
||||||
|
this.publisher.publishEvent(new HeartbeatEvent(this, INDEX.getAndIncrement()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
|
||||||
|
this.publisher = applicationEventPublisher;
|
||||||
|
}
|
||||||
|
}
|
@ -1,62 +0,0 @@
|
|||||||
package com.tencent.cloud.polaris.registry;
|
|
||||||
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import com.google.common.collect.Sets;
|
|
||||||
import com.tencent.polaris.api.plugin.registry.AbstractResourceEventListener;
|
|
||||||
import com.tencent.polaris.api.pojo.RegistryCacheValue;
|
|
||||||
import com.tencent.polaris.api.pojo.ServiceEventKey;
|
|
||||||
import com.tencent.polaris.client.pojo.ServicesByProto;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import org.springframework.cloud.client.discovery.event.HeartbeatEvent;
|
|
||||||
import org.springframework.context.ApplicationEventPublisher;
|
|
||||||
import org.springframework.context.ApplicationEventPublisherAware;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Change listener of Polaris service info.
|
|
||||||
*
|
|
||||||
* @author Haotian Zhang
|
|
||||||
*/
|
|
||||||
public class PolarisServiceChangeListener extends AbstractResourceEventListener implements ApplicationEventPublisherAware {
|
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(PolarisServiceChangeListener.class);
|
|
||||||
|
|
||||||
private static final AtomicInteger INDEX = new AtomicInteger(0);
|
|
||||||
|
|
||||||
private ApplicationEventPublisher publisher;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onResourceUpdated(ServiceEventKey svcEventKey, RegistryCacheValue oldValue,
|
|
||||||
RegistryCacheValue newValue) {
|
|
||||||
if (newValue.getEventType() != ServiceEventKey.EventType.SERVICE) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (oldValue instanceof ServicesByProto && newValue instanceof ServicesByProto) {
|
|
||||||
LOG.debug("receive service={} change event", svcEventKey);
|
|
||||||
Set<String> oldServiceInfoSet = ((ServicesByProto) oldValue).getServices().stream()
|
|
||||||
.map(i -> i.getNamespace() + "::" + i.getService()).collect(Collectors.toSet());
|
|
||||||
Set<String> newServiceInfoSet = ((ServicesByProto) newValue).getServices().stream()
|
|
||||||
.map(i -> i.getNamespace() + "::" + i.getService()).collect(Collectors.toSet());
|
|
||||||
|
|
||||||
Sets.SetView<String> addServiceInfoSetView = Sets.difference(newServiceInfoSet, oldServiceInfoSet);
|
|
||||||
Sets.SetView<String> deleteServiceInfoSetView = Sets.difference(oldServiceInfoSet, newServiceInfoSet);
|
|
||||||
|
|
||||||
if (addServiceInfoSetView.isEmpty() && deleteServiceInfoSetView.isEmpty()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
LOG.info("Service info is update. Add service of {}. Delete service of {}", addServiceInfoSetView, deleteServiceInfoSetView);
|
|
||||||
|
|
||||||
// Trigger reload of gateway route cache.
|
|
||||||
this.publisher.publishEvent(new HeartbeatEvent(this, INDEX.getAndIncrement()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
|
|
||||||
this.publisher = applicationEventPublisher;
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in new issue