fix router concurrent bug&fix spring-retry circuit breaker not work bug

pull/676/head
lepdou 3 years ago
parent e26520ea54
commit ed4d977605

@ -89,7 +89,7 @@
<properties> <properties>
<!-- Project revision --> <!-- Project revision -->
<revision>1.8.1-Hoxton.SR12</revision> <revision>1.8.2-Hoxton.SR12-SNAPSHOT</revision>
<!-- Spring Cloud --> <!-- Spring Cloud -->
<spring.cloud.version>Hoxton.SR12</spring.cloud.version> <spring.cloud.version>Hoxton.SR12</spring.cloud.version>

@ -20,6 +20,7 @@ package com.tencent.cloud.polaris.router;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import com.netflix.client.config.IClientConfig; import com.netflix.client.config.IClientConfig;
import com.netflix.loadbalancer.AbstractLoadBalancerRule; import com.netflix.loadbalancer.AbstractLoadBalancerRule;
@ -37,6 +38,7 @@ import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.common.pojo.PolarisServer; import com.tencent.cloud.common.pojo.PolarisServer;
import com.tencent.cloud.common.util.JacksonUtils; import com.tencent.cloud.common.util.JacksonUtils;
import com.tencent.cloud.polaris.loadbalancer.LoadBalancerUtils; import com.tencent.cloud.polaris.loadbalancer.LoadBalancerUtils;
import com.tencent.cloud.polaris.loadbalancer.PolarisLoadBalancer;
import com.tencent.cloud.polaris.loadbalancer.PolarisWeightedRule; import com.tencent.cloud.polaris.loadbalancer.PolarisWeightedRule;
import com.tencent.cloud.polaris.loadbalancer.config.PolarisLoadBalancerProperties; import com.tencent.cloud.polaris.loadbalancer.config.PolarisLoadBalancerProperties;
import com.tencent.cloud.polaris.router.spi.RouterRequestInterceptor; import com.tencent.cloud.polaris.router.spi.RouterRequestInterceptor;
@ -84,6 +86,8 @@ public class PolarisLoadBalancerCompositeRule extends AbstractLoadBalancerRule {
private final AbstractLoadBalancerRule delegateRule; private final AbstractLoadBalancerRule delegateRule;
private final AtomicBoolean initializedLoadBalancerToDelegateRule = new AtomicBoolean(false);
public PolarisLoadBalancerCompositeRule(RouterAPI routerAPI, public PolarisLoadBalancerCompositeRule(RouterAPI routerAPI,
PolarisLoadBalancerProperties polarisLoadBalancerProperties, PolarisLoadBalancerProperties polarisLoadBalancerProperties,
IClientConfig iClientConfig, IClientConfig iClientConfig,
@ -111,22 +115,31 @@ public class PolarisLoadBalancerCompositeRule extends AbstractLoadBalancerRule {
@Override @Override
public Server choose(Object key) { public Server choose(Object key) {
// 1. get all servers ILoadBalancer loadBalancer = getLoadBalancer();
List<Server> allServers = getLoadBalancer().getReachableServers();
if (!(loadBalancer instanceof PolarisLoadBalancer)) {
return loadBalancer.chooseServer(key);
}
// set load balancer to delegate rule
if (!initializedLoadBalancerToDelegateRule.compareAndSet(false, true)) {
delegateRule.setLoadBalancer(loadBalancer);
}
PolarisLoadBalancer polarisLoadBalancer = (PolarisLoadBalancer) loadBalancer;
// 1. get all servers from polaris client
List<Server> allServers = polarisLoadBalancer.getReachableServersWithoutCache();
if (CollectionUtils.isEmpty(allServers)) { if (CollectionUtils.isEmpty(allServers)) {
return null; return null;
} }
ILoadBalancer loadBalancer = new SimpleLoadBalancer();
// 2. filter by router // 2. filter by router
List<Server> serversAfterRouter;
if (key instanceof PolarisRouterContext) { if (key instanceof PolarisRouterContext) {
// router implement for Feign and scg // router implement for Feign and scg
PolarisRouterContext routerContext = (PolarisRouterContext) key; PolarisRouterContext routerContext = (PolarisRouterContext) key;
List<Server> serversAfterRouter = doRouter(allServers, routerContext); serversAfterRouter = doRouter(allServers, routerContext);
// 3. filter by load balance.
// A LoadBalancer needs to be regenerated for each request,
// because the list of servers may be different after filtered by router
loadBalancer.addServers(serversAfterRouter);
} }
else if (key instanceof HttpRequest) { else if (key instanceof HttpRequest) {
// router implement for rest template // router implement for rest template
@ -135,20 +148,20 @@ public class PolarisLoadBalancerCompositeRule extends AbstractLoadBalancerRule {
String routerContextStr = request.getHeaders().getFirst(RouterConstant.HEADER_ROUTER_CONTEXT); String routerContextStr = request.getHeaders().getFirst(RouterConstant.HEADER_ROUTER_CONTEXT);
if (StringUtils.isEmpty(routerContextStr)) { if (StringUtils.isEmpty(routerContextStr)) {
loadBalancer.addServers(allServers); serversAfterRouter = allServers;
} }
else { else {
PolarisRouterContext routerContext = JacksonUtils.deserialize(UriEncoder.decode(routerContextStr), PolarisRouterContext routerContext = JacksonUtils.deserialize(UriEncoder.decode(routerContextStr),
PolarisRouterContext.class); PolarisRouterContext.class);
List<Server> serversAfterRouter = doRouter(allServers, routerContext); serversAfterRouter = doRouter(allServers, routerContext);
loadBalancer.addServers(serversAfterRouter);
} }
} }
else { else {
loadBalancer.addServers(allServers); serversAfterRouter = allServers;
} }
delegateRule.setLoadBalancer(loadBalancer); // 3. put filtered servers to thread local, so delegate rule choose servers from filtered servers.
polarisLoadBalancer.addServers(serversAfterRouter);
return delegateRule.choose(key); return delegateRule.choose(key);
} }

@ -70,7 +70,7 @@
</developers> </developers>
<properties> <properties>
<revision>1.8.1-Hoxton.SR12</revision> <revision>1.8.2-Hoxton.SR12-SNAPSHOT</revision>
<polaris.version>1.9.1</polaris.version> <polaris.version>1.9.1</polaris.version>
<logback.version>1.2.11</logback.version> <logback.version>1.2.11</logback.version>
<mocktio.version>4.5.1</mocktio.version> <mocktio.version>4.5.1</mocktio.version>

@ -52,6 +52,7 @@ import org.springframework.util.CollectionUtils;
* @author Haotian Zhang * @author Haotian Zhang
*/ */
public class PolarisLoadBalancer extends DynamicServerListLoadBalancer<Server> { public class PolarisLoadBalancer extends DynamicServerListLoadBalancer<Server> {
private static final ThreadLocal<List<Server>> THREAD_CACHE_SERVERS = new ThreadLocal<>();
private final ConsumerAPI consumerAPI; private final ConsumerAPI consumerAPI;
@ -64,8 +65,21 @@ public class PolarisLoadBalancer extends DynamicServerListLoadBalancer<Server> {
this.polarisLoadBalancerProperties = properties; this.polarisLoadBalancerProperties = properties;
} }
@Override
public void addServers(List<Server> servers) {
THREAD_CACHE_SERVERS.set(servers);
}
@Override @Override
public List<Server> getReachableServers() { public List<Server> getReachableServers() {
// Get servers first from the thread context
if (!CollectionUtils.isEmpty(THREAD_CACHE_SERVERS.get())) {
return THREAD_CACHE_SERVERS.get();
}
return getReachableServersWithoutCache();
}
public List<Server> getReachableServersWithoutCache() {
ServiceInstances serviceInstances; ServiceInstances serviceInstances;
if (polarisLoadBalancerProperties.getDiscoveryType().equals(ContextConstant.POLARIS)) { if (polarisLoadBalancerProperties.getDiscoveryType().equals(ContextConstant.POLARIS)) {
serviceInstances = getPolarisDiscoveryServiceInstances(); serviceInstances = getPolarisDiscoveryServiceInstances();

Loading…
Cancel
Save