From 22b11a989f4cf2df96c371e94c9c607134f5683a Mon Sep 17 00:00:00 2001 From: lepdou Date: Wed, 14 Dec 2022 13:21:19 +0800 Subject: [PATCH] fix reactor concurrent bug when get instances (#775) --- CHANGELOG.md | 1 + .../common/pojo/PolarisServiceInstance.java | 18 +++++++++ .../loadbalancer/LoadBalancerUtils.java | 40 ++++++++++++++----- 3 files changed, 49 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 722625864..ec3cb71e9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,4 +18,5 @@ - [Bugfix: get service instances by Flux.blockLast() to resolve concurrent problem](https://github.com/Tencent/spring-cloud-tencent/pull/763) - [Test:add sct-stater-polaris-router juint.](https://github.com/Tencent/spring-cloud-tencent/pull/767) - [Feature: support nacos registry](https://github.com/Tencent/spring-cloud-tencent/pull/757) +- [Bugfix: fix reactor concurrent bug when get instances & fix spring-retry fuse not working bug](https://github.com/Tencent/spring-cloud-tencent/pull/775) - [Fix issue: prompt no registration if web dependence was not imported.](https://github.com/Tencent/spring-cloud-tencent/pull/777) diff --git a/spring-cloud-tencent-commons/src/main/java/com/tencent/cloud/common/pojo/PolarisServiceInstance.java b/spring-cloud-tencent-commons/src/main/java/com/tencent/cloud/common/pojo/PolarisServiceInstance.java index d6abcdf69..e4fc2934c 100644 --- a/spring-cloud-tencent-commons/src/main/java/com/tencent/cloud/common/pojo/PolarisServiceInstance.java +++ b/spring-cloud-tencent-commons/src/main/java/com/tencent/cloud/common/pojo/PolarisServiceInstance.java @@ -19,6 +19,7 @@ package com.tencent.cloud.common.pojo; import java.net.URI; import java.util.Map; +import java.util.Objects; import com.tencent.polaris.api.pojo.Instance; import org.apache.commons.lang.StringUtils; @@ -93,4 +94,21 @@ public class PolarisServiceInstance implements ServiceInstance { public String getScheme() { return this.scheme; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PolarisServiceInstance that = (PolarisServiceInstance) o; + return Objects.equals(instance, that.instance) && Objects.equals(scheme, that.scheme); + } + + @Override + public int hashCode() { + return Objects.hash(instance, scheme); + } } diff --git a/spring-cloud-tencent-polaris-loadbalancer/src/main/java/com/tencent/cloud/polaris/loadbalancer/LoadBalancerUtils.java b/spring-cloud-tencent-polaris-loadbalancer/src/main/java/com/tencent/cloud/polaris/loadbalancer/LoadBalancerUtils.java index af18a36fc..820c7a0b3 100644 --- a/spring-cloud-tencent-polaris-loadbalancer/src/main/java/com/tencent/cloud/polaris/loadbalancer/LoadBalancerUtils.java +++ b/spring-cloud-tencent-polaris-loadbalancer/src/main/java/com/tencent/cloud/polaris/loadbalancer/LoadBalancerUtils.java @@ -18,8 +18,12 @@ package com.tencent.cloud.polaris.loadbalancer; -import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import com.tencent.cloud.common.metadata.MetadataContext; import com.tencent.cloud.common.pojo.PolarisServiceInstance; @@ -28,6 +32,8 @@ import com.tencent.polaris.api.pojo.DefaultServiceInstances; import com.tencent.polaris.api.pojo.Instance; import com.tencent.polaris.api.pojo.ServiceInstances; import com.tencent.polaris.api.pojo.ServiceKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import org.springframework.cloud.client.ServiceInstance; @@ -39,8 +45,10 @@ import org.springframework.util.CollectionUtils; * @author lepdou 2022-05-17 */ public final class LoadBalancerUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(LoadBalancerUtils.class); private static final int DEFAULT_WEIGHT = 100; + private static final int WAIT_TIME = 3; private LoadBalancerUtils() { } @@ -52,21 +60,33 @@ public final class LoadBalancerUtils { * @return ServiceInstances */ public static ServiceInstances transferServersToServiceInstances(Flux> servers) { - List serviceInstances = servers.blockLast(); + CountDownLatch latch = new CountDownLatch(1); - List instances = new ArrayList<>(); - if (!CollectionUtils.isEmpty(serviceInstances)) { - for (ServiceInstance serviceInstance : serviceInstances) { - instances.add(transferServerToServiceInstance(serviceInstance)); - } + AtomicReference> instancesRef = new AtomicReference<>(); + servers.subscribe(serviceInstances -> { + instancesRef.set(serviceInstances + .stream() + .map(LoadBalancerUtils::transferServerToServiceInstance) + .collect(Collectors.toList())); + + latch.countDown(); + }); + + try { + latch.await(WAIT_TIME, TimeUnit.SECONDS); + } + catch (InterruptedException e) { + LOGGER.error("Wait get instance result error. ", e); } - String serviceName = null; - if (!CollectionUtils.isEmpty(instances)) { - serviceName = instances.get(0).getService(); + String serviceName = ""; + if (!CollectionUtils.isEmpty(instancesRef.get())) { + serviceName = instancesRef.get().get(0).getService(); } ServiceKey serviceKey = new ServiceKey(MetadataContext.LOCAL_NAMESPACE, serviceName); + List instances = instancesRef.get() == null ? Collections.emptyList() : instancesRef.get(); + return new DefaultServiceInstances(serviceKey, instances); }