feat:gateway支持上报调用数据

pull/924/head
chuntaojun 3 years ago
parent 4403df66d7
commit df3a7aaabe

@ -158,7 +158,7 @@ public class RpcEnhancementAutoConfiguration {
@Bean @Bean
public ExchangeFilterFunction exchangeFilterFunction( public ExchangeFilterFunction exchangeFilterFunction(
RpcEnhancementReporterProperties properties, SDKContext context, ConsumerAPI consumerAPI) { RpcEnhancementReporterProperties properties, SDKContext context, ConsumerAPI consumerAPI) {
return new EnhancedWebClientReporter(properties, consumerAPI); return new EnhancedWebClientReporter(properties, context, consumerAPI);
} }
} }

@ -36,10 +36,8 @@ import io.netty.handler.codec.http.HttpHeaders;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import reactor.netty.Connection;
import reactor.netty.http.client.HttpClient; import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientConfig; import reactor.netty.http.client.HttpClientConfig;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse; import reactor.netty.http.client.HttpClientResponse;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
@ -125,23 +123,20 @@ public class EnhancedPolarisHttpClient extends HttpClient {
} }
private void registerReportHandler() { private void registerReportHandler() {
target = target.doOnRequest(new BiConsumer<HttpClientRequest, Connection>() { target = target.doOnRequest((request, connection) -> {
@Override String serviceId = request.requestHeaders().get(HeaderConstant.INTERNAL_CALLEE_SERVICE_ID);
public void accept(HttpClientRequest request, Connection connection) { String host = request.requestHeaders().get(HeaderConstant.INTERNAL_CALLEE_INSTANCE_HOST);
String serviceId = request.requestHeaders().get(HeaderConstant.INTERNAL_CALLEE_SERVICE_ID); String port = request.requestHeaders().get(HeaderConstant.INTERNAL_CALLEE_INSTANCE_PORT);
String host = request.requestHeaders().get(HeaderConstant.INTERNAL_CALLEE_INSTANCE_HOST); if (StringUtils.isNotBlank(serviceId)) {
String port = request.requestHeaders().get(HeaderConstant.INTERNAL_CALLEE_INSTANCE_PORT); MetadataContextHolder.get().setLoadbalancer(HeaderConstant.INTERNAL_CALLEE_SERVICE_ID, serviceId);
if (StringUtils.isNotBlank(serviceId)) { MetadataContextHolder.get().setLoadbalancer(HeaderConstant.INTERNAL_CALLEE_INSTANCE_HOST, host);
MetadataContextHolder.get().setLoadbalancer(HeaderConstant.INTERNAL_CALLEE_SERVICE_ID, serviceId); MetadataContextHolder.get().setLoadbalancer(HeaderConstant.INTERNAL_CALLEE_INSTANCE_PORT, port);
MetadataContextHolder.get().setLoadbalancer(HeaderConstant.INTERNAL_CALLEE_INSTANCE_HOST, host); MetadataContextHolder.get().setLoadbalancer("startTime", System.currentTimeMillis() + "");
MetadataContextHolder.get().setLoadbalancer(HeaderConstant.INTERNAL_CALLEE_INSTANCE_PORT, port);
MetadataContextHolder.get().setLoadbalancer("startTime", System.currentTimeMillis() + "");
}
request.requestHeaders().remove(HeaderConstant.INTERNAL_CALLEE_SERVICE_ID);
request.requestHeaders().remove(HeaderConstant.INTERNAL_CALLEE_INSTANCE_HOST);
request.requestHeaders().remove(HeaderConstant.INTERNAL_CALLEE_INSTANCE_PORT);
} }
request.requestHeaders().remove(HeaderConstant.INTERNAL_CALLEE_SERVICE_ID);
request.requestHeaders().remove(HeaderConstant.INTERNAL_CALLEE_INSTANCE_HOST);
request.requestHeaders().remove(HeaderConstant.INTERNAL_CALLEE_INSTANCE_PORT);
}); });
target = target.doOnResponse((httpClientResponse, connection) -> handler.accept(httpClientResponse, null)); target = target.doOnResponse((httpClientResponse, connection) -> handler.accept(httpClientResponse, null));
target = target.doOnResponseError(handler); target = target.doOnResponseError(handler);

@ -36,7 +36,6 @@ import com.tencent.polaris.api.pojo.ServiceKey;
import com.tencent.polaris.api.rpc.ServiceCallResult; import com.tencent.polaris.api.rpc.ServiceCallResult;
import com.tencent.polaris.api.utils.CollectionUtils; import com.tencent.polaris.api.utils.CollectionUtils;
import com.tencent.polaris.client.api.SDKContext; import com.tencent.polaris.client.api.SDKContext;
import com.tencent.polaris.discovery.client.api.DefaultConsumerAPI;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -54,18 +53,16 @@ import static com.tencent.cloud.common.constant.ContextConstant.UTF_8;
public class EnhancedWebClientReporter extends AbstractPolarisReporterAdapter implements ExchangeFilterFunction { public class EnhancedWebClientReporter extends AbstractPolarisReporterAdapter implements ExchangeFilterFunction {
private static final Logger LOGGER = LoggerFactory.getLogger(EnhancedWebClientReporter.class); protected static final String METRICS_WEBCLIENT_START_TIME = EnhancedWebClientReporter.class.getName()
private static final String METRICS_WEBCLIENT_START_TIME = EnhancedWebClientReporter.class.getName()
+ ".START_TIME"; + ".START_TIME";
private static final Logger LOGGER = LoggerFactory.getLogger(EnhancedWebClientReporter.class);
private final ConsumerAPI consumerAPI; private final ConsumerAPI consumerAPI;
private final SDKContext context; private final SDKContext context;
public EnhancedWebClientReporter(RpcEnhancementReporterProperties reportProperties, ConsumerAPI consumerAPI) { public EnhancedWebClientReporter(RpcEnhancementReporterProperties reportProperties, SDKContext context, ConsumerAPI consumerAPI) {
super(reportProperties); super(reportProperties);
this.context = ((DefaultConsumerAPI) consumerAPI).getSDKContext(); this.context = context;
this.consumerAPI = consumerAPI; this.consumerAPI = consumerAPI;
} }
@ -75,10 +72,10 @@ public class EnhancedWebClientReporter extends AbstractPolarisReporterAdapter im
.contextWrite(this::putStartTime); .contextWrite(this::putStartTime);
} }
private Mono<ClientResponse> instrumentResponse(ClientRequest request, Mono<ClientResponse> responseMono) { Mono<ClientResponse> instrumentResponse(ClientRequest request, Mono<ClientResponse> responseMono) {
return Mono.deferContextual((ctx) -> responseMono.doOnEach((signal) -> { return Mono.deferContextual((ctx) -> responseMono.doOnEach((signal) -> {
// report result to polaris // report result to polaris
if (reportProperties.isEnabled()) { if (!reportProperties.isEnabled()) {
return; return;
} }
ServiceCallResult callResult = new ServiceCallResult(); ServiceCallResult callResult = new ServiceCallResult();
@ -110,7 +107,9 @@ public class EnhancedWebClientReporter extends AbstractPolarisReporterAdapter im
callResult.setHost(uri.getHost()); callResult.setHost(uri.getHost());
// -1 means access directly by url, and use http default port number 80 // -1 means access directly by url, and use http default port number 80
callResult.setPort(uri.getPort() == -1 ? 80 : uri.getPort()); callResult.setPort(uri.getPort() == -1 ? 80 : uri.getPort());
callResult.setCallerIp(context.getConfig().getGlobal().getAPI().getBindIP()); if (Objects.nonNull(context)) {
callResult.setCallerIp(context.getConfig().getGlobal().getAPI().getBindIP());
}
RetStatus retStatus = RetStatus.RetSuccess; RetStatus retStatus = RetStatus.RetSuccess;
ClientResponse response = signal.get(); ClientResponse response = signal.get();

@ -0,0 +1,49 @@
/*
* Tencent is pleased to support the open source community by making Spring Cloud Tencent available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.rpc.enhancement.scg;
import com.tencent.cloud.rpc.enhancement.config.RpcEnhancementReporterProperties;
import com.tencent.polaris.api.core.ConsumerAPI;
import com.tencent.polaris.client.api.SDKContext;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import reactor.netty.http.client.HttpClient;
public class EnhancedPolarisHttpClientCustomizerTest {
@Test
public void testCustomize() {
RpcEnhancementReporterProperties properties = new RpcEnhancementReporterProperties();
properties.setEnabled(true);
properties.getStatuses().clear();
properties.getSeries().clear();
SDKContext context = Mockito.mock(SDKContext.class);
ConsumerAPI consumerAPI = Mockito.mock(ConsumerAPI.class);
EnhancedPolarisHttpClientCustomizer clientCustomizer = new EnhancedPolarisHttpClientCustomizer(properties, context, consumerAPI);
HttpClient client = HttpClient.create();
HttpClient proxyClient = clientCustomizer.customize(client);
Assertions.assertNotNull(proxyClient);
Assertions.assertEquals(EnhancedPolarisHttpClient.class.getName(), proxyClient.getClass().getName());
}
}

@ -0,0 +1,57 @@
/*
* Tencent is pleased to support the open source community by making Spring Cloud Tencent available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.rpc.enhancement.scg;
import java.util.Collections;
import com.tencent.cloud.common.constant.HeaderConstant;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.DefaultResponse;
import org.springframework.http.HttpHeaders;
import org.springframework.web.server.ServerWebExchange;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_LOADBALANCER_RESPONSE_ATTR;
public class EnhancedPolarisHttpHeadersFilterTest {
@Test
public void testFilter() {
EnhancedPolarisHttpHeadersFilter filter = new EnhancedPolarisHttpHeadersFilter();
ServiceInstance instance = Mockito.mock(ServiceInstance.class);
Mockito.doReturn("mock_service").when(instance).getServiceId();
Mockito.doReturn("127.0.0.1").when(instance).getHost();
Mockito.doReturn(8080).when(instance).getPort();
DefaultResponse response = new DefaultResponse(instance);
ServerWebExchange exchange = Mockito.mock(ServerWebExchange.class);
Mockito.doReturn(response).when(exchange).getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR);
HttpHeaders input = new HttpHeaders();
HttpHeaders headers = filter.filter(input, exchange);
Assertions.assertEquals(Collections.singletonList("mock_service"), headers.get(HeaderConstant.INTERNAL_CALLEE_SERVICE_ID));
Assertions.assertEquals(Collections.singletonList("127.0.0.1"), headers.get(HeaderConstant.INTERNAL_CALLEE_INSTANCE_HOST));
Assertions.assertEquals(Collections.singletonList("8080"), headers.get(HeaderConstant.INTERNAL_CALLEE_INSTANCE_PORT));
}
}

@ -0,0 +1,102 @@
/*
* Tencent is pleased to support the open source community by making Spring Cloud Tencent available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.rpc.enhancement.webclient;
import java.net.URI;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.common.util.ApplicationContextAwareUtils;
import com.tencent.cloud.rpc.enhancement.config.RpcEnhancementReporterProperties;
import com.tencent.polaris.api.core.ConsumerAPI;
import com.tencent.polaris.api.rpc.ServiceCallResult;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import reactor.core.publisher.Mono;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
import static com.tencent.cloud.rpc.enhancement.webclient.EnhancedWebClientReporter.METRICS_WEBCLIENT_START_TIME;
import static com.tencent.polaris.test.common.Consts.NAMESPACE_TEST;
import static com.tencent.polaris.test.common.Consts.SERVICE_PROVIDER;
import static org.mockito.ArgumentMatchers.anyString;
public class EnhancedWebClientReporterTest {
private static final String URI_TEMPLATE_ATTRIBUTE = EnhancedWebClientReporterTest.class.getName() + ".uriTemplate";
private static MockedStatic<ApplicationContextAwareUtils> mockedApplicationContextAwareUtils;
@BeforeAll
static void beforeAll() {
mockedApplicationContextAwareUtils = Mockito.mockStatic(ApplicationContextAwareUtils.class);
mockedApplicationContextAwareUtils.when(() -> ApplicationContextAwareUtils.getProperties(anyString()))
.thenReturn("unit-test");
}
@AfterAll
static void afterAll() {
mockedApplicationContextAwareUtils.close();
}
@BeforeEach
void setUp() {
MetadataContext.LOCAL_NAMESPACE = NAMESPACE_TEST;
MetadataContext.LOCAL_SERVICE = SERVICE_PROVIDER;
}
@Test
public void testInstrumentResponse() {
ClientResponse response = Mockito.mock(ClientResponse.class);
ClientResponse.Headers headers = Mockito.mock(ClientResponse.Headers.class);
Mockito.doReturn(headers).when(response).headers();
Mockito.doReturn(new HttpHeaders()).when(headers).asHttpHeaders();
Mono<ClientResponse> responseMono = Mono.just(response);
ClientRequest request = ClientRequest.create(HttpMethod.GET, URI.create("https://example.org/projects/spring-boot"))
.attribute(URI_TEMPLATE_ATTRIBUTE, "https://example.org/projects/{project}")
.build();
ConsumerAPI consumerAPI = Mockito.mock(ConsumerAPI.class);
Mockito.doAnswer(invocationOnMock -> {
ServiceCallResult result = invocationOnMock.getArgument(0, ServiceCallResult.class);
Assertions.assertTrue(result.getDelay() > 0);
return null;
}).when(consumerAPI)
.updateServiceCallResult(Mockito.any(ServiceCallResult.class));
RpcEnhancementReporterProperties properties = new RpcEnhancementReporterProperties();
properties.setEnabled(true);
properties.getStatuses().clear();
properties.getSeries().clear();
EnhancedWebClientReporter reporter = new EnhancedWebClientReporter(properties, null, consumerAPI);
reporter.instrumentResponse(request, responseMono)
.contextWrite(context -> context.put(METRICS_WEBCLIENT_START_TIME, System.currentTimeMillis()))
.subscribe();
}
}
Loading…
Cancel
Save