temp commit

pull/963/head
seanyu 2 years ago
parent 6449e313a8
commit e9e8a09482

@ -18,10 +18,6 @@
package com.tencent.cloud.polaris.loadbalancer; package com.tencent.cloud.polaris.loadbalancer;
import com.tencent.cloud.polaris.context.ConditionalOnPolarisEnabled; import com.tencent.cloud.polaris.context.ConditionalOnPolarisEnabled;
import com.tencent.cloud.polaris.loadbalancer.reactive.PolarisLoadBalancerClientRequestTransformer;
import com.tencent.polaris.api.core.ConsumerAPI;
import com.tencent.polaris.client.api.SDKContext;
import com.tencent.polaris.factory.api.DiscoveryAPIFactory;
import com.tencent.polaris.router.api.core.RouterAPI; import com.tencent.polaris.router.api.core.RouterAPI;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
@ -33,7 +29,6 @@ import org.springframework.cloud.client.ConditionalOnReactiveDiscoveryEnabled;
import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient; import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient; import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient;
import org.springframework.cloud.client.loadbalancer.reactive.LoadBalancerClientRequestTransformer;
import org.springframework.cloud.loadbalancer.core.ReactorLoadBalancer; import org.springframework.cloud.loadbalancer.core.ReactorLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier; import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory; import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
@ -70,13 +65,6 @@ public class PolarisLoadBalancerClientConfiguration {
loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), routerAPI); loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), routerAPI);
} }
// @Bean
// @ConditionalOnMissingBean
// public LoadBalancerClientRequestTransformer polarisLoadBalancerClientRequestTransformer(SDKContext sdkContext) {
// ConsumerAPI consumerAPI = DiscoveryAPIFactory.createConsumerAPIByContext(sdkContext);
// return new PolarisLoadBalancerClientRequestTransformer(consumerAPI);
// }
@Configuration(proxyBeanMethods = false) @Configuration(proxyBeanMethods = false)
@ConditionalOnReactiveDiscoveryEnabled @ConditionalOnReactiveDiscoveryEnabled
@Order(REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER) @Order(REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER)

@ -1,44 +1,44 @@
/* ///*
* Tencent is pleased to support the open source community by making Spring Cloud Tencent available. // * Tencent is pleased to support the open source community by making Spring Cloud Tencent available.
* // *
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. // * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
* // *
* Licensed under the BSD 3-Clause License (the "License"); // * Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License. // * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at // * You may obtain a copy of the License at
* // *
* https://opensource.org/licenses/BSD-3-Clause // * https://opensource.org/licenses/BSD-3-Clause
* // *
* Unless required by applicable law or agreed to in writing, software distributed // * Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR // * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the // * CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License. // * specific language governing permissions and limitations under the License.
*/ // */
//
package com.tencent.cloud.polaris.loadbalancer.reactive; //package com.tencent.cloud.polaris.loadbalancer.reactive;
//
import com.tencent.cloud.common.constant.HeaderConstant; //import com.tencent.cloud.common.constant.HeaderConstant;
import com.tencent.polaris.api.core.ConsumerAPI; //import com.tencent.polaris.api.core.ConsumerAPI;
//
import org.springframework.cloud.client.ServiceInstance; //import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.reactive.LoadBalancerClientRequestTransformer; //import org.springframework.cloud.client.loadbalancer.reactive.LoadBalancerClientRequestTransformer;
import org.springframework.http.HttpHeaders; //import org.springframework.http.HttpHeaders;
import org.springframework.web.reactive.function.client.ClientRequest; //import org.springframework.web.reactive.function.client.ClientRequest;
//
public class PolarisLoadBalancerClientRequestTransformer implements LoadBalancerClientRequestTransformer { //public class PolarisLoadBalancerClientRequestTransformer implements LoadBalancerClientRequestTransformer {
//
private final ConsumerAPI consumerAPI; // private final ConsumerAPI consumerAPI;
//
public PolarisLoadBalancerClientRequestTransformer(ConsumerAPI consumerAPI) { // public PolarisLoadBalancerClientRequestTransformer(ConsumerAPI consumerAPI) {
this.consumerAPI = consumerAPI; // this.consumerAPI = consumerAPI;
} // }
//
@Override // @Override
public ClientRequest transformRequest(ClientRequest request, ServiceInstance instance) { // public ClientRequest transformRequest(ClientRequest request, ServiceInstance instance) {
if (instance != null) { // if (instance != null) {
HttpHeaders headers = request.headers(); // HttpHeaders headers = request.headers();
headers.add(HeaderConstant.INTERNAL_CALLEE_SERVICE_ID, instance.getServiceId()); // headers.add(HeaderConstant.INTERNAL_CALLEE_SERVICE_ID, instance.getServiceId());
} // }
return request; // return request;
} // }
} //}

@ -54,6 +54,6 @@ logging:
level: level:
root: info root: info
com.tencent.polaris.discovery.client.flow.RegisterFlow: off com.tencent.polaris.discovery.client.flow.RegisterFlow: off
com.tencent.polaris.plugins.registry.memory.CacheObject: off com.tencent.polaris.plugins.registry: off
com.tencent.cloud.polaris.circuitbreaker: debug com.tencent.cloud: debug

@ -92,6 +92,13 @@ public abstract class AbstractPolarisReporterAdapter {
this.context = context; this.context = context;
} }
public ServiceCallResult createServiceCallResult(
@Nullable String calleeServiceName, URI uri, HttpHeaders requestHeaders, @Nullable HttpHeaders responseHeaders,
@Nullable Integer statusCode, long delay, @Nullable Throwable exception) {
return createServiceCallResult(
calleeServiceName, uri.getHost(), uri.getPort(), uri, requestHeaders, responseHeaders, statusCode, delay, exception);
}
/** /**
* createServiceCallResult * createServiceCallResult
* @param calleeServiceName will pick up url host when null * @param calleeServiceName will pick up url host when null
@ -127,6 +134,13 @@ public abstract class AbstractPolarisReporterAdapter {
return resultRequest; return resultRequest;
} }
public ResourceStat createInstanceResourceStat(
@Nullable String calleeServiceName, URI uri,
@Nullable Integer statusCode, long delay, @Nullable Throwable exception) {
return createInstanceResourceStat(
calleeServiceName, uri.getHost(), uri.getPort(), uri, statusCode, delay, exception);
}
/** /**
* createInstanceResourceStat * createInstanceResourceStat
* @param calleeServiceName will pick up url host when null * @param calleeServiceName will pick up url host when null

@ -29,9 +29,9 @@ import com.tencent.cloud.rpc.enhancement.feign.plugin.EnhancedFeignPlugin;
import com.tencent.cloud.rpc.enhancement.feign.plugin.reporter.ExceptionPolarisReporter; import com.tencent.cloud.rpc.enhancement.feign.plugin.reporter.ExceptionPolarisReporter;
import com.tencent.cloud.rpc.enhancement.feign.plugin.reporter.SuccessPolarisReporter; import com.tencent.cloud.rpc.enhancement.feign.plugin.reporter.SuccessPolarisReporter;
import com.tencent.cloud.rpc.enhancement.resttemplate.BlockingLoadBalancerClientAspect; import com.tencent.cloud.rpc.enhancement.resttemplate.BlockingLoadBalancerClientAspect;
import com.tencent.cloud.rpc.enhancement.webclient.PolarisLoadBalancerClientRequestTransformer;
import com.tencent.cloud.rpc.enhancement.scg.PolarisGatewayReporter; import com.tencent.cloud.rpc.enhancement.scg.PolarisGatewayReporter;
import com.tencent.cloud.rpc.enhancement.resttemplate.PolarisRestTemplateReporter; import com.tencent.cloud.rpc.enhancement.resttemplate.PolarisRestTemplateReporter;
import com.tencent.cloud.rpc.enhancement.scg.EnhancedPolarisHttpHeadersFilter;
import com.tencent.cloud.rpc.enhancement.webclient.EnhancedWebClientReporter; import com.tencent.cloud.rpc.enhancement.webclient.EnhancedWebClientReporter;
import com.tencent.polaris.api.core.ConsumerAPI; import com.tencent.polaris.api.core.ConsumerAPI;
import com.tencent.polaris.circuitbreak.api.CircuitBreakAPI; import com.tencent.polaris.circuitbreak.api.CircuitBreakAPI;
@ -48,13 +48,11 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.client.loadbalancer.LoadBalanced; import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.cloud.gateway.filter.headers.HttpHeadersFilter;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Role; import org.springframework.context.annotation.Role;
import org.springframework.web.client.RestTemplate; import org.springframework.web.client.RestTemplate;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
/** /**
@ -167,6 +165,13 @@ public class RpcEnhancementAutoConfiguration {
@ConditionalOnClass(name = "org.springframework.web.reactive.function.client.WebClient") @ConditionalOnClass(name = "org.springframework.web.reactive.function.client.WebClient")
protected static class PolarisWebClientAutoConfiguration { protected static class PolarisWebClientAutoConfiguration {
@Bean
@ConditionalOnMissingBean
@ConditionalOnClass(name = "org.springframework.cloud.client.loadbalancer.reactive.LoadBalancerClientRequestTransformer")
public PolarisLoadBalancerClientRequestTransformer polarisLoadBalancerClientRequestTransformer() {
return new PolarisLoadBalancerClientRequestTransformer();
}
@Autowired(required = false) @Autowired(required = false)
private List<WebClient.Builder> webClientBuilder = Collections.emptyList(); private List<WebClient.Builder> webClientBuilder = Collections.emptyList();
@ -179,7 +184,7 @@ public class RpcEnhancementAutoConfiguration {
} }
@Bean @Bean
public SmartInitializingSingleton addEncodeTransferMetadataFilterForWebClient(EnhancedWebClientReporter reporter) { public SmartInitializingSingleton addEnhancedWebClientReporterForWebClient(EnhancedWebClientReporter reporter) {
return () -> webClientBuilder.forEach(webClient -> { return () -> webClientBuilder.forEach(webClient -> {
webClient.filter(reporter); webClient.filter(reporter);
}); });
@ -195,12 +200,6 @@ public class RpcEnhancementAutoConfiguration {
@Role(RootBeanDefinition.ROLE_INFRASTRUCTURE) @Role(RootBeanDefinition.ROLE_INFRASTRUCTURE)
protected static class PolarisGatewayAutoConfiguration { protected static class PolarisGatewayAutoConfiguration {
@Bean
@ConditionalOnClass(name = {"org.springframework.cloud.gateway.filter.headers.HttpHeadersFilter"})
public HttpHeadersFilter enhancedPolarisHttpHeadersFilter() {
return new EnhancedPolarisHttpHeadersFilter();
}
@Bean @Bean
@ConditionalOnClass(name = "org.springframework.cloud.gateway.filter.GlobalFilter") @ConditionalOnClass(name = "org.springframework.cloud.gateway.filter.GlobalFilter")
public PolarisGatewayReporter polarisGatewayReporter(RpcEnhancementReporterProperties properties, public PolarisGatewayReporter polarisGatewayReporter(RpcEnhancementReporterProperties properties,

@ -96,8 +96,6 @@ public class ExceptionPolarisReporter extends AbstractPolarisReporterAdapter imp
ServiceCallResult resultRequest = createServiceCallResult( ServiceCallResult resultRequest = createServiceCallResult(
request.requestTemplate().feignTarget().name(), request.requestTemplate().feignTarget().name(),
null,
null,
URI.create(request.url()), URI.create(request.url()),
requestHeaders, requestHeaders,
responseHeaders, responseHeaders,
@ -111,8 +109,6 @@ public class ExceptionPolarisReporter extends AbstractPolarisReporterAdapter imp
ResourceStat resourceStat = createInstanceResourceStat( ResourceStat resourceStat = createInstanceResourceStat(
request.requestTemplate().feignTarget().name(), request.requestTemplate().feignTarget().name(),
null,
null,
URI.create(request.url()), URI.create(request.url()),
status, status,
delay, delay,

@ -93,8 +93,6 @@ public class SuccessPolarisReporter extends AbstractPolarisReporterAdapter imple
ServiceCallResult resultRequest = createServiceCallResult( ServiceCallResult resultRequest = createServiceCallResult(
request.requestTemplate().feignTarget().name(), request.requestTemplate().feignTarget().name(),
null,
null,
URI.create(request.url()), URI.create(request.url()),
requestHeaders, requestHeaders,
responseHeaders, responseHeaders,
@ -108,8 +106,6 @@ public class SuccessPolarisReporter extends AbstractPolarisReporterAdapter imple
ResourceStat resourceStat = createInstanceResourceStat( ResourceStat resourceStat = createInstanceResourceStat(
request.requestTemplate().feignTarget().name(), request.requestTemplate().feignTarget().name(),
null,
null,
URI.create(request.url()), URI.create(request.url()),
status, status,
delay, delay,

@ -39,7 +39,6 @@ public final class LoadBalancerClientAspectUtils {
ServiceInstance instance = (ServiceInstance) server; ServiceInstance instance = (ServiceInstance) server;
MetadataContextHolder.get().setLoadbalancer(HeaderConstant.INTERNAL_CALLEE_INSTANCE_HOST, instance.getHost()); MetadataContextHolder.get().setLoadbalancer(HeaderConstant.INTERNAL_CALLEE_INSTANCE_HOST, instance.getHost());
MetadataContextHolder.get().setLoadbalancer(HeaderConstant.INTERNAL_CALLEE_INSTANCE_PORT, String.valueOf(instance.getPort())); MetadataContextHolder.get().setLoadbalancer(HeaderConstant.INTERNAL_CALLEE_INSTANCE_PORT, String.valueOf(instance.getPort()));
MetadataContextHolder.get().setLoadbalancer(HeaderConstant.INTERNAL_CALL_START_TIME, String.valueOf(System.currentTimeMillis()));
} }
} }
} }

@ -1,29 +1,29 @@
/* ///*
* Tencent is pleased to support the open source community by making Spring Cloud Tencent available. // * Tencent is pleased to support the open source community by making Spring Cloud Tencent available.
* // *
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. // * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
* // *
* Licensed under the BSD 3-Clause License (the "License"); // * Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License. // * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at // * You may obtain a copy of the License at
* // *
* https://opensource.org/licenses/BSD-3-Clause // * https://opensource.org/licenses/BSD-3-Clause
* // *
* Unless required by applicable law or agreed to in writing, software distributed // * Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR // * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the // * CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License. // * specific language governing permissions and limitations under the License.
*/ // */
//
package com.tencent.cloud.rpc.enhancement.resttemplate; //package com.tencent.cloud.rpc.enhancement.resttemplate;
//
import org.springframework.web.client.ResponseErrorHandler; //import org.springframework.web.client.ResponseErrorHandler;
//
/** ///**
* Polaris Response Error Handler Definition Of {@link ResponseErrorHandler}. // * Polaris Response Error Handler Definition Of {@link ResponseErrorHandler}.
* // *
* @author wh 2022/6/21 // * @author wh 2022/6/21
*/ // */
public interface PolarisResponseErrorHandler extends ResponseErrorHandler { //public interface PolarisResponseErrorHandler extends ResponseErrorHandler {
//
} //}

@ -69,6 +69,7 @@ public class PolarisRestTemplateReporter extends AbstractPolarisReporterAdapter
return execution.execute(request, body); return execution.execute(request, body);
} }
long startTime = System.currentTimeMillis();
ClientHttpResponse response = null; ClientHttpResponse response = null;
IOException ex = null; IOException ex = null;
try { try {
@ -87,7 +88,7 @@ public class PolarisRestTemplateReporter extends AbstractPolarisReporterAdapter
Map<String, String> loadBalancerContext = MetadataContextHolder.get().getLoadbalancerMetadata(); Map<String, String> loadBalancerContext = MetadataContextHolder.get().getLoadbalancerMetadata();
String targetHost = loadBalancerContext.get(HeaderConstant.INTERNAL_CALLEE_INSTANCE_HOST); String targetHost = loadBalancerContext.get(HeaderConstant.INTERNAL_CALLEE_INSTANCE_HOST);
Integer targetPort = Integer.valueOf(loadBalancerContext.get(HeaderConstant.INTERNAL_CALLEE_INSTANCE_PORT)); Integer targetPort = Integer.valueOf(loadBalancerContext.get(HeaderConstant.INTERNAL_CALLEE_INSTANCE_PORT));
long delay = System.currentTimeMillis() - Long.parseLong(loadBalancerContext.get(HeaderConstant.INTERNAL_CALL_START_TIME)); long delay = System.currentTimeMillis() - startTime;
ServiceCallResult resultRequest = createServiceCallResult( ServiceCallResult resultRequest = createServiceCallResult(
request.getURI().getHost(), request.getURI().getHost(),

@ -15,12 +15,16 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.Response;
import org.springframework.cloud.gateway.filter.GatewayFilterChain; import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter; import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.ServerWebExchange;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_LOADBALANCER_RESPONSE_ATTR;
public class PolarisGatewayReporter extends AbstractPolarisReporterAdapter implements GlobalFilter { public class PolarisGatewayReporter extends AbstractPolarisReporterAdapter implements GlobalFilter {
private static final Logger LOG = LoggerFactory.getLogger(PolarisGatewayReporter.class); private static final Logger LOG = LoggerFactory.getLogger(PolarisGatewayReporter.class);
@ -48,37 +52,29 @@ public class PolarisGatewayReporter extends AbstractPolarisReporterAdapter imple
if (!reportProperties.isEnabled()) { if (!reportProperties.isEnabled()) {
return chain.filter(exchange); return chain.filter(exchange);
} }
MetadataContextHolder.get().setLoadbalancer( long startTime = System.currentTimeMillis();
HeaderConstant.INTERNAL_CALLEE_SERVICE_ID,
exchange.getRequest().getHeaders().getFirst(HeaderConstant.INTERNAL_CALLEE_SERVICE_ID)
);
MetadataContextHolder.get().setLoadbalancer(
HeaderConstant.INTERNAL_CALLEE_INSTANCE_HOST,
exchange.getRequest().getHeaders().getFirst(HeaderConstant.INTERNAL_CALLEE_INSTANCE_HOST)
);
MetadataContextHolder.get().setLoadbalancer(
HeaderConstant.INTERNAL_CALLEE_INSTANCE_PORT,
exchange.getRequest().getHeaders().getFirst(HeaderConstant.INTERNAL_CALLEE_INSTANCE_PORT)
);
MetadataContextHolder.get().setLoadbalancer(
HeaderConstant.INTERNAL_CALL_START_TIME,
String.valueOf(System.currentTimeMillis())
);
return chain.filter(exchange) return chain.filter(exchange)
.doOnSuccess(v -> instrumentResponse(exchange, null)) .doOnSuccess(v -> instrumentResponse(exchange, null, startTime))
.doOnError(t -> instrumentResponse(exchange, t)); .doOnError(t -> instrumentResponse(exchange, t, startTime));
} }
private void instrumentResponse(ServerWebExchange exchange, Throwable t) { private void instrumentResponse(ServerWebExchange exchange, Throwable t, long startTime) {
Map<String, String> loadBalancerContext = MetadataContextHolder.get().getLoadbalancerMetadata();
String serviceId = loadBalancerContext.get(HeaderConstant.INTERNAL_CALLEE_SERVICE_ID);
String targetHost = loadBalancerContext.get(HeaderConstant.INTERNAL_CALLEE_INSTANCE_HOST);
Integer targetPort = Integer.valueOf(loadBalancerContext.get(HeaderConstant.INTERNAL_CALLEE_INSTANCE_PORT));
long delay = System.currentTimeMillis() - Long.parseLong(loadBalancerContext.get(HeaderConstant.INTERNAL_CALL_START_TIME));
ServerHttpResponse response = exchange.getResponse(); ServerHttpResponse response = exchange.getResponse();
ServerHttpRequest request = exchange.getRequest(); ServerHttpRequest request = exchange.getRequest();
long delay = System.currentTimeMillis() - startTime;
String serviceId = null;
String targetHost = null;
Integer targetPort = null;
Response<ServiceInstance> serviceInstanceResponse = exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR);
if (serviceInstanceResponse != null && serviceInstanceResponse.hasServer()) {
ServiceInstance instance = serviceInstanceResponse.getServer();
serviceId = instance.getServiceId();
targetHost = instance.getHost();
targetPort = instance.getPort();
}
ServiceCallResult resultRequest = createServiceCallResult( ServiceCallResult resultRequest = createServiceCallResult(
serviceId, serviceId,
targetHost, targetHost,

@ -17,35 +17,20 @@
package com.tencent.cloud.rpc.enhancement.webclient; package com.tencent.cloud.rpc.enhancement.webclient;
import java.io.UnsupportedEncodingException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URLDecoder;
import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import com.tencent.cloud.common.constant.HeaderConstant; import com.tencent.cloud.common.constant.HeaderConstant;
import com.tencent.cloud.common.constant.RouterConstant;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.common.metadata.MetadataContextHolder; import com.tencent.cloud.common.metadata.MetadataContextHolder;
import com.tencent.cloud.common.util.RequestLabelUtils;
import com.tencent.cloud.rpc.enhancement.AbstractPolarisReporterAdapter; import com.tencent.cloud.rpc.enhancement.AbstractPolarisReporterAdapter;
import com.tencent.cloud.rpc.enhancement.config.RpcEnhancementReporterProperties; import com.tencent.cloud.rpc.enhancement.config.RpcEnhancementReporterProperties;
import com.tencent.polaris.api.core.ConsumerAPI; import com.tencent.polaris.api.core.ConsumerAPI;
import com.tencent.polaris.api.plugin.circuitbreaker.ResourceStat; import com.tencent.polaris.api.plugin.circuitbreaker.ResourceStat;
import com.tencent.polaris.api.pojo.RetStatus;
import com.tencent.polaris.api.pojo.ServiceKey;
import com.tencent.polaris.api.rpc.ServiceCallResult; import com.tencent.polaris.api.rpc.ServiceCallResult;
import com.tencent.polaris.api.utils.CollectionUtils;
import com.tencent.polaris.circuitbreak.api.CircuitBreakAPI; import com.tencent.polaris.circuitbreak.api.CircuitBreakAPI;
import com.tencent.polaris.client.api.SDKContext; import com.tencent.polaris.client.api.SDKContext;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.util.context.Context;
import reactor.util.context.ContextView;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
import org.springframework.web.reactive.function.client.ClientRequest; import org.springframework.web.reactive.function.client.ClientRequest;
@ -53,7 +38,6 @@ import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction; import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.ExchangeFunction; import org.springframework.web.reactive.function.client.ExchangeFunction;
import static com.tencent.cloud.common.constant.ContextConstant.UTF_8;
public class EnhancedWebClientReporter extends AbstractPolarisReporterAdapter implements ExchangeFilterFunction { public class EnhancedWebClientReporter extends AbstractPolarisReporterAdapter implements ExchangeFilterFunction {
private static final Logger LOG = LoggerFactory.getLogger(EnhancedWebClientReporter.class); private static final Logger LOG = LoggerFactory.getLogger(EnhancedWebClientReporter.class);
@ -75,24 +59,16 @@ public class EnhancedWebClientReporter extends AbstractPolarisReporterAdapter im
if (!reportProperties.isEnabled()) { if (!reportProperties.isEnabled()) {
return next.exchange(request); return next.exchange(request);
} }
long startTime = System.currentTimeMillis();
MetadataContextHolder.get().setLoadbalancer(
HeaderConstant.INTERNAL_CALLEE_SERVICE_ID,
request.headers().getFirst(HeaderConstant.INTERNAL_CALLEE_SERVICE_ID)
);
MetadataContextHolder.get().setLoadbalancer(
HeaderConstant.INTERNAL_CALL_START_TIME,
String.valueOf(System.currentTimeMillis())
);
return next.exchange(request) return next.exchange(request)
.doOnSuccess(response -> instrumentResponse(request, response, null)) .doOnSuccess(response -> instrumentResponse(request, response, null, startTime))
.doOnError(t -> instrumentResponse(request, null, t)); .doOnError(t -> instrumentResponse(request, null, t, startTime));
} }
private void instrumentResponse(ClientRequest request, ClientResponse response, Throwable t) { private void instrumentResponse(ClientRequest request, ClientResponse response, Throwable t, long startTime) {
Map<String, String> loadBalancerContext = MetadataContextHolder.get().getLoadbalancerMetadata(); Map<String, String> loadBalancerContext = MetadataContextHolder.get().getLoadbalancerMetadata();
String serviceId = loadBalancerContext.get(HeaderConstant.INTERNAL_CALLEE_SERVICE_ID); String serviceId = loadBalancerContext.get(HeaderConstant.INTERNAL_CALLEE_SERVICE_ID);
long delay = System.currentTimeMillis() - Long.parseLong(loadBalancerContext.get(HeaderConstant.INTERNAL_CALL_START_TIME)); long delay = System.currentTimeMillis() - startTime;
HttpHeaders requestHeaders = request.headers(); HttpHeaders requestHeaders = request.headers();
HttpHeaders responseHeaders = null; HttpHeaders responseHeaders = null;
@ -104,8 +80,6 @@ public class EnhancedWebClientReporter extends AbstractPolarisReporterAdapter im
ServiceCallResult resultRequest = createServiceCallResult( ServiceCallResult resultRequest = createServiceCallResult(
serviceId, serviceId,
null,
null,
request.url(), request.url(),
requestHeaders, requestHeaders,
responseHeaders, responseHeaders,
@ -119,8 +93,6 @@ public class EnhancedWebClientReporter extends AbstractPolarisReporterAdapter im
ResourceStat resourceStat = createInstanceResourceStat( ResourceStat resourceStat = createInstanceResourceStat(
serviceId, serviceId,
null,
null,
request.url(), request.url(),
status, status,
delay, delay,

@ -0,0 +1,20 @@
package com.tencent.cloud.rpc.enhancement.webclient;
import com.tencent.cloud.common.constant.HeaderConstant;
import com.tencent.cloud.common.metadata.MetadataContextHolder;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.reactive.LoadBalancerClientRequestTransformer;
import org.springframework.web.reactive.function.client.ClientRequest;
public class PolarisLoadBalancerClientRequestTransformer implements LoadBalancerClientRequestTransformer {
@Override
public ClientRequest transformRequest(ClientRequest request, ServiceInstance instance) {
if (instance != null) {
MetadataContextHolder.get().setLoadbalancer(HeaderConstant.INTERNAL_CALLEE_SERVICE_ID, instance.getServiceId());
}
return request;
}
}
Loading…
Cancel
Save