diff --git a/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/loadbalancer/PolarisAbstractLoadBalancer.java b/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/loadbalancer/PolarisAbstractLoadBalancer.java new file mode 100644 index 000000000..94acdb7a4 --- /dev/null +++ b/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/loadbalancer/PolarisAbstractLoadBalancer.java @@ -0,0 +1,82 @@ +package com.tencent.cloud.polaris.loadbalancer; + +import com.tencent.cloud.common.metadata.MetadataContext; +import com.tencent.cloud.common.pojo.PolarisServiceInstance; +import com.tencent.polaris.api.pojo.DefaultServiceInstances; +import com.tencent.polaris.api.pojo.Instance; +import com.tencent.polaris.api.pojo.ServiceInstances; +import com.tencent.polaris.api.pojo.ServiceKey; +import com.tencent.polaris.router.api.core.RouterAPI; +import com.tencent.polaris.router.api.rpc.ProcessLoadBalanceRequest; +import com.tencent.polaris.router.api.rpc.ProcessLoadBalanceResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.cloud.client.ServiceInstance; +import org.springframework.cloud.client.loadbalancer.DefaultResponse; +import org.springframework.cloud.client.loadbalancer.EmptyResponse; +import org.springframework.cloud.client.loadbalancer.Request; +import org.springframework.cloud.client.loadbalancer.Response; +import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier; +import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer; +import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier; +import reactor.core.publisher.Mono; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * Abstract Loadbalancer of Polaris. + * + * @author veteranchen + */ +public abstract class PolarisAbstractLoadBalancer implements ReactorServiceInstanceLoadBalancer { + + private static final Logger log = LoggerFactory.getLogger(PolarisAbstractLoadBalancer.class); + + private final String serviceId; + + private final RouterAPI routerAPI; + + private ObjectProvider supplierObjectProvider; + + public PolarisAbstractLoadBalancer(String serviceId, ObjectProvider supplierObjectProvider, RouterAPI routerAPI) { + this.serviceId = serviceId; + this.supplierObjectProvider = supplierObjectProvider; + this.routerAPI = routerAPI; + } + + private static ServiceInstances convertToPolarisServiceInstances(List serviceInstances) { + ServiceKey serviceKey = new ServiceKey(MetadataContext.LOCAL_NAMESPACE, serviceInstances.get(0).getServiceId()); + List polarisInstances = serviceInstances.stream() + .map(serviceInstance -> ((PolarisServiceInstance) serviceInstance).getPolarisInstance()) + .collect(Collectors.toList()); + return new DefaultServiceInstances(serviceKey, polarisInstances); + } + + @Override + public Mono> choose(Request request) { + ServiceInstanceListSupplier supplier = supplierObjectProvider + .getIfAvailable(NoopServiceInstanceListSupplier::new); + return supplier.get(request).next().map(serviceInstances -> { + if (serviceInstances.isEmpty()) { + log.warn("No servers available for service: " + this.serviceId); + return new EmptyResponse(); + } + + ProcessLoadBalanceRequest req = new ProcessLoadBalanceRequest(); + req.setDstInstances(convertToPolarisServiceInstances(serviceInstances)); + req = setProcessLoadBalanceRequest(req); + + try { + ProcessLoadBalanceResponse response = routerAPI.processLoadBalance(req); + return new DefaultResponse(new PolarisServiceInstance(response.getTargetInstance())); + } catch (Exception e) { + log.warn("PolarisRoutingLoadbalancer error", e); + return new EmptyResponse(); + } + }); + } + + protected abstract ProcessLoadBalanceRequest setProcessLoadBalanceRequest(ProcessLoadBalanceRequest req); +}