feat:支持webclient调用指标上报&返回流控的真实retStatus

pull/924/head
chuntaojun 3 years ago
parent 8975054069
commit b2081651e9

@ -22,6 +22,8 @@ import java.util.function.Supplier;
import com.tencent.cloud.polaris.circuitbreaker.common.PolarisCircuitBreakerConfigBuilder;
import com.tencent.cloud.polaris.circuitbreaker.common.PolarisResultToErrorCode;
import com.tencent.cloud.polaris.circuitbreaker.util.PolarisCircuitBreakerUtils;
import com.tencent.polaris.api.core.ConsumerAPI;
import com.tencent.polaris.api.pojo.ServiceKey;
import com.tencent.polaris.circuitbreak.api.CircuitBreakAPI;
import com.tencent.polaris.circuitbreak.api.FunctionalDecorator;
@ -43,10 +45,18 @@ public class PolarisCircuitBreaker implements CircuitBreaker {
private final FunctionalDecorator decorator;
public PolarisCircuitBreaker(PolarisCircuitBreakerConfigBuilder.PolarisCircuitBreakerConfiguration conf, CircuitBreakAPI circuitBreakAPI) {
private final PolarisCircuitBreakerConfigBuilder.PolarisCircuitBreakerConfiguration conf;
private final ConsumerAPI consumerAPI;
public PolarisCircuitBreaker(PolarisCircuitBreakerConfigBuilder.PolarisCircuitBreakerConfiguration conf,
ConsumerAPI consumerAPI,
CircuitBreakAPI circuitBreakAPI) {
FunctionalDecoratorRequest makeDecoratorRequest = new FunctionalDecoratorRequest(new ServiceKey(conf.getNamespace(), conf.getService()), conf.getMethod());
makeDecoratorRequest.setSourceService(new ServiceKey(conf.getSourceNamespace(), conf.getSourceService()));
makeDecoratorRequest.setResultToErrorCode(new PolarisResultToErrorCode());
this.consumerAPI = consumerAPI;
this.conf = conf;
this.decorator = circuitBreakAPI.makeFunctionalDecorator(makeDecoratorRequest);
}
@ -58,6 +68,7 @@ public class PolarisCircuitBreaker implements CircuitBreaker {
}
catch (CallAbortedException e) {
LOGGER.debug("PolarisCircuitBreaker CallAbortedException: {}", e.getMessage());
PolarisCircuitBreakerUtils.reportStatus(consumerAPI, conf, e);
return fallback.apply(e);
}
catch (Exception e) {
@ -65,4 +76,5 @@ public class PolarisCircuitBreaker implements CircuitBreaker {
}
}
}

@ -21,6 +21,7 @@ import java.util.function.Function;
import com.tencent.cloud.polaris.circuitbreaker.common.PolarisCircuitBreakerConfigBuilder;
import com.tencent.cloud.polaris.circuitbreaker.util.PolarisCircuitBreakerUtils;
import com.tencent.polaris.api.core.ConsumerAPI;
import com.tencent.polaris.circuitbreak.api.CircuitBreakAPI;
import org.springframework.cloud.client.circuitbreaker.CircuitBreaker;
@ -47,15 +48,18 @@ public class PolarisCircuitBreakerFactory
private final CircuitBreakAPI circuitBreakAPI;
public PolarisCircuitBreakerFactory(CircuitBreakAPI circuitBreakAPI) {
private final ConsumerAPI consumerAPI;
public PolarisCircuitBreakerFactory(CircuitBreakAPI circuitBreakAPI, ConsumerAPI consumerAPI) {
this.circuitBreakAPI = circuitBreakAPI;
this.consumerAPI = consumerAPI;
}
@Override
public CircuitBreaker create(String id) {
PolarisCircuitBreakerConfigBuilder.PolarisCircuitBreakerConfiguration conf = getConfigurations()
.computeIfAbsent(id, defaultConfiguration);
return new PolarisCircuitBreaker(conf, circuitBreakAPI);
return new PolarisCircuitBreaker(conf, consumerAPI, circuitBreakAPI);
}
@Override

@ -22,11 +22,16 @@ import java.util.function.Function;
import com.tencent.cloud.polaris.circuitbreaker.common.PolarisCircuitBreakerConfigBuilder;
import com.tencent.cloud.polaris.circuitbreaker.common.PolarisResultToErrorCode;
import com.tencent.cloud.polaris.circuitbreaker.reactor.PolarisCircuitBreakerReactorTransformer;
import com.tencent.cloud.polaris.circuitbreaker.util.PolarisCircuitBreakerUtils;
import com.tencent.polaris.api.core.ConsumerAPI;
import com.tencent.polaris.api.pojo.ServiceKey;
import com.tencent.polaris.circuitbreak.api.CircuitBreakAPI;
import com.tencent.polaris.circuitbreak.api.InvokeHandler;
import com.tencent.polaris.circuitbreak.api.pojo.FunctionalDecoratorRequest;
import com.tencent.polaris.circuitbreak.api.pojo.InvokeContext;
import com.tencent.polaris.circuitbreak.client.exception.CallAbortedException;
import com.tencent.polaris.client.api.SDKContext;
import com.tencent.polaris.factory.api.DiscoveryAPIFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -41,10 +46,18 @@ public class ReactivePolarisCircuitBreaker implements ReactiveCircuitBreaker {
private final InvokeHandler invokeHandler;
public ReactivePolarisCircuitBreaker(PolarisCircuitBreakerConfigBuilder.PolarisCircuitBreakerConfiguration conf, CircuitBreakAPI circuitBreakAPI) {
private final ConsumerAPI consumerAPI;
private final PolarisCircuitBreakerConfigBuilder.PolarisCircuitBreakerConfiguration conf;
public ReactivePolarisCircuitBreaker(PolarisCircuitBreakerConfigBuilder.PolarisCircuitBreakerConfiguration conf,
ConsumerAPI consumerAPI,
CircuitBreakAPI circuitBreakAPI) {
InvokeContext.RequestContext requestContext = new FunctionalDecoratorRequest(new ServiceKey(conf.getNamespace(), conf.getService()), conf.getMethod());
requestContext.setSourceService(new ServiceKey(conf.getSourceNamespace(), conf.getSourceService()));
requestContext.setResultToErrorCode(new PolarisResultToErrorCode());
this.consumerAPI = consumerAPI;
this.conf = conf;
this.invokeHandler = circuitBreakAPI.makeInvokeHandler(requestContext);
}
@ -53,7 +66,12 @@ public class ReactivePolarisCircuitBreaker implements ReactiveCircuitBreaker {
public <T> Mono<T> run(Mono<T> toRun, Function<Throwable, Mono<T>> fallback) {
Mono<T> toReturn = toRun.transform(new PolarisCircuitBreakerReactorTransformer<>(invokeHandler));
if (fallback != null) {
toReturn = toReturn.onErrorResume(fallback);
toReturn = toReturn.onErrorResume(throwable -> {
if (throwable instanceof CallAbortedException) {
PolarisCircuitBreakerUtils.reportStatus(consumerAPI, conf, (CallAbortedException) throwable);
}
return fallback.apply(throwable);
});
}
return toReturn;
}
@ -62,7 +80,12 @@ public class ReactivePolarisCircuitBreaker implements ReactiveCircuitBreaker {
public <T> Flux<T> run(Flux<T> toRun, Function<Throwable, Flux<T>> fallback) {
Flux<T> toReturn = toRun.transform(new PolarisCircuitBreakerReactorTransformer<>(invokeHandler));
if (fallback != null) {
toReturn = toReturn.onErrorResume(fallback);
toReturn = toReturn.onErrorResume(throwable -> {
if (throwable instanceof CallAbortedException) {
PolarisCircuitBreakerUtils.reportStatus(consumerAPI, conf, (CallAbortedException) throwable);
}
return fallback.apply(throwable);
});
}
return toReturn;
}

@ -21,6 +21,7 @@ import java.util.function.Function;
import com.tencent.cloud.polaris.circuitbreaker.common.PolarisCircuitBreakerConfigBuilder;
import com.tencent.cloud.polaris.circuitbreaker.util.PolarisCircuitBreakerUtils;
import com.tencent.polaris.api.core.ConsumerAPI;
import com.tencent.polaris.circuitbreak.api.CircuitBreakAPI;
import org.springframework.cloud.client.circuitbreaker.ReactiveCircuitBreaker;
@ -46,8 +47,11 @@ public class ReactivePolarisCircuitBreakerFactory extends
private final CircuitBreakAPI circuitBreakAPI;
public ReactivePolarisCircuitBreakerFactory(CircuitBreakAPI circuitBreakAPI) {
private final ConsumerAPI consumerAPI;
public ReactivePolarisCircuitBreakerFactory(CircuitBreakAPI circuitBreakAPI, ConsumerAPI consumerAPI) {
this.circuitBreakAPI = circuitBreakAPI;
this.consumerAPI = consumerAPI;
}
@ -55,7 +59,7 @@ public class ReactivePolarisCircuitBreakerFactory extends
public ReactiveCircuitBreaker create(String id) {
PolarisCircuitBreakerConfigBuilder.PolarisCircuitBreakerConfiguration conf = getConfigurations()
.computeIfAbsent(id, defaultConfiguration);
return new ReactivePolarisCircuitBreaker(conf, circuitBreakAPI);
return new ReactivePolarisCircuitBreaker(conf, consumerAPI, circuitBreakAPI);
}
@Override

@ -25,6 +25,7 @@ import com.tencent.cloud.polaris.circuitbreaker.common.CircuitBreakerConfigModif
import com.tencent.cloud.polaris.circuitbreaker.resttemplate.PolarisCircuitBreakerRestTemplateBeanPostProcessor;
import com.tencent.cloud.rpc.enhancement.config.RpcEnhancementAutoConfiguration;
import com.tencent.cloud.rpc.enhancement.config.RpcEnhancementReporterProperties;
import com.tencent.polaris.api.core.ConsumerAPI;
import com.tencent.polaris.circuitbreak.api.CircuitBreakAPI;
import com.tencent.polaris.circuitbreak.factory.CircuitBreakAPIFactory;
import com.tencent.polaris.client.api.SDKContext;
@ -62,8 +63,8 @@ public class PolarisCircuitBreakerAutoConfiguration {
@Bean
@ConditionalOnMissingBean(CircuitBreakerFactory.class)
public CircuitBreakerFactory polarisCircuitBreakerFactory(CircuitBreakAPI circuitBreakAPI) {
PolarisCircuitBreakerFactory factory = new PolarisCircuitBreakerFactory(circuitBreakAPI);
public CircuitBreakerFactory polarisCircuitBreakerFactory(CircuitBreakAPI circuitBreakAPI, ConsumerAPI consumerAPI) {
PolarisCircuitBreakerFactory factory = new PolarisCircuitBreakerFactory(circuitBreakAPI, consumerAPI);
customizers.forEach(customizer -> customizer.customize(factory));
return factory;
}

@ -24,6 +24,7 @@ import com.tencent.cloud.polaris.circuitbreaker.ReactivePolarisCircuitBreakerFac
import com.tencent.cloud.polaris.circuitbreaker.common.CircuitBreakerConfigModifier;
import com.tencent.cloud.rpc.enhancement.config.RpcEnhancementAutoConfiguration;
import com.tencent.cloud.rpc.enhancement.config.RpcEnhancementReporterProperties;
import com.tencent.polaris.api.core.ConsumerAPI;
import com.tencent.polaris.circuitbreak.api.CircuitBreakAPI;
import com.tencent.polaris.circuitbreak.factory.CircuitBreakAPIFactory;
import com.tencent.polaris.client.api.SDKContext;
@ -60,8 +61,8 @@ public class ReactivePolarisCircuitBreakerAutoConfiguration {
@Bean
@ConditionalOnMissingBean(ReactiveCircuitBreakerFactory.class)
public ReactiveCircuitBreakerFactory polarisReactiveCircuitBreakerFactory(CircuitBreakAPI circuitBreakAPI) {
ReactivePolarisCircuitBreakerFactory factory = new ReactivePolarisCircuitBreakerFactory(circuitBreakAPI);
public ReactiveCircuitBreakerFactory polarisReactiveCircuitBreakerFactory(CircuitBreakAPI circuitBreakAPI, ConsumerAPI consumerAPI) {
ReactivePolarisCircuitBreakerFactory factory = new ReactivePolarisCircuitBreakerFactory(circuitBreakAPI, consumerAPI);
customizers.forEach(customizer -> customizer.customize(factory));
return factory;
}

@ -19,6 +19,14 @@ package com.tencent.cloud.polaris.circuitbreaker.util;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.polaris.circuitbreaker.common.PolarisCircuitBreakerConfigBuilder;
import com.tencent.polaris.api.core.ConsumerAPI;
import com.tencent.polaris.api.pojo.RetStatus;
import com.tencent.polaris.api.pojo.ServiceKey;
import com.tencent.polaris.api.rpc.ServiceCallResult;
import com.tencent.polaris.circuitbreak.client.exception.CallAbortedException;
import com.tencent.polaris.discovery.client.api.DefaultConsumerAPI;
import org.apache.commons.lang.StringUtils;
import org.springframework.util.Assert;
/**
@ -51,4 +59,22 @@ public final class PolarisCircuitBreakerUtils {
return new String[]{MetadataContext.LOCAL_NAMESPACE, id, ""};
}
public static void reportStatus(ConsumerAPI consumerAPI,
PolarisCircuitBreakerConfigBuilder.PolarisCircuitBreakerConfiguration conf, CallAbortedException e) {
ServiceCallResult result = new ServiceCallResult();
result.setMethod(conf.getMethod());
result.setNamespace(conf.getNamespace());
result.setService(conf.getService());
result.setRuleName(e.getRuleName());
result.setRetStatus(RetStatus.RetReject);
result.setCallerService(new ServiceKey(conf.getSourceNamespace(), conf.getSourceService()));
String callerIp = ((DefaultConsumerAPI) consumerAPI).getSDKContext().getConfig().getGlobal().getAPI().getBindIP();
if (StringUtils.isNotBlank(callerIp)) {
result.setCallerIp(callerIp);
}
consumerAPI.updateServiceCallResult(result);
}
}

@ -39,6 +39,7 @@ import com.tencent.polaris.api.core.ConsumerAPI;
import com.tencent.polaris.api.pojo.ServiceKey;
import com.tencent.polaris.circuitbreak.api.CircuitBreakAPI;
import com.tencent.polaris.circuitbreak.factory.CircuitBreakAPIFactory;
import com.tencent.polaris.client.api.SDKContext;
import com.tencent.polaris.client.util.Utils;
import com.tencent.polaris.specification.api.v1.fault.tolerance.CircuitBreakerProto;
import com.tencent.polaris.test.common.TestUtils;
@ -175,9 +176,9 @@ public class PolarisCircuitBreakerIntegrationTest {
@Bean
@PolarisCircuitBreaker(fallback = "fallback")
public RestTemplate defaultRestTemplate(RpcEnhancementReporterProperties properties, ConsumerAPI consumerAPI) {
public RestTemplate defaultRestTemplate(RpcEnhancementReporterProperties properties, SDKContext context, ConsumerAPI consumerAPI) {
RestTemplate defaultRestTemplate = new RestTemplate();
EnhancedRestTemplateReporter enhancedRestTemplateReporter = new EnhancedRestTemplateReporter(properties, consumerAPI);
EnhancedRestTemplateReporter enhancedRestTemplateReporter = new EnhancedRestTemplateReporter(properties, context, consumerAPI);
defaultRestTemplate.setErrorHandler(enhancedRestTemplateReporter);
return defaultRestTemplate;
}

@ -31,10 +31,12 @@ import java.util.stream.Collectors;
import com.google.protobuf.util.JsonFormat;
import com.tencent.cloud.common.util.ApplicationContextAwareUtils;
import com.tencent.polaris.api.config.Configuration;
import com.tencent.polaris.api.core.ConsumerAPI;
import com.tencent.polaris.api.pojo.ServiceKey;
import com.tencent.polaris.circuitbreak.api.CircuitBreakAPI;
import com.tencent.polaris.circuitbreak.factory.CircuitBreakAPIFactory;
import com.tencent.polaris.client.util.Utils;
import com.tencent.polaris.factory.api.DiscoveryAPIFactory;
import com.tencent.polaris.specification.api.v1.fault.tolerance.CircuitBreakerProto;
import com.tencent.polaris.test.common.TestUtils;
import com.tencent.polaris.test.mock.discovery.NamingServer;
@ -104,8 +106,9 @@ public class PolarisCircuitBreakerMockServerTest {
public void testCircuitBreaker() {
Configuration configuration = TestUtils.configWithEnvAddress();
CircuitBreakAPI circuitBreakAPI = CircuitBreakAPIFactory.createCircuitBreakAPIByConfig(configuration);
ConsumerAPI consumerAPI = DiscoveryAPIFactory.createConsumerAPIByConfig(configuration);
PolarisCircuitBreakerFactory polarisCircuitBreakerFactory = new PolarisCircuitBreakerFactory(circuitBreakAPI);
PolarisCircuitBreakerFactory polarisCircuitBreakerFactory = new PolarisCircuitBreakerFactory(circuitBreakAPI, consumerAPI);
CircuitBreaker cb = polarisCircuitBreakerFactory.create(SERVICE_CIRCUIT_BREAKER);
// trigger fallback for 5 times
@ -126,7 +129,7 @@ public class PolarisCircuitBreakerMockServerTest {
assertThat(resList).isEqualTo(Arrays.asList("invoke success", "fallback", "fallback", "fallback", "fallback"));
// always fallback
ReactivePolarisCircuitBreakerFactory reactivePolarisCircuitBreakerFactory = new ReactivePolarisCircuitBreakerFactory(circuitBreakAPI);
ReactivePolarisCircuitBreakerFactory reactivePolarisCircuitBreakerFactory = new ReactivePolarisCircuitBreakerFactory(circuitBreakAPI, consumerAPI);
ReactiveCircuitBreaker rcb = reactivePolarisCircuitBreakerFactory.create(SERVICE_CIRCUIT_BREAKER);
assertThat(Mono.just("foobar").transform(it -> rcb.run(it, t -> Mono.just("fallback")))

@ -18,6 +18,10 @@
package com.tencent.cloud.polaris.loadbalancer;
import com.tencent.cloud.polaris.context.ConditionalOnPolarisEnabled;
import com.tencent.cloud.polaris.loadbalancer.reactive.PolarisLoadBalancerClientRequestTransformer;
import com.tencent.polaris.api.core.ConsumerAPI;
import com.tencent.polaris.client.api.SDKContext;
import com.tencent.polaris.factory.api.DiscoveryAPIFactory;
import com.tencent.polaris.router.api.core.RouterAPI;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
@ -29,6 +33,7 @@ import org.springframework.cloud.client.ConditionalOnReactiveDiscoveryEnabled;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient;
import org.springframework.cloud.client.loadbalancer.reactive.LoadBalancerClientRequestTransformer;
import org.springframework.cloud.loadbalancer.core.ReactorLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
@ -95,4 +100,12 @@ public class PolarisLoadBalancerClientConfiguration {
ServiceInstanceListSupplier.builder().withBlockingDiscoveryClient().build(context));
}
}
@Bean
@ConditionalOnMissingBean
public LoadBalancerClientRequestTransformer polarisLoadBalancerClientRequestTransformer(
SDKContext sdkContext) {
ConsumerAPI consumerAPI = DiscoveryAPIFactory.createConsumerAPIByContext(sdkContext);
return new PolarisLoadBalancerClientRequestTransformer(consumerAPI);
}
}

@ -0,0 +1,43 @@
/*
* Tencent is pleased to support the open source community by making Spring Cloud Tencent available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.polaris.loadbalancer.reactive;
import com.tencent.cloud.common.constant.HeaderConstant;
import com.tencent.polaris.api.core.ConsumerAPI;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.reactive.LoadBalancerClientRequestTransformer;
import org.springframework.http.HttpHeaders;
import org.springframework.web.reactive.function.client.ClientRequest;
public class PolarisLoadBalancerClientRequestTransformer implements LoadBalancerClientRequestTransformer {
private final ConsumerAPI consumerAPI;
public PolarisLoadBalancerClientRequestTransformer(ConsumerAPI consumerAPI) {
this.consumerAPI = consumerAPI;
}
@Override
public ClientRequest transformRequest(ClientRequest request, ServiceInstance instance) {
if (instance != null) {
HttpHeaders headers = request.headers();
headers.add(HeaderConstant.INTERNAL_CALLEE_SERVICE_ID, instance.getServiceId());
}
return request;
}
}

@ -25,6 +25,7 @@ import java.util.Set;
import javax.annotation.PostConstruct;
import com.tencent.cloud.common.constant.HeaderConstant;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.polaris.ratelimit.config.PolarisRateLimitProperties;
import com.tencent.cloud.polaris.ratelimit.constant.RateLimitConstant;
@ -32,6 +33,7 @@ import com.tencent.cloud.polaris.ratelimit.resolver.RateLimitRuleArgumentReactiv
import com.tencent.cloud.polaris.ratelimit.spi.PolarisRateLimiterLimitedFallback;
import com.tencent.cloud.polaris.ratelimit.utils.QuotaCheckUtils;
import com.tencent.cloud.polaris.ratelimit.utils.RateLimitUtils;
import com.tencent.polaris.api.pojo.RetStatus;
import com.tencent.polaris.ratelimit.api.core.LimitAPI;
import com.tencent.polaris.ratelimit.api.rpc.Argument;
import com.tencent.polaris.ratelimit.api.rpc.QuotaResponse;
@ -116,6 +118,10 @@ public class QuotaCheckReactiveFilter implements WebFilter, Ordered {
dataBuffer = response.bufferFactory().allocateBuffer()
.write(rejectTips.getBytes(StandardCharsets.UTF_8));
}
response.getHeaders().add(HeaderConstant.INTERNAL_CALLEE_RET_STATUS, RetStatus.RetFlowControl.getDesc());
if (Objects.nonNull(quotaResponse.getActiveRule())) {
response.getHeaders().add(HeaderConstant.INTERNAL_ACTIVE_RULE_NAME, quotaResponse.getActiveRule().getName().getValue());
}
return response.writeWith(Mono.just(dataBuffer));
}
// Unirate

@ -28,6 +28,7 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import com.tencent.cloud.common.constant.HeaderConstant;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.polaris.ratelimit.config.PolarisRateLimitProperties;
import com.tencent.cloud.polaris.ratelimit.constant.RateLimitConstant;
@ -35,6 +36,7 @@ import com.tencent.cloud.polaris.ratelimit.resolver.RateLimitRuleArgumentServlet
import com.tencent.cloud.polaris.ratelimit.spi.PolarisRateLimiterLimitedFallback;
import com.tencent.cloud.polaris.ratelimit.utils.QuotaCheckUtils;
import com.tencent.cloud.polaris.ratelimit.utils.RateLimitUtils;
import com.tencent.polaris.api.pojo.RetStatus;
import com.tencent.polaris.ratelimit.api.core.LimitAPI;
import com.tencent.polaris.ratelimit.api.rpc.Argument;
import com.tencent.polaris.ratelimit.api.rpc.QuotaResponse;
@ -111,6 +113,10 @@ public class QuotaCheckServletFilter extends OncePerRequestFilter {
response.setContentType("text/html;charset=UTF-8");
response.getWriter().write(rejectTips);
}
response.addHeader(HeaderConstant.INTERNAL_CALLEE_RET_STATUS, RetStatus.RetFlowControl.getDesc());
if (Objects.nonNull(quotaResponse.getActiveRule())) {
response.addHeader(HeaderConstant.INTERNAL_ACTIVE_RULE_NAME, quotaResponse.getActiveRule().getName().getValue());
}
return;
}
// Unirate

@ -0,0 +1,42 @@
/*
* Tencent is pleased to support the open source community by making Spring Cloud Tencent available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.common.constant;
/**
* Built-in system http header fields
*/
public final class HeaderConstant {
/**
* The called service returns the real call result of its own processing request
*/
public static final String INTERNAL_CALLEE_RET_STATUS = "internal-callee-retstatus";
/**
* The name of the rule that the current limit/circiutbreaker rule takes effect
*/
public static final String INTERNAL_ACTIVE_RULE_NAME = "internal-callee-activerule";
/**
* The name information of the called service
*/
public static final String INTERNAL_CALLEE_SERVICE_ID = "internal-callee-serviceid";
private HeaderConstant() {
}
}

@ -73,7 +73,7 @@
<revision>1.11.0-2021.0.6-SNAPSHOT</revision>
<!-- Dependencies -->
<polaris.version>1.11.3-SNAPSHOT</polaris.version>
<polaris.version>1.12.0-SNAPSHOT</polaris.version>
<guava.version>31.0.1-jre</guava.version>
<logback.version>1.2.11</logback.version>
<mocktio.version>4.5.1</mocktio.version>

@ -27,12 +27,14 @@
- 在北极星服务端可以通过控制台在命名空间Production下添加服务RateLimitCalleeService。
- 启动服务被调方:
1. IDE直接启动找到主类 `RateLimitCalleeService`,执行 main 方法启动应用。
2. 打包编译后启动:首先执行 `mvn clean package` 将工程编译打包,然后执行 `java -jar ratelimit-callee-sevice-${verion}.jar`启动应用。
2. 打包编译后启动:首先执行 `mvn clean package` 将工程编译打包
- 执行 `java -jar ratelimit-callee-sevice-${verion}.jar`启动应用
- 执行 `java -jar ratelimit-caller-sevice-${verion}.jar`启动应用
- 启动后,可以在北极星控制台上看到注册上来的服务实例信息。
3. 调用服务
通过浏览器访问http://127.0.0.1:48081/business/invoke可以看到以下输出信息
通过浏览器访问http://127.0.0.1:58080/business/invoke可以看到以下输出信息
````
hello world for ratelimit service 1
hello world for ratelimit service 2

@ -32,13 +32,14 @@ Examples provided by Polaris all support to run at IDE, or compile and run with
- Launch callee server:
1. Launch IDE directly: First find `RateLimitCalleeService`, execute main then launch application
2. compile package then launch: first execute `mvn clean package` compile the package, then execute `java -jar ratelimit-callee-sevice-${verion}.jar` execute the application
2. compile package then launch: first execute `mvn clean package` compile the package
- then execute `java -jar ratelimit-callee-sevice-${verion}.jar` execute the application
- then execute `java -jar ratelimit-caller-sevice-${verion}.jar` execute the application
- After launching, one can watch server instance from Polaris control panel
3. Invoke Service
After visiting http://127.0.0.1:48081/business/invoke, one can see the following information:
After visiting http://127.0.0.1:58080/business/invoke, one can see the following information:
````
hello world for ratelimit service 1
@ -61,7 +62,7 @@ Examples provided by Polaris all support to run at IDE, or compile and run with
![](polaris-ratelimit-ui.png)
5. Verify rate limit result
continue visit http://127.0.0.1:48081/business/invoke, one can see, after 10 invokes, rate limit will start:
continue visit http://127.0.0.1:68080/business/invoke, one can see, after 10 invokes, rate limit will start:
````
hello world for ratelimit service 1

@ -16,6 +16,7 @@
<modules>
<module>ratelimit-callee-service</module>
<module>ratelimit-caller-service</module>
</modules>
<dependencies>

@ -48,6 +48,7 @@ import org.springframework.web.reactive.function.client.WebClientResponseExcepti
*
* @author Haotian Zhang
*/
@Deprecated
@RestController
@RequestMapping("/business")
public class BusinessController {

@ -5,7 +5,7 @@ spring:
name: RateLimitCalleeService
cloud:
polaris:
address: grpc://183.47.111.80:8091
address: grpc://127.0.0.1:8091
namespace: default
enabled: true
ratelimit:

@ -0,0 +1,51 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>polaris-ratelimit-example</artifactId>
<groupId>com.tencent.cloud</groupId>
<version>${revision}</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>ratelimit-caller-service</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>com.tencent.cloud</groupId>
<artifactId>spring-cloud-starter-tencent-polaris-discovery</artifactId>
</dependency>
<dependency>
<groupId>com.tencent.cloud</groupId>
<artifactId>spring-cloud-starter-tencent-polaris-ratelimit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,171 @@
/*
* Tencent is pleased to support the open source community by making Spring Cloud Tencent available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.ratelimit.example.service.caller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.HttpClientErrorException.TooManyRequests;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@RestController
@RequestMapping("/business")
public class Controller {
private static final Logger LOG = LoggerFactory.getLogger(Controller.class);
private final AtomicInteger index = new AtomicInteger(0);
private final AtomicLong lastTimestamp = new AtomicLong(0);
@Autowired
private RestTemplate restTemplate;
@Autowired
private WebClient.Builder webClientBuilder;
private String appName = "RateLimitCalleeService";
/**
* Get information.
* @return information
*/
@GetMapping("/info")
public String info() {
return "hello world for ratelimit service " + index.incrementAndGet();
}
@GetMapping("/info/webclient")
public Mono<String> infoWebClient() {
return Mono.just("hello world for ratelimit service " + index.incrementAndGet());
}
@GetMapping("/invoke/webclient")
public String invokeInfoWebClient() throws InterruptedException, ExecutionException {
StringBuffer builder = new StringBuffer();
WebClient webClient = webClientBuilder.baseUrl("http://" + appName).build();
List<Mono<String>> monoList = new ArrayList<>();
for (int i = 0; i < 30; i++) {
Mono<String> response = webClient.get()
.uri(uriBuilder -> uriBuilder
.path("/business/info/webclient")
.queryParam("yyy", "yyy")
.build()
)
.header("xxx", "xxx")
.retrieve()
.bodyToMono(String.class)
.doOnSuccess(s -> builder.append(s + "\n"))
.doOnError(e -> {
if (e instanceof WebClientResponseException) {
if (((WebClientResponseException) e).getRawStatusCode() == 429) {
builder.append("TooManyRequests ").append(index.incrementAndGet() + "\n");
}
}
})
.onErrorReturn("");
monoList.add(response);
}
for (Mono<String> mono : monoList) {
mono.toFuture().get();
}
index.set(0);
return builder.toString();
}
/**
* Get information 30 times per 1 second.
*
* @return result of 30 calls.
* @throws InterruptedException exception
*/
@GetMapping("/invoke")
public String invokeInfo() throws InterruptedException {
StringBuffer builder = new StringBuffer();
CountDownLatch count = new CountDownLatch(30);
for (int i = 0; i < 30; i++) {
new Thread(() -> {
try {
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.add("xxx", "xxx");
ResponseEntity<String> entity = restTemplate.exchange(
"http://" + appName + "/business/info?yyy={yyy}",
HttpMethod.GET,
new HttpEntity<>(httpHeaders),
String.class,
"yyy"
);
builder.append(entity.getBody() + "\n");
}
catch (RestClientException e) {
if (e instanceof TooManyRequests) {
builder.append("TooManyRequests ").append(index.incrementAndGet() + "\n");
}
else {
throw e;
}
}
catch (Exception e) {
e.printStackTrace();
}
finally {
count.countDown();
}
}).start();
}
count.await();
index.set(0);
return builder.toString();
}
/**
* Get information with unirate.
*
* @return information
*/
@GetMapping("/unirate")
public String unirate() {
long currentTimestamp = System.currentTimeMillis();
long lastTime = lastTimestamp.get();
if (lastTime != 0) {
LOG.info("Current timestamp:" + currentTimestamp + ", diff from last timestamp:" + (currentTimestamp - lastTime));
}
else {
LOG.info("Current timestamp:" + currentTimestamp);
}
lastTimestamp.set(currentTimestamp);
return "hello world for ratelimit service with diff from last request:" + (currentTimestamp - lastTime) + "ms.";
}
}

@ -0,0 +1,51 @@
/*
* Tencent is pleased to support the open source community by making Spring Cloud Tencent available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.ratelimit.example.service.caller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.reactive.function.client.WebClient;
@SpringBootApplication
public class RateLimitCallerService {
private static final Logger LOG = LoggerFactory.getLogger(RateLimitCallerService.class);
public static void main(String[] args) {
SpringApplication.run(RateLimitCallerService.class, args);
}
@Bean
@LoadBalanced
public RestTemplate restTemplate() {
return new RestTemplate();
}
@LoadBalanced
@Bean
WebClient.Builder webClientBuilder() {
return WebClient.builder();
}
}

@ -0,0 +1,20 @@
server:
port: 58080
spring:
application:
name: RateLimitCallerService
cloud:
polaris:
address: grpc://127.0.0.1:8091
namespace: default
enabled: true
management:
endpoints:
web:
exposure:
include:
- polaris-ratelimit
logging:
level:
com.tencent.cloud.polaris: debug

@ -56,6 +56,12 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>

@ -116,4 +116,10 @@ public abstract class AbstractPolarisReporterAdapter {
// DEFAULT RETURN FALSE.
return false;
}
public static String convertLabel(String label) {
label = label.replaceAll("\"|\\{|\\}", "")
.replaceAll(",", "|");
return label;
}
}

@ -30,8 +30,10 @@ import com.tencent.cloud.rpc.enhancement.feign.plugin.reporter.ExceptionPolarisR
import com.tencent.cloud.rpc.enhancement.feign.plugin.reporter.SuccessPolarisReporter;
import com.tencent.cloud.rpc.enhancement.resttemplate.BlockingLoadBalancerClientAspect;
import com.tencent.cloud.rpc.enhancement.resttemplate.EnhancedRestTemplateReporter;
import com.tencent.cloud.rpc.enhancement.webclient.EnhancedWebClientReporter;
import com.tencent.polaris.api.core.ConsumerAPI;
import com.tencent.polaris.client.api.SDKContext;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.support.RootBeanDefinition;
@ -47,6 +49,7 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Role;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
/**
* Auto Configuration for Polaris {@link feign.Feign} OR {@link RestTemplate} which can automatically bring in the call
@ -89,14 +92,16 @@ public class RpcEnhancementAutoConfiguration {
@Bean
public SuccessPolarisReporter successPolarisReporter(RpcEnhancementReporterProperties properties,
@Autowired(required = false) SDKContext context,
@Autowired(required = false) ConsumerAPI consumerAPI) {
return new SuccessPolarisReporter(properties, consumerAPI);
return new SuccessPolarisReporter(properties, context, consumerAPI);
}
@Bean
public ExceptionPolarisReporter exceptionPolarisReporter(RpcEnhancementReporterProperties properties,
@Autowired(required = false) SDKContext context,
@Autowired(required = false) ConsumerAPI consumerAPI) {
return new ExceptionPolarisReporter(properties, consumerAPI);
return new ExceptionPolarisReporter(properties, context, consumerAPI);
}
}
}
@ -117,8 +122,8 @@ public class RpcEnhancementAutoConfiguration {
@Bean
public EnhancedRestTemplateReporter enhancedRestTemplateReporter(
RpcEnhancementReporterProperties properties, ConsumerAPI consumerAPI) {
return new EnhancedRestTemplateReporter(properties, consumerAPI);
RpcEnhancementReporterProperties properties, SDKContext context, ConsumerAPI consumerAPI) {
return new EnhancedRestTemplateReporter(properties, context, consumerAPI);
}
@Bean
@ -137,4 +142,20 @@ public class RpcEnhancementAutoConfiguration {
return new BlockingLoadBalancerClientAspect();
}
}
/**
* Configuration for Polaris {@link org.springframework.web.reactive.function.client.WebClient} which can automatically bring in the call
* results for reporting.
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(name = "org.springframework.web.reactive.function.client.WebClient")
protected static class PolarisWebClientAutoConfiguration {
@Bean
public ExchangeFilterFunction exchangeFilterFunction(
RpcEnhancementReporterProperties properties, SDKContext context, ConsumerAPI consumerAPI) {
return new EnhancedWebClientReporter(properties, consumerAPI);
}
}
}

@ -26,6 +26,7 @@ import com.tencent.cloud.rpc.enhancement.feign.plugin.EnhancedFeignPluginType;
import com.tencent.polaris.api.core.ConsumerAPI;
import com.tencent.polaris.api.pojo.RetStatus;
import com.tencent.polaris.api.rpc.ServiceCallResult;
import com.tencent.polaris.client.api.SDKContext;
import feign.Request;
import feign.Response;
import org.slf4j.Logger;
@ -45,9 +46,13 @@ public class ExceptionPolarisReporter implements EnhancedFeignPlugin {
private final ConsumerAPI consumerAPI;
private final SDKContext context;
public ExceptionPolarisReporter(RpcEnhancementReporterProperties reporterProperties,
SDKContext context,
ConsumerAPI consumerAPI) {
this.reporterProperties = reporterProperties;
this.context = context;
this.consumerAPI = consumerAPI;
}
@ -78,7 +83,7 @@ public class ExceptionPolarisReporter implements EnhancedFeignPlugin {
}
LOG.debug("Will report result of {}. Request=[{} {}]. Response=[{}]. Delay=[{}]ms.", retStatus.name(), request.httpMethod()
.name(), request.url(), response.status(), delay);
ServiceCallResult resultRequest = ReporterUtils.createServiceCallResult(request, response, delay, retStatus);
ServiceCallResult resultRequest = ReporterUtils.createServiceCallResult(this.context, request, response, delay, retStatus);
consumerAPI.updateServiceCallResult(resultRequest);
}
}

@ -20,14 +20,19 @@ package com.tencent.cloud.rpc.enhancement.feign.plugin.reporter;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Objects;
import com.tencent.cloud.common.constant.HeaderConstant;
import com.tencent.cloud.common.constant.RouterConstant;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.rpc.enhancement.AbstractPolarisReporterAdapter;
import com.tencent.polaris.api.pojo.RetStatus;
import com.tencent.polaris.api.pojo.ServiceKey;
import com.tencent.polaris.api.rpc.ServiceCallResult;
import com.tencent.polaris.api.utils.CollectionUtils;
import com.tencent.polaris.client.api.SDKContext;
import feign.Request;
import feign.RequestTemplate;
import feign.Response;
@ -49,7 +54,7 @@ public final class ReporterUtils {
private ReporterUtils() {
}
public static ServiceCallResult createServiceCallResult(final Request request, final Response response, long delay, RetStatus retStatus) {
public static ServiceCallResult createServiceCallResult(final SDKContext context, final Request request, final Response response, long delay, RetStatus retStatus) {
ServiceCallResult resultRequest = new ServiceCallResult();
resultRequest.setNamespace(MetadataContext.LOCAL_NAMESPACE);
@ -65,18 +70,25 @@ public final class ReporterUtils {
catch (UnsupportedEncodingException e) {
LOGGER.error("unsupported charset exception " + UTF_8, e);
}
resultRequest.setLabels(convertLabel(label));
resultRequest.setLabels(AbstractPolarisReporterAdapter.convertLabel(label));
}
URI uri = URI.create(request.url());
resultRequest.setMethod(uri.getPath());
resultRequest.setRetCode(response.status());
resultRequest.setRetStatus(retStatus);
resultRequest.setRetStatus(getRetStatusFromRequest(response, retStatus));
resultRequest.setDelay(delay);
String sourceNamespace = MetadataContext.LOCAL_NAMESPACE;
String sourceService = MetadataContext.LOCAL_SERVICE;
if (StringUtils.isNotBlank(sourceNamespace) && StringUtils.isNotBlank(sourceService)) {
resultRequest.setCallerService(new ServiceKey(sourceNamespace, sourceService));
}
if (StringUtils.isNotBlank(context.getConfig().getGlobal().getAPI().getBindIP())) {
resultRequest.setCallerIp(context.getConfig().getGlobal().getAPI().getBindIP());
}
String ruleName = getActiveRuleNameFromRequest(response);
if (StringUtils.isNotBlank(ruleName)) {
resultRequest.setRuleName(ruleName);
}
resultRequest.setHost(uri.getHost());
// -1 means access directly by url, and use http default port number 80
resultRequest.setPort(uri.getPort() == -1 ? 80 : uri.getPort());
@ -84,9 +96,30 @@ public final class ReporterUtils {
return resultRequest;
}
private static String convertLabel(String label) {
label = label.replaceAll("\"|\\{|\\}", "")
.replaceAll(",", "|");
return label;
private static RetStatus getRetStatusFromRequest(Response response, RetStatus defaultVal) {
if (response.headers().containsKey(HeaderConstant.INTERNAL_CALLEE_RET_STATUS)) {
Collection<String> values = response.headers().get(HeaderConstant.INTERNAL_CALLEE_RET_STATUS);
if (CollectionUtils.isNotEmpty(values)) {
String retStatusVal = com.tencent.polaris.api.utils.StringUtils.defaultString(new ArrayList<>(values).get(0));
if (Objects.equals(retStatusVal, RetStatus.RetFlowControl.getDesc())) {
return RetStatus.RetFlowControl;
}
if (Objects.equals(retStatusVal, RetStatus.RetReject.getDesc())) {
return RetStatus.RetReject;
}
}
}
return defaultVal;
}
private static String getActiveRuleNameFromRequest(Response response) {
if (response.headers().containsKey(HeaderConstant.INTERNAL_ACTIVE_RULE_NAME)) {
Collection<String> values = response.headers().get(HeaderConstant.INTERNAL_ACTIVE_RULE_NAME);
if (CollectionUtils.isNotEmpty(values)) {
String val = com.tencent.polaris.api.utils.StringUtils.defaultString(new ArrayList<>(values).get(0));
return val;
}
}
return "";
}
}

@ -25,6 +25,7 @@ import com.tencent.cloud.rpc.enhancement.feign.plugin.EnhancedFeignPluginType;
import com.tencent.polaris.api.core.ConsumerAPI;
import com.tencent.polaris.api.pojo.RetStatus;
import com.tencent.polaris.api.rpc.ServiceCallResult;
import com.tencent.polaris.client.api.SDKContext;
import feign.Request;
import feign.Response;
import org.slf4j.Logger;
@ -44,8 +45,11 @@ public class SuccessPolarisReporter extends AbstractPolarisReporterAdapter imple
private final ConsumerAPI consumerAPI;
public SuccessPolarisReporter(RpcEnhancementReporterProperties properties, ConsumerAPI consumerAPI) {
private final SDKContext context;
public SuccessPolarisReporter(RpcEnhancementReporterProperties properties, SDKContext context, ConsumerAPI consumerAPI) {
super(properties);
this.context = context;
this.consumerAPI = consumerAPI;
}
@ -75,7 +79,7 @@ public class SuccessPolarisReporter extends AbstractPolarisReporterAdapter imple
}
LOG.debug("Will report result of {}. Request=[{} {}]. Response=[{}]. Delay=[{}]ms.", retStatus.name(), request.httpMethod()
.name(), request.url(), response.status(), delay);
ServiceCallResult resultRequest = ReporterUtils.createServiceCallResult(request, response, delay, retStatus);
ServiceCallResult resultRequest = ReporterUtils.createServiceCallResult(this.context, request, response, delay, retStatus);
consumerAPI.updateServiceCallResult(resultRequest);
}
}

@ -21,9 +21,13 @@ import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import com.tencent.cloud.common.constant.HeaderConstant;
import com.tencent.cloud.common.constant.RouterConstant;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.common.metadata.MetadataContextHolder;
@ -34,6 +38,7 @@ import com.tencent.polaris.api.pojo.RetStatus;
import com.tencent.polaris.api.pojo.ServiceKey;
import com.tencent.polaris.api.rpc.ServiceCallResult;
import com.tencent.polaris.api.utils.CollectionUtils;
import com.tencent.polaris.client.api.SDKContext;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -66,10 +71,12 @@ public class EnhancedRestTemplateReporter extends AbstractPolarisReporterAdapter
public static final String HEADER_HAS_ERROR = "X-SCT-Has-Error";
private static final Logger LOGGER = LoggerFactory.getLogger(EnhancedRestTemplateReporter.class);
private final ConsumerAPI consumerAPI;
private final SDKContext context;
private ResponseErrorHandler delegateHandler;
public EnhancedRestTemplateReporter(RpcEnhancementReporterProperties properties, ConsumerAPI consumerAPI) {
public EnhancedRestTemplateReporter(RpcEnhancementReporterProperties properties, SDKContext context, ConsumerAPI consumerAPI) {
super(properties);
this.context = context;
this.consumerAPI = consumerAPI;
}
@ -149,6 +156,9 @@ public class EnhancedRestTemplateReporter extends AbstractPolarisReporterAdapter
if (apply(response.getStatusCode())) {
resultRequest.setRetStatus(RetStatus.RetFail);
}
resultRequest.setRetStatus(getRetStatusFromRequest(response, resultRequest.getRetStatus()));
resultRequest.setRuleName(getActiveRuleNameFromRequest(response));
resultRequest.setCallerIp(context.getConfig().getGlobal().getAPI().getBindIP());
List<String> labels = response.getHeaders().get(RouterConstant.ROUTER_LABEL_HEADER);
if (CollectionUtils.isNotEmpty(labels)) {
@ -172,12 +182,6 @@ public class EnhancedRestTemplateReporter extends AbstractPolarisReporterAdapter
}
}
private String convertLabel(String label) {
label = label.replaceAll("\"|\\{|\\}", "")
.replaceAll(",", "|");
return label;
}
private void invokeDelegateHandler(URI url, HttpMethod method, ClientHttpResponse response) throws IOException {
if (realHasError(response)) {
delegateHandler.handleError(url, method, response);
@ -227,4 +231,31 @@ public class EnhancedRestTemplateReporter extends AbstractPolarisReporterAdapter
protected void setDelegateHandler(ResponseErrorHandler delegateHandler) {
this.delegateHandler = delegateHandler;
}
private static RetStatus getRetStatusFromRequest(ClientHttpResponse response, RetStatus defaultVal) {
if (response.getHeaders().containsKey(HeaderConstant.INTERNAL_CALLEE_RET_STATUS)) {
List<String> values = response.getHeaders().get(HeaderConstant.INTERNAL_CALLEE_RET_STATUS);
if (CollectionUtils.isNotEmpty(values)) {
String retStatusVal = com.tencent.polaris.api.utils.StringUtils.defaultString(values.get(0));
if (Objects.equals(retStatusVal, RetStatus.RetFlowControl.getDesc())) {
return RetStatus.RetFlowControl;
}
if (Objects.equals(retStatusVal, RetStatus.RetReject.getDesc())) {
return RetStatus.RetReject;
}
}
}
return defaultVal;
}
private static String getActiveRuleNameFromRequest(ClientHttpResponse response) {
if (response.getHeaders().containsKey(HeaderConstant.INTERNAL_ACTIVE_RULE_NAME)) {
Collection<String> values = response.getHeaders().get(HeaderConstant.INTERNAL_ACTIVE_RULE_NAME);
if (CollectionUtils.isNotEmpty(values)) {
String val = com.tencent.polaris.api.utils.StringUtils.defaultString(new ArrayList<>(values).get(0));
return val;
}
}
return "";
}
}

@ -0,0 +1,170 @@
/*
* Tencent is pleased to support the open source community by making Spring Cloud Tencent available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.rpc.enhancement.webclient;
import com.tencent.cloud.common.constant.HeaderConstant;
import com.tencent.cloud.common.constant.RouterConstant;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.rpc.enhancement.AbstractPolarisReporterAdapter;
import com.tencent.cloud.rpc.enhancement.config.RpcEnhancementReporterProperties;
import com.tencent.polaris.api.core.ConsumerAPI;
import com.tencent.polaris.api.pojo.RetStatus;
import com.tencent.polaris.api.pojo.ServiceKey;
import com.tencent.polaris.api.rpc.ServiceCallResult;
import com.tencent.polaris.api.utils.CollectionUtils;
import com.tencent.polaris.client.api.SDKContext;
import com.tencent.polaris.discovery.client.api.DefaultConsumerAPI;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.ExchangeFunction;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
import reactor.util.context.ContextView;
import java.io.UnsupportedEncodingException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URLDecoder;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import static com.tencent.cloud.common.constant.ContextConstant.UTF_8;
public class EnhancedWebClientReporter extends AbstractPolarisReporterAdapter implements ExchangeFilterFunction {
private static final Logger LOGGER = LoggerFactory.getLogger(EnhancedWebClientReporter.class);
private static final String METRICS_WEBCLIENT_START_TIME = EnhancedWebClientReporter.class.getName()
+ ".START_TIME";
private final ConsumerAPI consumerAPI;
private final SDKContext context;
public EnhancedWebClientReporter(RpcEnhancementReporterProperties reportProperties, ConsumerAPI consumerAPI) {
super(reportProperties);
this.context = ((DefaultConsumerAPI) consumerAPI).getSDKContext();
this.consumerAPI = consumerAPI;
}
@Override
public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
return next.exchange(request).as((responseMono) -> instrumentResponse(request, responseMono))
.contextWrite(this::putStartTime);
}
private Mono<ClientResponse> instrumentResponse(ClientRequest request, Mono<ClientResponse> responseMono) {
return Mono.deferContextual((ctx) -> responseMono.doOnEach((signal) -> {
// report result to polaris
if (reportProperties.isEnabled()) {
return;
}
ServiceCallResult callResult = new ServiceCallResult();
Long startTime = getStartTime(ctx);
callResult.setDelay(System.currentTimeMillis() - startTime);
callResult.setNamespace(MetadataContext.LOCAL_NAMESPACE);
callResult.setService(request.headers().getFirst(HeaderConstant.INTERNAL_CALLEE_SERVICE_ID));
String sourceNamespace = MetadataContext.LOCAL_NAMESPACE;
String sourceService = MetadataContext.LOCAL_SERVICE;
if (StringUtils.isNotBlank(sourceNamespace) && StringUtils.isNotBlank(sourceService)) {
callResult.setCallerService(new ServiceKey(sourceNamespace, sourceService));
}
Collection<String> labels = request.headers().get(RouterConstant.ROUTER_LABEL_HEADER);
if (CollectionUtils.isNotEmpty(labels) && labels.iterator().hasNext()) {
String label = labels.iterator().next();
try {
label = URLDecoder.decode(label, UTF_8);
} catch (UnsupportedEncodingException e) {
LOGGER.error("unsupported charset exception " + UTF_8, e);
}
callResult.setLabels(convertLabel(label));
}
URI uri = request.url();
callResult.setMethod(uri.getPath());
callResult.setHost(uri.getHost());
// -1 means access directly by url, and use http default port number 80
callResult.setPort(uri.getPort() == -1 ? 80 : uri.getPort());
callResult.setCallerIp(context.getConfig().getGlobal().getAPI().getBindIP());
RetStatus retStatus = RetStatus.RetSuccess;
ClientResponse response = signal.get();
if (Objects.nonNull(response)) {
callResult.setRuleName(getActiveRuleNameFromRequest(response));
if (apply(response.statusCode())) {
retStatus = RetStatus.RetFail;
}
retStatus = getRetStatusFromRequest(response, retStatus);
}
if (signal.isOnError()) {
Throwable throwable = signal.getThrowable();
if (throwable instanceof SocketTimeoutException) {
retStatus = RetStatus.RetTimeout;
}
}
callResult.setRetStatus(retStatus);
consumerAPI.updateServiceCallResult(callResult);
}));
}
private static RetStatus getRetStatusFromRequest(ClientResponse response, RetStatus defaultVal) {
HttpHeaders headers = response.headers().asHttpHeaders();
if (headers.containsKey(HeaderConstant.INTERNAL_CALLEE_RET_STATUS)) {
List<String> values = headers.get(HeaderConstant.INTERNAL_CALLEE_RET_STATUS);
if (CollectionUtils.isNotEmpty(values)) {
String retStatusVal = com.tencent.polaris.api.utils.StringUtils.defaultString(values.get(0));
if (Objects.equals(retStatusVal, RetStatus.RetFlowControl.getDesc())) {
return RetStatus.RetFlowControl;
}
if (Objects.equals(retStatusVal, RetStatus.RetReject.getDesc())) {
return RetStatus.RetReject;
}
}
}
return defaultVal;
}
private static String getActiveRuleNameFromRequest(ClientResponse response) {
HttpHeaders headers = response.headers().asHttpHeaders();
if (headers.containsKey(HeaderConstant.INTERNAL_ACTIVE_RULE_NAME)) {
List<String> values = headers.get(HeaderConstant.INTERNAL_ACTIVE_RULE_NAME);
if (CollectionUtils.isNotEmpty(values)) {
String val = com.tencent.polaris.api.utils.StringUtils.defaultString(values.get(0));
return val;
}
}
return "";
}
private Long getStartTime(ContextView context) {
return context.get(METRICS_WEBCLIENT_START_TIME);
}
private Context putStartTime(Context context) {
return context.put(METRICS_WEBCLIENT_START_TIME, System.currentTimeMillis());
}
}

@ -25,6 +25,7 @@ import com.tencent.cloud.rpc.enhancement.feign.plugin.EnhancedFeignPluginType;
import com.tencent.polaris.api.core.ConsumerAPI;
import com.tencent.polaris.api.pojo.RetStatus;
import com.tencent.polaris.api.rpc.ServiceCallResult;
import com.tencent.polaris.client.api.SDKContext;
import feign.Request;
import feign.Response;
import org.junit.jupiter.api.AfterAll;
@ -63,8 +64,9 @@ public class ExceptionPolarisReporterTest {
@BeforeAll
static void beforeAll() {
SDKContext context = ReporterUtilsTest.mockSDKContext();
mockedReporterUtils = Mockito.mockStatic(ReporterUtils.class);
mockedReporterUtils.when(() -> ReporterUtils.createServiceCallResult(any(Request.class), any(Response.class), anyLong(), any(RetStatus.class)))
mockedReporterUtils.when(() -> ReporterUtils.createServiceCallResult(context, any(Request.class), any(Response.class), anyLong(), any(RetStatus.class)))
.thenReturn(mock(ServiceCallResult.class));
}

@ -23,8 +23,12 @@ import java.net.URLEncoder;
import com.tencent.cloud.common.constant.RouterConstant;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.common.util.ApplicationContextAwareUtils;
import com.tencent.polaris.api.config.Configuration;
import com.tencent.polaris.api.config.global.APIConfig;
import com.tencent.polaris.api.config.global.GlobalConfig;
import com.tencent.polaris.api.pojo.RetStatus;
import com.tencent.polaris.api.rpc.ServiceCallResult;
import com.tencent.polaris.client.api.SDKContext;
import feign.Request;
import feign.RequestTemplate;
import feign.Response;
@ -99,7 +103,7 @@ public class ReporterUtilsTest {
Response response = mock(Response.class);
doReturn(502).when(response).status();
ServiceCallResult serviceCallResult = ReporterUtils.createServiceCallResult(request, response, 10L, RetStatus.RetSuccess);
ServiceCallResult serviceCallResult = ReporterUtils.createServiceCallResult(mockSDKContext(), request, response, 10L, RetStatus.RetSuccess);
assertThat(serviceCallResult.getNamespace()).isEqualTo(NAMESPACE_TEST);
assertThat(serviceCallResult.getService()).isEqualTo(SERVICE_PROVIDER);
assertThat(serviceCallResult.getHost()).isEqualTo("1.1.1.1");
@ -112,4 +116,17 @@ public class ReporterUtilsTest {
assertThat(serviceCallResult.getRetCode()).isEqualTo(502);
assertThat(serviceCallResult.getDelay()).isEqualTo(10L);
}
public static SDKContext mockSDKContext() {
APIConfig apiConfig = mock(APIConfig.class);
doReturn("127.0.0.1").when(apiConfig).getBindIP();
GlobalConfig globalConfig = mock(GlobalConfig.class);
doReturn(apiConfig).when(globalConfig).getAPI();
Configuration configuration = mock(Configuration.class);
doReturn(globalConfig).when(configuration).getGlobal();
SDKContext context = mock(SDKContext.class);
doReturn(configuration).when(context).getConfig();
return context;
}
}

@ -22,9 +22,13 @@ import java.util.HashMap;
import com.tencent.cloud.rpc.enhancement.config.RpcEnhancementReporterProperties;
import com.tencent.cloud.rpc.enhancement.feign.plugin.EnhancedFeignContext;
import com.tencent.cloud.rpc.enhancement.feign.plugin.EnhancedFeignPluginType;
import com.tencent.polaris.api.config.Configuration;
import com.tencent.polaris.api.config.global.APIConfig;
import com.tencent.polaris.api.config.global.GlobalConfig;
import com.tencent.polaris.api.core.ConsumerAPI;
import com.tencent.polaris.api.pojo.RetStatus;
import com.tencent.polaris.api.rpc.ServiceCallResult;
import com.tencent.polaris.client.api.SDKContext;
import feign.Request;
import feign.Response;
import org.junit.jupiter.api.AfterAll;
@ -63,8 +67,9 @@ public class SuccessPolarisReporterTest {
@BeforeAll
static void beforeAll() {
SDKContext context = ReporterUtilsTest.mockSDKContext();
mockedReporterUtils = Mockito.mockStatic(ReporterUtils.class);
mockedReporterUtils.when(() -> ReporterUtils.createServiceCallResult(any(Request.class), any(Response.class), anyLong(), any(RetStatus.class)))
mockedReporterUtils.when(() -> ReporterUtils.createServiceCallResult(context, any(Request.class), any(Response.class), anyLong(), any(RetStatus.class)))
.thenReturn(mock(ServiceCallResult.class));
}

Loading…
Cancel
Save