diff --git a/CHANGELOG.md b/CHANGELOG.md index d22859a1..629fbb1d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,3 +6,4 @@ - [feat:support webclient and gateway report call metrics](https://github.com/Tencent/spring-cloud-tencent/pull/946) - [fix: fix nacos CircuitBreaker disable bug.](https://github.com/Tencent/spring-cloud-tencent/pull/949) - [feature: add config for customized local port.](https://github.com/Tencent/spring-cloud-tencent/pull/957) +- [refactor:refactor reactor code in router module.](https://github.com/Tencent/spring-cloud-tencent/pull/959) diff --git a/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/loadbalancer/PolarisLoadBalancerClientConfiguration.java b/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/loadbalancer/PolarisLoadBalancerClientConfiguration.java index 3d9bf1a4..da5377b6 100644 --- a/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/loadbalancer/PolarisLoadBalancerClientConfiguration.java +++ b/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/loadbalancer/PolarisLoadBalancerClientConfiguration.java @@ -18,9 +18,14 @@ package com.tencent.cloud.polaris.loadbalancer; import com.tencent.cloud.polaris.context.ConditionalOnPolarisEnabled; +import com.tencent.cloud.polaris.loadbalancer.reactive.PolarisLoadBalancerClientRequestTransformer; +import com.tencent.polaris.api.core.ConsumerAPI; +import com.tencent.polaris.client.api.SDKContext; +import com.tencent.polaris.factory.api.DiscoveryAPIFactory; import com.tencent.polaris.router.api.core.RouterAPI; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.cloud.client.ConditionalOnBlockingDiscoveryEnabled; @@ -29,6 +34,7 @@ import org.springframework.cloud.client.ConditionalOnReactiveDiscoveryEnabled; import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.discovery.DiscoveryClient; import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient; +import org.springframework.cloud.client.loadbalancer.reactive.LoadBalancerClientRequestTransformer; import org.springframework.cloud.loadbalancer.core.ReactorLoadBalancer; import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier; import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory; @@ -65,6 +71,14 @@ public class PolarisLoadBalancerClientConfiguration { loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), routerAPI); } + @Bean + @ConditionalOnMissingBean + @ConditionalOnClass(name = "org.springframework.web.reactive.function.client.ClientRequest") + public LoadBalancerClientRequestTransformer polarisLoadBalancerClientRequestTransformer(SDKContext sdkContext) { + ConsumerAPI consumerAPI = DiscoveryAPIFactory.createConsumerAPIByContext(sdkContext); + return new PolarisLoadBalancerClientRequestTransformer(consumerAPI); + } + @Configuration(proxyBeanMethods = false) @ConditionalOnReactiveDiscoveryEnabled @Order(REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER) diff --git a/spring-cloud-starter-tencent-polaris-router/src/main/java/com/tencent/cloud/polaris/router/RouterUtils.java b/spring-cloud-starter-tencent-polaris-router/src/main/java/com/tencent/cloud/polaris/router/RouterUtils.java index e2947a31..1dfe01fc 100644 --- a/spring-cloud-starter-tencent-polaris-router/src/main/java/com/tencent/cloud/polaris/router/RouterUtils.java +++ b/spring-cloud-starter-tencent-polaris-router/src/main/java/com/tencent/cloud/polaris/router/RouterUtils.java @@ -17,11 +17,10 @@ package com.tencent.cloud.polaris.router; +import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.stream.Collectors; import com.tencent.cloud.common.metadata.MetadataContext; @@ -30,6 +29,7 @@ import com.tencent.polaris.api.pojo.DefaultServiceInstances; import com.tencent.polaris.api.pojo.Instance; import com.tencent.polaris.api.pojo.ServiceInstances; import com.tencent.polaris.api.pojo.ServiceKey; +import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; @@ -57,33 +57,19 @@ public final class RouterUtils { * @return ServiceInstances */ public static ServiceInstances transferServersToServiceInstances(Flux> servers, InstanceTransformer instanceTransformer) { - CountDownLatch latch = new CountDownLatch(1); - - AtomicReference> instancesRef = new AtomicReference<>(); - servers.subscribe(serviceInstances -> { - instancesRef.set(serviceInstances - .stream() - .map(instanceTransformer::transform) - .collect(Collectors.toList())); - - latch.countDown(); - }); - - try { - latch.await(WAIT_TIME, TimeUnit.SECONDS); - } - catch (InterruptedException e) { - LOGGER.error("Wait get instance result error. ", e); - } + List instanceList = Collections.synchronizedList(new ArrayList<>()); + servers.flatMap((Function, Publisher>) serviceInstances -> + Flux.fromIterable(serviceInstances.stream() + .map(instanceTransformer::transform) + .collect(Collectors.toList()))).subscribe(instance -> instanceList.add((Instance) instance)); String serviceName = ""; - if (!CollectionUtils.isEmpty(instancesRef.get())) { - serviceName = instancesRef.get().get(0).getService(); + if (!CollectionUtils.isEmpty(instanceList)) { + serviceName = instanceList.get(0).getService(); } ServiceKey serviceKey = new ServiceKey(MetadataContext.LOCAL_NAMESPACE, serviceName); - List instances = instancesRef.get() == null ? Collections.emptyList() : instancesRef.get(); - return new DefaultServiceInstances(serviceKey, instances); + return new DefaultServiceInstances(serviceKey, instanceList); } } diff --git a/spring-cloud-tencent-polaris-context/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/spring-cloud-tencent-polaris-context/src/main/resources/META-INF/additional-spring-configuration-metadata.json index 7bb1d659..5c0ddb2d 100644 --- a/spring-cloud-tencent-polaris-context/src/main/resources/META-INF/additional-spring-configuration-metadata.json +++ b/spring-cloud-tencent-polaris-context/src/main/resources/META-INF/additional-spring-configuration-metadata.json @@ -38,7 +38,13 @@ "name": "spring.cloud.polaris.local-ip-address", "type": "java.lang.String", "defaultValue": "", - "description": "current server local ip address." + "description": "current server local ip address to be registered." + }, + { + "name": "spring.cloud.polaris.local-port", + "type": "java.lang.Integer", + "defaultValue": "", + "description": "current server local port to be registered." } ], "hints": []