refactor:refactor reactor code in router module. (#959)

pull/991/head
Haotian Zhang 2 years ago committed by GitHub
parent 5cb043f6c6
commit 8869e901b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -6,3 +6,4 @@
- [feat:support webclient and gateway report call metrics](https://github.com/Tencent/spring-cloud-tencent/pull/946) - [feat:support webclient and gateway report call metrics](https://github.com/Tencent/spring-cloud-tencent/pull/946)
- [fix: fix nacos CircuitBreaker disable bug.](https://github.com/Tencent/spring-cloud-tencent/pull/949) - [fix: fix nacos CircuitBreaker disable bug.](https://github.com/Tencent/spring-cloud-tencent/pull/949)
- [feature: add config for customized local port.](https://github.com/Tencent/spring-cloud-tencent/pull/957) - [feature: add config for customized local port.](https://github.com/Tencent/spring-cloud-tencent/pull/957)
- [refactor:refactor reactor code in router module.](https://github.com/Tencent/spring-cloud-tencent/pull/959)

@ -18,9 +18,14 @@
package com.tencent.cloud.polaris.loadbalancer; package com.tencent.cloud.polaris.loadbalancer;
import com.tencent.cloud.polaris.context.ConditionalOnPolarisEnabled; import com.tencent.cloud.polaris.context.ConditionalOnPolarisEnabled;
import com.tencent.cloud.polaris.loadbalancer.reactive.PolarisLoadBalancerClientRequestTransformer;
import com.tencent.polaris.api.core.ConsumerAPI;
import com.tencent.polaris.client.api.SDKContext;
import com.tencent.polaris.factory.api.DiscoveryAPIFactory;
import com.tencent.polaris.router.api.core.RouterAPI; import com.tencent.polaris.router.api.core.RouterAPI;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.client.ConditionalOnBlockingDiscoveryEnabled; import org.springframework.cloud.client.ConditionalOnBlockingDiscoveryEnabled;
@ -29,6 +34,7 @@ import org.springframework.cloud.client.ConditionalOnReactiveDiscoveryEnabled;
import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient; import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient; import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient;
import org.springframework.cloud.client.loadbalancer.reactive.LoadBalancerClientRequestTransformer;
import org.springframework.cloud.loadbalancer.core.ReactorLoadBalancer; import org.springframework.cloud.loadbalancer.core.ReactorLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier; import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory; import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
@ -65,6 +71,14 @@ public class PolarisLoadBalancerClientConfiguration {
loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), routerAPI); loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), routerAPI);
} }
@Bean
@ConditionalOnMissingBean
@ConditionalOnClass(name = "org.springframework.web.reactive.function.client.ClientRequest")
public LoadBalancerClientRequestTransformer polarisLoadBalancerClientRequestTransformer(SDKContext sdkContext) {
ConsumerAPI consumerAPI = DiscoveryAPIFactory.createConsumerAPIByContext(sdkContext);
return new PolarisLoadBalancerClientRequestTransformer(consumerAPI);
}
@Configuration(proxyBeanMethods = false) @Configuration(proxyBeanMethods = false)
@ConditionalOnReactiveDiscoveryEnabled @ConditionalOnReactiveDiscoveryEnabled
@Order(REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER) @Order(REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER)

@ -17,11 +17,10 @@
package com.tencent.cloud.polaris.router; package com.tencent.cloud.polaris.router;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.function.Function;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import com.tencent.cloud.common.metadata.MetadataContext; import com.tencent.cloud.common.metadata.MetadataContext;
@ -30,6 +29,7 @@ import com.tencent.polaris.api.pojo.DefaultServiceInstances;
import com.tencent.polaris.api.pojo.Instance; import com.tencent.polaris.api.pojo.Instance;
import com.tencent.polaris.api.pojo.ServiceInstances; import com.tencent.polaris.api.pojo.ServiceInstances;
import com.tencent.polaris.api.pojo.ServiceKey; import com.tencent.polaris.api.pojo.ServiceKey;
import org.reactivestreams.Publisher;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
@ -57,33 +57,19 @@ public final class RouterUtils {
* @return ServiceInstances * @return ServiceInstances
*/ */
public static ServiceInstances transferServersToServiceInstances(Flux<List<ServiceInstance>> servers, InstanceTransformer instanceTransformer) { public static ServiceInstances transferServersToServiceInstances(Flux<List<ServiceInstance>> servers, InstanceTransformer instanceTransformer) {
CountDownLatch latch = new CountDownLatch(1); List<Instance> instanceList = Collections.synchronizedList(new ArrayList<>());
servers.flatMap((Function<List<ServiceInstance>, Publisher<?>>) serviceInstances ->
AtomicReference<List<Instance>> instancesRef = new AtomicReference<>(); Flux.fromIterable(serviceInstances.stream()
servers.subscribe(serviceInstances -> { .map(instanceTransformer::transform)
instancesRef.set(serviceInstances .collect(Collectors.toList()))).subscribe(instance -> instanceList.add((Instance) instance));
.stream()
.map(instanceTransformer::transform)
.collect(Collectors.toList()));
latch.countDown();
});
try {
latch.await(WAIT_TIME, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
LOGGER.error("Wait get instance result error. ", e);
}
String serviceName = ""; String serviceName = "";
if (!CollectionUtils.isEmpty(instancesRef.get())) { if (!CollectionUtils.isEmpty(instanceList)) {
serviceName = instancesRef.get().get(0).getService(); serviceName = instanceList.get(0).getService();
} }
ServiceKey serviceKey = new ServiceKey(MetadataContext.LOCAL_NAMESPACE, serviceName); ServiceKey serviceKey = new ServiceKey(MetadataContext.LOCAL_NAMESPACE, serviceName);
List<Instance> instances = instancesRef.get() == null ? Collections.emptyList() : instancesRef.get();
return new DefaultServiceInstances(serviceKey, instances); return new DefaultServiceInstances(serviceKey, instanceList);
} }
} }

@ -38,7 +38,13 @@
"name": "spring.cloud.polaris.local-ip-address", "name": "spring.cloud.polaris.local-ip-address",
"type": "java.lang.String", "type": "java.lang.String",
"defaultValue": "", "defaultValue": "",
"description": "current server local ip address." "description": "current server local ip address to be registered."
},
{
"name": "spring.cloud.polaris.local-port",
"type": "java.lang.Integer",
"defaultValue": "",
"description": "current server local port to be registered."
} }
], ],
"hints": [] "hints": []

Loading…
Cancel
Save