refactor:refactor reactor code in router module. (#955)

pull/970/head
Haotian Zhang 2 years ago committed by GitHub
parent f4d4e03cd9
commit 4ff4459d6b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,9 +1,10 @@
# Change Log # Change Log
--- ---
- [feature: optimize polaris-ratelimit-example, add caller query params and headers, add callee custom label resolver.](https://github.com/Tencent/spring-cloud-tencent/pull/951)
- [feature: add config for customized local port.](https://github.com/Tencent/spring-cloud-tencent/pull/923) - [feature: add config for customized local port.](https://github.com/Tencent/spring-cloud-tencent/pull/923)
- [feat:support webclient and gateway report call metrics](https://github.com/Tencent/spring-cloud-tencent/pull/924) - [feat:support webclient and gateway report call metrics](https://github.com/Tencent/spring-cloud-tencent/pull/924)
- [feature: optimize polaris-discovery-example/discovery-callee-service, add client-ip return.](https://github.com/Tencent/spring-cloud-tencent/pull/939) - [feature: optimize polaris-discovery-example/discovery-callee-service, add client-ip return.](https://github.com/Tencent/spring-cloud-tencent/pull/939)
- [docs:prevent the release of the final version of the sdk.](https://github.com/Tencent/spring-cloud-tencent/pull/943) - [docs:prevent the release of the final version of the sdk.](https://github.com/Tencent/spring-cloud-tencent/pull/943)
- [fix: fix nacos CircuitBreaker disable bug.](https://github.com/Tencent/spring-cloud-tencent/pull/948) - [fix: fix nacos CircuitBreaker disable bug.](https://github.com/Tencent/spring-cloud-tencent/pull/948)
- [feature: optimize polaris-ratelimit-example, add caller query params and headers, add callee custom label resolver.](https://github.com/Tencent/spring-cloud-tencent/pull/951)
- [refactor:refactor reactor code in router module.](https://github.com/Tencent/spring-cloud-tencent/pull/955)

@ -25,6 +25,7 @@ import com.tencent.polaris.factory.api.DiscoveryAPIFactory;
import com.tencent.polaris.router.api.core.RouterAPI; import com.tencent.polaris.router.api.core.RouterAPI;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.client.ConditionalOnBlockingDiscoveryEnabled; import org.springframework.cloud.client.ConditionalOnBlockingDiscoveryEnabled;
@ -72,6 +73,7 @@ public class PolarisLoadBalancerClientConfiguration {
@Bean @Bean
@ConditionalOnMissingBean @ConditionalOnMissingBean
@ConditionalOnClass(name = "org.springframework.web.reactive.function.client.ClientRequest")
public LoadBalancerClientRequestTransformer polarisLoadBalancerClientRequestTransformer(SDKContext sdkContext) { public LoadBalancerClientRequestTransformer polarisLoadBalancerClientRequestTransformer(SDKContext sdkContext) {
ConsumerAPI consumerAPI = DiscoveryAPIFactory.createConsumerAPIByContext(sdkContext); ConsumerAPI consumerAPI = DiscoveryAPIFactory.createConsumerAPIByContext(sdkContext);
return new PolarisLoadBalancerClientRequestTransformer(consumerAPI); return new PolarisLoadBalancerClientRequestTransformer(consumerAPI);

@ -17,11 +17,10 @@
package com.tencent.cloud.polaris.router; package com.tencent.cloud.polaris.router;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.function.Function;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import com.tencent.cloud.common.metadata.MetadataContext; import com.tencent.cloud.common.metadata.MetadataContext;
@ -30,6 +29,7 @@ import com.tencent.polaris.api.pojo.DefaultServiceInstances;
import com.tencent.polaris.api.pojo.Instance; import com.tencent.polaris.api.pojo.Instance;
import com.tencent.polaris.api.pojo.ServiceInstances; import com.tencent.polaris.api.pojo.ServiceInstances;
import com.tencent.polaris.api.pojo.ServiceKey; import com.tencent.polaris.api.pojo.ServiceKey;
import org.reactivestreams.Publisher;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
@ -57,33 +57,19 @@ public final class RouterUtils {
* @return ServiceInstances * @return ServiceInstances
*/ */
public static ServiceInstances transferServersToServiceInstances(Flux<List<ServiceInstance>> servers, InstanceTransformer instanceTransformer) { public static ServiceInstances transferServersToServiceInstances(Flux<List<ServiceInstance>> servers, InstanceTransformer instanceTransformer) {
CountDownLatch latch = new CountDownLatch(1); List<Instance> instanceList = Collections.synchronizedList(new ArrayList<>());
servers.flatMap((Function<List<ServiceInstance>, Publisher<?>>) serviceInstances ->
AtomicReference<List<Instance>> instancesRef = new AtomicReference<>(); Flux.fromIterable(serviceInstances.stream()
servers.subscribe(serviceInstances -> {
instancesRef.set(serviceInstances
.stream()
.map(instanceTransformer::transform) .map(instanceTransformer::transform)
.collect(Collectors.toList())); .collect(Collectors.toList()))).subscribe(instance -> instanceList.add((Instance) instance));
latch.countDown();
});
try {
latch.await(WAIT_TIME, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
LOGGER.error("Wait get instance result error. ", e);
}
String serviceName = ""; String serviceName = "";
if (!CollectionUtils.isEmpty(instancesRef.get())) { if (!CollectionUtils.isEmpty(instanceList)) {
serviceName = instancesRef.get().get(0).getService(); serviceName = instanceList.get(0).getService();
} }
ServiceKey serviceKey = new ServiceKey(MetadataContext.LOCAL_NAMESPACE, serviceName); ServiceKey serviceKey = new ServiceKey(MetadataContext.LOCAL_NAMESPACE, serviceName);
List<Instance> instances = instancesRef.get() == null ? Collections.emptyList() : instancesRef.get();
return new DefaultServiceInstances(serviceKey, instances); return new DefaultServiceInstances(serviceKey, instanceList);
} }
} }

@ -38,7 +38,13 @@
"name": "spring.cloud.polaris.local-ip-address", "name": "spring.cloud.polaris.local-ip-address",
"type": "java.lang.String", "type": "java.lang.String",
"defaultValue": "", "defaultValue": "",
"description": "current server local ip address." "description": "current server local ip address to be registered."
},
{
"name": "spring.cloud.polaris.local-port",
"type": "java.lang.Integer",
"defaultValue": "",
"description": "current server local port to be registered."
} }
], ],
"hints": [] "hints": []

Loading…
Cancel
Save