feat:support traffic mirroring. (#1703)

pull/1704/head
Haotian Zhang 3 weeks ago committed by GitHub
parent b97124e31e
commit ba181fc264
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -11,3 +11,4 @@
- [refactor: optimize log](https://github.com/Tencent/spring-cloud-tencent/pull/1700)
- [feat:support setting load balancing strategy per service.](https://github.com/Tencent/spring-cloud-tencent/pull/1701)
- [feat: support shortest response time lb and least connection lb](https://github.com/Tencent/spring-cloud-tencent/pull/1702)
- [feat:support traffic mirroring.](https://github.com/Tencent/spring-cloud-tencent/pull/1703)

@ -409,6 +409,7 @@
<excludeArtifact>quickstart-zuul-service</excludeArtifact>
<excludeArtifact>tsf-example</excludeArtifact>
<excludeArtifact>consumer-demo</excludeArtifact>
<excludeArtifact>msgw-scg</excludeArtifact>
<excludeArtifact>provider-demo</excludeArtifact>
</excludeArtifacts>
</configuration>

@ -74,5 +74,10 @@
<groupId>com.tencent.cloud</groupId>
<artifactId>spring-cloud-starter-tencent-multi-discovery-plugin</artifactId>
</dependency>
<dependency>
<groupId>com.tencent.cloud</groupId>
<artifactId>spring-cloud-starter-tencent-traffic-mirroring-plugin</artifactId>
</dependency>
</dependencies>
</project>

@ -94,6 +94,12 @@
<artifactId>spring-cloud-starter-tencent-multi-discovery-plugin</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.tencent.cloud</groupId>
<artifactId>spring-cloud-starter-tencent-traffic-mirroring-plugin</artifactId>
<version>${revision}</version>
</dependency>
</dependencies>
<build>

@ -222,6 +222,12 @@
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.tencent.cloud</groupId>
<artifactId>spring-cloud-starter-tencent-traffic-mirroring-plugin</artifactId>
<version>${revision}</version>
</dependency>
<!-- third part framework dependencies -->
<dependency>
<groupId>org.springdoc</groupId>

@ -22,6 +22,7 @@ import java.net.URLDecoder;
import com.tencent.cloud.common.constant.MetadataConstant;
import com.tencent.cloud.quickstart.callee.config.DataSourceProperties;
import com.tencent.cloud.quickstart.callee.pojo.User;
import com.tencent.cloud.quickstart.callee.service.FaultToleranceService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -31,6 +32,8 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
@ -87,6 +90,12 @@ public class QuickstartCalleeController {
return String.format("Quickstart [%s] Service [%s:%s] is called. datasource = [%s].", appName, ip, port, dataSourceProperties);
}
@PostMapping("/user")
public String user(@RequestBody User user) {
LOG.info("Quickstart [{}] Service [{}:{}] is called. user = [{}].", appName, ip, port, user);
return String.format("Quickstart [%s] Service [%s:%s] is called. user = [%s].", appName, ip, port, user);
}
/**
* Get metadata in HTTP header.
*

@ -0,0 +1,54 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.quickstart.callee.pojo;
/**
*
*
* @author Haotian Zhang
*/
public class User {
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}

@ -22,6 +22,7 @@ import java.net.URLDecoder;
import com.tencent.cloud.common.constant.MetadataConstant;
import com.tencent.cloud.quickstart.callee.config.DataSourceProperties;
import com.tencent.cloud.quickstart.callee.pojo.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -32,6 +33,8 @@ import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
@ -102,6 +105,12 @@ public class QuickstartCalleeController {
return metadataStr;
}
@PostMapping("/user")
public String user(@RequestBody User user) {
LOG.info("Quickstart [{}] Service [{}:{}] is called. user = [{}].", appName, ip, port, user);
return String.format("Quickstart [%s] Service [%s:%s] is called. user = [%s].", appName, ip, port, user);
}
/**
* Check circuit break.
*

@ -0,0 +1,54 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.quickstart.callee.pojo;
/**
*
*
* @author Haotian Zhang
*/
public class User {
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}

@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.common.metadata.MetadataContextHolder;
import com.tencent.cloud.quickstart.caller.pojo.User;
import com.tencent.polaris.api.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -36,6 +37,8 @@ import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
@ -256,4 +259,9 @@ public class QuickstartCallerController {
return new ResponseEntity<>(httpClientErrorException.getResponseBodyAsString(), httpClientErrorException.getStatusCode());
}
}
@PostMapping("/user")
public String user(@RequestBody User user) {
return restTemplate.postForObject("http://QuickstartCalleeService/quickstart/callee/user", user, String.class);
}
}

@ -0,0 +1,54 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.quickstart.caller.pojo;
/**
*
*
* @author Haotian Zhang
*/
public class User {
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}

@ -23,6 +23,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import com.tencent.cloud.common.util.PolarisCompletableFutureUtils;
import com.tencent.cloud.tsf.demo.consumer.entity.User;
import com.tencent.cloud.tsf.demo.consumer.proxy.ProviderDemoService;
import com.tencent.cloud.tsf.demo.consumer.proxy.ProviderService;
import com.tencent.polaris.api.utils.StringUtils;
@ -32,6 +33,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.tsf.core.TsfContext;
import org.springframework.tsf.core.entity.Tag;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
@ -167,4 +170,14 @@ public class ConsumerController {
TsfContext.putTags(mTags, Tag.ControlFlag.TRANSITIVE);
return providerService.echo(str);
}
@PostMapping("/user-rest")
public String userRest(@RequestBody User user) {
return restTemplate.postForObject("http://provider-demo/user", user, String.class);
}
@PostMapping("/user-feign")
public String userFeign(@RequestBody User user) {
return providerDemoService.user(user);
}
}

@ -0,0 +1,54 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.tsf.demo.consumer.entity;
/**
*
*
* @author Haotian Zhang
*/
public class User {
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}

@ -17,8 +17,11 @@
package com.tencent.cloud.tsf.demo.consumer.proxy;
import com.tencent.cloud.tsf.demo.consumer.entity.User;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
@ -33,4 +36,7 @@ public interface ProviderDemoService {
@RequestMapping(value = "/echo/slow/{str}", method = RequestMethod.GET)
String echoSlow(@PathVariable("str") String str, @RequestParam("delay") int delay);
@RequestMapping(value = "/user", method = RequestMethod.POST)
String user(@RequestBody User user);
}

@ -25,6 +25,7 @@ import java.util.Enumeration;
import javax.servlet.http.HttpServletResponse;
import com.tencent.cloud.tsf.demo.provider.config.ProviderNameConfig;
import com.tencent.cloud.tsf.demo.provider.entity.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -34,6 +35,8 @@ import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
@ -172,4 +175,12 @@ public class ProviderController {
return "info is set to return HttpStatus.OK.";
}
}
@PostMapping("/user")
public String user(@RequestBody User user) {
String response = String.format("from host-ip: %s, request body: %s, response from %s",
getInet4Address(), user, providerNameConfig.getName());
LOG.info(response);
return response;
}
}

@ -0,0 +1,54 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.tsf.demo.provider.entity;
/**
*
*
* @author Haotian Zhang
*/
public class User {
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}

@ -23,6 +23,7 @@
<module>spring-cloud-starter-tencent-fault-tolerance</module>
<module>spring-cloud-tencent-security-protection-plugin</module>
<module>spring-cloud-starter-tencent-multi-discovery-plugin</module>
<module>spring-cloud-starter-tencent-traffic-mirroring-plugin</module>
</modules>
</project>

@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring-cloud-tencent-plugin-starters</artifactId>
<groupId>com.tencent.cloud</groupId>
<version>${revision}</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-starter-tencent-traffic-mirroring-plugin</artifactId>
<name>Spring Cloud Starter Tencent Traffic Mirroring Plugin</name>
<dependencies>
<!-- Spring Cloud Tencent dependencies start -->
<dependency>
<groupId>com.tencent.cloud</groupId>
<artifactId>spring-cloud-starter-tencent-polaris-router</artifactId>
</dependency>
<!-- Spring Cloud Tencent dependencies end -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
</project>

@ -0,0 +1,47 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.trafficmirroring;
import com.tencent.cloud.plugin.trafficmirroring.config.TrafficMirroringProperties;
import com.tencent.cloud.rpc.enhancement.plugin.EnhancedPluginType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Traffic mirroring exception plugin.
*
* @author Haotian Zhang
*/
public class TrafficMirroringExceptionPlugin extends TrafficMirroringPostPlugin {
private static final Logger LOG = LoggerFactory.getLogger(TrafficMirroringExceptionPlugin.class);
public TrafficMirroringExceptionPlugin(TrafficMirroringProperties trafficMirroringProperties) {
super(trafficMirroringProperties);
}
@Override
public String getName() {
return TrafficMirroringExceptionPlugin.class.getName();
}
@Override
public EnhancedPluginType getType() {
return EnhancedPluginType.Client.EXCEPTION;
}
}

@ -0,0 +1,264 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.trafficmirroring;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.tencent.cloud.common.metadata.MetadataContextHolder;
import com.tencent.cloud.plugin.trafficmirroring.config.TrafficMirroringProperties;
import com.tencent.cloud.rpc.enhancement.plugin.EnhancedPlugin;
import com.tencent.cloud.rpc.enhancement.plugin.EnhancedPluginContext;
import com.tencent.cloud.rpc.enhancement.plugin.EnhancedPluginType;
import com.tencent.cloud.rpc.enhancement.utils.EnhancedRequestUtils;
import com.tencent.polaris.api.plugin.route.RouterConstants;
import com.tencent.polaris.api.utils.ClassUtils;
import com.tencent.polaris.api.utils.StringUtils;
import com.tencent.polaris.client.pojo.Node;
import com.tencent.polaris.client.util.NamedThreadFactory;
import com.tencent.polaris.metadata.core.MetadataObjectValue;
import com.tencent.polaris.metadata.core.MetadataType;
import feign.Request;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpRequest;
import static com.tencent.cloud.rpc.enhancement.plugin.PluginOrderConstant.ClientPluginOrder.TRAFFIC_MIRRORING_PLUGIN_ORDER;
/**
* Traffic mirroring post plugin.
*
* @author Haotian Zhang
*/
public class TrafficMirroringPostPlugin implements EnhancedPlugin {
private static final Logger LOG = LoggerFactory.getLogger(TrafficMirroringPostPlugin.class);
private final TrafficMirroringProperties trafficMirroringProperties;
private ThreadPoolExecutor threadPoolExecutor;
private HttpClient httpClient;
public TrafficMirroringPostPlugin(TrafficMirroringProperties trafficMirroringProperties) {
this.trafficMirroringProperties = trafficMirroringProperties;
}
@Override
public String getName() {
return TrafficMirroringPostPlugin.class.getName();
}
@Override
public EnhancedPluginType getType() {
return EnhancedPluginType.Client.POST;
}
@Override
public void run(EnhancedPluginContext context) throws Throwable {
if (!trafficMirroringProperties.isEnabled()) {
return;
}
Object originalRequest = context.getOriginRequest();
// only RestTemplate and Feign request can be mirrored.
if (!(ClassUtils.isClassPresent("org.springframework.http.HttpRequest")
&& originalRequest instanceof HttpRequest)
&& !(ClassUtils.isClassPresent("feign.Request")
&& originalRequest instanceof Request)) {
return;
}
// get traffic mirroring address.
String destination = null;
try {
MetadataObjectValue<Node> metadataValue = MetadataContextHolder.get()
.getMetadataContainer(MetadataType.CUSTOM, false)
.getMetadataValue(RouterConstants.TRAFFIC_MIRRORING_NODE_KEY);
if (metadataValue != null) {
Optional<Node> trafficMirroringNode = metadataValue.getObjectValue();
if (trafficMirroringNode.isPresent()) {
destination = trafficMirroringNode.get().getHostPort();
}
}
}
catch (Throwable throwable) {
LOG.warn("Get traffic mirroring node failed.", throwable);
}
if (StringUtils.isBlank(destination)) {
return;
}
initIfNeeded();
byte[] body = context.getOriginBody();
// send mirroring request.
try {
sendMirroringRequest(originalRequest, body, destination);
}
catch (Exception e) {
LOG.warn("Traffic mirroring request failed.", e);
}
}
private void initIfNeeded() {
if (threadPoolExecutor == null) {
// create thread pool executor.
int cpuCores = Runtime.getRuntime().availableProcessors();
int corePoolSize = cpuCores;
int maxPoolSize = trafficMirroringProperties.getRequestPoolSize();
if (cpuCores > trafficMirroringProperties.getRequestPoolSize()) {
corePoolSize = trafficMirroringProperties.getRequestPoolSize();
}
this.threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize,
60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("polaris-traffic-mirroring"));
}
if (httpClient == null) {
// create JDK HttpClient instance.
this.httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofMillis(trafficMirroringProperties.getRequestConnectionTimeout()))
.executor(threadPoolExecutor)
.build();
}
}
private void setHeaders(java.net.http.HttpRequest.Builder requestBuilder, Object originalRequest) {
if (ClassUtils.isClassPresent("org.springframework.http.HttpRequest")
&& originalRequest instanceof HttpRequest) {
HttpHeaders httpHeaders = ((HttpRequest) originalRequest).getHeaders();
httpHeaders.forEach((headerName, headerValues) -> {
if (shouldKeepHeader(headerName)) {
headerValues.forEach(headerValue ->
requestBuilder.header(headerName, headerValue));
}
});
}
else if (ClassUtils.isClassPresent("feign.Request")
&& originalRequest instanceof Request) {
Request request = (Request) originalRequest;
request.headers().forEach((headerName, headerValues) -> {
if (shouldKeepHeader(headerName)) {
headerValues.forEach(headerValue ->
requestBuilder.header(headerName, headerValue));
}
});
}
}
/**
* Send mirroring request to specified destination.
*/
private void sendMirroringRequest(Object originalRequest, byte[] originalBody, String destination) {
// rebuilt mirroring url: replace original host with specified destination.
URI originalUri = EnhancedRequestUtils.getUri(originalRequest);
String method = EnhancedRequestUtils.getMethod(originalRequest);
String mirroringUrl = rebuildUrlWithDestination(originalUri, destination);
if (StringUtils.isBlank(mirroringUrl)) {
LOG.debug("Traffic mirroring request url is empty.");
return;
}
if (originalBody == null) {
originalBody = new byte[0];
}
// create JDK HttpRequest builder.
java.net.http.HttpRequest.Builder requestBuilder = java.net.http.HttpRequest.newBuilder()
.uri(URI.create(mirroringUrl))
.method(method,
java.net.http.HttpRequest.BodyPublishers.ofByteArray(originalBody));
// copy request headers (except some special headers).
setHeaders(requestBuilder, originalRequest);
// build request.
java.net.http.HttpRequest mirrorRequest = requestBuilder.build();
// send async request (ignore response).
httpClient.sendAsync(mirrorRequest, HttpResponse.BodyHandlers.discarding())
.thenAccept(response -> {
if (response.statusCode() >= 400) {
LOG.warn("Traffic mirroring async request return: {}", response.statusCode());
}
else {
LOG.debug("Traffic mirroring async request return: {}", response.statusCode());
}
})
.exceptionally(ex -> {
LOG.warn("Traffic mirroring async request failed", ex);
return null;
});
}
/**
* Rebuild url with specified destination.
*/
private String rebuildUrlWithDestination(URI originalUri, String destination) {
if (StringUtils.isBlank(destination)) {
return null;
}
String scheme = originalUri.getScheme();
String path = originalUri.getRawPath();
String query = originalUri.getRawQuery();
// fill default port.
if (!StringUtils.contains(destination, ":")) {
destination += (scheme.equals("https") ? ":443" : ":80");
}
// build full mirroring url.
String finalUrl = String.format("%s://%s%s%s",
scheme,
destination,
path != null ? path : "",
query != null ? "?" + query : "");
LOG.debug("Traffic mirroring request url: {}", finalUrl);
return finalUrl;
}
/**
* check header should be kept.
*/
private boolean shouldKeepHeader(String headerName) {
return !headerName.equalsIgnoreCase("connection") &&
!headerName.equalsIgnoreCase("content-length") &&
!headerName.equalsIgnoreCase("expect") &&
!headerName.equalsIgnoreCase("host") &&
!headerName.equalsIgnoreCase("upgrade");
}
@Override
public void handlerThrowable(EnhancedPluginContext context, Throwable throwable) {
LOG.error("TrafficMirroringPostPlugin runs failed. context=[{}].", context, throwable);
}
@Override
public int getOrder() {
return TRAFFIC_MIRRORING_PLUGIN_ORDER;
}
}

@ -0,0 +1,49 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.trafficmirroring.config;
import com.tencent.cloud.plugin.trafficmirroring.TrafficMirroringExceptionPlugin;
import com.tencent.cloud.plugin.trafficmirroring.TrafficMirroringPostPlugin;
import com.tencent.cloud.polaris.router.config.ConditionalOnPolarisRouterEnabled;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Auto configuration for traffic mirroring plugin.
*
* @author Haotian Zhang
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnPolarisRouterEnabled
@ConditionalOnProperty(value = "spring.cloud.polaris.traffic-mirroring.enabled", matchIfMissing = true)
@EnableConfigurationProperties(TrafficMirroringProperties.class)
public class TrafficMirroringAutoConfiguration {
@Bean
public TrafficMirroringPostPlugin trafficMirrorPostPlugin(TrafficMirroringProperties trafficMirrorProperties) {
return new TrafficMirroringPostPlugin(trafficMirrorProperties);
}
@Bean
public TrafficMirroringExceptionPlugin trafficMirrorExceptionPlugin(TrafficMirroringProperties trafficMirrorProperties) {
return new TrafficMirroringExceptionPlugin(trafficMirrorProperties);
}
}

@ -0,0 +1,77 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.trafficmirroring.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* Properties for traffic mirroring.
*
* @author Haotian Zhang
*/
@ConfigurationProperties("spring.cloud.polaris.traffic-mirroring")
public class TrafficMirroringProperties {
/**
* If traffic mirroring is enabled. Default is true.
*/
private boolean enabled = true;
/**
* Traffic mirroring request pool size. Default is 100.
*/
private int requestPoolSize = 100;
/**
* Traffic mirroring request connection timeout in millisecond. Default is 5000.
*/
private long requestConnectionTimeout = 5000;
public boolean isEnabled() {
return enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
public int getRequestPoolSize() {
return requestPoolSize;
}
public void setRequestPoolSize(int requestPoolSize) {
this.requestPoolSize = requestPoolSize;
}
public long getRequestConnectionTimeout() {
return requestConnectionTimeout;
}
public void setRequestConnectionTimeout(long requestConnectionTimeout) {
this.requestConnectionTimeout = requestConnectionTimeout;
}
@Override
public String toString() {
return "TrafficMirroringProperties{" +
"enabled=" + enabled +
", requestPoolSize=" + requestPoolSize +
", requestConnectionTimeout=" + requestConnectionTimeout +
'}';
}
}

@ -0,0 +1,102 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.trafficmirroring.config;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Tests for {@link TrafficMirroringProperties}.
*
* @author Haotian Zhang
*/
public class TrafficMirroringPropertiesTest {
@Test
void testDefaultValues() {
TrafficMirroringProperties properties = new TrafficMirroringProperties();
assertThat(properties.isEnabled()).isTrue();
assertThat(properties.getRequestPoolSize()).isEqualTo(4);
assertThat(properties.getRequestConnectionTimeout()).isEqualTo(5000L);
}
@Test
void testEnabledProperty() {
TrafficMirroringProperties properties = new TrafficMirroringProperties();
properties.setEnabled(false);
assertThat(properties.isEnabled()).isFalse();
}
@Test
void testRequestPoolSizeProperty() {
TrafficMirroringProperties properties = new TrafficMirroringProperties();
properties.setRequestPoolSize(10);
assertThat(properties.getRequestPoolSize()).isEqualTo(10);
}
@Test
void testRequestPoolSizeBoundaryValues() {
TrafficMirroringProperties properties = new TrafficMirroringProperties();
properties.setRequestPoolSize(0);
assertThat(properties.getRequestPoolSize()).isZero();
properties.setRequestPoolSize(-5);
assertThat(properties.getRequestPoolSize()).isNegative();
}
@Test
void testRequestConnectionTimeoutProperty() {
TrafficMirroringProperties properties = new TrafficMirroringProperties();
properties.setRequestConnectionTimeout(3000L);
assertThat(properties.getRequestConnectionTimeout()).isEqualTo(3000L);
}
@Test
void testRequestConnectionTimeoutBoundaryValues() {
TrafficMirroringProperties properties = new TrafficMirroringProperties();
properties.setRequestConnectionTimeout(0L);
assertThat(properties.getRequestConnectionTimeout()).isZero();
properties.setRequestConnectionTimeout(-1000L);
assertThat(properties.getRequestConnectionTimeout()).isNegative();
}
@Test
void testToString() {
TrafficMirroringProperties properties = new TrafficMirroringProperties();
properties.setEnabled(false);
properties.setRequestPoolSize(8);
properties.setRequestConnectionTimeout(2000L);
String result = properties.toString();
assertThat(result)
.contains("enabled=false")
.contains("requestPoolSize=8")
.contains("requestConnectionTimeout=2000");
}
}

@ -70,6 +70,12 @@
<artifactId>spring-boot-starter-web</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
</project>

@ -192,6 +192,12 @@ public class RpcEnhancementAutoConfiguration {
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();
@Bean
@ConditionalOnClass(name = "org.springframework.cloud.client.loadbalancer.LoadBalancerInterceptor")
public BlockingLoadBalancerClientBeanPostProcessor loadBalancerInterceptorBeanPostProcessor() {
return new BlockingLoadBalancerClientBeanPostProcessor();
}
@Bean
@ConditionalOnMissingBean
@ConditionalOnClass(name = {"org.springframework.cloud.client.loadbalancer.LoadBalancerRequestTransformer"})

@ -87,6 +87,9 @@ public class EnhancedFeignClient implements Client {
.build();
enhancedPluginContext.setRequest(enhancedRequestContext);
enhancedPluginContext.setOriginRequest(request);
if (request.body() != null) {
enhancedPluginContext.setOriginBody(request.body());
}
enhancedPluginContext.setLocalServiceInstance(pluginRunner.getLocalServiceInstance());
String svcName = serviceUrl.getHost();

@ -61,7 +61,7 @@ public class EnhancedRestTemplateBlockingLoadBalancerClientInterceptor {
}
public <T> T intercept(HttpRequest httpRequest, String serviceId, ServiceInstance serviceInstance,
public <T> T intercept(HttpRequest httpRequest, byte[] body, String serviceId, ServiceInstance serviceInstance,
LoadBalancerRequest<T> loadBalancerRequest) throws IOException {
EnhancedPluginContext enhancedPluginContext = EnhancedPluginUtils.createEnhancedPluginContext();
@ -83,6 +83,9 @@ public class EnhancedRestTemplateBlockingLoadBalancerClientInterceptor {
.build();
enhancedPluginContext.setRequest(enhancedRequestContext);
enhancedPluginContext.setOriginRequest(httpRequest);
if (body != null) {
enhancedPluginContext.setOriginBody(body);
}
enhancedPluginContext.setLocalServiceInstance(pluginRunner.getLocalServiceInstance());
@ -111,7 +114,7 @@ public class EnhancedRestTemplateBlockingLoadBalancerClientInterceptor {
enhancedResponseContextBuilder.httpStatus(((ClientHttpResponse) response).getStatusCode().value());
enhancedResponseContextBuilder.httpHeaders(((ClientHttpResponse) response).getHeaders());
}
EnhancedResponseContext enhancedResponseContext = enhancedResponseContextBuilder.build();
EnhancedResponseContext enhancedResponseContext = enhancedResponseContextBuilder.build();
enhancedPluginContext.setResponse(enhancedResponseContext);

@ -35,9 +35,8 @@ import org.springframework.http.HttpRequest;
*/
public class PolarisBlockingLoadBalancerClient extends BlockingLoadBalancerClient {
private BlockingLoadBalancerClient delegate;
private final EnhancedPluginRunner enhancedPluginRunner;
private BlockingLoadBalancerClient delegate;
public PolarisBlockingLoadBalancerClient(ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory,
BlockingLoadBalancerClient delegate, EnhancedPluginRunner enhancedPluginRunner) {
@ -55,8 +54,9 @@ public class PolarisBlockingLoadBalancerClient extends BlockingLoadBalancerClien
if (httpRequest == null || enhancedPluginRunner == null) {
return delegate.execute(serviceId, request);
}
byte[] body = LoadBalancerUtils.getHttpBodyIfAvailable(request);
EnhancedRestTemplateBlockingLoadBalancerClientInterceptor enhancedRestTemplateBlockingLoadBalancerClientInterceptor = new EnhancedRestTemplateBlockingLoadBalancerClientInterceptor(enhancedPluginRunner, delegate);
return enhancedRestTemplateBlockingLoadBalancerClientInterceptor.intercept(httpRequest, serviceId, null, request);
return enhancedRestTemplateBlockingLoadBalancerClientInterceptor.intercept(httpRequest, body, serviceId, null, request);
}
/**
@ -68,7 +68,8 @@ public class PolarisBlockingLoadBalancerClient extends BlockingLoadBalancerClien
if (httpRequest == null || serviceInstance == null || enhancedPluginRunner == null) {
return delegate.execute(serviceId, serviceInstance, request);
}
byte[] body = LoadBalancerUtils.getHttpBodyIfAvailable(request);
EnhancedRestTemplateBlockingLoadBalancerClientInterceptor enhancedRestTemplateBlockingLoadBalancerClientInterceptor = new EnhancedRestTemplateBlockingLoadBalancerClientInterceptor(enhancedPluginRunner, delegate);
return enhancedRestTemplateBlockingLoadBalancerClientInterceptor.intercept(httpRequest, serviceId, serviceInstance, request);
return enhancedRestTemplateBlockingLoadBalancerClientInterceptor.intercept(httpRequest, body, serviceId, serviceInstance, request);
}
}

@ -39,6 +39,7 @@ public class EnhancedPluginContext {
private static final Logger LOGGER = LoggerFactory.getLogger(EnhancedPluginContext.class);
private final Map<String, Object> extraData = new ConcurrentHashMap<>();
private Object originRequest;
private byte[] originBody;
private EnhancedRequestContext request;
private EnhancedResponseContext response;
private Throwable throwable;
@ -57,6 +58,20 @@ public class EnhancedPluginContext {
this.originRequest = originRequest;
}
public byte[] getOriginBody() {
return originBody;
}
public void setOriginBody(byte[] originBody) {
if (originBody != null) {
this.originBody = new byte[originBody.length];
System.arraycopy(originBody, 0, this.originBody, 0, originBody.length);
}
else {
this.originBody = new byte[0];
}
}
public EnhancedRequestContext getRequest() {
return request;
}

@ -34,7 +34,7 @@ public class PluginOrderConstant {
* and
* {@link com.tencent.cloud.rpc.enhancement.plugin.reporter.ExceptionPolarisReporter}.
*/
public static final int CONSUMER_REPORTER_PLUGIN_ORDER = Ordered.HIGHEST_PRECEDENCE + 1;
public static final int CONSUMER_REPORTER_PLUGIN_ORDER = Ordered.HIGHEST_PRECEDENCE + 10;
/**
* order for
@ -42,7 +42,15 @@ public class PluginOrderConstant {
* and
* {@link com.tencent.cloud.polaris.circuitbreaker.reporter.ExceptionCircuitBreakerReporter}.
*/
public static final int CIRCUIT_BREAKER_REPORTER_PLUGIN_ORDER = Ordered.HIGHEST_PRECEDENCE + 2;
public static final int CIRCUIT_BREAKER_REPORTER_PLUGIN_ORDER = Ordered.HIGHEST_PRECEDENCE + 30;
/**
* order for
* {@link com.tencent.cloud.plugin.trafficmirroring.TrafficMirrorPostPlugin}
* and
* {@link com.tencent.cloud.plugin.trafficmirroring.TrafficMirrorExceptionPlugin}.
*/
public static final int TRAFFIC_MIRRORING_PLUGIN_ORDER = Ordered.HIGHEST_PRECEDENCE + 20;
/**
* order for

@ -0,0 +1,85 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.rpc.enhancement.utils;
import java.net.URI;
import com.tencent.polaris.api.utils.ClassUtils;
import feign.Request;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpRequest;
/**
* Utils for enhanced request.
*
* @author Haotian Zhang
*/
public final class EnhancedRequestUtils {
private static final Logger LOG = LoggerFactory.getLogger(EnhancedRequestUtils.class);
private EnhancedRequestUtils() {
}
/**
* Get uri from original request.
*
* @param originalRequest original request
* @return URI
*/
public static URI getUri(Object originalRequest) {
URI uri = null;
if (ClassUtils.isClassPresent("org.springframework.http.HttpRequest")
&& originalRequest instanceof HttpRequest) {
HttpRequest httpRequest = (HttpRequest) originalRequest;
uri = httpRequest.getURI();
LOG.debug("Get uri: {} from org.springframework.http.HttpRequest", uri);
}
else if (ClassUtils.isClassPresent("feign.Request")
&& originalRequest instanceof Request) {
Request request = (Request) originalRequest;
uri = URI.create(request.url());
LOG.debug("Get uri: {} from feign.Request", uri);
}
return uri;
}
/**
* Get method from original request.
*
* @param originalRequest original request
* @return method
*/
public static String getMethod(Object originalRequest) {
String method = "GET";
if (ClassUtils.isClassPresent("org.springframework.http.HttpRequest")
&& originalRequest instanceof HttpRequest) {
method = ((HttpRequest) originalRequest).getMethod().name();
LOG.debug("Get method: {} from org.springframework.http.HttpRequest", method);
}
else if (ClassUtils.isClassPresent("feign.Request")
&& originalRequest instanceof Request) {
method = ((Request) originalRequest).httpMethod().name();
LOG.debug("Get method: {} from feign.Request", method);
}
return method;
}
}

@ -50,8 +50,8 @@ public final class LoadBalancerUtils {
/**
* if request is a LoadBalancerRequestAdapter(RetryLoadBalancerInterceptor), return its delegate.
*/
public static LoadBalancerRequest<?> getDelegateLoadBalancerRequestIfAvailable(LoadBalancerRequest<?> request) {
if (!(request instanceof LoadBalancerRequestAdapter)) {
public static LoadBalancerRequest<?> getDelegateLoadBalancerRequestIfAvailable(LoadBalancerRequest<?> request) {
if (!(request instanceof LoadBalancerRequestAdapter)) {
if (logger.isDebugEnabled()) {
logger.debug("LoadBalancerRequest is not a LoadBalancerRequestAdapter, request:{}", request);
}
@ -72,4 +72,38 @@ public final class LoadBalancerUtils {
return null;
}
/**
* if request is a BlockingLoadBalancerRequest, return its body.
*/
public static byte[] getHttpBodyIfAvailable(LoadBalancerRequest<?> request) {
if (request instanceof BlockingLoadBalancerRequest) {
BlockingLoadBalancerRequest blockingRequest = (BlockingLoadBalancerRequest) request;
try {
// get clientHttpRequestData field from BlockingLoadBalancerRequest
Field clientHttpRequestDataField = BlockingLoadBalancerRequest.class.getDeclaredField("clientHttpRequestData");
clientHttpRequestDataField.setAccessible(true);
BlockingLoadBalancerRequest.ClientHttpRequestData clientHttpRequestData = (BlockingLoadBalancerRequest.ClientHttpRequestData) clientHttpRequestDataField.get(blockingRequest);
// get body field from clientHttpRequestData
Field bodyField = BlockingLoadBalancerRequest.ClientHttpRequestData.class.getDeclaredField("body");
bodyField.setAccessible(true);
return (byte[]) bodyField.get(clientHttpRequestData);
}
catch (Exception e) {
// ignore
if (logger.isDebugEnabled()) {
logger.debug("Failed to get body from BlockingLoadBalancerRequest, request:{}", request, e);
}
}
return null;
}
else {
if (logger.isDebugEnabled()) {
logger.debug("LoadBalancerRequest is not a BlockingLoadBalancerRequest, request:{}", request);
}
return null;
}
}
}

@ -18,6 +18,7 @@
package com.tencent.cloud.rpc.enhancement.instrument.feign;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -118,18 +119,18 @@ public class EnhancedFeignClientTest {
// 200
Response response = polarisFeignClient.execute(Request.create(Request.HttpMethod.GET, "http://localhost:8080/test",
Collections.emptyMap(), null, requestTemplate), null);
Collections.emptyMap(), new byte[0], StandardCharsets.UTF_8, requestTemplate), null);
assertThat(response.status()).isEqualTo(200);
// 502
response = polarisFeignClient.execute(Request.create(Request.HttpMethod.POST, "http://localhost:8080/test",
Collections.emptyMap(), null, requestTemplate), null);
Collections.emptyMap(), new byte[0], StandardCharsets.UTF_8, requestTemplate), null);
assertThat(response.status()).isEqualTo(502);
// Exception
try {
polarisFeignClient.execute(Request.create(Request.HttpMethod.DELETE, "http://localhost:8080/test",
Collections.emptyMap(), null, requestTemplate), null);
Collections.emptyMap(), new byte[0], StandardCharsets.UTF_8, requestTemplate), null);
fail("IOException should be thrown.");
}
catch (Throwable t) {
@ -166,7 +167,7 @@ public class EnhancedFeignClientTest {
// Exception
try {
polarisFeignClient.execute(Request.create(Request.HttpMethod.GET, "http://localhost:8080/test",
Collections.emptyMap(), null, requestTemplate), null);
Collections.emptyMap(), new byte[0], StandardCharsets.UTF_8, requestTemplate), null);
fail("CallAbortedException should be thrown.");
}
catch (CallAbortedException t) {
@ -180,7 +181,7 @@ public class EnhancedFeignClientTest {
// fallback 200
Response response = polarisFeignClient.execute(Request.create(Request.HttpMethod.GET, "http://localhost:8080/test",
Collections.emptyMap(), null, requestTemplate), null);
Collections.emptyMap(), new byte[0], StandardCharsets.UTF_8, requestTemplate), null);
assertThat(response.status()).isEqualTo(200);
}

Loading…
Cancel
Save