From 6e8e5f72687e7b0d14a6403ea621c99eb4b15f59 Mon Sep 17 00:00:00 2001 From: lepdou Date: Wed, 7 Dec 2022 15:54:45 +0800 Subject: [PATCH] get service instances by Flux.blockLast() to resolve concurrent problem (#764) --- CHANGELOG.md | 1 + .../loadbalancer/LoadBalancerUtils.java | 76 +++++++++++-------- 2 files changed, 46 insertions(+), 31 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 156408186..ef9b9be62 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,3 +18,4 @@ - [fix:fix discovery junit.](https://github.com/Tencent/spring-cloud-tencent/pull/730) - [adapt polaris-java 1.10.1 version](https://github.com/Tencent/spring-cloud-tencent/pull/746) - [Optimize: change RouteArgument.buildCustom to RouteArgument.fromLabel](https://github.com/Tencent/spring-cloud-tencent/pull/749) +- [Optimize: get service instances by Flux.blockLast() to resolve concurrent problem](https://github.com/Tencent/spring-cloud-tencent/pull/764) diff --git a/spring-cloud-tencent-polaris-loadbalancer/src/main/java/com/tencent/cloud/polaris/loadbalancer/LoadBalancerUtils.java b/spring-cloud-tencent-polaris-loadbalancer/src/main/java/com/tencent/cloud/polaris/loadbalancer/LoadBalancerUtils.java index e4f837254..af18a36fc 100644 --- a/spring-cloud-tencent-polaris-loadbalancer/src/main/java/com/tencent/cloud/polaris/loadbalancer/LoadBalancerUtils.java +++ b/spring-cloud-tencent-polaris-loadbalancer/src/main/java/com/tencent/cloud/polaris/loadbalancer/LoadBalancerUtils.java @@ -18,10 +18,8 @@ package com.tencent.cloud.polaris.loadbalancer; -import java.util.Collections; +import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; import com.tencent.cloud.common.metadata.MetadataContext; import com.tencent.cloud.common.pojo.PolarisServiceInstance; @@ -42,44 +40,60 @@ import org.springframework.util.CollectionUtils; */ public final class LoadBalancerUtils { + private static final int DEFAULT_WEIGHT = 100; + private LoadBalancerUtils() { } + /** + * transfer servers to ServiceInstances. + * + * @param servers servers + * @return ServiceInstances + */ public static ServiceInstances transferServersToServiceInstances(Flux> servers) { - AtomicReference> instances = new AtomicReference<>(); - servers.subscribe(serviceInstances -> { - instances.set(serviceInstances.stream().map(serviceInstance -> { - DefaultInstance instance = new DefaultInstance(); - instance.setNamespace(MetadataContext.LOCAL_NAMESPACE); - instance.setService(serviceInstance.getServiceId()); - instance.setProtocol(serviceInstance.getScheme()); - instance.setId(serviceInstance.getInstanceId()); - instance.setHost(serviceInstance.getHost()); - instance.setPort(serviceInstance.getPort()); - instance.setWeight(100); - instance.setMetadata(serviceInstance.getMetadata()); - - if (serviceInstance instanceof PolarisServiceInstance) { - PolarisServiceInstance polarisServiceInstance = (PolarisServiceInstance) serviceInstance; - instance.setRegion(polarisServiceInstance.getPolarisInstance().getRegion()); - instance.setZone(polarisServiceInstance.getPolarisInstance().getZone()); - instance.setCampus(polarisServiceInstance.getPolarisInstance().getCampus()); - } + List serviceInstances = servers.blockLast(); - return instance; - }).collect(Collectors.toList())); - }); + List instances = new ArrayList<>(); + if (!CollectionUtils.isEmpty(serviceInstances)) { + for (ServiceInstance serviceInstance : serviceInstances) { + instances.add(transferServerToServiceInstance(serviceInstance)); + } + } String serviceName = null; - if (CollectionUtils.isEmpty(instances.get())) { - instances.set(Collections.emptyList()); - } - else { - serviceName = instances.get().get(0).getService(); + if (!CollectionUtils.isEmpty(instances)) { + serviceName = instances.get(0).getService(); } ServiceKey serviceKey = new ServiceKey(MetadataContext.LOCAL_NAMESPACE, serviceName); + return new DefaultServiceInstances(serviceKey, instances); + } + + /** + * transfer ServiceInstance to DefaultInstance. + * + * @param serviceInstance serviceInstance + * @return defaultInstance + */ + public static DefaultInstance transferServerToServiceInstance(ServiceInstance serviceInstance) { + DefaultInstance instance = new DefaultInstance(); + instance.setNamespace(MetadataContext.LOCAL_NAMESPACE); + instance.setService(serviceInstance.getServiceId()); + instance.setProtocol(serviceInstance.getScheme()); + instance.setId(serviceInstance.getInstanceId()); + instance.setHost(serviceInstance.getHost()); + instance.setPort(serviceInstance.getPort()); + instance.setWeight(DEFAULT_WEIGHT); + instance.setMetadata(serviceInstance.getMetadata()); + + if (serviceInstance instanceof PolarisServiceInstance) { + PolarisServiceInstance polarisServiceInstance = (PolarisServiceInstance) serviceInstance; + instance.setRegion(polarisServiceInstance.getPolarisInstance().getRegion()); + instance.setZone(polarisServiceInstance.getPolarisInstance().getZone()); + instance.setCampus(polarisServiceInstance.getPolarisInstance().getCampus()); + } - return new DefaultServiceInstances(serviceKey, instances.get()); + return instance; } }