feat:webclient支持上报调用数据

pull/924/head
chuntaojun 3 years ago
parent ae7f186c92
commit 5d7310a9e7

@ -50,8 +50,8 @@ public class PolarisCircuitBreaker implements CircuitBreaker {
private final ConsumerAPI consumerAPI;
public PolarisCircuitBreaker(PolarisCircuitBreakerConfigBuilder.PolarisCircuitBreakerConfiguration conf,
ConsumerAPI consumerAPI,
CircuitBreakAPI circuitBreakAPI) {
ConsumerAPI consumerAPI,
CircuitBreakAPI circuitBreakAPI) {
FunctionalDecoratorRequest makeDecoratorRequest = new FunctionalDecoratorRequest(new ServiceKey(conf.getNamespace(), conf.getService()), conf.getMethod());
makeDecoratorRequest.setSourceService(new ServiceKey(conf.getSourceNamespace(), conf.getSourceService()));
makeDecoratorRequest.setResultToErrorCode(new PolarisResultToErrorCode());
@ -76,5 +76,4 @@ public class PolarisCircuitBreaker implements CircuitBreaker {
}
}
}

@ -30,8 +30,6 @@ import com.tencent.polaris.circuitbreak.api.InvokeHandler;
import com.tencent.polaris.circuitbreak.api.pojo.FunctionalDecoratorRequest;
import com.tencent.polaris.circuitbreak.api.pojo.InvokeContext;
import com.tencent.polaris.circuitbreak.client.exception.CallAbortedException;
import com.tencent.polaris.client.api.SDKContext;
import com.tencent.polaris.factory.api.DiscoveryAPIFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -51,8 +49,8 @@ public class ReactivePolarisCircuitBreaker implements ReactiveCircuitBreaker {
private final PolarisCircuitBreakerConfigBuilder.PolarisCircuitBreakerConfiguration conf;
public ReactivePolarisCircuitBreaker(PolarisCircuitBreakerConfigBuilder.PolarisCircuitBreakerConfiguration conf,
ConsumerAPI consumerAPI,
CircuitBreakAPI circuitBreakAPI) {
ConsumerAPI consumerAPI,
CircuitBreakAPI circuitBreakAPI) {
InvokeContext.RequestContext requestContext = new FunctionalDecoratorRequest(new ServiceKey(conf.getNamespace(), conf.getService()), conf.getMethod());
requestContext.setSourceService(new ServiceKey(conf.getSourceNamespace(), conf.getSourceService()));
requestContext.setResultToErrorCode(new PolarisResultToErrorCode());

@ -17,8 +17,9 @@
package com.tencent.cloud.polaris.circuitbreaker.util;
import com.tencent.cloud.common.metadata.MetadataContext;
import java.util.Objects;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.polaris.circuitbreaker.common.PolarisCircuitBreakerConfigBuilder;
import com.tencent.polaris.api.core.ConsumerAPI;
import com.tencent.polaris.api.pojo.RetStatus;
@ -29,9 +30,8 @@ import com.tencent.polaris.discovery.client.api.DefaultConsumerAPI;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import java.util.Objects;
import org.springframework.util.Assert;
/**
* PolarisCircuitBreakerUtils.
@ -57,16 +57,16 @@ public final class PolarisCircuitBreakerUtils {
Assert.hasText(id, "A CircuitBreaker must have an id. Id could be : namespace#service#method or service#method or service");
String[] polarisCircuitBreakerMetaData = id.split("#");
if (polarisCircuitBreakerMetaData.length == 2) {
return new String[]{MetadataContext.LOCAL_NAMESPACE, polarisCircuitBreakerMetaData[0], polarisCircuitBreakerMetaData[1]};
return new String[] {MetadataContext.LOCAL_NAMESPACE, polarisCircuitBreakerMetaData[0], polarisCircuitBreakerMetaData[1]};
}
if (polarisCircuitBreakerMetaData.length == 3) {
return new String[]{polarisCircuitBreakerMetaData[0], polarisCircuitBreakerMetaData[1], polarisCircuitBreakerMetaData[2]};
return new String[] {polarisCircuitBreakerMetaData[0], polarisCircuitBreakerMetaData[1], polarisCircuitBreakerMetaData[2]};
}
return new String[]{MetadataContext.LOCAL_NAMESPACE, id, ""};
return new String[] {MetadataContext.LOCAL_NAMESPACE, id, ""};
}
public static void reportStatus(ConsumerAPI consumerAPI,
PolarisCircuitBreakerConfigBuilder.PolarisCircuitBreakerConfiguration conf, CallAbortedException e) {
PolarisCircuitBreakerConfigBuilder.PolarisCircuitBreakerConfiguration conf, CallAbortedException e) {
try {
ServiceCallResult result = new ServiceCallResult();
result.setMethod(conf.getMethod());
@ -80,13 +80,15 @@ public final class PolarisCircuitBreakerUtils {
result.setRetCode(e.getFallbackInfo().getCode());
}
String callerIp = ((DefaultConsumerAPI) consumerAPI).getSDKContext().getConfig().getGlobal().getAPI().getBindIP();
String callerIp = ((DefaultConsumerAPI) consumerAPI).getSDKContext().getConfig().getGlobal().getAPI()
.getBindIP();
if (StringUtils.isNotBlank(callerIp)) {
result.setCallerIp(callerIp);
}
consumerAPI.updateServiceCallResult(result);
} catch (Throwable ex) {
}
catch (Throwable ex) {
LOG.error("[CircuitBreaker]");
}
}

@ -70,6 +70,13 @@ public class PolarisLoadBalancerClientConfiguration {
loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), routerAPI);
}
@Bean
@ConditionalOnMissingBean
public LoadBalancerClientRequestTransformer polarisLoadBalancerClientRequestTransformer(SDKContext sdkContext) {
ConsumerAPI consumerAPI = DiscoveryAPIFactory.createConsumerAPIByContext(sdkContext);
return new PolarisLoadBalancerClientRequestTransformer(consumerAPI);
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnReactiveDiscoveryEnabled
@Order(REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER)
@ -100,12 +107,4 @@ public class PolarisLoadBalancerClientConfiguration {
ServiceInstanceListSupplier.builder().withBlockingDiscoveryClient().build(context));
}
}
@Bean
@ConditionalOnMissingBean
public LoadBalancerClientRequestTransformer polarisLoadBalancerClientRequestTransformer(
SDKContext sdkContext) {
ConsumerAPI consumerAPI = DiscoveryAPIFactory.createConsumerAPIByContext(sdkContext);
return new PolarisLoadBalancerClientRequestTransformer(consumerAPI);
}
}

@ -19,6 +19,7 @@ package com.tencent.cloud.polaris.loadbalancer.reactive;
import com.tencent.cloud.common.constant.HeaderConstant;
import com.tencent.polaris.api.core.ConsumerAPI;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.reactive.LoadBalancerClientRequestTransformer;
import org.springframework.http.HttpHeaders;

@ -18,22 +18,22 @@
package com.tencent.cloud.common.constant;
/**
* Built-in system http header fields
* Built-in system http header fields.
*/
public final class HeaderConstant {
/**
* The called service returns the real call result of its own processing request
* The called service returns the real call result of its own processing request.
*/
public static final String INTERNAL_CALLEE_RET_STATUS = "internal-callee-retstatus";
/**
* The name of the rule that the current limit/circiutbreaker rule takes effect
* The name of the rule that the current limit/circiutbreaker rule takes effect.
*/
public static final String INTERNAL_ACTIVE_RULE_NAME = "internal-callee-activerule";
/**
* The name information of the called service
* The name information of the called service.
*/
public static final String INTERNAL_CALLEE_SERVICE_ID = "internal-callee-serviceid";

@ -5,7 +5,7 @@ spring:
name: RateLimitCalleeService
cloud:
polaris:
address: grpc://127.0.0.1:8091
address: grpc://183.47.111.80:8091
namespace: default
enabled: true
ratelimit:

@ -17,10 +17,18 @@
package com.tencent.cloud.ratelimit.example.service.caller;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
@ -33,14 +41,6 @@ import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@RestController
@RequestMapping("/business")

@ -19,6 +19,7 @@ package com.tencent.cloud.ratelimit.example.service.caller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;

@ -5,7 +5,7 @@ spring:
name: RateLimitCallerService
cloud:
polaris:
address: grpc://127.0.0.1:8091
address: grpc://183.47.111.80:8091
namespace: default
enabled: true

@ -32,8 +32,8 @@ import com.tencent.cloud.rpc.enhancement.resttemplate.BlockingLoadBalancerClient
import com.tencent.cloud.rpc.enhancement.resttemplate.EnhancedRestTemplateReporter;
import com.tencent.cloud.rpc.enhancement.webclient.EnhancedWebClientReporter;
import com.tencent.polaris.api.core.ConsumerAPI;
import com.tencent.polaris.client.api.SDKContext;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.support.RootBeanDefinition;
@ -92,15 +92,15 @@ public class RpcEnhancementAutoConfiguration {
@Bean
public SuccessPolarisReporter successPolarisReporter(RpcEnhancementReporterProperties properties,
@Autowired(required = false) SDKContext context,
@Autowired(required = false) ConsumerAPI consumerAPI) {
@Autowired(required = false) SDKContext context,
@Autowired(required = false) ConsumerAPI consumerAPI) {
return new SuccessPolarisReporter(properties, context, consumerAPI);
}
@Bean
public ExceptionPolarisReporter exceptionPolarisReporter(RpcEnhancementReporterProperties properties,
@Autowired(required = false) SDKContext context,
@Autowired(required = false) ConsumerAPI consumerAPI) {
@Autowired(required = false) SDKContext context,
@Autowired(required = false) ConsumerAPI consumerAPI) {
return new ExceptionPolarisReporter(properties, context, consumerAPI);
}
}

@ -82,7 +82,7 @@ public final class ReporterUtils {
if (StringUtils.isNotBlank(sourceNamespace) && StringUtils.isNotBlank(sourceService)) {
resultRequest.setCallerService(new ServiceKey(sourceNamespace, sourceService));
}
if (StringUtils.isNotBlank(context.getConfig().getGlobal().getAPI().getBindIP())) {
if (Objects.nonNull(context)) {
resultRequest.setCallerIp(context.getConfig().getGlobal().getAPI().getBindIP());
}
String ruleName = getActiveRuleNameFromRequest(response);

@ -158,7 +158,9 @@ public class EnhancedRestTemplateReporter extends AbstractPolarisReporterAdapter
}
resultRequest.setRetStatus(getRetStatusFromRequest(response, resultRequest.getRetStatus()));
resultRequest.setRuleName(getActiveRuleNameFromRequest(response));
resultRequest.setCallerIp(context.getConfig().getGlobal().getAPI().getBindIP());
if (Objects.nonNull(context)) {
resultRequest.setCallerIp(context.getConfig().getGlobal().getAPI().getBindIP());
}
List<String> labels = response.getHeaders().get(RouterConstant.ROUTER_LABEL_HEADER);
if (CollectionUtils.isNotEmpty(labels)) {

@ -17,6 +17,14 @@
package com.tencent.cloud.rpc.enhancement.webclient;
import java.io.UnsupportedEncodingException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URLDecoder;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import com.tencent.cloud.common.constant.HeaderConstant;
import com.tencent.cloud.common.constant.RouterConstant;
import com.tencent.cloud.common.metadata.MetadataContext;
@ -32,22 +40,15 @@ import com.tencent.polaris.discovery.client.api.DefaultConsumerAPI;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
import reactor.util.context.ContextView;
import org.springframework.http.HttpHeaders;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.ExchangeFunction;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
import reactor.util.context.ContextView;
import java.io.UnsupportedEncodingException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URLDecoder;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import static com.tencent.cloud.common.constant.ContextConstant.UTF_8;
@ -68,6 +69,35 @@ public class EnhancedWebClientReporter extends AbstractPolarisReporterAdapter im
this.consumerAPI = consumerAPI;
}
private static RetStatus getRetStatusFromRequest(ClientResponse response, RetStatus defaultVal) {
HttpHeaders headers = response.headers().asHttpHeaders();
if (headers.containsKey(HeaderConstant.INTERNAL_CALLEE_RET_STATUS)) {
List<String> values = headers.get(HeaderConstant.INTERNAL_CALLEE_RET_STATUS);
if (CollectionUtils.isNotEmpty(values)) {
String retStatusVal = com.tencent.polaris.api.utils.StringUtils.defaultString(values.get(0));
if (Objects.equals(retStatusVal, RetStatus.RetFlowControl.getDesc())) {
return RetStatus.RetFlowControl;
}
if (Objects.equals(retStatusVal, RetStatus.RetReject.getDesc())) {
return RetStatus.RetReject;
}
}
}
return defaultVal;
}
private static String getActiveRuleNameFromRequest(ClientResponse response) {
HttpHeaders headers = response.headers().asHttpHeaders();
if (headers.containsKey(HeaderConstant.INTERNAL_ACTIVE_RULE_NAME)) {
List<String> values = headers.get(HeaderConstant.INTERNAL_ACTIVE_RULE_NAME);
if (CollectionUtils.isNotEmpty(values)) {
String val = com.tencent.polaris.api.utils.StringUtils.defaultString(values.get(0));
return val;
}
}
return "";
}
@Override
public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
return next.exchange(request).as((responseMono) -> instrumentResponse(request, responseMono))
@ -97,7 +127,8 @@ public class EnhancedWebClientReporter extends AbstractPolarisReporterAdapter im
String label = labels.iterator().next();
try {
label = URLDecoder.decode(label, UTF_8);
} catch (UnsupportedEncodingException e) {
}
catch (UnsupportedEncodingException e) {
LOGGER.error("unsupported charset exception " + UTF_8, e);
}
callResult.setLabels(convertLabel(label));
@ -131,35 +162,6 @@ public class EnhancedWebClientReporter extends AbstractPolarisReporterAdapter im
}));
}
private static RetStatus getRetStatusFromRequest(ClientResponse response, RetStatus defaultVal) {
HttpHeaders headers = response.headers().asHttpHeaders();
if (headers.containsKey(HeaderConstant.INTERNAL_CALLEE_RET_STATUS)) {
List<String> values = headers.get(HeaderConstant.INTERNAL_CALLEE_RET_STATUS);
if (CollectionUtils.isNotEmpty(values)) {
String retStatusVal = com.tencent.polaris.api.utils.StringUtils.defaultString(values.get(0));
if (Objects.equals(retStatusVal, RetStatus.RetFlowControl.getDesc())) {
return RetStatus.RetFlowControl;
}
if (Objects.equals(retStatusVal, RetStatus.RetReject.getDesc())) {
return RetStatus.RetReject;
}
}
}
return defaultVal;
}
private static String getActiveRuleNameFromRequest(ClientResponse response) {
HttpHeaders headers = response.headers().asHttpHeaders();
if (headers.containsKey(HeaderConstant.INTERNAL_ACTIVE_RULE_NAME)) {
List<String> values = headers.get(HeaderConstant.INTERNAL_ACTIVE_RULE_NAME);
if (CollectionUtils.isNotEmpty(values)) {
String val = com.tencent.polaris.api.utils.StringUtils.defaultString(values.get(0));
return val;
}
}
return "";
}
private Long getStartTime(ContextView context) {
return context.get(METRICS_WEBCLIENT_START_TIME);
}

@ -64,9 +64,8 @@ public class ExceptionPolarisReporterTest {
@BeforeAll
static void beforeAll() {
SDKContext context = ReporterUtilsTest.mockSDKContext();
mockedReporterUtils = Mockito.mockStatic(ReporterUtils.class);
mockedReporterUtils.when(() -> ReporterUtils.createServiceCallResult(context, any(Request.class), any(Response.class), anyLong(), any(RetStatus.class)))
mockedReporterUtils.when(() -> ReporterUtils.createServiceCallResult(any(SDKContext.class), any(Request.class), any(Response.class), anyLong(), any(RetStatus.class)))
.thenReturn(mock(ServiceCallResult.class));
}

@ -22,9 +22,6 @@ import java.util.HashMap;
import com.tencent.cloud.rpc.enhancement.config.RpcEnhancementReporterProperties;
import com.tencent.cloud.rpc.enhancement.feign.plugin.EnhancedFeignContext;
import com.tencent.cloud.rpc.enhancement.feign.plugin.EnhancedFeignPluginType;
import com.tencent.polaris.api.config.Configuration;
import com.tencent.polaris.api.config.global.APIConfig;
import com.tencent.polaris.api.config.global.GlobalConfig;
import com.tencent.polaris.api.core.ConsumerAPI;
import com.tencent.polaris.api.pojo.RetStatus;
import com.tencent.polaris.api.rpc.ServiceCallResult;
@ -67,9 +64,8 @@ public class SuccessPolarisReporterTest {
@BeforeAll
static void beforeAll() {
SDKContext context = ReporterUtilsTest.mockSDKContext();
mockedReporterUtils = Mockito.mockStatic(ReporterUtils.class);
mockedReporterUtils.when(() -> ReporterUtils.createServiceCallResult(context, any(Request.class), any(Response.class), anyLong(), any(RetStatus.class)))
mockedReporterUtils.when(() -> ReporterUtils.createServiceCallResult(any(SDKContext.class), any(Request.class), any(Response.class), anyLong(), any(RetStatus.class)))
.thenReturn(mock(ServiceCallResult.class));
}

Loading…
Cancel
Save