get service instances by Flux.blockLast() to resolve concurrent problem (#764)

pull/772/head
lepdou 2 years ago committed by GitHub
parent 26b7e3d1c3
commit 6e8e5f7268
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -18,3 +18,4 @@
- [fix:fix discovery junit.](https://github.com/Tencent/spring-cloud-tencent/pull/730) - [fix:fix discovery junit.](https://github.com/Tencent/spring-cloud-tencent/pull/730)
- [adapt polaris-java 1.10.1 version](https://github.com/Tencent/spring-cloud-tencent/pull/746) - [adapt polaris-java 1.10.1 version](https://github.com/Tencent/spring-cloud-tencent/pull/746)
- [Optimize: change RouteArgument.buildCustom to RouteArgument.fromLabel](https://github.com/Tencent/spring-cloud-tencent/pull/749) - [Optimize: change RouteArgument.buildCustom to RouteArgument.fromLabel](https://github.com/Tencent/spring-cloud-tencent/pull/749)
- [Optimize: get service instances by Flux.blockLast() to resolve concurrent problem](https://github.com/Tencent/spring-cloud-tencent/pull/764)

@ -18,10 +18,8 @@
package com.tencent.cloud.polaris.loadbalancer; package com.tencent.cloud.polaris.loadbalancer;
import java.util.Collections; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import com.tencent.cloud.common.metadata.MetadataContext; import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.common.pojo.PolarisServiceInstance; import com.tencent.cloud.common.pojo.PolarisServiceInstance;
@ -42,44 +40,60 @@ import org.springframework.util.CollectionUtils;
*/ */
public final class LoadBalancerUtils { public final class LoadBalancerUtils {
private static final int DEFAULT_WEIGHT = 100;
private LoadBalancerUtils() { private LoadBalancerUtils() {
} }
/**
* transfer servers to ServiceInstances.
*
* @param servers servers
* @return ServiceInstances
*/
public static ServiceInstances transferServersToServiceInstances(Flux<List<ServiceInstance>> servers) { public static ServiceInstances transferServersToServiceInstances(Flux<List<ServiceInstance>> servers) {
AtomicReference<List<Instance>> instances = new AtomicReference<>(); List<ServiceInstance> serviceInstances = servers.blockLast();
servers.subscribe(serviceInstances -> {
instances.set(serviceInstances.stream().map(serviceInstance -> {
DefaultInstance instance = new DefaultInstance();
instance.setNamespace(MetadataContext.LOCAL_NAMESPACE);
instance.setService(serviceInstance.getServiceId());
instance.setProtocol(serviceInstance.getScheme());
instance.setId(serviceInstance.getInstanceId());
instance.setHost(serviceInstance.getHost());
instance.setPort(serviceInstance.getPort());
instance.setWeight(100);
instance.setMetadata(serviceInstance.getMetadata());
if (serviceInstance instanceof PolarisServiceInstance) {
PolarisServiceInstance polarisServiceInstance = (PolarisServiceInstance) serviceInstance;
instance.setRegion(polarisServiceInstance.getPolarisInstance().getRegion());
instance.setZone(polarisServiceInstance.getPolarisInstance().getZone());
instance.setCampus(polarisServiceInstance.getPolarisInstance().getCampus());
}
return instance; List<Instance> instances = new ArrayList<>();
}).collect(Collectors.toList())); if (!CollectionUtils.isEmpty(serviceInstances)) {
}); for (ServiceInstance serviceInstance : serviceInstances) {
instances.add(transferServerToServiceInstance(serviceInstance));
}
}
String serviceName = null; String serviceName = null;
if (CollectionUtils.isEmpty(instances.get())) { if (!CollectionUtils.isEmpty(instances)) {
instances.set(Collections.emptyList()); serviceName = instances.get(0).getService();
}
else {
serviceName = instances.get().get(0).getService();
} }
ServiceKey serviceKey = new ServiceKey(MetadataContext.LOCAL_NAMESPACE, serviceName); ServiceKey serviceKey = new ServiceKey(MetadataContext.LOCAL_NAMESPACE, serviceName);
return new DefaultServiceInstances(serviceKey, instances);
}
/**
* transfer ServiceInstance to DefaultInstance.
*
* @param serviceInstance serviceInstance
* @return defaultInstance
*/
public static DefaultInstance transferServerToServiceInstance(ServiceInstance serviceInstance) {
DefaultInstance instance = new DefaultInstance();
instance.setNamespace(MetadataContext.LOCAL_NAMESPACE);
instance.setService(serviceInstance.getServiceId());
instance.setProtocol(serviceInstance.getScheme());
instance.setId(serviceInstance.getInstanceId());
instance.setHost(serviceInstance.getHost());
instance.setPort(serviceInstance.getPort());
instance.setWeight(DEFAULT_WEIGHT);
instance.setMetadata(serviceInstance.getMetadata());
if (serviceInstance instanceof PolarisServiceInstance) {
PolarisServiceInstance polarisServiceInstance = (PolarisServiceInstance) serviceInstance;
instance.setRegion(polarisServiceInstance.getPolarisInstance().getRegion());
instance.setZone(polarisServiceInstance.getPolarisInstance().getZone());
instance.setCampus(polarisServiceInstance.getPolarisInstance().getCampus());
}
return new DefaultServiceInstances(serviceKey, instances.get()); return instance;
} }
} }

Loading…
Cancel
Save