From 7af4d3c026de61696a006a10ec7c5ebcf11169fd Mon Sep 17 00:00:00 2001 From: veteranchen Date: Thu, 20 Jul 2023 23:34:12 +0800 Subject: [PATCH] =?UTF-8?q?polaris=20load=20balancer:=20=E6=8F=90=E5=8F=96?= =?UTF-8?q?=E8=B4=9F=E8=BD=BD=E5=9D=87=E8=A1=A1=E5=99=A8=E5=85=AC=E5=85=B1?= =?UTF-8?q?=E6=8A=BD=E8=B1=A1=E5=9F=BA=E7=B1=BB=EF=BC=8C=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E5=8A=A0=E6=9D=83=E8=BD=AE=E8=AE=AD=E8=B4=9F=E8=BD=BD=E5=9D=87?= =?UTF-8?q?=E8=A1=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --issue=968 --- .../PolarisAbstractLoadBalancer.java | 82 +++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/loadbalancer/PolarisAbstractLoadBalancer.java diff --git a/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/loadbalancer/PolarisAbstractLoadBalancer.java b/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/loadbalancer/PolarisAbstractLoadBalancer.java new file mode 100644 index 000000000..94acdb7a4 --- /dev/null +++ b/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/loadbalancer/PolarisAbstractLoadBalancer.java @@ -0,0 +1,82 @@ +package com.tencent.cloud.polaris.loadbalancer; + +import com.tencent.cloud.common.metadata.MetadataContext; +import com.tencent.cloud.common.pojo.PolarisServiceInstance; +import com.tencent.polaris.api.pojo.DefaultServiceInstances; +import com.tencent.polaris.api.pojo.Instance; +import com.tencent.polaris.api.pojo.ServiceInstances; +import com.tencent.polaris.api.pojo.ServiceKey; +import com.tencent.polaris.router.api.core.RouterAPI; +import com.tencent.polaris.router.api.rpc.ProcessLoadBalanceRequest; +import com.tencent.polaris.router.api.rpc.ProcessLoadBalanceResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.cloud.client.ServiceInstance; +import org.springframework.cloud.client.loadbalancer.DefaultResponse; +import org.springframework.cloud.client.loadbalancer.EmptyResponse; +import org.springframework.cloud.client.loadbalancer.Request; +import org.springframework.cloud.client.loadbalancer.Response; +import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier; +import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer; +import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier; +import reactor.core.publisher.Mono; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * Abstract Loadbalancer of Polaris. + * + * @author veteranchen + */ +public abstract class PolarisAbstractLoadBalancer implements ReactorServiceInstanceLoadBalancer { + + private static final Logger log = LoggerFactory.getLogger(PolarisAbstractLoadBalancer.class); + + private final String serviceId; + + private final RouterAPI routerAPI; + + private ObjectProvider supplierObjectProvider; + + public PolarisAbstractLoadBalancer(String serviceId, ObjectProvider supplierObjectProvider, RouterAPI routerAPI) { + this.serviceId = serviceId; + this.supplierObjectProvider = supplierObjectProvider; + this.routerAPI = routerAPI; + } + + private static ServiceInstances convertToPolarisServiceInstances(List serviceInstances) { + ServiceKey serviceKey = new ServiceKey(MetadataContext.LOCAL_NAMESPACE, serviceInstances.get(0).getServiceId()); + List polarisInstances = serviceInstances.stream() + .map(serviceInstance -> ((PolarisServiceInstance) serviceInstance).getPolarisInstance()) + .collect(Collectors.toList()); + return new DefaultServiceInstances(serviceKey, polarisInstances); + } + + @Override + public Mono> choose(Request request) { + ServiceInstanceListSupplier supplier = supplierObjectProvider + .getIfAvailable(NoopServiceInstanceListSupplier::new); + return supplier.get(request).next().map(serviceInstances -> { + if (serviceInstances.isEmpty()) { + log.warn("No servers available for service: " + this.serviceId); + return new EmptyResponse(); + } + + ProcessLoadBalanceRequest req = new ProcessLoadBalanceRequest(); + req.setDstInstances(convertToPolarisServiceInstances(serviceInstances)); + req = setProcessLoadBalanceRequest(req); + + try { + ProcessLoadBalanceResponse response = routerAPI.processLoadBalance(req); + return new DefaultResponse(new PolarisServiceInstance(response.getTargetInstance())); + } catch (Exception e) { + log.warn("PolarisRoutingLoadbalancer error", e); + return new EmptyResponse(); + } + }); + } + + protected abstract ProcessLoadBalanceRequest setProcessLoadBalanceRequest(ProcessLoadBalanceRequest req); +}