fix reactor concurrent bug when get instances (#775)

pull/786/head
lepdou 2 years ago committed by GitHub
parent a092d5396f
commit 22b11a989f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -18,4 +18,5 @@
- [Bugfix: get service instances by Flux.blockLast() to resolve concurrent problem](https://github.com/Tencent/spring-cloud-tencent/pull/763) - [Bugfix: get service instances by Flux.blockLast() to resolve concurrent problem](https://github.com/Tencent/spring-cloud-tencent/pull/763)
- [Test:add sct-stater-polaris-router juint.](https://github.com/Tencent/spring-cloud-tencent/pull/767) - [Test:add sct-stater-polaris-router juint.](https://github.com/Tencent/spring-cloud-tencent/pull/767)
- [Feature: support nacos registry](https://github.com/Tencent/spring-cloud-tencent/pull/757) - [Feature: support nacos registry](https://github.com/Tencent/spring-cloud-tencent/pull/757)
- [Bugfix: fix reactor concurrent bug when get instances & fix spring-retry fuse not working bug](https://github.com/Tencent/spring-cloud-tencent/pull/775)
- [Fix issue: prompt no registration if web dependence was not imported.](https://github.com/Tencent/spring-cloud-tencent/pull/777) - [Fix issue: prompt no registration if web dependence was not imported.](https://github.com/Tencent/spring-cloud-tencent/pull/777)

@ -19,6 +19,7 @@ package com.tencent.cloud.common.pojo;
import java.net.URI; import java.net.URI;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import com.tencent.polaris.api.pojo.Instance; import com.tencent.polaris.api.pojo.Instance;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
@ -93,4 +94,21 @@ public class PolarisServiceInstance implements ServiceInstance {
public String getScheme() { public String getScheme() {
return this.scheme; return this.scheme;
} }
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PolarisServiceInstance that = (PolarisServiceInstance) o;
return Objects.equals(instance, that.instance) && Objects.equals(scheme, that.scheme);
}
@Override
public int hashCode() {
return Objects.hash(instance, scheme);
}
} }

@ -18,8 +18,12 @@
package com.tencent.cloud.polaris.loadbalancer; package com.tencent.cloud.polaris.loadbalancer;
import java.util.ArrayList; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import com.tencent.cloud.common.metadata.MetadataContext; import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.common.pojo.PolarisServiceInstance; import com.tencent.cloud.common.pojo.PolarisServiceInstance;
@ -28,6 +32,8 @@ import com.tencent.polaris.api.pojo.DefaultServiceInstances;
import com.tencent.polaris.api.pojo.Instance; import com.tencent.polaris.api.pojo.Instance;
import com.tencent.polaris.api.pojo.ServiceInstances; import com.tencent.polaris.api.pojo.ServiceInstances;
import com.tencent.polaris.api.pojo.ServiceKey; import com.tencent.polaris.api.pojo.ServiceKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.ServiceInstance;
@ -39,8 +45,10 @@ import org.springframework.util.CollectionUtils;
* @author lepdou 2022-05-17 * @author lepdou 2022-05-17
*/ */
public final class LoadBalancerUtils { public final class LoadBalancerUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadBalancerUtils.class);
private static final int DEFAULT_WEIGHT = 100; private static final int DEFAULT_WEIGHT = 100;
private static final int WAIT_TIME = 3;
private LoadBalancerUtils() { private LoadBalancerUtils() {
} }
@ -52,21 +60,33 @@ public final class LoadBalancerUtils {
* @return ServiceInstances * @return ServiceInstances
*/ */
public static ServiceInstances transferServersToServiceInstances(Flux<List<ServiceInstance>> servers) { public static ServiceInstances transferServersToServiceInstances(Flux<List<ServiceInstance>> servers) {
List<ServiceInstance> serviceInstances = servers.blockLast(); CountDownLatch latch = new CountDownLatch(1);
List<Instance> instances = new ArrayList<>(); AtomicReference<List<Instance>> instancesRef = new AtomicReference<>();
if (!CollectionUtils.isEmpty(serviceInstances)) { servers.subscribe(serviceInstances -> {
for (ServiceInstance serviceInstance : serviceInstances) { instancesRef.set(serviceInstances
instances.add(transferServerToServiceInstance(serviceInstance)); .stream()
} .map(LoadBalancerUtils::transferServerToServiceInstance)
.collect(Collectors.toList()));
latch.countDown();
});
try {
latch.await(WAIT_TIME, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
LOGGER.error("Wait get instance result error. ", e);
} }
String serviceName = null; String serviceName = "";
if (!CollectionUtils.isEmpty(instances)) { if (!CollectionUtils.isEmpty(instancesRef.get())) {
serviceName = instances.get(0).getService(); serviceName = instancesRef.get().get(0).getService();
} }
ServiceKey serviceKey = new ServiceKey(MetadataContext.LOCAL_NAMESPACE, serviceName); ServiceKey serviceKey = new ServiceKey(MetadataContext.LOCAL_NAMESPACE, serviceName);
List<Instance> instances = instancesRef.get() == null ? Collections.emptyList() : instancesRef.get();
return new DefaultServiceInstances(serviceKey, instances); return new DefaultServiceInstances(serviceKey, instances);
} }

Loading…
Cancel
Save