@ -15,9 +15,10 @@
* specific language governing permissions and limitations under the License .
* /
package com.tencent.cloud.polaris. rout er;
package com.tencent.cloud.polaris. loadbalanc er;
import java.util.ArrayList ;
import java.util.Collections ;
import java.util.List ;
import java.util.Map ;
@ -28,49 +29,105 @@ import com.netflix.loadbalancer.IRule;
import com.netflix.loadbalancer.PollingServerListUpdater ;
import com.netflix.loadbalancer.Server ;
import com.netflix.loadbalancer.ServerList ;
import com.tencent.cloud.common.constant.ContextConstant ;
import com.tencent.cloud.common.constant.MetadataConstant.SystemMetadataKey ;
import com.tencent.cloud.common.metadata.MetadataContext ;
import com.tencent.cloud.common.metadata.MetadataContextHolder ;
import com.tencent.cloud.common.pojo.PolarisServer ;
import com.tencent.cloud.polaris.loadbalancer.config.PolarisLoadBalancerProperties ;
import com.tencent.polaris.api.core.ConsumerAPI ;
import com.tencent.polaris.api.pojo.DefaultInstance ;
import com.tencent.polaris.api.pojo.DefaultServiceInstances ;
import com.tencent.polaris.api.pojo.Instance ;
import com.tencent.polaris.api.pojo.ServiceInfo ;
import com.tencent.polaris.api.pojo.ServiceInstances ;
import com.tencent.polaris.api.pojo.ServiceKey ;
import com.tencent.polaris.api.rpc.GetAllInstancesRequest ;
import com.tencent.polaris.api.rpc.InstancesResponse ;
import com.tencent.polaris.router.api.core.RouterAPI ;
import com.tencent.polaris.router.api.rpc.ProcessRoutersRequest ;
import com.tencent.polaris.router.api.rpc.ProcessRoutersResponse ;
import org.apache.commons.collections.CollectionUtils ;
import org.apache.commons.lang.StringUtils ;
import org.springframework.util.CollectionUtils ;
/ * *
* Routing load balancer of polaris .
*
* @author Haotian Zhang
* /
public class Polaris Routing LoadBalancer extends DynamicServerListLoadBalancer < Server > {
public class Polaris LoadBalancer extends DynamicServerListLoadBalancer < Server > {
private final RouterAPI routerAPI ;
public PolarisRoutingLoadBalancer ( IClientConfig config , IRule rule , IPing ping ,
ServerList < Server > serverList , RouterAPI routerAPI ) {
private ConsumerAPI consumerAPI ;
private PolarisLoadBalancerProperties polarisLoadBalancerProperties ;
public PolarisLoadBalancer ( IClientConfig config , IRule rule , IPing ping , ServerList < Server > serverList ,
RouterAPI routerAPI , ConsumerAPI consumerAPI , PolarisLoadBalancerProperties properties ) {
super ( config , rule , ping , serverList , null , new PollingServerListUpdater ( ) ) ;
this . routerAPI = routerAPI ;
this . consumerAPI = consumerAPI ;
this . polarisLoadBalancerProperties = properties ;
}
@Override
public List < Server > getReachableServers ( ) {
ServiceInstances serviceInstances ;
if ( polarisLoadBalancerProperties . getDiscoveryType ( ) . equals ( ContextConstant . POLARIS ) ) {
serviceInstances = getPolarisDiscoveryServiceInstances ( ) ;
}
else {
serviceInstances = getExtendDiscoveryServiceInstances ( ) ;
}
if ( serviceInstances = = null | | CollectionUtils . isEmpty ( serviceInstances . getInstances ( ) ) ) {
return Collections . emptyList ( ) ;
}
ProcessRoutersRequest processRoutersRequest = new ProcessRoutersRequest ( ) ;
processRoutersRequest . setDstInstances ( serviceInstances ) ;
String srcNamespace = MetadataContext . LOCAL_NAMESPACE ;
String srcService = MetadataContext . LOCAL_SERVICE ;
Map < String , String > transitiveCustomMetadata = MetadataContextHolder . get ( )
. getAllTransitiveCustomMetadata ( ) ;
String method = MetadataContextHolder . get ( )
. getSystemMetadata ( SystemMetadataKey . PEER_PATH ) ;
processRoutersRequest . setMethod ( method ) ;
if ( StringUtils . isNotBlank ( srcNamespace ) & & StringUtils . isNotBlank ( srcService ) ) {
ServiceInfo serviceInfo = new ServiceInfo ( ) ;
serviceInfo . setNamespace ( srcNamespace ) ;
serviceInfo . setService ( srcService ) ;
serviceInfo . setMetadata ( transitiveCustomMetadata ) ;
processRoutersRequest . setSourceService ( serviceInfo ) ;
}
ProcessRoutersResponse processRoutersResponse = routerAPI
. processRouters ( processRoutersRequest ) ;
ServiceInstances filteredServiceInstances = processRoutersResponse
. getServiceInstances ( ) ;
List < Server > filteredInstances = new ArrayList < > ( ) ;
for ( Instance instance : filteredServiceInstances . getInstances ( ) ) {
filteredInstances . add ( new PolarisServer ( serviceInstances , instance ) ) ;
}
return filteredInstances ;
}
private ServiceInstances getPolarisDiscoveryServiceInstances ( ) {
String serviceName = MetadataContextHolder . get ( ) . getSystemMetadata ( SystemMetadataKey . PEER_SERVICE ) ;
if ( StringUtils . isBlank ( serviceName ) ) {
List < Server > allServers = super . getAllServers ( ) ;
if ( CollectionUtils . isEmpty ( allServers ) ) {
return allServers ;
return null ;
}
ServiceInstances serviceInstances = null ;
if ( allServers . get ( 0 ) instanceof PolarisServer ) {
serviceInstances = ( ( PolarisServer ) allServers . get ( 0 ) ) . getServiceInstances ( ) ;
serviceName = ( ( PolarisServer ) super . getAllServers ( ) . get ( 0 ) ) . getServiceInstances ( ) . getService ( ) ;
}
else {
return getAllInstances ( MetadataContext . LOCAL_NAMESPACE , serviceName ) . toServiceInstances ( ) ;
}
private ServiceInstances getExtendDiscoveryServiceInstances ( ) {
List < Server > allServers = super . getAllServers ( ) ;
if ( CollectionUtils . isEmpty ( allServers ) ) {
return null ;
}
ServiceInstances serviceInstances ;
String serviceName ;
// notice the difference between different service registries
if ( StringUtils . isNotBlank (
@ -82,7 +139,7 @@ public class PolarisRoutingLoadBalancer extends DynamicServerListLoadBalancer<Se
}
if ( StringUtils . isBlank ( serviceName ) ) {
throw new IllegalStateException (
"Polaris Routing LoadBalancer only Server with AppName or ServiceIdForDiscovery attribute") ;
"Polaris LoadBalancer only Server with AppName or ServiceIdForDiscovery attribute") ;
}
ServiceKey serviceKey = new ServiceKey ( MetadataContext . LOCAL_NAMESPACE ,
serviceName ) ;
@ -101,32 +158,7 @@ public class PolarisRoutingLoadBalancer extends DynamicServerListLoadBalancer<Se
instances . add ( instance ) ;
}
serviceInstances = new DefaultServiceInstances ( serviceKey , instances ) ;
}
ProcessRoutersRequest processRoutersRequest = new ProcessRoutersRequest ( ) ;
processRoutersRequest . setDstInstances ( serviceInstances ) ;
String srcNamespace = MetadataContext . LOCAL_NAMESPACE ;
String srcService = MetadataContext . LOCAL_SERVICE ;
Map < String , String > transitiveCustomMetadata = MetadataContextHolder . get ( )
. getAllTransitiveCustomMetadata ( ) ;
String method = MetadataContextHolder . get ( )
. getSystemMetadata ( SystemMetadataKey . PEER_PATH ) ;
processRoutersRequest . setMethod ( method ) ;
if ( StringUtils . isNotBlank ( srcNamespace ) & & StringUtils . isNotBlank ( srcService ) ) {
ServiceInfo serviceInfo = new ServiceInfo ( ) ;
serviceInfo . setNamespace ( srcNamespace ) ;
serviceInfo . setService ( srcService ) ;
serviceInfo . setMetadata ( transitiveCustomMetadata ) ;
processRoutersRequest . setSourceService ( serviceInfo ) ;
}
ProcessRoutersResponse processRoutersResponse = routerAPI
. processRouters ( processRoutersRequest ) ;
ServiceInstances filteredServiceInstances = processRoutersResponse
. getServiceInstances ( ) ;
List < Server > filteredInstances = new ArrayList < > ( ) ;
for ( Instance instance : filteredServiceInstances . getInstances ( ) ) {
filteredInstances . add ( new PolarisServer ( serviceInstances , instance ) ) ;
}
return filteredInstances ;
return serviceInstances ;
}
@Override
@ -134,4 +166,17 @@ public class PolarisRoutingLoadBalancer extends DynamicServerListLoadBalancer<Se
return getReachableServers ( ) ;
}
/ * *
* Get a list of instances .
* @param namespace namespace
* @param serviceName service name
* @return list of instances
* /
public InstancesResponse getAllInstances ( String namespace , String serviceName ) {
GetAllInstancesRequest request = new GetAllInstancesRequest ( ) ;
request . setNamespace ( namespace ) ;
request . setService ( serviceName ) ;
return consumerAPI . getAllInstance ( request ) ;
}
}