refactor:refactor reactor code in router module.

pull/955/head
Haotian Zhang 2 years ago
parent f4d4e03cd9
commit 806a600a26

@ -1,9 +1,10 @@
# Change Log # Change Log
--- ---
- [feature: optimize polaris-ratelimit-example, add caller query params and headers, add callee custom label resolver.](https://github.com/Tencent/spring-cloud-tencent/pull/951)
- [feature: add config for customized local port.](https://github.com/Tencent/spring-cloud-tencent/pull/923) - [feature: add config for customized local port.](https://github.com/Tencent/spring-cloud-tencent/pull/923)
- [feat:support webclient and gateway report call metrics](https://github.com/Tencent/spring-cloud-tencent/pull/924) - [feat:support webclient and gateway report call metrics](https://github.com/Tencent/spring-cloud-tencent/pull/924)
- [feature: optimize polaris-discovery-example/discovery-callee-service, add client-ip return.](https://github.com/Tencent/spring-cloud-tencent/pull/939) - [feature: optimize polaris-discovery-example/discovery-callee-service, add client-ip return.](https://github.com/Tencent/spring-cloud-tencent/pull/939)
- [docs:prevent the release of the final version of the sdk.](https://github.com/Tencent/spring-cloud-tencent/pull/943) - [docs:prevent the release of the final version of the sdk.](https://github.com/Tencent/spring-cloud-tencent/pull/943)
- [fix: fix nacos CircuitBreaker disable bug.](https://github.com/Tencent/spring-cloud-tencent/pull/948) - [fix: fix nacos CircuitBreaker disable bug.](https://github.com/Tencent/spring-cloud-tencent/pull/948)
- [feature: optimize polaris-ratelimit-example, add caller query params and headers, add callee custom label resolver.](https://github.com/Tencent/spring-cloud-tencent/pull/951)
- [refactor:refactor reactor code in router module.](https://github.com/Tencent/spring-cloud-tencent/pull/955)

@ -25,6 +25,7 @@ import com.tencent.polaris.factory.api.DiscoveryAPIFactory;
import com.tencent.polaris.router.api.core.RouterAPI; import com.tencent.polaris.router.api.core.RouterAPI;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.client.ConditionalOnBlockingDiscoveryEnabled; import org.springframework.cloud.client.ConditionalOnBlockingDiscoveryEnabled;
@ -72,6 +73,7 @@ public class PolarisLoadBalancerClientConfiguration {
@Bean @Bean
@ConditionalOnMissingBean @ConditionalOnMissingBean
@ConditionalOnClass(name = "org.springframework.web.reactive.function.client.ClientRequest")
public LoadBalancerClientRequestTransformer polarisLoadBalancerClientRequestTransformer(SDKContext sdkContext) { public LoadBalancerClientRequestTransformer polarisLoadBalancerClientRequestTransformer(SDKContext sdkContext) {
ConsumerAPI consumerAPI = DiscoveryAPIFactory.createConsumerAPIByContext(sdkContext); ConsumerAPI consumerAPI = DiscoveryAPIFactory.createConsumerAPIByContext(sdkContext);
return new PolarisLoadBalancerClientRequestTransformer(consumerAPI); return new PolarisLoadBalancerClientRequestTransformer(consumerAPI);

@ -17,11 +17,10 @@
package com.tencent.cloud.polaris.router; package com.tencent.cloud.polaris.router;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.function.Function;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import com.tencent.cloud.common.metadata.MetadataContext; import com.tencent.cloud.common.metadata.MetadataContext;
@ -30,6 +29,7 @@ import com.tencent.polaris.api.pojo.DefaultServiceInstances;
import com.tencent.polaris.api.pojo.Instance; import com.tencent.polaris.api.pojo.Instance;
import com.tencent.polaris.api.pojo.ServiceInstances; import com.tencent.polaris.api.pojo.ServiceInstances;
import com.tencent.polaris.api.pojo.ServiceKey; import com.tencent.polaris.api.pojo.ServiceKey;
import org.reactivestreams.Publisher;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
@ -57,33 +57,19 @@ public final class RouterUtils {
* @return ServiceInstances * @return ServiceInstances
*/ */
public static ServiceInstances transferServersToServiceInstances(Flux<List<ServiceInstance>> servers, InstanceTransformer instanceTransformer) { public static ServiceInstances transferServersToServiceInstances(Flux<List<ServiceInstance>> servers, InstanceTransformer instanceTransformer) {
CountDownLatch latch = new CountDownLatch(1); List<Instance> instanceList = Collections.synchronizedList(new ArrayList<>());
servers.flatMap((Function<List<ServiceInstance>, Publisher<?>>) serviceInstances ->
AtomicReference<List<Instance>> instancesRef = new AtomicReference<>(); Flux.fromIterable(serviceInstances.stream()
servers.subscribe(serviceInstances -> { .map(instanceTransformer::transform)
instancesRef.set(serviceInstances .collect(Collectors.toList()))).subscribe(instance -> instanceList.add((Instance) instance));
.stream()
.map(instanceTransformer::transform)
.collect(Collectors.toList()));
latch.countDown();
});
try {
latch.await(WAIT_TIME, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
LOGGER.error("Wait get instance result error. ", e);
}
String serviceName = ""; String serviceName = "";
if (!CollectionUtils.isEmpty(instancesRef.get())) { if (!CollectionUtils.isEmpty(instanceList)) {
serviceName = instancesRef.get().get(0).getService(); serviceName = instanceList.get(0).getService();
} }
ServiceKey serviceKey = new ServiceKey(MetadataContext.LOCAL_NAMESPACE, serviceName); ServiceKey serviceKey = new ServiceKey(MetadataContext.LOCAL_NAMESPACE, serviceName);
List<Instance> instances = instancesRef.get() == null ? Collections.emptyList() : instancesRef.get();
return new DefaultServiceInstances(serviceKey, instances); return new DefaultServiceInstances(serviceKey, instanceList);
} }
} }

Loading…
Cancel
Save