diff --git a/CHANGELOG.md b/CHANGELOG.md index 671ece8fb..89f245350 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,2 +1,4 @@ # Change Log --- + +- [Bugfix: fix reactor concurrent bug when get instances & fix spring-retry fuse not working bug](https://github.com/Tencent/spring-cloud-tencent/pull/771) diff --git a/changes/changes-1.8.2.md b/changes/changes-1.8.2.md deleted file mode 100644 index 6349c0898..000000000 --- a/changes/changes-1.8.2.md +++ /dev/null @@ -1,4 +0,0 @@ -# Change Log ---- - -- [Bugfix: get service instances by Flux.blockLast() to resolve concurrent problem](https://github.com/Tencent/spring-cloud-tencent/pull/762) diff --git a/pom.xml b/pom.xml index 831685d24..1e069e8b6 100644 --- a/pom.xml +++ b/pom.xml @@ -89,7 +89,7 @@ - 1.8.2-2020.0.5 + 1.8.2-2020.0.5-SNAPSHOT 2020.0.5 diff --git a/spring-cloud-tencent-commons/src/main/java/com/tencent/cloud/common/pojo/PolarisServiceInstance.java b/spring-cloud-tencent-commons/src/main/java/com/tencent/cloud/common/pojo/PolarisServiceInstance.java index d6abcdf69..e4fc2934c 100644 --- a/spring-cloud-tencent-commons/src/main/java/com/tencent/cloud/common/pojo/PolarisServiceInstance.java +++ b/spring-cloud-tencent-commons/src/main/java/com/tencent/cloud/common/pojo/PolarisServiceInstance.java @@ -19,6 +19,7 @@ package com.tencent.cloud.common.pojo; import java.net.URI; import java.util.Map; +import java.util.Objects; import com.tencent.polaris.api.pojo.Instance; import org.apache.commons.lang.StringUtils; @@ -93,4 +94,21 @@ public class PolarisServiceInstance implements ServiceInstance { public String getScheme() { return this.scheme; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PolarisServiceInstance that = (PolarisServiceInstance) o; + return Objects.equals(instance, that.instance) && Objects.equals(scheme, that.scheme); + } + + @Override + public int hashCode() { + return Objects.hash(instance, scheme); + } } diff --git a/spring-cloud-tencent-dependencies/pom.xml b/spring-cloud-tencent-dependencies/pom.xml index 60ef292f0..25c9be53f 100644 --- a/spring-cloud-tencent-dependencies/pom.xml +++ b/spring-cloud-tencent-dependencies/pom.xml @@ -70,7 +70,7 @@ - 1.8.2-2020.0.5 + 1.8.2-2020.0.5-SNAPSHOT 1.9.1 diff --git a/spring-cloud-tencent-polaris-loadbalancer/src/main/java/com/tencent/cloud/polaris/loadbalancer/LoadBalancerUtils.java b/spring-cloud-tencent-polaris-loadbalancer/src/main/java/com/tencent/cloud/polaris/loadbalancer/LoadBalancerUtils.java index df1c8fc48..820c7a0b3 100644 --- a/spring-cloud-tencent-polaris-loadbalancer/src/main/java/com/tencent/cloud/polaris/loadbalancer/LoadBalancerUtils.java +++ b/spring-cloud-tencent-polaris-loadbalancer/src/main/java/com/tencent/cloud/polaris/loadbalancer/LoadBalancerUtils.java @@ -18,8 +18,12 @@ package com.tencent.cloud.polaris.loadbalancer; -import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import com.tencent.cloud.common.metadata.MetadataContext; import com.tencent.cloud.common.pojo.PolarisServiceInstance; @@ -28,6 +32,8 @@ import com.tencent.polaris.api.pojo.DefaultServiceInstances; import com.tencent.polaris.api.pojo.Instance; import com.tencent.polaris.api.pojo.ServiceInstances; import com.tencent.polaris.api.pojo.ServiceKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import org.springframework.cloud.client.ServiceInstance; @@ -39,8 +45,10 @@ import org.springframework.util.CollectionUtils; * @author lepdou 2022-05-17 */ public final class LoadBalancerUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(LoadBalancerUtils.class); private static final int DEFAULT_WEIGHT = 100; + private static final int WAIT_TIME = 3; private LoadBalancerUtils() { } @@ -52,21 +60,33 @@ public final class LoadBalancerUtils { * @return ServiceInstances */ public static ServiceInstances transferServersToServiceInstances(Flux> servers) { - List serviceInstances = servers.blockLast(); + CountDownLatch latch = new CountDownLatch(1); - List instances = new ArrayList<>(); - if (!CollectionUtils.isEmpty(serviceInstances)) { - for (ServiceInstance serviceInstance : serviceInstances) { - instances.add(transferServerToServiceInstance(serviceInstance)); - } + AtomicReference> instancesRef = new AtomicReference<>(); + servers.subscribe(serviceInstances -> { + instancesRef.set(serviceInstances + .stream() + .map(LoadBalancerUtils::transferServerToServiceInstance) + .collect(Collectors.toList())); + + latch.countDown(); + }); + + try { + latch.await(WAIT_TIME, TimeUnit.SECONDS); + } + catch (InterruptedException e) { + LOGGER.error("Wait get instance result error. ", e); } String serviceName = ""; - if (!CollectionUtils.isEmpty(instances)) { - serviceName = instances.get(0).getService(); + if (!CollectionUtils.isEmpty(instancesRef.get())) { + serviceName = instancesRef.get().get(0).getService(); } ServiceKey serviceKey = new ServiceKey(MetadataContext.LOCAL_NAMESPACE, serviceName); + List instances = instancesRef.get() == null ? Collections.emptyList() : instancesRef.get(); + return new DefaultServiceInstances(serviceKey, instances); }