diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e57251d0..1ae35736a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,3 +6,4 @@ - [docs:prevent the release of the final version of the sdk.](https://github.com/Tencent/spring-cloud-tencent/pull/945) - [fix: fix nacos CircuitBreaker disable bug.](https://github.com/Tencent/spring-cloud-tencent/pull/947) - [feature: add config for customized local port.](https://github.com/Tencent/spring-cloud-tencent/pull/956) +- [refactor:refactor reactor code in router module.](https://github.com/Tencent/spring-cloud-tencent/pull/958) diff --git a/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/loadbalancer/PolarisLoadBalancerClientConfiguration.java b/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/loadbalancer/PolarisLoadBalancerClientConfiguration.java index cce8b0a9f..da5377b63 100644 --- a/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/loadbalancer/PolarisLoadBalancerClientConfiguration.java +++ b/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/loadbalancer/PolarisLoadBalancerClientConfiguration.java @@ -25,6 +25,7 @@ import com.tencent.polaris.factory.api.DiscoveryAPIFactory; import com.tencent.polaris.router.api.core.RouterAPI; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.cloud.client.ConditionalOnBlockingDiscoveryEnabled; @@ -72,6 +73,7 @@ public class PolarisLoadBalancerClientConfiguration { @Bean @ConditionalOnMissingBean + @ConditionalOnClass(name = "org.springframework.web.reactive.function.client.ClientRequest") public LoadBalancerClientRequestTransformer polarisLoadBalancerClientRequestTransformer(SDKContext sdkContext) { ConsumerAPI consumerAPI = DiscoveryAPIFactory.createConsumerAPIByContext(sdkContext); return new PolarisLoadBalancerClientRequestTransformer(consumerAPI); diff --git a/spring-cloud-starter-tencent-polaris-router/src/main/java/com/tencent/cloud/polaris/router/RouterUtils.java b/spring-cloud-starter-tencent-polaris-router/src/main/java/com/tencent/cloud/polaris/router/RouterUtils.java index e2947a317..1dfe01fc9 100644 --- a/spring-cloud-starter-tencent-polaris-router/src/main/java/com/tencent/cloud/polaris/router/RouterUtils.java +++ b/spring-cloud-starter-tencent-polaris-router/src/main/java/com/tencent/cloud/polaris/router/RouterUtils.java @@ -17,11 +17,10 @@ package com.tencent.cloud.polaris.router; +import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.stream.Collectors; import com.tencent.cloud.common.metadata.MetadataContext; @@ -30,6 +29,7 @@ import com.tencent.polaris.api.pojo.DefaultServiceInstances; import com.tencent.polaris.api.pojo.Instance; import com.tencent.polaris.api.pojo.ServiceInstances; import com.tencent.polaris.api.pojo.ServiceKey; +import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; @@ -57,33 +57,19 @@ public final class RouterUtils { * @return ServiceInstances */ public static ServiceInstances transferServersToServiceInstances(Flux> servers, InstanceTransformer instanceTransformer) { - CountDownLatch latch = new CountDownLatch(1); - - AtomicReference> instancesRef = new AtomicReference<>(); - servers.subscribe(serviceInstances -> { - instancesRef.set(serviceInstances - .stream() - .map(instanceTransformer::transform) - .collect(Collectors.toList())); - - latch.countDown(); - }); - - try { - latch.await(WAIT_TIME, TimeUnit.SECONDS); - } - catch (InterruptedException e) { - LOGGER.error("Wait get instance result error. ", e); - } + List instanceList = Collections.synchronizedList(new ArrayList<>()); + servers.flatMap((Function, Publisher>) serviceInstances -> + Flux.fromIterable(serviceInstances.stream() + .map(instanceTransformer::transform) + .collect(Collectors.toList()))).subscribe(instance -> instanceList.add((Instance) instance)); String serviceName = ""; - if (!CollectionUtils.isEmpty(instancesRef.get())) { - serviceName = instancesRef.get().get(0).getService(); + if (!CollectionUtils.isEmpty(instanceList)) { + serviceName = instanceList.get(0).getService(); } ServiceKey serviceKey = new ServiceKey(MetadataContext.LOCAL_NAMESPACE, serviceName); - List instances = instancesRef.get() == null ? Collections.emptyList() : instancesRef.get(); - return new DefaultServiceInstances(serviceKey, instances); + return new DefaultServiceInstances(serviceKey, instanceList); } } diff --git a/spring-cloud-tencent-polaris-context/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/spring-cloud-tencent-polaris-context/src/main/resources/META-INF/additional-spring-configuration-metadata.json index 7bb1d6596..5c0ddb2da 100644 --- a/spring-cloud-tencent-polaris-context/src/main/resources/META-INF/additional-spring-configuration-metadata.json +++ b/spring-cloud-tencent-polaris-context/src/main/resources/META-INF/additional-spring-configuration-metadata.json @@ -38,7 +38,13 @@ "name": "spring.cloud.polaris.local-ip-address", "type": "java.lang.String", "defaultValue": "", - "description": "current server local ip address." + "description": "current server local ip address to be registered." + }, + { + "name": "spring.cloud.polaris.local-port", + "type": "java.lang.Integer", + "defaultValue": "", + "description": "current server local port to be registered." } ], "hints": []