diff --git a/spring-cloud-tencent-polaris-loadbalancer/src/main/java/com/tencent/cloud/polaris/loadbalancer/LoadBalancerUtils.java b/spring-cloud-tencent-polaris-loadbalancer/src/main/java/com/tencent/cloud/polaris/loadbalancer/LoadBalancerUtils.java index 91078a05e..5ab7660e1 100644 --- a/spring-cloud-tencent-polaris-loadbalancer/src/main/java/com/tencent/cloud/polaris/loadbalancer/LoadBalancerUtils.java +++ b/spring-cloud-tencent-polaris-loadbalancer/src/main/java/com/tencent/cloud/polaris/loadbalancer/LoadBalancerUtils.java @@ -18,10 +18,9 @@ package com.tencent.cloud.polaris.loadbalancer; +import java.time.Duration; import java.util.Collections; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -32,8 +31,6 @@ import com.tencent.polaris.api.pojo.DefaultServiceInstances; import com.tencent.polaris.api.pojo.Instance; import com.tencent.polaris.api.pojo.ServiceInstances; import com.tencent.polaris.api.pojo.ServiceKey; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import org.springframework.cloud.client.ServiceInstance; @@ -45,8 +42,6 @@ import org.springframework.util.CollectionUtils; * @author lepdou 2022-05-17 */ public final class LoadBalancerUtils { - private static final Logger LOGGER = LoggerFactory.getLogger(LoadBalancerUtils.class); - private static final int WAIT_TIME = 3; private LoadBalancerUtils() { @@ -59,25 +54,11 @@ public final class LoadBalancerUtils { * @return ServiceInstances */ public static ServiceInstances transferServersToServiceInstances(Flux> servers) { - CountDownLatch latch = new CountDownLatch(1); - - AtomicReference> instancesRef = new AtomicReference<>(); - servers.subscribe(serviceInstances -> { - instancesRef.set(serviceInstances - .stream() - .map(LoadBalancerUtils::transferServerToServiceInstance) - .collect(Collectors.toList())); - - latch.countDown(); - }); - - try { - latch.await(WAIT_TIME, TimeUnit.SECONDS); - } - catch (InterruptedException e) { - LOGGER.error("Wait get instance result error. ", e); - } - + AtomicReference> instancesRef = servers.map(serviceInstances -> serviceInstances + .stream() + .map(LoadBalancerUtils::transferServerToServiceInstance) + .collect(Collectors.toList())).collect(AtomicReference>::new, AtomicReference::set) + .block(Duration.ofSeconds(WAIT_TIME)); String serviceName = ""; if (!CollectionUtils.isEmpty(instancesRef.get())) { serviceName = instancesRef.get().get(0).getService(); @@ -95,7 +76,7 @@ public final class LoadBalancerUtils { * @param serviceInstance serviceInstance * @return defaultInstance */ - public static DefaultInstance transferServerToServiceInstance(ServiceInstance serviceInstance) { + public static Instance transferServerToServiceInstance(ServiceInstance serviceInstance) { DefaultInstance instance = new DefaultInstance(); instance.setNamespace(MetadataContext.LOCAL_NAMESPACE); instance.setService(serviceInstance.getServiceId());