From ed4d97760579d508c862a72e1922028be46edff6 Mon Sep 17 00:00:00 2001 From: lepdou Date: Mon, 31 Oct 2022 19:43:34 +0800 Subject: [PATCH] fix router concurrent bug&fix spring-retry circuit breaker not work bug --- pom.xml | 2 +- .../PolarisLoadBalancerCompositeRule.java | 39 ++++++++++++------- spring-cloud-tencent-dependencies/pom.xml | 2 +- .../loadbalancer/PolarisLoadBalancer.java | 14 +++++++ 4 files changed, 42 insertions(+), 15 deletions(-) diff --git a/pom.xml b/pom.xml index ac88a75bb..d75106f5a 100644 --- a/pom.xml +++ b/pom.xml @@ -89,7 +89,7 @@ - 1.8.1-Hoxton.SR12 + 1.8.2-Hoxton.SR12-SNAPSHOT Hoxton.SR12 diff --git a/spring-cloud-starter-tencent-polaris-router/src/main/java/com/tencent/cloud/polaris/router/PolarisLoadBalancerCompositeRule.java b/spring-cloud-starter-tencent-polaris-router/src/main/java/com/tencent/cloud/polaris/router/PolarisLoadBalancerCompositeRule.java index 7581ca469..947444de4 100644 --- a/spring-cloud-starter-tencent-polaris-router/src/main/java/com/tencent/cloud/polaris/router/PolarisLoadBalancerCompositeRule.java +++ b/spring-cloud-starter-tencent-polaris-router/src/main/java/com/tencent/cloud/polaris/router/PolarisLoadBalancerCompositeRule.java @@ -20,6 +20,7 @@ package com.tencent.cloud.polaris.router; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import com.netflix.client.config.IClientConfig; import com.netflix.loadbalancer.AbstractLoadBalancerRule; @@ -37,6 +38,7 @@ import com.tencent.cloud.common.metadata.MetadataContext; import com.tencent.cloud.common.pojo.PolarisServer; import com.tencent.cloud.common.util.JacksonUtils; import com.tencent.cloud.polaris.loadbalancer.LoadBalancerUtils; +import com.tencent.cloud.polaris.loadbalancer.PolarisLoadBalancer; import com.tencent.cloud.polaris.loadbalancer.PolarisWeightedRule; import com.tencent.cloud.polaris.loadbalancer.config.PolarisLoadBalancerProperties; import com.tencent.cloud.polaris.router.spi.RouterRequestInterceptor; @@ -84,6 +86,8 @@ public class PolarisLoadBalancerCompositeRule extends AbstractLoadBalancerRule { private final AbstractLoadBalancerRule delegateRule; + private final AtomicBoolean initializedLoadBalancerToDelegateRule = new AtomicBoolean(false); + public PolarisLoadBalancerCompositeRule(RouterAPI routerAPI, PolarisLoadBalancerProperties polarisLoadBalancerProperties, IClientConfig iClientConfig, @@ -111,22 +115,31 @@ public class PolarisLoadBalancerCompositeRule extends AbstractLoadBalancerRule { @Override public Server choose(Object key) { - // 1. get all servers - List allServers = getLoadBalancer().getReachableServers(); + ILoadBalancer loadBalancer = getLoadBalancer(); + + if (!(loadBalancer instanceof PolarisLoadBalancer)) { + return loadBalancer.chooseServer(key); + } + + // set load balancer to delegate rule + if (!initializedLoadBalancerToDelegateRule.compareAndSet(false, true)) { + delegateRule.setLoadBalancer(loadBalancer); + } + + PolarisLoadBalancer polarisLoadBalancer = (PolarisLoadBalancer) loadBalancer; + + // 1. get all servers from polaris client + List allServers = polarisLoadBalancer.getReachableServersWithoutCache(); if (CollectionUtils.isEmpty(allServers)) { return null; } - ILoadBalancer loadBalancer = new SimpleLoadBalancer(); // 2. filter by router + List serversAfterRouter; if (key instanceof PolarisRouterContext) { // router implement for Feign and scg PolarisRouterContext routerContext = (PolarisRouterContext) key; - List serversAfterRouter = doRouter(allServers, routerContext); - // 3. filter by load balance. - // A LoadBalancer needs to be regenerated for each request, - // because the list of servers may be different after filtered by router - loadBalancer.addServers(serversAfterRouter); + serversAfterRouter = doRouter(allServers, routerContext); } else if (key instanceof HttpRequest) { // router implement for rest template @@ -135,20 +148,20 @@ public class PolarisLoadBalancerCompositeRule extends AbstractLoadBalancerRule { String routerContextStr = request.getHeaders().getFirst(RouterConstant.HEADER_ROUTER_CONTEXT); if (StringUtils.isEmpty(routerContextStr)) { - loadBalancer.addServers(allServers); + serversAfterRouter = allServers; } else { PolarisRouterContext routerContext = JacksonUtils.deserialize(UriEncoder.decode(routerContextStr), PolarisRouterContext.class); - List serversAfterRouter = doRouter(allServers, routerContext); - loadBalancer.addServers(serversAfterRouter); + serversAfterRouter = doRouter(allServers, routerContext); } } else { - loadBalancer.addServers(allServers); + serversAfterRouter = allServers; } - delegateRule.setLoadBalancer(loadBalancer); + // 3. put filtered servers to thread local, so delegate rule choose servers from filtered servers. + polarisLoadBalancer.addServers(serversAfterRouter); return delegateRule.choose(key); } diff --git a/spring-cloud-tencent-dependencies/pom.xml b/spring-cloud-tencent-dependencies/pom.xml index 4622d3d69..532b7256a 100644 --- a/spring-cloud-tencent-dependencies/pom.xml +++ b/spring-cloud-tencent-dependencies/pom.xml @@ -70,7 +70,7 @@ - 1.8.1-Hoxton.SR12 + 1.8.2-Hoxton.SR12-SNAPSHOT 1.9.1 1.2.11 4.5.1 diff --git a/spring-cloud-tencent-polaris-loadbalancer/src/main/java/com/tencent/cloud/polaris/loadbalancer/PolarisLoadBalancer.java b/spring-cloud-tencent-polaris-loadbalancer/src/main/java/com/tencent/cloud/polaris/loadbalancer/PolarisLoadBalancer.java index d1c4e3ae1..c3b28f18e 100644 --- a/spring-cloud-tencent-polaris-loadbalancer/src/main/java/com/tencent/cloud/polaris/loadbalancer/PolarisLoadBalancer.java +++ b/spring-cloud-tencent-polaris-loadbalancer/src/main/java/com/tencent/cloud/polaris/loadbalancer/PolarisLoadBalancer.java @@ -52,6 +52,7 @@ import org.springframework.util.CollectionUtils; * @author Haotian Zhang */ public class PolarisLoadBalancer extends DynamicServerListLoadBalancer { + private static final ThreadLocal> THREAD_CACHE_SERVERS = new ThreadLocal<>(); private final ConsumerAPI consumerAPI; @@ -64,8 +65,21 @@ public class PolarisLoadBalancer extends DynamicServerListLoadBalancer { this.polarisLoadBalancerProperties = properties; } + @Override + public void addServers(List servers) { + THREAD_CACHE_SERVERS.set(servers); + } + @Override public List getReachableServers() { + // Get servers first from the thread context + if (!CollectionUtils.isEmpty(THREAD_CACHE_SERVERS.get())) { + return THREAD_CACHE_SERVERS.get(); + } + return getReachableServersWithoutCache(); + } + + public List getReachableServersWithoutCache() { ServiceInstances serviceInstances; if (polarisLoadBalancerProperties.getDiscoveryType().equals(ContextConstant.POLARIS)) { serviceInstances = getPolarisDiscoveryServiceInstances();