|
|
|
@ -17,10 +17,10 @@
|
|
|
|
|
|
|
|
|
|
package com.tencent.cloud.plugin.trafficmirroring;
|
|
|
|
|
|
|
|
|
|
import java.io.OutputStream;
|
|
|
|
|
import java.net.HttpURLConnection;
|
|
|
|
|
import java.net.URI;
|
|
|
|
|
import java.net.http.HttpClient;
|
|
|
|
|
import java.net.http.HttpResponse;
|
|
|
|
|
import java.time.Duration;
|
|
|
|
|
import java.net.URL;
|
|
|
|
|
import java.util.Optional;
|
|
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
|
|
@ -61,8 +61,6 @@ public class TrafficMirroringPostPlugin implements EnhancedPlugin {
|
|
|
|
|
|
|
|
|
|
private ThreadPoolExecutor threadPoolExecutor;
|
|
|
|
|
|
|
|
|
|
private HttpClient httpClient;
|
|
|
|
|
|
|
|
|
|
public TrafficMirroringPostPlugin(TrafficMirroringProperties trafficMirroringProperties) {
|
|
|
|
|
this.trafficMirroringProperties = trafficMirroringProperties;
|
|
|
|
|
}
|
|
|
|
@ -137,23 +135,16 @@ public class TrafficMirroringPostPlugin implements EnhancedPlugin {
|
|
|
|
|
60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("polaris-traffic-mirroring"));
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
if (httpClient == null) {
|
|
|
|
|
// create JDK HttpClient instance.
|
|
|
|
|
this.httpClient = HttpClient.newBuilder()
|
|
|
|
|
.connectTimeout(Duration.ofMillis(trafficMirroringProperties.getRequestConnectionTimeout()))
|
|
|
|
|
.executor(threadPoolExecutor)
|
|
|
|
|
.build();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void setHeaders(java.net.http.HttpRequest.Builder requestBuilder, Object originalRequest) {
|
|
|
|
|
private void setHeaders(HttpURLConnection connection, Object originalRequest) {
|
|
|
|
|
if (ClassUtils.isClassPresent("org.springframework.http.HttpRequest")
|
|
|
|
|
&& originalRequest instanceof HttpRequest) {
|
|
|
|
|
HttpHeaders httpHeaders = ((HttpRequest) originalRequest).getHeaders();
|
|
|
|
|
httpHeaders.forEach((headerName, headerValues) -> {
|
|
|
|
|
if (shouldKeepHeader(headerName)) {
|
|
|
|
|
headerValues.forEach(headerValue ->
|
|
|
|
|
requestBuilder.header(headerName, headerValue));
|
|
|
|
|
connection.setRequestProperty(headerName, headerValue));
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
@ -163,7 +154,7 @@ public class TrafficMirroringPostPlugin implements EnhancedPlugin {
|
|
|
|
|
request.headers().forEach((headerName, headerValues) -> {
|
|
|
|
|
if (shouldKeepHeader(headerName)) {
|
|
|
|
|
headerValues.forEach(headerValue ->
|
|
|
|
|
requestBuilder.header(headerName, headerValue));
|
|
|
|
|
connection.setRequestProperty(headerName, headerValue));
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
@ -185,31 +176,38 @@ public class TrafficMirroringPostPlugin implements EnhancedPlugin {
|
|
|
|
|
originalBody = new byte[0];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// create JDK HttpRequest builder.
|
|
|
|
|
java.net.http.HttpRequest.Builder requestBuilder = java.net.http.HttpRequest.newBuilder()
|
|
|
|
|
.uri(URI.create(mirroringUrl))
|
|
|
|
|
.method(method,
|
|
|
|
|
java.net.http.HttpRequest.BodyPublishers.ofByteArray(originalBody));
|
|
|
|
|
byte[] finalOriginalBody = originalBody;
|
|
|
|
|
threadPoolExecutor.execute(() -> {
|
|
|
|
|
try {
|
|
|
|
|
URL url = new URL(mirroringUrl);
|
|
|
|
|
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
|
|
|
|
|
connection.setRequestMethod(method);
|
|
|
|
|
connection.setConnectTimeout((int) trafficMirroringProperties.getRequestConnectionTimeout());
|
|
|
|
|
connection.setReadTimeout((int) trafficMirroringProperties.getRequestConnectionTimeout());
|
|
|
|
|
|
|
|
|
|
// copy request headers (except some special headers).
|
|
|
|
|
setHeaders(requestBuilder, originalRequest);
|
|
|
|
|
setHeaders(connection, originalRequest);
|
|
|
|
|
|
|
|
|
|
// build request.
|
|
|
|
|
java.net.http.HttpRequest mirrorRequest = requestBuilder.build();
|
|
|
|
|
// set request body.
|
|
|
|
|
if (finalOriginalBody.length > 0) {
|
|
|
|
|
connection.setDoOutput(true);
|
|
|
|
|
try (OutputStream os = connection.getOutputStream()) {
|
|
|
|
|
os.write(finalOriginalBody);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// send async request (ignore response).
|
|
|
|
|
httpClient.sendAsync(mirrorRequest, HttpResponse.BodyHandlers.discarding())
|
|
|
|
|
.thenAccept(response -> {
|
|
|
|
|
if (response.statusCode() >= 400) {
|
|
|
|
|
LOG.warn("Traffic mirroring async request return: {}", response.statusCode());
|
|
|
|
|
// send request (ignore response).
|
|
|
|
|
int responseCode = connection.getResponseCode();
|
|
|
|
|
if (responseCode >= 400) {
|
|
|
|
|
LOG.warn("Traffic mirroring request return: {}", responseCode);
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
LOG.debug("Traffic mirroring async request return: {}", response.statusCode());
|
|
|
|
|
LOG.debug("Traffic mirroring request return: {}", responseCode);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
catch (Exception e) {
|
|
|
|
|
LOG.warn("Traffic mirroring request failed", e);
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
.exceptionally(ex -> {
|
|
|
|
|
LOG.warn("Traffic mirroring async request failed", ex);
|
|
|
|
|
return null;
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|