|
|
|
@ -15,9 +15,10 @@
|
|
|
|
|
* specific language governing permissions and limitations under the License.
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
package com.tencent.cloud.polaris.router;
|
|
|
|
|
package com.tencent.cloud.polaris.loadbalancer;
|
|
|
|
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
|
|
import java.util.Collections;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
import java.util.Map;
|
|
|
|
|
|
|
|
|
@ -28,49 +29,105 @@ import com.netflix.loadbalancer.IRule;
|
|
|
|
|
import com.netflix.loadbalancer.PollingServerListUpdater;
|
|
|
|
|
import com.netflix.loadbalancer.Server;
|
|
|
|
|
import com.netflix.loadbalancer.ServerList;
|
|
|
|
|
import com.tencent.cloud.common.constant.ContextConstant;
|
|
|
|
|
import com.tencent.cloud.common.constant.MetadataConstant.SystemMetadataKey;
|
|
|
|
|
import com.tencent.cloud.common.metadata.MetadataContext;
|
|
|
|
|
import com.tencent.cloud.common.metadata.MetadataContextHolder;
|
|
|
|
|
import com.tencent.cloud.common.pojo.PolarisServer;
|
|
|
|
|
import com.tencent.cloud.polaris.loadbalancer.config.PolarisLoadBalancerProperties;
|
|
|
|
|
import com.tencent.polaris.api.core.ConsumerAPI;
|
|
|
|
|
import com.tencent.polaris.api.pojo.DefaultInstance;
|
|
|
|
|
import com.tencent.polaris.api.pojo.DefaultServiceInstances;
|
|
|
|
|
import com.tencent.polaris.api.pojo.Instance;
|
|
|
|
|
import com.tencent.polaris.api.pojo.ServiceInfo;
|
|
|
|
|
import com.tencent.polaris.api.pojo.ServiceInstances;
|
|
|
|
|
import com.tencent.polaris.api.pojo.ServiceKey;
|
|
|
|
|
import com.tencent.polaris.api.rpc.GetAllInstancesRequest;
|
|
|
|
|
import com.tencent.polaris.api.rpc.InstancesResponse;
|
|
|
|
|
import com.tencent.polaris.router.api.core.RouterAPI;
|
|
|
|
|
import com.tencent.polaris.router.api.rpc.ProcessRoutersRequest;
|
|
|
|
|
import com.tencent.polaris.router.api.rpc.ProcessRoutersResponse;
|
|
|
|
|
import org.apache.commons.collections.CollectionUtils;
|
|
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
|
|
|
|
|
|
|
|
import org.springframework.util.CollectionUtils;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Routing load balancer of polaris.
|
|
|
|
|
*
|
|
|
|
|
* @author Haotian Zhang
|
|
|
|
|
*/
|
|
|
|
|
public class PolarisRoutingLoadBalancer extends DynamicServerListLoadBalancer<Server> {
|
|
|
|
|
public class PolarisLoadBalancer extends DynamicServerListLoadBalancer<Server> {
|
|
|
|
|
|
|
|
|
|
private final RouterAPI routerAPI;
|
|
|
|
|
|
|
|
|
|
public PolarisRoutingLoadBalancer(IClientConfig config, IRule rule, IPing ping,
|
|
|
|
|
ServerList<Server> serverList, RouterAPI routerAPI) {
|
|
|
|
|
private ConsumerAPI consumerAPI;
|
|
|
|
|
|
|
|
|
|
private PolarisLoadBalancerProperties polarisLoadBalancerProperties;
|
|
|
|
|
|
|
|
|
|
public PolarisLoadBalancer(IClientConfig config, IRule rule, IPing ping, ServerList<Server> serverList,
|
|
|
|
|
RouterAPI routerAPI, ConsumerAPI consumerAPI, PolarisLoadBalancerProperties properties) {
|
|
|
|
|
super(config, rule, ping, serverList, null, new PollingServerListUpdater());
|
|
|
|
|
this.routerAPI = routerAPI;
|
|
|
|
|
this.consumerAPI = consumerAPI;
|
|
|
|
|
this.polarisLoadBalancerProperties = properties;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public List<Server> getReachableServers() {
|
|
|
|
|
ServiceInstances serviceInstances;
|
|
|
|
|
if (polarisLoadBalancerProperties.getDiscoveryType().equals(ContextConstant.POLARIS)) {
|
|
|
|
|
serviceInstances = getPolarisDiscoveryServiceInstances();
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
serviceInstances = getExtendDiscoveryServiceInstances();
|
|
|
|
|
}
|
|
|
|
|
if (serviceInstances == null || CollectionUtils.isEmpty(serviceInstances.getInstances())) {
|
|
|
|
|
return Collections.emptyList();
|
|
|
|
|
}
|
|
|
|
|
ProcessRoutersRequest processRoutersRequest = new ProcessRoutersRequest();
|
|
|
|
|
processRoutersRequest.setDstInstances(serviceInstances);
|
|
|
|
|
String srcNamespace = MetadataContext.LOCAL_NAMESPACE;
|
|
|
|
|
String srcService = MetadataContext.LOCAL_SERVICE;
|
|
|
|
|
Map<String, String> transitiveCustomMetadata = MetadataContextHolder.get()
|
|
|
|
|
.getAllTransitiveCustomMetadata();
|
|
|
|
|
String method = MetadataContextHolder.get()
|
|
|
|
|
.getSystemMetadata(SystemMetadataKey.PEER_PATH);
|
|
|
|
|
processRoutersRequest.setMethod(method);
|
|
|
|
|
if (StringUtils.isNotBlank(srcNamespace) && StringUtils.isNotBlank(srcService)) {
|
|
|
|
|
ServiceInfo serviceInfo = new ServiceInfo();
|
|
|
|
|
serviceInfo.setNamespace(srcNamespace);
|
|
|
|
|
serviceInfo.setService(srcService);
|
|
|
|
|
serviceInfo.setMetadata(transitiveCustomMetadata);
|
|
|
|
|
processRoutersRequest.setSourceService(serviceInfo);
|
|
|
|
|
}
|
|
|
|
|
ProcessRoutersResponse processRoutersResponse = routerAPI
|
|
|
|
|
.processRouters(processRoutersRequest);
|
|
|
|
|
ServiceInstances filteredServiceInstances = processRoutersResponse
|
|
|
|
|
.getServiceInstances();
|
|
|
|
|
List<Server> filteredInstances = new ArrayList<>();
|
|
|
|
|
for (Instance instance : filteredServiceInstances.getInstances()) {
|
|
|
|
|
filteredInstances.add(new PolarisServer(serviceInstances, instance));
|
|
|
|
|
}
|
|
|
|
|
return filteredInstances;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private ServiceInstances getPolarisDiscoveryServiceInstances() {
|
|
|
|
|
String serviceName = MetadataContextHolder.get().getSystemMetadata(SystemMetadataKey.PEER_SERVICE);
|
|
|
|
|
if (StringUtils.isBlank(serviceName)) {
|
|
|
|
|
List<Server> allServers = super.getAllServers();
|
|
|
|
|
if (CollectionUtils.isEmpty(allServers)) {
|
|
|
|
|
return allServers;
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
ServiceInstances serviceInstances = null;
|
|
|
|
|
if (allServers.get(0) instanceof PolarisServer) {
|
|
|
|
|
serviceInstances = ((PolarisServer) allServers.get(0)).getServiceInstances();
|
|
|
|
|
serviceName = ((PolarisServer) super.getAllServers().get(0)).getServiceInstances().getService();
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
return getAllInstances(MetadataContext.LOCAL_NAMESPACE, serviceName).toServiceInstances();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private ServiceInstances getExtendDiscoveryServiceInstances() {
|
|
|
|
|
List<Server> allServers = super.getAllServers();
|
|
|
|
|
if (CollectionUtils.isEmpty(allServers)) {
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
ServiceInstances serviceInstances;
|
|
|
|
|
String serviceName;
|
|
|
|
|
// notice the difference between different service registries
|
|
|
|
|
if (StringUtils.isNotBlank(
|
|
|
|
@ -82,7 +139,7 @@ public class PolarisRoutingLoadBalancer extends DynamicServerListLoadBalancer<Se
|
|
|
|
|
}
|
|
|
|
|
if (StringUtils.isBlank(serviceName)) {
|
|
|
|
|
throw new IllegalStateException(
|
|
|
|
|
"PolarisRoutingLoadBalancer only Server with AppName or ServiceIdForDiscovery attribute");
|
|
|
|
|
"PolarisLoadBalancer only Server with AppName or ServiceIdForDiscovery attribute");
|
|
|
|
|
}
|
|
|
|
|
ServiceKey serviceKey = new ServiceKey(MetadataContext.LOCAL_NAMESPACE,
|
|
|
|
|
serviceName);
|
|
|
|
@ -101,32 +158,7 @@ public class PolarisRoutingLoadBalancer extends DynamicServerListLoadBalancer<Se
|
|
|
|
|
instances.add(instance);
|
|
|
|
|
}
|
|
|
|
|
serviceInstances = new DefaultServiceInstances(serviceKey, instances);
|
|
|
|
|
}
|
|
|
|
|
ProcessRoutersRequest processRoutersRequest = new ProcessRoutersRequest();
|
|
|
|
|
processRoutersRequest.setDstInstances(serviceInstances);
|
|
|
|
|
String srcNamespace = MetadataContext.LOCAL_NAMESPACE;
|
|
|
|
|
String srcService = MetadataContext.LOCAL_SERVICE;
|
|
|
|
|
Map<String, String> transitiveCustomMetadata = MetadataContextHolder.get()
|
|
|
|
|
.getAllTransitiveCustomMetadata();
|
|
|
|
|
String method = MetadataContextHolder.get()
|
|
|
|
|
.getSystemMetadata(SystemMetadataKey.PEER_PATH);
|
|
|
|
|
processRoutersRequest.setMethod(method);
|
|
|
|
|
if (StringUtils.isNotBlank(srcNamespace) && StringUtils.isNotBlank(srcService)) {
|
|
|
|
|
ServiceInfo serviceInfo = new ServiceInfo();
|
|
|
|
|
serviceInfo.setNamespace(srcNamespace);
|
|
|
|
|
serviceInfo.setService(srcService);
|
|
|
|
|
serviceInfo.setMetadata(transitiveCustomMetadata);
|
|
|
|
|
processRoutersRequest.setSourceService(serviceInfo);
|
|
|
|
|
}
|
|
|
|
|
ProcessRoutersResponse processRoutersResponse = routerAPI
|
|
|
|
|
.processRouters(processRoutersRequest);
|
|
|
|
|
ServiceInstances filteredServiceInstances = processRoutersResponse
|
|
|
|
|
.getServiceInstances();
|
|
|
|
|
List<Server> filteredInstances = new ArrayList<>();
|
|
|
|
|
for (Instance instance : filteredServiceInstances.getInstances()) {
|
|
|
|
|
filteredInstances.add(new PolarisServer(serviceInstances, instance));
|
|
|
|
|
}
|
|
|
|
|
return filteredInstances;
|
|
|
|
|
return serviceInstances;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -134,4 +166,17 @@ public class PolarisRoutingLoadBalancer extends DynamicServerListLoadBalancer<Se
|
|
|
|
|
return getReachableServers();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get a list of instances.
|
|
|
|
|
* @param namespace namespace
|
|
|
|
|
* @param serviceName service name
|
|
|
|
|
* @return list of instances
|
|
|
|
|
*/
|
|
|
|
|
public InstancesResponse getAllInstances(String namespace, String serviceName) {
|
|
|
|
|
GetAllInstancesRequest request = new GetAllInstancesRequest();
|
|
|
|
|
request.setNamespace(namespace);
|
|
|
|
|
request.setService(serviceName);
|
|
|
|
|
return consumerAPI.getAllInstance(request);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|